Overview

Skill Level: Any Skill Level

Intermediate  

Connecting Message Hub to Watson™ IoT Platform provides a scalable, high-throughput message bus for historical data storage. Message Hub is built on Apache Kafka, which is an open-source, high-throughput messaging system for handling real-time data feeds

Ingredients

Step-by-step

  1. Introduction

    IBM Message Hub for Bluemix is a scalable, distributed, high throughput message bus to wire microservices together by using open protocols. This can be used to stream data to analytics to realize powerful insights. IBM Message Hub is built on Apache Kafka which is a fast, scalable, and durable real-time messaging engine.

    Advantages of using IBM Message Hub

    • Distributed publish-subscribe messaging system that is designed to be fast, scalable, and durable
    • High throughput message bus which can consume all the live events from millions of devices.
    • Low latency platform for handling real time data feeds.
    • Build high performance scalable streaming analytics solutions.

    In this tutorial, you will learn how to integrate Watson IoT Platform with IBM Message Hub. The events from devices in Watson IoT will be posted to a topic in IBM message Hub.

  2. Architecture

    IBM Message Hub provides multiple interfaces through which messages can be produced and consumed. In this tutorial, we will stream the live event data from the IBM Watson IoT Platform to the IBM Message Hub using the Message Hub connector provided by Watson IoT Platform. And then use the power of Apache Spark and InfoSphere streams alongside IBM Message Hub(Kafka) to build high performance scalable streaming analytics solutions.

     

    Producers publish the data to the Message Hub, which in this case is the Message Hub connector present in Watson IoT Platform that forwards the IoT devices data to Message Hub. Consumers consume the data from pulling it from the Message Hub. IBM Message Hub can be integrated with multiple services in Bluemix to consume the data. For example, Apache Spark, Streaming Analytics.

  3. Connect & publish events to Watson IoT Platform

    Register your device(s) in Watson IoT Platform

    As you work through this recipe, it is necessary that, you need to have atleast one or more Devices registered on the Watson IoT Platform. Refer this recipe to register Devices to Watson IoT Platform. When the device is registered, you will be provided with the registration details shown below. d1

    Connect your device(s) to the Watson IoT Platform

    Connect your device to Watson IoT Platform and send events. In case if there is no real device, you can use the simulator device – http://watson-iot-sensor-simulator.mybluemix.net/ to send the events to Watson IoT Platform. This simulated device is capable of sending temperature, humidity, and object temperature data. The value being sent can be changed by interacting with the web UI of the simulator. This simulator connects to the Platform and sends an ‘iotsensor’ event every 2 seconds while active. Keep the tab or window open with the sensor to continue sending data.

    Refer to this IoT Recipe to further understand on how to configure the IoT Simulator and fetch the sensor data for the sensors mentioned above.

  4. Configure the Message Hub Connector in Watson IoT Platform

    Create Message Hub service in Bluemix

    Click this link to create the Message Hub service in Bluemix. Note:  The Message Hub service must be created in the same Bluemix space as your Watson IoT Platform.

    Configure the Message Hub connector in Watson IoT Platform

    • On the Watson™ IoT Platform dashboard click Extensions in the navigation bar.
      extension_1
    • In the Historical Data Storage tile, click Setup,
      historic_1
    • Observe that all available Message Hub services within the same Bluemix space as your Watson™ IoT Platform service are listed. Select the Message Hub service that you want to connect to. historic-2
    • You will be provided with list of configuration options,
    • Change the timezone if you want to convert the event data timestamp to the selected timezone.
    • Define the default topic incase you want to send all the device data to the Message Hub topic. To use more granular topic assignments, leave the default topic blank and add custom forwarding rules as shown below,define-rule-forward_1
    • As shown the configuration sends only the iotsensor events from all the devices associated with the device type iotsample-deviceType to the Message Hub topic sensor_event. Click the tick mark to complete the definition. Note: You need to change the values according to your device type and event name.
    • Topic configuration is optional and allows you to list a partition count for specific MessageHub topics. If you do not set a topic configuration for a topic, it will be created with a default of two partitions unless it is a previously existing topic in your Message Hub service. 
    • To create 3 partitions for the sensor_event topic configure the connector as follows,topic-config-final
    • Click on the tick mark to complete the partition setup and Click on the Done button to complete the setup. Observe a following popup screen that requests for your authorization,authorize
    • Click Confirm to authorize. 
    • Open the Message Hub service to see the topic.topic-in-ms
    • Your device data is now stored in your Message Hub. Following is the sample message that is published to the Message Hub for the above configuration.
       {"orgId":"9ts1nd","deviceType":"iotsample-deviceType","deviceId":"Device01","eventType":"iotsensor","format":"json","timestamp":"2016-10-18T11:53:29.525Z"},{"d":{"name":"Device01","temperature":37,"humidity":79,"objectTemp":23}}

    In this section, we saw how to configure the Message Hub connector to send device events to Message Hug topic. In the next section we will see how to use the Jupyter Notebook to consume the data in Spark and analyze it.

  5. Consume the Device Events in Apache Spark from Message Hub

    In this step, we will learn how to consume the device events in Apache Spark from Message Hub.

    1. Create Apache Spark service from the Catalog in Bluemix.
      1. Click this link to create the Apache Spark service running in Bluemix
      2. Click on Create
      3. After the service in created, click on the service.
    2. Click on Notebooks to run the analytic code in the interactive mode.
    3. Now click on New Notebook to create a new instance of the notebook.
    4. In the notebook copy the following code.
      %Addjar http://central.maven.org/maven2/org/apache/kafka/kafka-clients/0.9.0.0/kafka-clients-0.9.0.0.jar
      %Addjar http://central.maven.org/maven2/org/apache/kafka/kafka_2.10/0.9.0.0/kafka_2.10-0.9.0.0.jar
      %Addjar http://central.maven.org/maven2/org/apache/kafka/kafka-log4j-appender/0.9.0.0/kafka-log4j-appender-0.9.0.0.jar
      %Addjar https://github.com/ibm-messaging/message-hub-samples/raw/master/java/message-hub-login-library/messagehub.login-1.0.0.jar
      %Addjar https://github.com/ibm-messaging/iot-messgehub-spark-samples/releases/download/v0.1/streaming-kafka.jar

      import org.apache.spark.streaming.Duration
      import org.apache.spark.streaming.Seconds
      import org.apache.spark.streaming.StreamingContext
      import com.ibm.cds.spark.samples.config.MessageHubConfig
      import com.ibm.cds.spark.samples.dstream.KafkaStreaming.KafkaStreamingContextAdapter
      import org.apache.kafka.common.serialization.Deserializer
      import org.apache.kafka.common.serialization.StringDeserializer

      val kafkaProps = new MessageHubConfig

      kafkaProps.setConfig("bootstrap.servers", "kafka01-prod01.messagehub.services.us-south.bluemix.net:9093,kafka02-prod01.messagehub.services.us-south.bluemix.net:9093,kafka03-prod01.messagehub.services.us-south.bluemix.net:9093,kafka04-prod01.messagehub.services.us-south.bluemix.net:9093,kafka05-prod01.messagehub.services.us-south.bluemix.net:9093")
      kafkaProps.setConfig("kafka.user.name","xxxxxxxxxxxx")
      kafkaProps.setConfig("kafka.user.password", "xxxxxxxxxx")
      kafkaProps.setConfig("kafka.topic","sensor_event")
      kafkaProps.setConfig("api_key","xxxxxxxxxxxxxxx")
      kafkaProps.setConfig("kafka_rest_url", "https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443")

      kafkaProps.createConfiguration()


      val ssc = new StreamingContext( sc, Seconds(2) )

      val stream = ssc.createKafkaStream[String, String, StringDeserializer, StringDeserializer](
      kafkaProps,
      List(kafkaProps.getConfig("kafka.topic"))
      );

      stream.print()
      ssc.start()
      ssc.awaitTermination()

       

    5. Navigate to Message Hubs Bluemix dashboard and click the Service Credentials tab. Note the kafka_brokers, user, password and api_key fields.
    6. Replace the “x” in the Notebook code with the credentials from Message Hub.
    7. Now the code is ready
    8. Click on Cell – > Run
    9. Now the Code is running in Apache Spark. And you will be able to see the events from the devices here in the console,Spark-output 

     

    Now you have successfully integrated Apache Spark with Message Hub and able to get the device events from Watson IoT Platform.

  6. Advanced Spark Example - Running Average

    In this step, you will learn how to get the Running Average(rolling average) of the device events. 

    1. We will use the code from the previous step as a template for this example.
    2. First we need to decode the payload that we get from the Message Hub. We will use the below code to decode the payload.
       val decodedLines = stream.map( x => {
      val value = x._2
      val kafkaPayload = new JsonParser().parse(value).getAsJsonObject()
      val encodedPayload:String = kafkaPayload.get("payload").getAsString()
      val payload = Base64.decodeBase64(encodedPayload)
      (x._1, new String(payload))
      })
    3. Then we will use the below code to get the data points from the device events.
       val jsonMappedLines = decodedLines.map( x => {
      var dataMap:Map[String, Any] = Map()
      val payload = new JsonParser().parse(x._2).getAsJsonObject()
      val setObj = payload.entrySet()
      val itr = setObj.iterator()
      while(itr.hasNext()) {
      val entry = itr.next();
      try {
      dataMap.put(entry.getKey(), entry.getValue().getAsDouble())
      } catch {
      case e: Exception => dataMap.put(entry.getKey(), entry.getValue().getAsString())
      }
      }
      (x._1, dataMap)
      })
    4. Then use the below code to find the running average for each of the data point.
       val runningAverage = (value: Seq[Map[String, Any]], state: Option[Map[String, Any]]) => {
      var stateMap = state.getOrElse(Map())
      var count = 0;
      if(stateMap.contains("TotalEventsSoFar")) {
      count = stateMap.get("TotalEventsSoFar").get.asInstanceOf[Int] }

      value.foreach { row => {
      count = count + 1;
      row.foreach { entry => {
      try {
      val currentVal = entry._2.asInstanceOf[Double];
      if(stateMap.contains(entry._1)) {
      val oldAverage = stateMap.get(entry._1).get.asInstanceOf[Double] val newAvg = (oldAverage * (count - 1) / count) + (currentVal / count)
      stateMap.put(entry._1, newAvg)
      } else {
      stateMap.put(entry._1, currentVal)
      }
      } catch {
      case e: Exception => stateMap.put(entry._1, entry._2.asInstanceOf[String])
      }
      }}
      }
      }
      stateMap.put("TotalEventsSoFar", count)
      Some(stateMap)
      }

    Final Code

    %Addjar http://central.maven.org/maven2/org/apache/kafka/kafka-clients/0.9.0.0/kafka-clients-0.9.0.0.jar
    %Addjar http://central.maven.org/maven2/org/apache/kafka/kafka_2.10/0.9.0.0/kafka_2.10-0.9.0.0.jar
    %Addjar http://central.maven.org/maven2/org/apache/kafka/kafka-log4j-appender/0.9.0.0/kafka-log4j-appender-0.9.0.0.jar
    %Addjar https://github.com/ibm-messaging/message-hub-samples/raw/master/java/message-hub-login-library/messagehub.login-1.0.0.jar
    %Addjar https://github.com/ibm-messaging/iot-messgehub-spark-samples/releases/download/v0.1/streaming-kafka.jar
    %Addjar http://central.maven.org/maven2/commons-codec/commons-codec/1.2/commons-codec-1.2.jar
    %Addjar http://central.maven.org/maven2/com/google/code/gson/gson/2.6.2/gson-2.6.2.jar

    import org.apache.spark.streaming.Duration
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.StreamingContext
    import com.ibm.cds.spark.samples.config.MessageHubConfig
    import com.ibm.cds.spark.samples.dstream.KafkaStreaming.KafkaStreamingContextAdapter
    import org.apache.kafka.common.serialization.Deserializer
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.commons.codec.binary.Base64
    import java.util.Map.Entry
    import com.google.gson.JsonElement
    import java.util.Set

    import scala.collection.mutable.Map

    import com.google.gson.JsonObject
    import com.google.gson.JsonParser
    val kafkaProps = new MessageHubConfig
    val runningAverage = (value: Seq[Map[String, Any]], state: Option[Map[String, Any]]) => {
    var stateMap = state.getOrElse(Map())
    var count = 0;
    if(stateMap.contains("TotalEventsSoFar")) {
    count = stateMap.get("TotalEventsSoFar").get.asInstanceOf[Int] }

    value.foreach { row => {
    count = count + 1;
    row.foreach { entry => {
    try {
    val currentVal = entry._2.asInstanceOf[Double];
    if(stateMap.contains(entry._1)) {
    val oldAverage = stateMap.get(entry._1).get.asInstanceOf[Double] val newAvg = (oldAverage * (count - 1) / count) + (currentVal / count)
    stateMap.put(entry._1, newAvg)
    } else {
    stateMap.put(entry._1, currentVal)
    }
    } catch {
    case e: Exception => stateMap.put(entry._1, entry._2.asInstanceOf[String])
    }
    }}
    }
    }
    stateMap.put("TotalEventsSoFar", count)
    Some(stateMap)
    }

    kafkaProps.setConfig("bootstrap.servers", "kafka01-prod01.messagehub.services.us-south.bluemix.net:9093,kafka02-prod01.messagehub.services.us-south.bluemix.net:9093,kafka03-prod01.messagehub.services.us-south.bluemix.net:9093,kafka04-prod01.messagehub.services.us-south.bluemix.net:9093,kafka05-prod01.messagehub.services.us-south.bluemix.net:9093")
    kafkaProps.setConfig("kafka.user.name","xxxxxxxxxxxxxx")
    kafkaProps.setConfig("kafka.user.password", "xxxxxxxxxxxxxxxx")
    kafkaProps.setConfig("kafka.topic","sensor_event")
    kafkaProps.setConfig("api_key","xxxxxxxxxxxxxxxxxxx")
    kafkaProps.setConfig("kafka_rest_url", "https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443")

    kafkaProps.createConfiguration()


    val ssc = new StreamingContext( sc, Seconds(2) )
    ssc.checkpoint(".")

    val stream = ssc.createKafkaStream[String, String, StringDeserializer, StringDeserializer](
    kafkaProps, List(kafkaProps.getConfig("kafka.topic")));

    //stream.print()

    val reducedKey = stream.map( x => {
    val value = x._1
    val key = new JsonParser().parse(value).getAsJsonObject()
    val clientId:String = key.get("orgId").getAsString() + ":" + key.get("deviceType").getAsString() + ":" + key.get("deviceId")
    (clientId, x._2)
    })
    //reducedKey.print()

    val jsonMappedLines = reducedKey.map( x => {
    var dataMap:Map[String, Any] = Map()
    val payload = new JsonParser().parse(x._2).getAsJsonObject()
    val deviceObject = payload.get("d").getAsJsonObject()
    val setObj = deviceObject.entrySet()
    val itr = setObj.iterator()
    while(itr.hasNext()) {
    val entry = itr.next();
    try {
    dataMap.put(entry.getKey(), entry.getValue().getAsDouble())
    } catch {
    case e: Exception => dataMap.put(entry.getKey(), entry.getValue().getAsString())
    }
    }
    (x._1, dataMap)
    })
    //jsonMappedLines.print()

    // compute the running average
    val avgLines = jsonMappedLines.updateStateByKey(runningAverage)
    avgLines.print()


    ssc.start()
    ssc.awaitTermination()

    Now run this code by clicking Cell – > RUN. You will see the running average of all datapoints from all the devices.

    Spark-output-2

     

    Now, you have successfully executed the running average sample in Apache Spark with the events from your devices.

  7. Next Steps

    Now you can stream the data from IBM message hub to the analytics platform to build high performance scalable streaming analytics solutions. You can follow the blogs below to extend this tutorial.

    Also, We suggest you to look at the list of analytical and cognitive recipes around Watson IoT Platform to analyze the IoT events.


    For any questions/feedback , please leave your comments below.

7 Comments on "Integrating Watson IoT Platform with Message Hub and Apache Spark"

  1. Thank you! I had been investigating how to publish from my Watson Iot app to MessageHub, and you provided the missing link.

  2. StanfordDad June 29, 2016

    Hello. Thanks for this excellent recipe. I am new to Scala.

    My setup would work when the message is

    (MqttClient:aabbccddee,{“myName”: “Arduino DHT22″,”temperatureF”: 80,”temperatureC”: 26,”humidity”: 23})

    But fails when it is
    (MqttClient:aabbccddee,{ “d”: { “myName”: “Arduino DHT22”, “temperatureF”: 80, “temperatureC”: 26, “humidity”: 24 }})

    Any suggestions on how to tweak the code to make it work with the second message?

    • YMDH_sathish_Palaniappan June 30, 2016

      Thank you @StanfordDad for the comments !!

      Modify the “val jsonMappedLines = decodedLines.map” section to parse the parent object “d” first before the child by using the following code,

      val jsonMappedLines = decodedLines.map( x => {
      var dataMap:Map[String, Any] = Map()
      val payload = new JsonParser().parse(x._2).getAsJsonObject()
      val deviceObject = payload.get(“d”).getAsJsonObject()
      val setObj = deviceObject.entrySet()
      val itr = setObj.iterator()
      while(itr.hasNext()) {
      val entry = itr.next();
      try {
      dataMap.put(entry.getKey(), entry.getValue().getAsDouble())
      } catch {
      case e: Exception => dataMap.put(entry.getKey(), entry.getValue().getAsString())
      }
      }
      (x._1, dataMap)
      })

      Let me know if you still face any issues.

      Thanks !
      Sathish

  3. ibm_tester-01 March 18, 2017

    Hello. I am trying to reproduce this tutorial, however when I try to run the code to consume the Device Events in Apache Spark from Message Hub there are no results.

    Results:

    ——
    Time: XXXX
    —–

    ——
    Time: XXXX
    —–

    I have also tried to consume data from Message Hub in node.js apps and the data is exchanged correctly.

    Any suggestions?

  4. JacquesRoy March 20, 2017

    I seem to have a similar problem as @ibm_tester-01

    I am using data science.ibm.com

    I first tried with a Jupyter/Scala/Spark 2.0 environment and it failed (strangely) when creating the StreamingContext.

    I switched to Jupyter/Scala/Spark1.6
    This time, it failed at:
    %Addjar https://github.com/ibm-messaging/message-hub-samples/raw/master/java/message-hub-login-library/messagehub.login-1.0.0.jar

    If I comment out this command, it runs but does not get anything from MessageHub.
    I know my parameters are fine since I was able to read Message Hub from IBM Streams.

    Something must have changed recently.
    Any help welcome.

Join The Discussion