Contents


Overview

Skill Level: Any Skill Level

Basic knowledge of Watson IoT Platform, Apache Spark and Scala programming.

Introduction Today, IoT devices generate a humongous amount of data, which are extremely varied, noisy, time-sensitive and often confidential. In many scenarios, these data need to be processed in real-time to detect anomalies, filter events, enrich the data for machine learning and prediction. This requires an integration between the IoT Platform and Apache Spark. For […]

Step-by-step

  1. Introduction

    Today, IoT devices generate a humongous amount of data, which are extremely varied, noisy, time-sensitive and often confidential. In many scenarios, these data need to be processed in real-time to detect anomalies, filter events, enrich the data for machine learning and prediction. This requires an integration between the IoT Platform and Apache Spark. For example, Apache Spark can be used to analyze the vehicle telematics data, detect anomalies if any, and trigger a corresponding action back to the vehicle in real-time.

    This recipe explains, how to configure the Spark service running in IBM Bluemix to receive data from IBM Watson IoT Platform in simple steps, to process the IoT data in real-time. The following diagram shows various components involved in the integration.

    Where,

    IBM Watson Internet of Things Platform is a fully managed, cloud-hosted service that makes it simple to derive value from Internet of Things (IoT) devices. The platform provides simple, but powerful application access to IoT devices and data.

    Apache Spark is a powerful open source, general engine for big data processing, built around with speed, ease of use, and sophisticated analytics. Apache Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

    Spark-Streaming-MQTT helps to receive data from any MQTT sources to Spark Streaming application.

    Jupyter Notebook allows one to create and share documents that contain live code, equations, visualizations and explanatory text.

  2. Register your Device(s) In Watson IoT Platform

    In order to receive the device events in Spark, we need to setup the IBM Watson IoT Platform and register the device(s) first. This section shows how you can setup the same.

    Carry out the steps present in this recipe to register your device(s) in IBM Watson IoT Platform. When the device is registered, you will be provided with the registration details shown below. Make a note of them, we need these details to connect the device to Watson IoT Platform later.

    At this step, we have successfully created the IBM Watson IoT Platform service and registered the device(s) in it.

  3. Publish Device Events

    In order to receive the device events in Apache Spark, we need to configure the device to send the events to IBM Watson IoT Platform. This section shows how to send sample device events to the Watson IoT Platform.

    1. Download and install Maven and Git if not installed already.
    2. Open a command prompt and Clone the device-samples project using git clone as follows,
      git clone https://github.com/ibm-messaging/iot-device-samples.git
    3. Navigate to the device-samples project, cd iot-device-samples/java/device-samples

    4. Build the project using maven as follows,
      mvn clean package 

      (This will download the Java Client library for Watson IoT Platform, other required dependencies and starts the building process. Once built, the sample can be located in the target directory, for example, target/ibmiot-device-samples-0.0.1.jar)

    5. Modify the device.prop file present under target/classes directory structure, by entering the following device registration details that you noted in the previous step:
      Organization-ID = <Your Organization ID>
      Device-Type = <Your Device Type>
      Device-ID = <Your Device ID>
      Authentication-Method = token
      Authentication-Token = <Your Device Token>
    6. Run the deviceeventpublish sample by specifying the following command:
      mvn exec:java -Dexec.mainClass="com.ibm.iotf.sample.client.device.DeviceEventPublishWithCounter"

      (The sample code is present here)

    7. This code makes the device to publish the events every 1 second each event contains a counter, name, cpu and memory usage of the process. Observe that the datapoint event-count is increased for every new event that gets published.

    In this section, we have successfully started a device sample. Let's consume and process these events by creating the Apache Spark application in the next section.

  4. Create Apache Spark service in Bluemix

    In this step, we will create the Apache Spark service in Bluemix.

    IBM Bluemix is an open standards cloud platform for building, running, and managing apps and services.

    1. Open your favorite browser and go to Bluemix. If you are an existing Bluemix user, log in as usual. If you are new to Bluemix you can sign up for a free 30 day trial.
    2. In Bluemix, click Catalog and select Apache Spark service under Data and Analytics section as shown below,spark
    3. Pick a plan and create the service as shown below,
    4. Observe that the following UI is shown when the service is created.
    5. Notebooks are interactive environments for exploring, analyzing, and visualizing data, integrated for use with IBM Analytics for Apache Spark. Click NOTEBOOKS button to show existing Notebooks.Click on NEW NOTEBOOK button.

    6. Enter a Name, under Language select Scala and click CREATE NOTEBOOK button as shown below,

    7. Cool, We are all set to run a Spark streaming application now.

    In this step, we have successfully created a Spark service in Bluemix and created a scala notebook.

  5. On-Board IoT Device Events to Spark

    In this step, we will create the scala notebook application to onboard the device events to Apache Spark service in Bluemix.

    Generate API Key and Token of Watson IoT Platform

    In order to connect Apache Spark service to IBM Watson IoT Platform to receive device events, we need to generate the API key and token first. This can be achieved by carrying out steps present in this section – Generate API Key in Watson IoT Platform.

    Note down API Key and Authentication Token. We need these to connect the Spark application to Watson IoT Platform.

    Create the notebook application to receive the device events in the Spark service,

    1. Go to the notebook, In the first cell, enter the following special command AddJar to upload the MQTT and spark-streaming-mqtt jars to the IBM Analytics for Spark service.
      %AddJar https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/org.eclipse.paho.client.mqttv3-1.0.2.jar -f
      %AddJar http://central.maven.org/maven2/com/google/code/gson/gson/2.2.4/gson-2.2.4.jar -f
      %AddJar https://github.com/sathipal/spark-streaming-mqtt-with-security_2.10-1.3.0/releases/download/0.0.1/spark-streaming-mqtt-security_2.10-1.3.0-0.0.1.jar -f

      (The % before AddJar is a special command, which is currently available, but may be deprecated in an upcoming release. We'll update this tutorial at that time. The -f forces the download even if the file is already in the cache)

    2. In the next cell, import the necessary classes and create the Apache Spark Streaming context as shown below. In this example, we set the streaming batch interval to 1 second.
      import org.eclipse.paho.client.mqttv3._

      import org.apache.spark.storage.StorageLevel
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      import org.apache.spark.streaming.mqtt._
      import org.apache.spark.SparkConf

      val ssc = new StreamingContext(sc, Seconds(1))
    3. Then, create the MQTT stream using the the spark-streaming-mqtt-connector service that is available in Github. We customized the connector from Apache Spark and added the following,
      • Added SSL security such that the connection between the Spark application and IBM Watson IoT Platform is always secured.
        Added MQTT topic in the RDD, such that one can parse and associate the messages with the appropriate device or device type. For more information refer to the Github.
    4. Add the following code that creates the MQTT stream,
      val lines = MQTTUtils.createStream(ssc,                           // Spark Streaming Context
      "ssl://<your-org>.messaging.internetofthings.ibmcloud.com:8883", // Watson IoT Platform URL
      "iot-2/type/+/id/+/evt/+/fmt/+", // MQTT topic to receive the events
      "a:<your-org>:random", // Unique ID of the application
      "<API-Key>", // API-Key
      "<Auth-Token>") // Auth-Token
      (Modify the above code with your Organization ID, API-Key and Auth-Token that you received earlier. The MQTT topic used in this code snippet subscribes to events from all the devices in the given Organization. Refer to the documentation if you want to control the subscription)
    5. In the next step, add the code to parse the topic and associate the messages with deviceId as shown below, Also start the streaming context such that it runs for every second.
      /*
      * The message topic and payload is split with space, so lets split the message with space
      * and keep the deviceId as key and payload as value.
      */
      val deviceMappedLines = lines.map(x => ((x.split(" ", 2)(0)).split("/")(4), x.split(" ", 2)(1)))
      deviceMappedLines.print()

      ssc.start()
      ssc.awaitTermination()
    6. Then run the code all at once by going to Cell->Run All as shown below,
    7. Observe that, the Spark application maps the events with the deviceId, and outputs the event every second as shown below, In this case the device Id is SoftwareDevice.

    In this step, we have successfully created a scala notebook application and received the events in Apace Spark.

  6. Threshold based Anomaly detection

    In this step, we will see how to detect anomalies based on threshold settings.

    One can use the Filter operator to detect the threshold based deviations. The filter transformation removes tuples from the RDD by passing along only those lines that satisfy a user-specified condition.

    • In the existing notebook, remove the line deviceMappedLines.print() and paste the following code that initially maps the JSON payload into thescala map and then filters the lines where thecpu usage is greater than 10%.
      import java.util.Map.Entry
      import com.google.gson.JsonElement
      import java.util.Set

      // Map the Json payload into scala map
      val jsonLines = deviceMappedLines.map(x => {
      var dataMap:Map[String, Any] = Map()
      val payload = new JsonParser().parse(x._2).getAsJsonObject()
      val deviceObject = payload.get("d").getAsJsonObject()
      val setObj = deviceObject.entrySet()
      val itr = setObj.iterator()
      while(itr.hasNext()) {
      val entry = itr.next();
      try {
      dataMap.put(entry.getKey(), entry.getValue().getAsDouble())
      } catch {
      case e: Exception => dataMap.put(entry.getKey(), entry.getValue().getAsString())
      }
      }
      (x._1, dataMap)
      })

      /**
      * Create a simple threshold rule. If cpu usage is greater than 10, alert
      */

      val threasholdCrossedLines = jsonLines.filter(
      x => {
      var status = false;
      val value = x._2.get("cpu")
      if(value != null) {
      if(value.get.asInstanceOf[Double] > 10.0) {
      status = true
      }
      }
      status
      })

      threasholdCrossedLines.print()

      (The complete code is available here)

    • Stop the previous job by clicking on the interrupt & restart kernel buttons.
    • Then run the code all at once by going to Cell->Run All. Observe that the output line is printed only when the CPU usage crosses 10%.

    In this step, we saw how to filter the lines where the CPU usage is greater than 10%. Similarly, you can filter rows based on other datapoints as well.

  7. Compute Running Average

    In this step, we will see how to compute running average of various data points that the IoT devices send. 

    1. Open a new notebook
    2. Copy the code that calculates the running average from the Github and place it in the notebook.
    3. Modify the line that creates the MQTT stream with your Organization ID, API-Key and Auth-Token:
      Open a new notebook
      Copy the code that calculates the running average from the Github and place it in the notebook.
      Modify the line that creates the MQTT stream with your Organization ID, API-Key and Auth-Token:

    4. Then run the code all at once by going to Cell->Run All as shown below, (you may need to interrupt & restart the kernel if there is an instance running already)
    5. Observe that each line prints the average along with the number of events consumed so far.

    At this step, we have successfully computed the running average of individual datapoints that the device sends.

  8. Conclusion

    In this recipe, we demonstrated how to connect Apache Spark service running in Bluemix to IBM Watson IoT Platform and receive device events in real-time. Developers can look at the code made available in the Github repository to understand whats happening under the hood. Developers can consider this recipe as a template for integrating Apache Spark with IBM Watson IoT Platform.

  9. Where to go from here?

     

    1. Go through IBM Watson IoT Real-Time Insights recipe that enables one to perform analytics on real-time data from the IoT devices and gain diagnostic insights. Though the recipe is written to analyze the vehicle data, the same can be used as a template to derive insights from any IoT data.
    2. Go through Engage Machine Learning for detecting anomalous behaviors of things recipe that showcases how one can make use of the Predictive Analysis service, available on the IBM Bluemix, to determine the hidden patterns on the events published by IoT devices, on the IBM Watson IoT Platform.

Join The Discussion