Overview

Skill Level: Beginner

Basic knowledge of Watson IoT Platform, Apache Spark and Scala programming.

Introduction Today, IoT devices generate a humongous amount of data, which are extremely varied, noisy, time-sensitive and often confidential. In many scenarios, these data need to be processed in real-time to detect anomalies, filter events, enrich the data for machine learning and prediction. This requires an integration between the IoT Platform and Apache Spark. For […]

Ingredients

Step-by-step

  1. Introduction

    Today, IoT devices generate a humongous amount of data, which are extremely varied, noisy, time-sensitive and often confidential. In many scenarios, these data need to be processed in real-time to detect anomalies, filter events, enrich the data for machine learning and prediction. This requires an integration between the IoT Platform and Apache Spark. For example, Apache Spark can be used to analyze the vehicle telematics data, detect anomalies if any, and trigger a corresponding action back to the vehicle in real-time.

    This recipe explains, how to configure the Spark service running in IBM Bluemix to receive data from IBM Watson IoT Platform in simple steps, to process the IoT data in real-time. The following diagram shows various components involved in the integration.

    Where,

    IBM Watson Internet of Things Platform is a fully managed, cloud-hosted service that makes it simple to derive value from Internet of Things (IoT) devices. The platform provides simple, but powerful application access to IoT devices and data.

    Apache Spark is a powerful open source, general engine for big data processing, built around with speed, ease of use, and sophisticated analytics. Apache Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

    Spark-Streaming-MQTT helps to receive data from any MQTT sources to Spark Streaming application.

    Jupyter Notebook allows one to create and share documents that contain live code, equations, visualizations and explanatory text.

  2. Register your Device(s) In Watson IoT Platform

    In order to receive the device events in Spark, we need to setup the IBM Watson IoT Platform and register the device(s) first. This section shows how you can setup the same.

    Carry out the steps present in this recipe to register your device(s) in IBM Watson IoT Platform. When the device is registered, you will be provided with the registration details shown below. Make a note of them, we need these details to connect the device to Watson IoT Platform later.

    At this step, we have successfully created the IBM Watson IoT Platform service and registered the device(s) in it.

  3. Publish Device Events

    In order to receive the device events in Apache Spark, we need to configure the device to send the events to IBM Watson IoT Platform. This section shows how to send sample device events to the Watson IoT Platform.

    1. Download and install Maven and Git if not installed already.
    2. Open a command prompt and Clone the device-samples project using git clone as follows,
      git clone https://github.com/ibm-messaging/iot-device-samples.git
    3. Navigate to the device-samples project, cd iot-device-samples/java/device-samples

    4. Build the project using maven as follows,
      mvn clean package 

      (This will download the Java Client library for Watson IoT Platform, other required dependencies and starts the building process. Once built, the sample can be located in the target directory, for example, target/ibmiot-device-samples-0.0.1.jar)

    5. Modify the device.prop file present under target/classes directory structure, by entering the following device registration details that you noted in the previous step:
      Organization-ID = <Your Organization ID>
      Device-Type = <Your Device Type>
      Device-ID = <Your Device ID>
      Authentication-Method = token
      Authentication-Token = <Your Device Token>
    6. Run the deviceeventpublish sample by specifying the following command:
      mvn exec:java -Dexec.mainClass="com.ibm.iotf.sample.client.device.DeviceEventPublishWithCounter"

      (The sample code is present here)

    7. This code makes the device to publish the events every 1 second each event contains a counter, name, cpu and memory usage of the process. Observe that the datapoint event-count is increased for every new event that gets published.

    In this section, we have successfully started a device sample. Let’s consume and process these events by creating the Apache Spark application in the next section.

  4. Setup IBM Data Science Experience and create a Jupyter notebook

    The IBM Data Science Experience(DSX) is an environment that has everything a data scientist needs to be successful. It provides an interactive, collaborative, cloud-based environment where data scientists can use multiple tools to activate their insights. Data scientists can use the best of open source tools such as R and Python, tap into IBMs unique features, grow their capabilities, and share their successes.

    1. Use a supported browser to log in to DSX at – http://datascience.ibm.com/.
      Note!
      If you have Bluemix id, you can login with the same.
    2. Setup a new Project. Projects create a space for you to collect and share notebooks, connect to data sources, create pipelines and add data sets all in one place. As shown below, click “+” then select Create Project to create a new project,create-not
    3. Specify the name and create the Project, Note: Incase if there is no Spark service and Object Storage instance created, Create them before creating the project,create-spark-object-instances
    4. Goto project and click on “add notebooks” link to create a new Jupyter notebook as shown below. The Jupyter Notebook is a web application that allows one to create and share documents that contain executable code, mathematical formulae, graphics/visualization (matplotlib) and explanatory textadd-notebook
    5. Specify a descriptive name for the Notebook, select Scala as the language, select 1.6 as the version, and click Create Notebook button as shown below,notebook-creation-spark16
  5. On-Board IoT Device Events to Spark

    In this step, we will create the scala notebook application to onboard the device events to Apache Spark.

    Generate API Key and Token of Watson IoT Platform

    In order to connect Apache Spark service to IBM Watson IoT Platform to receive device events, we need to generate the API key and token first. This can be achieved by carrying out steps present in this section – Generate API Key in Watson IoT Platform.

    Note down API Key and Authentication Token. We need these to connect the Spark application to Watson IoT Platform.

    Create the notebook application to receive the device events in the Spark service,

    1. Go to the notebook, In the first cell, enter the following special command AddJar to upload the MQTT and spark-streaming-mqtt jars to the IBM Analytics for Spark service.
      %AddJar https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/org.eclipse.paho.client.mqttv3-1.0.2.jar -f
      %AddJar http://central.maven.org/maven2/com/google/code/gson/gson/2.2.4/gson-2.2.4.jar -f
      %AddJar https://github.com/sathipal/spark-streaming-mqtt-with-security_2.10-1.3.0/releases/download/0.0.1/spark-streaming-mqtt-security_2.10-1.3.0-0.0.1.jar -f

      (The % before AddJar is a special command, which is currently available, but may be deprecated in an upcoming release. We’ll update this tutorial at that time. The -f forces the download even if the file is already in the cache)

    2. In the next cell, import the necessary classes and create the Apache Spark Streaming context as shown below. In this example, we set the streaming batch interval to 1 second.
      import org.eclipse.paho.client.mqttv3._

      import org.apache.spark.storage.StorageLevel
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      import org.apache.spark.streaming.mqtt._
      import org.apache.spark.SparkConf

      val ssc = new StreamingContext(sc, Seconds(1))
    3. Then, create the MQTT stream using the the spark-streaming-mqtt-connector service that is available in Github. We customized the connector from Apache Spark and added the following,
      • Added SSL security such that the connection between the Spark application and IBM Watson IoT Platform is always secured.
        Added MQTT topic in the RDD, such that one can parse and associate the messages with the appropriate device or device type. For more information refer to the Github.
    4. Add the following code that creates the MQTT stream,
      val lines = MQTTUtils.createStream(ssc,                           // Spark Streaming Context
      "ssl://<your-org>.messaging.internetofthings.ibmcloud.com:8883", // Watson IoT Platform URL
      "iot-2/type/+/id/+/evt/+/fmt/+", // MQTT topic to receive the events
      "a:<your-org>:random", // Unique ID of the application
      "<API-Key>", // API-Key
      "<Auth-Token>") // Auth-Token

      (Modify the above code with your Organization ID, API-Key and Auth-Token that you received earlier. The MQTT topic used in this code snippet subscribes to events from all the devices in the given Organization. Refer to the documentation if you want to control the subscription)

    5. In the next step, add the code to parse the topic and associate the messages with deviceId as shown below, Also start the streaming context such that it runs for every second.
      /*
      * The message topic and payload is split with space, so lets split the message with space
      * and keep the deviceId as key and payload as value.
      */
      val deviceMappedLines = lines.map(x => ((x.split(" ", 2)(0)).split("/")(4), x.split(" ", 2)(1)))
      deviceMappedLines.print()

      ssc.start()
      ssc.awaitTermination()
    6. Then run the code all at once by going to Cell->Run All as shown below,
    7. Observe that, the Spark application maps the events with the deviceId, and outputs the event every second as shown below, In this case the device Id is SoftwareDevice.

    In this step, we have successfully created a scala notebook application and received the events in Apace Spark.

  6. Threshold based Anomaly detection

    In this step, we will see how to detect anomalies based on threshold settings.

    One can use the Filter operator to detect the threshold based deviations. The filter transformation removes tuples from the RDD by passing along only those lines that satisfy a user-specified condition.

    • In the existing notebook, remove the line deviceMappedLines.print() and paste the following code that initially maps the JSON payload into thescala map and then filters the lines where thecpu usage is greater than 10%.
      import java.util.Map.Entry
      import com.google.gson.JsonElement
      import java.util.Set

      // Map the Json payload into scala map
      val jsonLines = deviceMappedLines.map(x => {
      var dataMap:Map[String, Any] = Map()
      val payload = new JsonParser().parse(x._2).getAsJsonObject()
      var deviceObject = payload
      if(deviceObject.has("d")) {
      deviceObject = payload.get("d").getAsJsonObject()
      }
      val setObj = deviceObject.entrySet()
      val itr = setObj.iterator()
      while(itr.hasNext()) {
      val entry = itr.next();
      try {
      dataMap.put(entry.getKey(), entry.getValue().getAsDouble())
      } catch {
      case e: Exception => dataMap.put(entry.getKey(), entry.getValue().getAsString())
      }
      }
      (x._1, dataMap)
      })

      /**
      * Create a simple threshold rule. If cpu usage is greater than 10, alert
      */

      val threasholdCrossedLines = jsonLines.filter(
      x => {
      var status = false;
      val value = x._2.get("cpu")
      if(value != null) {
      if(value.get.asInstanceOf[Double] > 10.0) {
      status = true
      }
      }
      status
      })

      threasholdCrossedLines.print()

      (The complete code is available here)

    • Stop the previous job by clicking on the interrupt & restart kernel buttons.
    • Then run the code all at once by going to Cell->Run All. Observe that the output line is printed only when the CPU usage crosses 10%.

    In this step, we saw how to filter the lines where the CPU usage is greater than 10%. Similarly, you can filter rows based on other datapoints as well.

  7. Compute Running Average

    In this step, we will see how to compute running average of various data points that the IoT devices send. 

    1. Open a new notebook
    2. Copy the code that calculates the running average from the Github and place it in the notebook.
    3. Modify the line that creates the MQTT stream with your Organization ID, API-Key and Auth-Token:
      Open a new notebook
      Copy the code that calculates the running average from the Github and place it in the notebook.
      Modify the line that creates the MQTT stream with your Organization ID, API-Key and Auth-Token:

    4. Then run the code all at once by going to Cell->Run All as shown below, (you may need to interrupt & restart the kernel if there is an instance running already)
    5. Observe that each line prints the average along with the number of events consumed so far.

    At this step, we have successfully computed the running average of individual datapoints that the device sends.

  8. Conclusion

    In this recipe, we demonstrated how to connect Apache Spark Streaming application to IBM Watson IoT Platform and receive device events in real-time. Developers can look at the code made available in the Github repository to understand whats happening under the hood. Developers can consider this recipe as a template for integrating Apache Spark with IBM Watson IoT Platform.

  9. Where to go from here?

     

    1. Go through IBM Watson IoT Real-Time Insights recipe that enables one to perform analytics on real-time data from the IoT devices and gain diagnostic insights. Though the recipe is written to analyze the vehicle data, the same can be used as a template to derive insights from any IoT data.
    2. Go through Engage Machine Learning for detecting anomalous behaviors of things recipe that showcases how one can make use of the Predictive Analysis service, available on the IBM Bluemix, to determine the hidden patterns on the events published by IoT devices, on the IBM Watson IoT Platform.

2 Comments on "Spark Streaming + Watson IoT Platform Integration"

  1. KevinLaurier May 11, 2017

    Hi,

    Would it be possible to update the GitHub fork to use the latest version of the park Project External MQTT Assembly library?

    https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-mqtt-assembly_2.10

    Regards,

    Kevin

  2. nicholasfay June 26, 2017

    What would be the best way to then return specific analytic results from Spark back to the IoT devices (mobiles phones, smart watches, etc.) in the above pipeline laid out?

Join The Discussion