Apache Oozie is a workflow scheduler that is used to manage Apache Hadoop jobs. Oozie combines multiple jobs sequentially into one logical unit of work as a directed acyclic graph (DAG) of actions. Oozie is reliable, scalable, extensible, and well integrated with the Hadoop stack, with YARN as its architectural center. It provides several types of Hadoop jobs out of the box, such as Java map-reduce, Pig, Hive, Sqoop, and DistCp, as well as system-specific jobs, such as Java programs and shell scripts.

Apache Spark is a fast, in-memory data processing engine with elegant and expressive development APIs that enable you to efficiently execute streaming, machine learning, or SQL workloads that require fast iterative access to data sets. The Hadoop YARN-based architecture provides the foundation that enables Spark to share a common cluster and data set.

A new action type is available in Oozie 4.2.0 that weaves a Spark job into your workflow. The workflow waits until the Spark job completes before continuing to the next action. This article shows you how to use the new Spark action to run Spark jobs on IBM Open Platform with Apache Hadoop (IOP) 4.1.

Consider a simple word count application that creates a distribution of words in a set of text files. Such an application, written in the Spark Java API, can serve as a good example of a Spark job in your workflow. The following list identifies, at a high level, the actions that the Spark driver must execute:

  1. Read an input set of text documents.
  2. Count the number of times each word appears.
  3. Sort by word count and output the result in CSV format and in descending order.

The following sections show you how to schedule and launch this Spark application on YARN with Oozie. A full program listing appears at the end of the article.

1. Create a workflow definition: workflow.xml

The following simple workflow definition executes one Spark job:

[code language=”shell”] <workflow-app xmlns=’uri:oozie:workflow:0.5′ name=’SparkWordCount’>
<start to=’spark-node’ />
<action name=’spark-node’>
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data"/>
</prepare>
<master>${master}</master>
<name>Spark-Wordcount</name>
<class>com.ibm.biginsights.oozie.examples.WordCountSparkMain</class>
<jar>${nameNode}/user/${wf:user()}/${examplesRoot}/lib/examples-1.0.jar</jar>
<spark-opts>–conf spark.driver.extraJavaOptions=-Diop.version=4.1.0.0</spark-opts>
<arg>${nameNode}/user/${wf:user()}/${examplesRoot}/input-data</arg>
<arg>${nameNode}/user/${wf:user()}/${examplesRoot}/output-data</arg>
</spark>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Workflow failed, error
message[${wf:errorMessage(wf:lastErrorNode())}] </message>
</kill>
<end name=’end’ />
</workflow-app>
[/code]

Some of these elements are defined as follows:

  • The prepare element specifies a list of paths to delete or create before starting the job. The paths must start with: hdfs://host_name:port_number
  • The master element specifies the URL of the Spark Master; for example, spark://host:port, mesos://host:port, yarn-cluster, yarn-master, or local. For Spark on YARN mode, specify yarn-client or yarn-cluster in the master element. In this example, master=yarn-cluster.
  • The name element specifies the name of the Spark application.
  • The class element specifies the main class of the Spark application.
  • The jar element specifies a comma-separated list of JAR files.
  • The spark-opts element, if present, contains a list of Spark configuration options that can be passed to the Spark driver by specifying ‘-conf key=value‘.
  • The arg element contains arguments that can be passed to the Spark application.

For detailed information about the Spark XML schema in Oozie, see https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html.

2. Create an Oozie job configuration: job.properties


nameNode=hdfs://nn:8020
jobTracker=rm:8050
master=yarn-cluster
queueName=default
examplesRoot=spark-example
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}

3. Create an Oozie application directory

Create an application directory structure with the workflow definition and resources, as shown in the following example:


+-~/spark-example/
  +-job.properties
  +-workflow.xml
  +-lib/
    +-example-1.0.jar 

The example-1.0.jar file contains the Spark application.

4. Download the spark-assembly.jar file

Download the spark-assembly.jar file by browsing the HDFS NameNode user interface.

  1. From the Ambari console, select HDFS and then Quick Links –> NameNode UI.
  2. Click Utilities –> Browse the file system.
  3. From the Hadoop file explorer, navigate to /iop/apps/4.1.0.0/spark/jars, select spark-assembly.jar, click Download and save the file.
  4. Move the downloaded spark-assembly.jar file into the lib directory, which results in the following directory structure:
    
    +-~/spark-example/
      +-job.properties
      +-workflow.xml
      +-lib/
        +-example-1.0.jar
        +-spark-assembly.jar
    

5. Copy the application to the HDFS

Copy the spark-example/ directory to the user HOME directory in the HDFS. Ensure that the spark-example location in the HDFS matches the value of oozie.wf.application.path in job.properties.

[code language=”shell”] $ hadoop fs -put spark-example spark-example
[/code]

6. Run the example job

Submit the Oozie job by running the following command:

[code language=”shell”] $cd ~/spark-example
$oozie job -oozie http://oozie-host:11000/oozie -config ./job.properties –run
job: 0000012-151103233206132-oozie-oozi-W
[/code]

Check the workflow job status:


$ oozie job –oozie http://oozie-host:11000/oozie -info 0000012-151103233206132-oozie-oozi-W

Job ID : 0000012-151103233206132-oozie-oozi-W
β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
Workflow Name : SparkWordCount
App Path      : hdfs://bdvs1211.svl.ibm.com:8020/user/root/spark-example
Status        : SUCCEEDED
Run           : 0
User          : root
Group         : –
Created       : 2015-11-04 15:19 GMT
Started       : 2015-11-04 15:19 GMT
Last Modified : 2015-11-04 15:23 GMT
Ended         : 2015-11-04 15:23 GMT
CoordAction ID: –

Actions
β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
ID                                                Status    Ext ID                 Ext Status Err Code  
β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
0000012-151103233206132-oozie-oozi-W@:start:      OK        –                      OK         –         
0000012-151103233206132-oozie-oozi-W@spark-node   OK        job_1446622088718_0022 SUCCEEDED  –         
0000012-151103233206132-oozie-oozi-W@end          OK        –                      OK         –         
β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”

The full Java program

[code language=”shell”] public class WordCountSparkMain {
public static void main(String[] args) {
if (args.length < 2) {
System.err.println("Usage: WordCountSparkMain <file> <file>");
System.exit(1);
}
String inputPath = args[0];
String outputPath = args[1];
SparkConf sparkConf = new SparkConf().setAppName("Word count");
try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {
JavaRDD<String> lines = ctx.textFile(inputPath, 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
public Iterable<String> call(String sentence) {
List<String> result = new ArrayList<>();
if (sentence != null) {
String[] words = sentence.split(" ");
for (String word : words) {
if (word != null && word.trim().length() > 0) {
result.add(word.trim().toLowerCase());
}
}
}
return result;
}
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});

JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer a, Integer b) {
return a + b;
}
}, 2);
JavaPairRDD<Integer, String> countsAfterSwap = counts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
private static final long serialVersionUID = 2267107270683328434L;
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
throws Exception {
return new Tuple2<>(t._2, t._1);
}
});
countsAfterSwap = countsAfterSwap.sortByKey(false);
counts = countsAfterSwap.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
private static final long serialVersionUID = 2267107270683328434L;
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> t)
throws Exception {
return new Tuple2<>(t._2, t._1);
}
});
JavaRDD<String> results = counts.map(new Function<Tuple2<String, Integer>, String>() {
@Override
public String call(Tuple2<String, Integer> v1) throws Exception {
return String.format("%s,%s", v1._1, Integer.toString(v1._2));
}
});
results.saveAsTextFile(outputPath);
}
}
}
[/code]

5 comments on"How to run a Spark job on YARN with Oozie"

  1. bracelet cartierΒ diamant copie August 05, 2016

    cartierbraceletlove He is one of the first I’ve heard who places the blame for Arab backwardness where it belongs! May he live long and multiply as quickly as he can!! But the 70 million ignorant Arabs who don’t read and swallow the violence and hatred fed them from their Imams and their leaders…. well all we can do is simply give them back the war they wage on everyone else … we β€œPeople of the Book”.
    bracelet cartierΒ diamant copie http://www.bestcalove.net/fr/

  2. bracelet cartierΒ diamant replique August 05, 2016

    cartierbraceletlove Whether you consider the launch a success or failure, Diana Harbour of The Red Dress Boutique was able to take advantage of the sell out for an extra $100K in sales. The CEO and founder of the online boutique was not surprised by the frenzy, but took to Twitter to offer 20% off her clothes, which are brightly-colored and reasonably priced, just like the Lilly for Target line. Kudos to a young, female entrepreneur who capitalized on the disappointment of shoppers and paid attention to her competition by creating opportunity for her own business. She continues to grow her loyal customer base and not frustrate them, as Target did.
    bracelet cartierΒ diamant replique http://www.classiquebijoux.com/fake-cartier-juste-un-clou-bracelet-diamonds-18k-white-gold-b6037915-p277/

  3. Where can I find the example-1.0.jar file?

  4. Hi,

    I’m trying to run one of the examples from the Spark2 on Yarn using Oozie. But I’m getting the below error:

    java.lang.NoClassDefFoundError: org/apache/spark/sql/SparkSession$
    at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:28)
    at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:559)
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    … 7 more

    I’ve included the assembly jar to the lib structure but it’s still giving me this error. Any idea which other Jars need to be included in the lib for Spark2?

    • The error tells, you don’t have .jar file which has “org/apache/spark/sql/SparkSession”. Check the spark version you are running or the spark jar you added in your maven dependency. You should have spark 2 libraries as the SparkSession is part of Spark2 and not present in Spark 1.6 or older version than spark 2.0

Join The Discussion

Your email address will not be published. Required fields are marked *