With Apache Spark gaining popularity as the processing framework in the bigdata world, there also comes a need to remotely submit and monitor Spark jobs. This could be achieved via REST calls using Oozie and other open source Spark job servers.
Livy is one such open source (Apache licensed) Spark REST Server for submitting and interacting with your Spark jobs from anywhere. This post covers the steps needed to execute Spark jobs in IOP 4.1 using Livy.

Building Livy:

Livy is built using Apache Maven, please refer the Livy GitHub for details. Checkout and build Livy to use Spark 1.5.1 (since the latest IOP ships with that), run:
%git clone git@github.com:cloudera/livy.git
%cd livy
%mvn -DskipTests -Dspark.version=1.5.1 clean package

Since Livy supports executing snippets of Scala, pySpark, SparkR, the following are the typical requirements for running it:

  • mvn (from maven package or maven3 tarball)
  • java-1.7.0-openjdk (or Oracle Java7 jdk) or above
  • spark 1.4+
  • Python 2.6+
  • R 3.x

Set the environment variable and start the Livy Server:
%export SPARK_HOME=/usr/iop/current/spark-client
%export HADOOP_CONF_DIR=/etc/hadoop/conf
%export LIVY_REPL_JAVA_OPTS=-Diop.version=4.1.0.0
%bin/livy-server

Livy can be used either a) interactively, where a session is created to execute snippets of code or b) batch, where an entire script is submitted to be run on the Spark cluster.

Interactive:
This allows creating an interactive session to execute commands in Spark.

List the sessions:
%curl http://bdavm1611:8998/sessions
{"from":0,"total":0,"sessions":[]}

Start a new session:
% curl -X POST --data '{"kind": "scala"}' -H "Content-Type: application/json" bdavm1611:8998/sessions
{"state":"starting","proxyUser":null,"id":0,"kind":"spark","log":[]}
In the above example, "kind": "scala" indicates a new Scala session is being initiated. Livy supports other kinds like spark, pyspark, or sparkr . To check the status of this running Livy session, one can check the YARN UI as well. Every session has an ‘id‘, which can be used to reference that session.

Wait till the status shows idle before issuing commands:
%curl http://bdavm1611:8998/sessions
{"from":0,"total":1,"sessions":[{"state":"idle","proxyUser":null,"id":0,"kind":"spark","log":[]}]}

Execute commands by referencing the session id:
%curl bdavm1611:8998/sessions/0/statements -X POST -H 'Content-Type: application/json' -d '{"code":"val a= 100;val b=a+1100;b"}'
{"id":0,"state":"running","output":null}
Similar to the ‘id‘ for sessions, each statement has an id, which can used to retrieve results for that specific statement.

Get the results:
%curl bdavm1611:8998/sessions/0/statements/0
{"id":0,"state":"available","output":{"status":"ok","execution_count":0,"data":{"text/plain":"a: Int = 100\nb: Int = 1200\nres0: Int = 1200"}}}
Since each session starts a respective SparkContext, the variables created within that are always accessible until the session is deleted.

Delete a session:
%curl -X DELETE http://bdavm1611:8998/sessions/0
{"msg":"deleted"}

Batch:

By default, Livy runs the batches in local Spark mode. To ensure the job runs on the YARN cluster update spark-defaults.conf to add
spark.master yarn-cluster as a custom property in one of the following two ways:

  • via the Ambari UI ->Spark->Configs.
  • OR

  • create a directory and copy spark-defaults.conf, spark-env.sh and java-opts from the /etc/spark/conf directory. Edit this new spark-defaults.conf file to add the spark.master property. Set the SPARK_CONF_DIR to this new directory to override the defaults from $SPARK_HOME/conf:
  • export SPARK_CONF_DIR=/local0/spark-conf/

Start the Livy Server:
%bin/livy-server
(If already running, use Ctrl+C to kill the server before restarting)

Submit a batch job:
Copy the python script or Scala jar to HDFS and pass the hdfs:// path as part of the POST request to Livy. Like the sessions, for batch submissions as well an id is returned which can be referenced to get the logs and perform other operations.

Python:
%curl -X POST -H "Content-Type: application/json" bdavm1611:8998/batches --data '{"file": "hdfs:///tmp/REST/python/testpi.py", "name": "MY HDFS Livy Pi Example", "executorCores":1, "executorMemory":"512m", "driverCores":1, "driverMemory":"512m", "queue":"default", "args":["10"]}'
{"id":0,"state":"running","log":[]}

Scala:
%curl -X POST -H "Content-Type: application/json" bdavm1611:8998/batches --data '{"file": "hdfs:///tmp/REST/scala/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi", "name": "MY HDFS Scala Livy Pi Example", "executorCores":1, "executorMemory":"512m", "driverCores":1, "driverMemory":"512m", "queue":"default", "args":["10"]}'
{"id":1,"state":"running","log":[]}

Retrieve logs from the above batch run:
Use the correct batch id to get the logs for the corresponding execution.
curl -v -X GET bdavm1611:8998/batches/0/log | python -m json.tool
In the above case, json.tool is used to pretty-print the logs retrieved.

While the above examples uses the curl commands, needless to mention these calls can be made in any language of choice to the Livy Server. Please refer the Livy GitHub site for details on other REST calls.

References:

1 comment on"Submit Spark jobs via REST in IOP 4.1 using Livy"

  1. […] sourceREST interface for interacting with Spark from anywhere and used by Apache Zeppelin and other tools. Using Livy with Zeppelin …read […]

Join The Discussion

Your email address will not be published. Required fields are marked *