Titan in IOP used tinkerPop SparkGraphComputer for its graph computing, Titan-Hadoop works with TinkerPop 3â€™s new hadoop-gremlin package for general-purpose OLAP.
Spark is an Apache Software Foundation project focused on general-purpose OLAP data processing. Spark provides a hybrid in-memory/disk-based distributed computing model that is similar to Hadoopâ€™s MapReduce model. Spark maintains a fluent function chaining DSL that is arguably easier for developers to work with than native Hadoop MapReduce. Spark-Gremlin provides an implementation of the bulk-synchronous parallel, distributed message passing algorithm within Spark and thus, any VertexProgram can be executed over SparkGraphComputer.
The SparkGraphComputer algorithm leverages Sparkâ€™s caching abilities to reduce the amount of data shuffled across the wire on each iteration of the VertexProgram. When the graph is loaded as a Spark RDD (Resilient Distributed Dataset) it is immediately cached as graphRDD. The graphRDD is a distributed adjacency list which encodes the vertex, its properties, and all its incident edges. On the first iteration, each vertex (in parallel) is passed through VertexProgram.execute(). This yields an output of the vertexâ€™s mutated state (i.e. updated compute keysâ€‰â€”â€‰propertyX) and its outgoing messages. This viewOutgoingRDD is then reduced to viewIncomingRDD where the outgoing messages are sent to their respective vertices. If a MessageCombiner exists for the vertex program, then messages are aggregated locally and globally to ultimately yield one incoming message for the vertex. This reduce sequence is the “message pass.” If the vertex program does not terminate on this iteration, then the viewIncomingRDD is joined with the cached graphRDD and the process continues. When there are no more iterations, there is a final join and the resultant RDD is stripped of its edges and messages. This mapReduceRDD is cached and is processed by each MapReduce job in the GraphComputer computation.
|gremlin.hadoop.graphReader||A class for reading a graph-based RDD (e.g. an InputRDDor InputFormat).|
|gremlin.hadoop.graphWriter||A class for writing a graph-based RDD (e.g. an OutputRDDor OutputFormat).|
|gremlin.spark.graphStorageLevel||What StorageLevel to use for the cached graph during job execution (default MEMORY_ONLY).|
|gremlin.spark.persistContext||Whether to create a new SparkContext for every SparkGraphComputer or to reuse an existing one.|
|gremlin.spark.persistStorageLevel||What StorageLevel to use when persisted RDDs via PersistedOutputRDD (default MEMORY_ONLY).|
InputRDD and OutputRDD
If the provider/user does not want to use Hadoop InputFormats, it is possible to leverage Sparkâ€™s RDD constructs directly. An InputRDD provides a read method that takes a SparkContext and returns a graphRDD. Likewise, and OutputRDD is used for writing a graphRDD.
If the graph system provider uses an InputRDD, the RDD should maintain an associated org.apache.spark.Partitioner. By doing so, SparkGraphComputer will not partition the loaded graph across the cluster as it has already been partitioned by the graph system provider. This can save a significant amount of time and space resources. If the InputRDD does not have a registered partitioner, SparkGraphComputer will partition the graph using a org.apache.spark.HashPartitioner with the number of partitions being either the number of existing partitions in the input (i.e. input splits) or the user specified number of GraphComputer.workers().
The SparkGraphComputer uses MEMORY_ONLY to cache the input graph and the output graph by default. Users should be aware of the impact of different storage levels, since the default settings can quickly lead to memory issues on larger graphs. An overview of Sparkâ€™s persistence settings is provided in Sparkâ€™s programming guide.
Using a Persisted Context
It is possible to persist the graph RDD between jobs within the SparkContext (e.g. SparkServer) by leveraging PersistedOutputRDD. Note that gremlin.spark.persistContext should be set to true or else the persisted RDD will be destroyed when the SparkContext closes. The persisted RDD is named by the gremlin.hadoop.outputLocation configuration. Similarly, PersistedInputRDD is used with respective gremlin.hadoop.inputLocation to retrieve the persisted RDD from the SparkContext.
When using a persistent SparkContext the configuration used by the original Spark Configuration will be inherited by all threaded references to that Spark Context. The exception to this rule are those properties which have a specific thread local effect.
Thread Local Properties
Finally, there is a spark object that can be used to manage persisted RDDs (see Interacting with Spark).
IOP Titan 1.0.0-IBM was implemented base on TinkerPop 3.2.1-IBM, and Spark 2.1.0.
Titan environment configuration
|HADOOP_CONF_DIR||hadoop configuraiton directory|
|HBASE_CONF_DIR||hbase configuraiton directory|
|YARN_CONF_DIR||yarn configuraiton directory|
|SPARK_CONF_DIR||spark configuraiton directory|
|CLASSPATH||java class path|
|IOP_JAVA_OPTIONS||iop java configuraiton optionsectory|
|HADOOP_GREMLIN_LIBS||hadoop gremlini configuraiton lib, If SparkGraphComputer will be used as the GraphComputer for HadoopGraph then its lib directory should be specified in HADOOP_GREMLIN_LIBS.|
|TITAN_LOGDIR||Titan log directory|
Titan configuration for spark computing
A Titan graph database cluster consists of one or multiple Titan instances. To open a Titan instance a configuration has to be provided which specifies how Titan should be set up.
A Titan configuration specifies which components Titan should use, controls all operational aspects of a Titan deployment, and provides a number of tuning options to get maximum performance from a Titan cluster.
hadoop-gryo.properties is a sample configuration file for spark computing, it use yarn client to submit spark applications for graph.
Titan properties file sample
... gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat gremlin.hadoop.jarsInDistributedCache=true gremlin.hadoop.inputLocation=data/tinkerpop-modern.kryo gremlin.hadoop.outputLocation=output ... spark.master=yarn spark.submit.deployMode=client spark.yarn.jars=hdfs://hostname:8020/user/spark/share/lib/spark/*.jar # the Spark YARN ApplicationManager needs this to resolve classpath it sends to the executors spark.yarn.appMasterEnv.JAVA_HOME=/usr/jdk64/java-1.8.0-openjdk-184.108.40.206-0.b13.el6_8.x86_64 spark.yarn.appMasterEnv.HADOOP_CONF_DIR=/usr/iop/current/hadoop-client/conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/usr/iop/current/spark2-client/conf spark.yarn.am.extraJavaOptions=-Diop.version=220.127.116.11-0000 -Djava.library.path=/usr/iop/current/hadoop-client/lib/native # the Spark Executors (on the work nodes) needs this to resolve classpath to run Spark tasks spark.executorEnv.JAVA_HOME=/usr/jdk64/java-1.8.0-openjdk-18.104.22.168-0.b13.el6_8.x86_64 spark.executorEnv.HADOOP_CONF_DIR=/usr/iop/current/hadoop-client/conf spark.executorEnv.SPARK_CONF_DIR=/usr/iop/current/spark2-client/conf spark.executor.memory=2g spark.executor.extraClassPath=/usr/iop/current/hbase-client/conf spark.serializer=org.apache.spark.serializer.KryoSerializer ...
|gremlin.graph||The implementation of graph factory that will be sued by gremlin server|
|gremlin.hadoop.graphInputFormat||The format of the input data|
|gremlin.hadoop.graphOutputFormat||The format of the output data|
|gremlin.hadoop.jarsInDistributedCache||Whether to upload the Hadoop-Gremlin jars to a distributed cache (necessary if jars are not on the machines’ classpaths).|
|gremlin.hadoop.inputLocation||The location of the input file(s) for Hadoop-Gremlin to read the graph from.|
|gremlin.hadoop.outputLocation||The location to write the computed HadoopGraph to.|
The cluster manager to connect to, support following master URLs:
local Run Spark locally with one worker thread (i.e. no parallelism at all).
local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
local[*] Run Spark locally with as many worker threads as logical cores on your machine.
yarn Connect to a YARN cluster in client or cluster mode depending on the value of –deploy-mode. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable.
|spark.submit.deployMode||The deploy mode of Spark driver program, default value is “client”, which means to launch driver program locally (“client”). Titan only support yarn-client mode.|
|spark.yarn.jars||List of libraries containing Spark code to distribute to YARN containers. By default, Spark on YARN will use Spark jars installed locally, but the Spark jars can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn’t need to be distributed each time an application runs. To point to jars on HDFS, for example, set this configuration to hdfs:///some/path. Globs are allowed.|
|spark.executorEnv.[EnvVarName]||Add the environment variable specified by EnvironmentVariableName to the Executor process. The user can specify multiple of these to set multiple environment variables.|
|spark.executor.memory||Amount of memory to use per executor process (e.g. 2g, 8g).|
|spark.serializer||Class to use for serializing objects that will be sent over the network or need to be cached in serialized form. The default of Java serialization works with any Serializable Java object but is quite slow, so we recommend using org.apache.spark.serializer.KryoSerializer and configuring Kryo serialization when speed is necessary. Can be any subclass of org.apache.spark.Serializer.|
Vertex program with spark
Below are the sample usage of vertex programs with spark computing in gremlin.
hdfs.copyFromLocal('/usr/iop/current/titan-client/data/tinkerpop-modern.kryo','/user/titan/data/tinkerpop-modern.kryo') graph = GraphFactory.open('/usr/iop/current/titan-client/conf/hadoop-graph/hadoop-gryo.properties') g = graph.traversal(computer(SparkGraphComputer)) g.V().valueMap()
graph = GraphFactory.open('/usr/iop/current/titan-client/conf/hadoop-graph/hadoop-gryo.properties') prvp = PageRankVertexProgram.build().create() result = graph.compute(SparkGraphComputer).program(prvp).submit().get() result.memory().getRuntime() result.memory().asMap() g = result.graph().traversal(computer(SparkGraphComputer)) g.V().valueMap('name', PageRankVertexProgram.PAGE_RANK)
hdfs.copyFromLocal('data/tinkerpop-modern.kryo', 'tinkerpop-modern.kryo') hdfs.copyFromLocal('/usr/iop/current/titan-client/data/tinkerpop-modern.kryo','/user/titan/data/tinkerpop-modern.kryo') graph = GraphFactory.open('/usr/iop/current/titan-client/conf/hadoop-graph/hadoop-gryo.properties') graph.configuration().setProperty('gremlin.hadoop.graphWriter', 'org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat') graph.compute(SparkGraphComputer).program(BulkDumperVertexProgram.build().create()).submit().get() hdfs.ls('output') hdfs.head('output/~g')
hdfs.copyFromLocal('/usr/iop/current/titan-client/data/tinkerpop-modern.kryo','/user/titan/data/tinkerpop-modern.kryo') graph = GraphFactory.open('/usr/iop/current/titan-client/conf/hadoop-graph/hadoop-gryo.properties') g = graph.traversal(computer(SparkGraphComputer)) #possible PeerPressure usages: 1. g.V().peerPressure().by('cluster').valueMap() 2. g.V().hasLabel('person'). peerPressure().by('cluster'). group().by('cluster').by('name') 3. g.V().peerPressure().by('cluster').values('cluster') 4. g.V().peerPressure().by(outE('knows')).by('cluster').valueMap()
Both PeerPressure and PageRank are built in as two of the traversal steps in tinkerpop, according to doc:
The peerPressure()-step (map/sideEffect) clusters vertices using PeerPressureVertexProgram.
The pageRank()-step (map/sideEffect) calculates PageRank using PageRankVertexProgram.
So it is just a different usage
IMPORTANT The pageRank()-step is a VertexComputing-step and as such, can only be used against a graph that supports GraphComputer (OLAP).
IMPORTANT The peerPressure()-step is a VertexComputing-step and as such, can only be used against a graph that supports GraphComputer (OLAP).
Thus, your pageRank can also be used in this way