Titan in IOP used tinkerPop SparkGraphComputer for its graph computing, Titan-Hadoop works with TinkerPop 3’s new hadoop-gremlin package for general-purpose OLAP.

Introduction

Spark is an Apache Software Foundation project focused on general-purpose OLAP data processing. Spark provides a hybrid in-memory/disk-based distributed computing model that is similar to Hadoop’s MapReduce model. Spark maintains a fluent function chaining DSL that is arguably easier for developers to work with than native Hadoop MapReduce. Spark-Gremlin provides an implementation of the bulk-synchronous parallel, distributed message passing algorithm within Spark and thus, any VertexProgram can be executed over SparkGraphComputer.

Architecture

The SparkGraphComputer algorithm leverages Spark’s caching abilities to reduce the amount of data shuffled across the wire on each iteration of the VertexProgram. When the graph is loaded as a Spark RDD (Resilient Distributed Dataset) it is immediately cached as graphRDD. The graphRDD is a distributed adjacency list which encodes the vertex, its properties, and all its incident edges. On the first iteration, each vertex (in parallel) is passed through VertexProgram.execute(). This yields an output of the vertex’s mutated state (i.e. updated compute keys — propertyX) and its outgoing messages. This viewOutgoingRDD is then reduced to viewIncomingRDD where the outgoing messages are sent to their respective vertices. If a MessageCombiner exists for the vertex program, then messages are aggregated locally and globally to ultimately yield one incoming message for the vertex. This reduce sequence is the “message pass.” If the vertex program does not terminate on this iteration, then the viewIncomingRDD is joined with the cached graphRDD and the process continues. When there are no more iterations, there is a final join and the resultant RDD is stripped of its edges and messages. This mapReduceRDD is cached and is processed by each MapReduce job in the GraphComputer computation.

Property Description
gremlin.hadoop.graphReader A class for reading a graph-based RDD (e.g. an InputRDDor InputFormat).
gremlin.hadoop.graphWriter A class for writing a graph-based RDD (e.g. an OutputRDDor OutputFormat).
gremlin.spark.graphStorageLevel What StorageLevel to use for the cached graph during job execution (default MEMORY_ONLY).
gremlin.spark.persistContext Whether to create a new SparkContext for every SparkGraphComputer or to reuse an existing one.
gremlin.spark.persistStorageLevel What StorageLevel to use when persisted RDDs via PersistedOutputRDD (default MEMORY_ONLY).

InputRDD and OutputRDD

If the provider/user does not want to use Hadoop InputFormats, it is possible to leverage Spark’s RDD constructs directly. An InputRDD provides a read method that takes a SparkContext and returns a graphRDD. Likewise, and OutputRDD is used for writing a graphRDD.
If the graph system provider uses an InputRDD, the RDD should maintain an associated org.apache.spark.Partitioner. By doing so, SparkGraphComputer will not partition the loaded graph across the cluster as it has already been partitioned by the graph system provider. This can save a significant amount of time and space resources. If the InputRDD does not have a registered partitioner, SparkGraphComputer will partition the graph using a org.apache.spark.HashPartitioner with the number of partitions being either the number of existing partitions in the input (i.e. input splits) or the user specified number of GraphComputer.workers().

Storage Levels

The SparkGraphComputer uses MEMORY_ONLY to cache the input graph and the output graph by default. Users should be aware of the impact of different storage levels, since the default settings can quickly lead to memory issues on larger graphs. An overview of Spark’s persistence settings is provided in Spark’s programming guide.

Using a Persisted Context

It is possible to persist the graph RDD between jobs within the SparkContext (e.g. SparkServer) by leveraging PersistedOutputRDD. Note that gremlin.spark.persistContext should be set to true or else the persisted RDD will be destroyed when the SparkContext closes. The persisted RDD is named by the gremlin.hadoop.outputLocation configuration. Similarly, PersistedInputRDD is used with respective gremlin.hadoop.inputLocation to retrieve the persisted RDD from the SparkContext.
When using a persistent SparkContext the configuration used by the original Spark Configuration will be inherited by all threaded references to that Spark Context. The exception to this rule are those properties which have a specific thread local effect.

Thread Local Properties

  1. spark.jobGroup.id
  2. spark.job.description
  3. spark.job.interruptOnCancel
  4. spark.scheduler.pool

Finally, there is a spark object that can be used to manage persisted RDDs (see Interacting with Spark).

Dependence

IOP Titan 1.0.0-IBM was implemented base on TinkerPop 3.2.1-IBM, and Spark 2.1.0.

Titan environment configuration

Name Description
JAVA_HOME Java home
HADOOP_CONF_DIR hadoop configuraiton directory
HBASE_CONF_DIR hbase configuraiton directory
YARN_HOME Yarn home
YARN_CONF_DIR yarn configuraiton directory
SPARK_HOME Spark home
SPARK_CONF_DIR spark configuraiton directory
TITAN_HOME Titan home
CLASSPATH java class path
IOP_JAVA_OPTIONS iop java configuraiton optionsectory
HADOOP_GREMLIN_LIBS hadoop gremlini configuraiton lib, If SparkGraphComputer will be used as the GraphComputer for HadoopGraph then its lib directory should be specified in HADOOP_GREMLIN_LIBS.
TITAN_LOGDIR Titan log directory

Titan configuration for spark computing

A Titan graph database cluster consists of one or multiple Titan instances. To open a Titan instance a configuration has to be provided which specifies how Titan should be set up.
A Titan configuration specifies which components Titan should use, controls all operational aspects of a Titan deployment, and provides a number of tuning options to get maximum performance from a Titan cluster.
hadoop-gryo.properties is a sample configuration file for spark computing, it use yarn client to submit spark applications for graph.

Titan properties file sample


...
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
gremlin.hadoop.jarsInDistributedCache=true

gremlin.hadoop.inputLocation=data/tinkerpop-modern.kryo
gremlin.hadoop.outputLocation=output
...
spark.master=yarn
spark.submit.deployMode=client
spark.yarn.jars=hdfs://hostname:8020/user/spark/share/lib/spark/*.jar

# the Spark YARN ApplicationManager needs this to resolve classpath it sends to the executors
spark.yarn.appMasterEnv.JAVA_HOME=/usr/jdk64/java-1.8.0-openjdk-1.8.0.121-0.b13.el6_8.x86_64
spark.yarn.appMasterEnv.HADOOP_CONF_DIR=/usr/iop/current/hadoop-client/conf
spark.yarn.appMasterEnv.SPARK_CONF_DIR=/usr/iop/current/spark2-client/conf
spark.yarn.am.extraJavaOptions=-Diop.version=4.2.5.0-0000 -Djava.library.path=/usr/iop/current/hadoop-client/lib/native

# the Spark Executors (on the work nodes) needs this to resolve classpath to run Spark tasks
spark.executorEnv.JAVA_HOME=/usr/jdk64/java-1.8.0-openjdk-1.8.0.121-0.b13.el6_8.x86_64
spark.executorEnv.HADOOP_CONF_DIR=/usr/iop/current/hadoop-client/conf
spark.executorEnv.SPARK_CONF_DIR=/usr/iop/current/spark2-client/conf
spark.executor.memory=2g
spark.executor.extraClassPath=/usr/iop/current/hbase-client/conf
spark.serializer=org.apache.spark.serializer.KryoSerializer
...
 
Name Description
gremlin.graph The implementation of graph factory that will be sued by gremlin server
gremlin.hadoop.graphInputFormat The format of the input data
gremlin.hadoop.graphOutputFormat The format of the output data
gremlin.hadoop.jarsInDistributedCache Whether to upload the Hadoop-Gremlin jars to a distributed cache (necessary if jars are not on the machines’ classpaths).
gremlin.hadoop.inputLocation The location of the input file(s) for Hadoop-Gremlin to read the graph from.
gremlin.hadoop.outputLocation The location to write the computed HadoopGraph to.
spark.master The cluster manager to connect to, support following master URLs:
local Run Spark locally with one worker thread (i.e. no parallelism at all).
local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
local[*] Run Spark locally with as many worker threads as logical cores on your machine.
yarn Connect to a YARN cluster in client or cluster mode depending on the value of –deploy-mode. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable.
spark.submit.deployMode The deploy mode of Spark driver program, default value is “client”, which means to launch driver program locally (“client”). Titan only support yarn-client mode.
spark.yarn.jars List of libraries containing Spark code to distribute to YARN containers. By default, Spark on YARN will use Spark jars installed locally, but the Spark jars can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn’t need to be distributed each time an application runs. To point to jars on HDFS, for example, set this configuration to hdfs:///some/path. Globs are allowed.
spark.executorEnv.[EnvVarName] Add the environment variable specified by EnvironmentVariableName to the Executor process. The user can specify multiple of these to set multiple environment variables.
spark.executor.memory Amount of memory to use per executor process (e.g. 2g, 8g).
spark.serializer Class to use for serializing objects that will be sent over the network or need to be cached in serialized form. The default of Java serialization works with any Serializable Java object but is quite slow, so we recommend using org.apache.spark.serializer.KryoSerializer and configuring Kryo serialization when speed is necessary. Can be any subclass of org.apache.spark.Serializer.

Vertex program with spark

Below are the sample usage of vertex programs with spark computing in gremlin.

Traversal


hdfs.copyFromLocal('/usr/iop/current/titan-client/data/tinkerpop-modern.kryo','/user/titan/data/tinkerpop-modern.kryo')
graph = GraphFactory.open('/usr/iop/current/titan-client/conf/hadoop-graph/hadoop-gryo.properties')
g = graph.traversal(computer(SparkGraphComputer))
g.V().valueMap()

PageRankVertexProgram


graph = GraphFactory.open('/usr/iop/current/titan-client/conf/hadoop-graph/hadoop-gryo.properties')
prvp = PageRankVertexProgram.build().create()
result = graph.compute(SparkGraphComputer).program(prvp).submit().get()
result.memory().getRuntime()
result.memory().asMap()
g = result.graph().traversal(computer(SparkGraphComputer))
g.V().valueMap('name', PageRankVertexProgram.PAGE_RANK)

BulkDumperVertexProgram


hdfs.copyFromLocal('data/tinkerpop-modern.kryo', 'tinkerpop-modern.kryo')
hdfs.copyFromLocal('/usr/iop/current/titan-client/data/tinkerpop-modern.kryo','/user/titan/data/tinkerpop-modern.kryo')
graph = GraphFactory.open('/usr/iop/current/titan-client/conf/hadoop-graph/hadoop-gryo.properties')
graph.configuration().setProperty('gremlin.hadoop.graphWriter', 'org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat')
graph.compute(SparkGraphComputer).program(BulkDumperVertexProgram.build().create()).submit().get()
hdfs.ls('output')
hdfs.head('output/~g')

PeerPressure


hdfs.copyFromLocal('/usr/iop/current/titan-client/data/tinkerpop-modern.kryo','/user/titan/data/tinkerpop-modern.kryo')
graph = GraphFactory.open('/usr/iop/current/titan-client/conf/hadoop-graph/hadoop-gryo.properties')
g = graph.traversal(computer(SparkGraphComputer))
#possible PeerPressure usages:
1.  g.V().peerPressure().by('cluster').valueMap()
2.  g.V().hasLabel('person'). peerPressure().by('cluster'). group().by('cluster').by('name')
3.  g.V().peerPressure().by('cluster').values('cluster')
4.  g.V().peerPressure().by(outE('knows')).by('cluster').valueMap()

Both PeerPressure and PageRank are built in as two of the traversal steps in tinkerpop, according to doc:
The peerPressure()-step (map/sideEffect) clusters vertices using PeerPressureVertexProgram.
http://tinkerpop.apache.org/docs/current/reference/#peerpressure-step
The pageRank()-step (map/sideEffect) calculates PageRank using PageRankVertexProgram.
http://tinkerpop.apache.org/docs/current/reference/#pagerank-step

So it is just a different usage
IMPORTANT The pageRank()-step is a VertexComputing-step and as such, can only be used against a graph that supports GraphComputer (OLAP).
IMPORTANT The peerPressure()-step is a VertexComputing-step and as such, can only be used against a graph that supports GraphComputer (OLAP).
Thus, your pageRank can also be used in this way

1 comment on"Titan spark graph computing in IBM Open Platform"

  1. Azharuddin May 29, 2017

    Thanks for the information. Want to get trained on Hadoop? Mindmajix provides best Hadoop training. Online training by real time experts. Explore Online training and Hadoop course is aligned with industry needs & developed by industry experts. Attend free demo here!https://mindmajix.com/hadoop-training

Join The Discussion

Your email address will not be published. Required fields are marked *