Apache Spark is a fast general purpose clustering system that is well suited for machine learning algorithms. MLlib is a machine learning library provided by Spark with support for common machine learning algorithms including classification, regression, collaborative filtering and others.

Let us take a look at how you can leverage a machine learning model that have been trained in a Spark application for real-time scoring in IBM Streams.

Overview

The Spark MLLib toolkit provides operators that support a number of MLLib algorithms including:

  • Classification
    • Linear SVM
    • Naive Bayes
  • Clustering
    • KMeans
  • Collaborative Filtering
  • Regression
    • Isotonic
    • Linear
    • Logistic
  • Tree
    • Decision Tree
    • Gradient Boosted Trees
    • Random Forest

The toolkit assumes that a Spark machine learning model has been trained and saved to the local filesystem or on HDFS. The training of the model and saving it to disk or to HDFS needs to be done outside of IBM Streams using the APIs provided by Spark. For the sample code provided in this guide, the training of the model has been done in a Spark application written in Java.

The saved model can then be loaded by an appropriate operator within the toolkit and used for scoring incoming tuple data.

Prerequisites

The rest of the guide assumes that you have a working knowledge of Spark and the MLLib library API that it provides. In addition, we assume that you are familiar with IBM Streams and proficient in developing Streams applications using SPL.

For more information on Apache Spark, refer to the Spark documentation available here.

If you are new to Streams, you can learn more about Streams using the excellent guides and demos available on Streamsdev.

Setup

To demonstrate how Spark MLLib models can be used in IBM Streams, we will use the K-Means clustering algorithm support that is included Spark MLLib. For the demo, we have provided 2 sample CSV documents: random_2d_training.csv and random_2d.csv that contain a list of points on a Cartesian (X-Y) plane. We will be using the random_2d_training.csv file to train and generate a K-Means clustering model in Spark. The trained model will then be used in IBM Streams to predict the cluster # for each point in the random_2d.csv file. Both these files are included in the demo code that is attached below.

The toolkit also requires Apache Spark version 1.4 or higher to be installed from https://spark.apache.org/. Spark must be installed to a location that is accessible when you compile your Streams application. In addition, the Spark install must be accessible or installed on all hardware nodes where the toolkit operators can run. Once installed, you will need to set the SPARK_HOME environment variable to point to the installation location.

To use the operators from the Spark MLlib Toolkit in Streams Studio, you must add the toolkit location <Streams Installation Directory>/toolkits to Studio. See Adding toolkit locations.

To build the application from the command line, you must configure the SPL compiler to find the toolkit root directory. Use one of the following methods:

  • Set the STREAMS_SPLPATH environment variable to the root directory of a toolkit or multiple toolkits (with : as a separator).For example:
    export STREAMS_SPLPATH=$STREAMS_INSTALL/toolkits/com.ibm.streamsx.sparkmllib
  • Specify the -t or –spl-path command parameter when you run the sc command.For example:
    sc -t $STREAMS_INSTALL/toolkits/com.ibm.streamsx.sparkmllib -M MyMain

    where MyMain is the name of the SPL main composite.
    Note: These command parameters override the STREAMS_SPLPATH environment variable.

Training a model in Spark

As mentioned above, in order to use the Spark MLLib Toolkit, we need to train and save an MLLib model in Spark. The code snippets below show the steps to develop a Spark Java application that trains a K-Means clustering model using the random_2d_training.csv file and saves it to the local file system.

The first snippet shows the creation of a JavaSparkContext instance that represents a connection to a local Spark instance or a remote Spark cluster. The JavaSparkContext provides the main entry point into Spark functionality. We use it to read the random_2d_training.csv file as an RDD.

SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkStreamsSampleTrainingApplication");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD lines = jsc.textFile("random_2d_training.csv");

Once the RDD for the CSV file is created, we can parse the file and create a Spark Vector for each x,y point in the file. The K-Means clustering model will take these vectors as input for training the model. The code snippet for vector creation and training of the K-Means model is given below.

JavaRDD parsedData = lines.map(new Function<String, Vector>() {
	@Override
	public Vector call(String s) {
		String[] sarray = s.split(",");
		double[] values = new double[sarray.length];
		for (int i = 0; i < sarray.length; i++) {
			values[i] = Double.parseDouble(sarray[i]);
		}
		return Vectors.dense(values);
	}
});

int numClusters = 10;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);

The final step is to save the trained model to the file system so that the Spark MLLib toolkit can access it. This can be done using the save API on the KMeansModel instance as shown in the code snippet below.

clusters.save(jsc.sc(), "/some/path/kmeans_model");

In the snippet above, the model is saved to the /some/path/kmeans_model directory on the local file system. To specify a location on HDFS, pass the hdfs://... URI scheme to the save API.

Compiling and running the Java application above will train a Spark K-Means clustering model using the random_2d_training.csv file and save the trained model to the /some/path/kmeans_model location on the local filesystem.

Using the trained model in IBM Streams

Once the model has been trained and saved as described above, we can use it for scoring in an IBM Streams application using the Spark MLLib toolkit. Each operator within the toolkit can load the corresponding Spark model from the local filesystem or HDFS. For the demo being developed in this guide, the SparkKMeansClustering operator will be used. The demo application architecture is shown below.

app_arch

Let us take a look at the application in greater detail:

  1. The application starts off by reading the random_2d.csv file using a FileSource operator. Each line from the file is then converted into x and y coordinates using the Functor operator and stored in a list<float64> testData tuple attribute. This attribute will be used as input into the scoring model as described in step 2 below.
  2. The tuple data is then passed into the SparkKMeansClustering operator. The operator will take the x,y coordinates that are read from the file and score them against the model generated above. The result of the scoring is the cluster number that will be output as an int32 analysisResult attribute on the output port.
  3. The resulting tuples are then passed to the FileSink operator where the cluster numbers are saved to the output.txt file.

The following code snippet shows the SPL code for the SparkKMeansClustering operator.

(stream SparkClusteringKMeans_3_out0) a
	SparkClusteringKMeans_3 = SparkClusteringKMeans(Functor_4_out0)
{
	param
		testDataAttr : testData ;
		modelPath : "/some/path/kmeans_model" ;
}

As you can see, the operator requires 2 parameters to be set:

  • testDataAttr:This parameter references an attribute on the input port that will be used as input data for the MLLib model. As mentioned in step 1 above, the input data is stored in an attribute called testData in this demo.
  • modelPath:This parameter should be set to a location on the local filesystem or on HDFS where the Spark MLLib model has been saved. In the section above, we had stored our model in the /some/path/kmeans_model directory on the local filesystem. To specify a location on HDFS, set the model path to a path on HDFS using the hdfs://... URI scheme.

For more details on the various operators that are available and the parameters they accept, refer to the documentation for the toolkit located here.

Conclusion

Compiling and running the Streams application outlined above will load the saved Spark K-Means clustering model and use it to determine the cluster number for each 2D point in the random_2d.csv file. The generated cluster numbers will be stored in the output.txt file.

This guide along with the demo application should help you get started with using the Spark MLLib toolkit in your IBM Streams application. For more information about the toolkit, refer to the github site for the toolkit located at http://ibmstreams.github.io/streamsx.sparkMLLib/.

The sample application code referenced in this guide is available here.

1 Comment on "Getting Started with the Spark MLLib Toolkit"

  1. YMDH_sathish_Palaniappan September 22, 2016

    Can we run this sample in Streams service available in Bluemix?

Join The Discussion