Apache Oozie is a workflow scheduler that is used to manage Apache Hadoop jobs. Oozie combines multiple jobs sequentially into one logical unit of work as a directed acyclic graph (DAG) of actions. Oozie is reliable, scalable, extensible, and well integrated with the Hadoop stack, with YARN as its architectural center. It provides several types of Hadoop jobs out of the box, such as Java map-reduce, Pig, Hive, Sqoop, SSH, and DistCp, as well as system-specific jobs, such as Java programs and shell scripts.

Apache Spark is a fast general purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools, including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming. The Hadoop YARN-based architecture provides the foundation that enables Spark to share a common cluster and data set.

This article is part 2 of a series that shows you how to use Oozie to schedule various Spark applications (written in Python, SparkR, SystemML, Scala, and SparkSQL) on YARN. Part 2 focuses on SparkSQL and SparkML with Oozie. The Spark version is 2.1.0, and the Oozie version is 4.3.0.

Consider a simple SparkSQL application that is written in the Spark Scala API. The following example uses SparkSQL to query structured data that is stored in a file. A full program listing appears at the end of the article.

Note that the following two data files, which are used by this application, must be placed under /user/${wf:user()}/examples/src/main/resources/ in the HDFS.

# vi people.json 
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

# vi people.txt 
Michael, 29
Andy, 30
Justin, 19
  1. Create a workflow definition (workflow.xml). The following simple workflow definition executes one Spark job:
    <workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkSQL'>
        <start to='spark-node' />
        <action name='spark-node'>
            <spark xmlns="uri:oozie:spark-action:0.1">
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <master>${master}</master>
                <name>Spark-SQL</name>
                  <class>com.ibm.biginsights.oozie.examples.SparkSQLExample</class>
               <jar>${nameNode}/user/${wf:user()}/${examplesRoot}/apps/sparksql/lib/spark-examples.jar</jar>
               <spark-opts>--conf spark.driver.extraJavaOptions=-Diop.version=4.3.0.0 --conf spark.yarn.archive=hdfs://nn:8020/iop/apps/4.3.0.0-0000/spark2/spark2-iop-yarn-archive.tar.gz</spark-opts>
                <arg>2</arg>
            </spark>
            <ok to="end" />
            <error to="fail" />
        </action>
        <kill name="fail">
            <message>Workflow failed, error
                message[${wf:errorMessage(wf:lastErrorNode())}]
            </message>
        </kill>
        <end name='end' />
    </workflow-app>
    

    Some of these elements in Oozie Spark workflow.xml are defined as follows:

    • The master element specifies the URL of the Spark Master; for example, spark://host:port, mesos://host:port, yarn-cluster, yarn-master, or local. For Spark on YARN mode, specify yarn-client or yarn-cluster in the master element. In this example, master=yarn-cluster.
    • The name element specifies the name of the Spark application.
    • The jar element specifies a comma-separated list of JAR files.
    • The spark-opts element, if present, contains a list of Spark configuration options that can be passed to the Spark driver by specifying ‘-conf key=value’.
    • The arg element contains arguments that can be passed to the Spark application.

    For detailed information about the Spark XML schema in Oozie, see https://oozie.apache.org/docs/4.3.0/DG_SparkActionExtension.html

  2. Create an Oozie job configuration (job.properties):
    nameNode=hdfs://nn:8020
    jobTracker=rm:8050
    master=yarn-cluster
    queueName=default
    examplesRoot=examples
    oozie.use.system.libpath=true
    oozie.wf.application.path=/user/oozie/examples/apps/sparksql
    
  3. Create an Oozie application directory. Create an application directory structure with the workflow definition and resources, as shown in the following example:
    +-~/sparksql/
      +-job.properties
      +-workflow.xml
      +-lib/
        +-spark-example.jar
    

    The spark-example.jar file contains the Spark application.

  4. Copy the application to the HDFS. Copy the sparksql/ directory to the user HOME directory in the HDFS. Ensure that the sparksql location in the HDFS matches the value of oozie.wf.application.path in job.properties.
    $ hadoop fs -put sparksql/ /user/oozie/examples/apps
    
  5. Run the example job.
    1. Submit the Oozie job by running the following command:
      $ cd ~/sparksql
      $ oozie job -oozie http://oozie-host:11000/oozie -config ./job.properties –run
      job: 0000005-170413230214295-oozie-oozi-W
      
    2. Check the workflow job status:
      $ oozie job -oozie http://oozie-host:11000/oozie -info 0000005-170413230214295-oozie-oozi-W
      
      Job ID : 0000005-170413230214295-oozie-oozi-W
      -------------------------------------------------------------------------------
      Workflow Name : SparkSQL
      App Path      : /user/oozie/examples/apps/sparksql
      Status        : SUCCEEDED
      Run           : 0
      User          : oozie
      Group         : -
      Created       : 2017-04-25 02:28 GMT
      Started       : 2017-04-25 02:28 GMT
      Last Modified : 2017-04-25 02:29 GMT
      Ended         : 2017-04-25 02:29 GMT
      CoordAction ID: -
      
      Actions
      ------------------------------------------------------------------------------------------------------------------------
      ID                                                                            Status    Ext ID                 Ext Status Err Code  
      ------------------------------------------------------------------------------------------------------------------------
      0000005-170413230214295-oozie-oozi-W@:start:                                  OK        -                      OK         -         
      ------------------------------------------------------------------------------------------------------------------------
      0000005-170413230214295-oozie-oozi-W@spark-node                               OK        job_1492070316231_0015 SUCCEEDED  -         
      ------------------------------------------------------------------------------------------------------------------------
      0000005-170413230214295-oozie-oozi-W@end                                      OK        -                      OK         -         
      ------------------------------------------------------------------------------------------------------------------------
      

The full Scala program

object SparkSQLExample {
  case class Person(name: String, age: Long)
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    import spark.implicits._
    runBasicDataFrameExample(spark)
    runDatasetCreationExample(spark)
    runInferSchemaExample(spark)
    runProgrammaticSchemaExample(spark)
    spark.stop()
  }
  private def runBasicDataFrameExample(spark: SparkSession): Unit = {
    val df = spark.read.json("examples/src/main/resources/people.json")
    df.show()
    import spark.implicits._
    df.printSchema()
    df.select("name").show()
    df.select($"name", $"age" + 1).show()
df.filter($"age" > 21).show()
    df.groupBy("age").count().show()
    df.createOrReplaceTempView("people")
    val sqlDF = spark.sql("SELECT * FROM people")
    sqlDF.show()
    df.createGlobalTempView("people")
    spark.sql("SELECT * FROM global_temp.people").show()
    spark.newSession().sql("SELECT * FROM global_temp.people").show()
  }
  private def runDatasetCreationExample(spark: SparkSession): Unit = {
    import spark.implicits._
    val caseClassDS = Seq(Person("Andy", 32)).toDS()
    caseClassDS.show()
    val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
    val path = "examples/src/main/resources/people.json"
    val peopleDS = spark.read.json(path).as[Person]
    peopleDS.show()
  }
  private def runInferSchemaExample(spark: SparkSession): Unit = {
    import spark.implicits._
    val peopleDF = spark.sparkContext
      .textFile("examples/src/main/resources/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    peopleDF.createOrReplaceTempView("people")
    val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
    teenagersDF.map(teenager => "Name: " + teenager(0)).show()
    teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
    implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
    teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
  }
  private def runProgrammaticSchemaExample(spark: SparkSession): Unit = {
    import spark.implicits._
    val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
    val schemaString = "name age"
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    val rowRDD = peopleRDD
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim))
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    peopleDF.createOrReplaceTempView("people")
    val results = spark.sql("SELECT name FROM people")
    results.map(attributes => "Name: " + attributes(0)).show()
  }
}

SparkML jobs

You can also schedule a SparkML job by using an Oozie Spark action. The steps for doing so are similar to those for a SparkSQL job, except that the program is using a SparkML library. You can also use Java to write the program and put the Java class name in the Oozie workflow definition.

5 comments on"Scheduling a SparkSQL or SparkML job written in Java or Scala on YARN with Oozie"

  1. Thanks for the informative post. i was looking for apache hadoop jobs with different methods. really helpful.

    Regards

    Kuldeep

    https://acadgild.com/big-data/big-data-development-training-certification

  2. Hello, Thank you for posting detailed information about oozie. I have an application which is built on Java and spring boot to query database and process the data using spark in that application. Does oozie support the application built on java and spring boot to run on hadoop cluster?

  3. I really appreciate your content. I was facing some issues in scheduling SparkML jobs, but it is clear now. The way you explain the concept, exactly in the same way my instructor explained to me yesterday in my Apache Spark Training.
    I really appreciate IBM for providing us an informative platform.

Join The Discussion

Your email address will not be published. Required fields are marked *