Overview

Skill Level: Any Skill Level

Intermediate  

Connecting Message Hub to Watson™ IoT Platform provides a scalable, high-throughput message bus for historical data storage. Message Hub is built on Apache Kafka, which is an open-source, high-throughput messaging system for handling real-time data feeds

Ingredients

Step-by-step

  1. Introduction

    IBM Message Hub for Bluemix is a scalable, distributed, high throughput message bus to wire microservices together by using open protocols. This can be used to stream data to analytics to realize powerful insights. IBM Message Hub is built on Apache Kafka which is a fast, scalable, and durable real-time messaging engine.

    Advantages of using IBM Message Hub

    • Distributed publish-subscribe messaging system that is designed to be fast, scalable, and durable
    • High throughput message bus which can consume all the live events from millions of devices.
    • Low latency platform for handling real time data feeds.
    • Build high performance scalable streaming analytics solutions.

    In this tutorial, you will learn how to integrate Watson IoT Platform with IBM Message Hub. The events from devices in Watson IoT will be posted to a topic in IBM message Hub.

  2. Architecture

    IBM Message Hub provides multiple interfaces through which messages can be produced and consumed. In this tutorial, we will stream the live event data from the IBM Watson IoT Platform to the IBM Message Hub using the Message Hub connector provided by Watson IoT Platform. And then use the power of Apache Spark and InfoSphere streams alongside IBM Message Hub(Kafka) to build high performance scalable streaming analytics solutions.

     

    Producers publish the data to the Message Hub, which in this case is the Message Hub connector present in Watson IoT Platform that forwards the IoT devices data to Message Hub. Consumers consume the data from pulling it from the Message Hub. IBM Message Hub can be integrated with multiple services in Bluemix to consume the data. For example, Apache Spark, Streaming Analytics.

  3. Connect & publish events to Watson IoT Platform

    Register your device(s) in Watson IoT Platform

    As you work through this recipe, it is necessary that, you need to have atleast one or more Devices registered on the Watson IoT Platform. Refer this recipe to register Devices to Watson IoT Platform. When the device is registered, you will be provided with the registration details shown below. d1

    Connect your device(s) to the Watson IoT Platform

    Connect your device to Watson IoT Platform and send events. In case if there is no real device, you can use the simulator device – http://watson-iot-sensor-simulator.mybluemix.net/ to send the events to Watson IoT Platform. This simulated device is capable of sending temperature, humidity, and object temperature data. The value being sent can be changed by interacting with the web UI of the simulator. This simulator connects to the Platform and sends an ‘iotsensor’ event every 2 seconds while active. Keep the tab or window open with the sensor to continue sending data.

    Refer to this IoT Recipe to further understand on how to configure the IoT Simulator and fetch the sensor data for the sensors mentioned above.

  4. Configure the Message Hub Connector in Watson IoT Platform

    Create Message Hub service in Bluemix

    Click this link to create the Message Hub service in Bluemix. Note:  The Message Hub service must be created in the same Bluemix space as your Watson IoT Platform.

    Configure the Message Hub connector in Watson IoT Platform

    • On the Watson™ IoT Platform dashboard click Extensions in the navigation bar.
      extension_1
    • In the Historical Data Storage tile, click Setup,
      historic_1
    • Observe that all available Message Hub services within the same Bluemix space as your Watson™ IoT Platform service are listed. Select the Message Hub service that you want to connect to. historic-2
    • You will be provided with list of configuration options,
    • Change the timezone if you want to convert the event data timestamp to the selected timezone.
    • Define the default topic incase you want to send all the device data to the Message Hub topic. To use more granular topic assignments, leave the default topic blank and add custom forwarding rules as shown below,define-rule-forward_1
    • As shown the configuration sends only the iotsensor events from all the devices associated with the device type iotsample-deviceType to the Message Hub topic sensor_event. Click the tick mark to complete the definition. Note: You need to change the values according to your device type and event name.
    • Topic configuration is optional and allows you to list a partition count for specific MessageHub topics. If you do not set a topic configuration for a topic, it will be created with a default of two partitions unless it is a previously existing topic in your Message Hub service. 
    • To create 3 partitions for the sensor_event topic configure the connector as follows,topic-config-final
    • Click on the tick mark to complete the partition setup and Click on the Done button to complete the setup. Observe a following popup screen that requests for your authorization,authorize
    • Click Confirm to authorize. 
    • Open the Message Hub service to see the topic.topic-in-ms
    • Your device data is now stored in your Message Hub. Following is the sample message that is published to the Message Hub for the above configuration.
       {"orgId":"9ts1nd","deviceType":"iotsample-deviceType","deviceId":"Device01","eventType":"iotsensor","format":"json","timestamp":"2016-10-18T11:53:29.525Z"},{"d":{"name":"Device01","temperature":37,"humidity":79,"objectTemp":23}}

    In this section, we saw how to configure the Message Hub connector to send device events to Message Hug topic. In the next section we will see how to use the Jupyter Notebook to consume the data in Spark and analyze it.

  5. Consume the Device Events in Apache Spark from Message Hub

    In this step, we will learn how to consume the device events in Apache Spark from Message Hub.

    1. Create Apache Spark service from the Catalog in Bluemix.
      1. Click this link to create the Apache Spark service running in Bluemix
      2. Click on Create
      3. After the service in created, click on the service.
    2. Click on Notebooks to run the analytic code in the interactive mode.
    3. Now click on New Notebook to create a new instance of the notebook.
    4. In the notebook copy the following code.
      %Addjar http://central.maven.org/maven2/org/apache/kafka/kafka-clients/0.9.0.0/kafka-clients-0.9.0.0.jar
      %Addjar http://central.maven.org/maven2/org/apache/kafka/kafka_2.10/0.9.0.0/kafka_2.10-0.9.0.0.jar
      %Addjar http://central.maven.org/maven2/org/apache/kafka/kafka-log4j-appender/0.9.0.0/kafka-log4j-appender-0.9.0.0.jar
      %Addjar https://github.com/ibm-messaging/message-hub-samples/raw/master/java/message-hub-login-library/messagehub.login-1.0.0.jar
      %Addjar https://github.com/ibm-messaging/iot-messgehub-spark-samples/releases/download/v0.1/streaming-kafka.jar

      import org.apache.spark.streaming.Duration
      import org.apache.spark.streaming.Seconds
      import org.apache.spark.streaming.StreamingContext
      import com.ibm.cds.spark.samples.config.MessageHubConfig
      import com.ibm.cds.spark.samples.dstream.KafkaStreaming.KafkaStreamingContextAdapter
      import org.apache.kafka.common.serialization.Deserializer
      import org.apache.kafka.common.serialization.StringDeserializer

      val kafkaProps = new MessageHubConfig

      kafkaProps.setConfig("bootstrap.servers", "kafka01-prod01.messagehub.services.us-south.bluemix.net:9093,kafka02-prod01.messagehub.services.us-south.bluemix.net:9093,kafka03-prod01.messagehub.services.us-south.bluemix.net:9093,kafka04-prod01.messagehub.services.us-south.bluemix.net:9093,kafka05-prod01.messagehub.services.us-south.bluemix.net:9093")
      kafkaProps.setConfig("kafka.user.name","xxxxxxxxxxxx")
      kafkaProps.setConfig("kafka.user.password", "xxxxxxxxxx")
      kafkaProps.setConfig("kafka.topic","sensor_event")
      kafkaProps.setConfig("api_key","xxxxxxxxxxxxxxx")
      kafkaProps.setConfig("kafka_rest_url", "https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443")

      kafkaProps.createConfiguration()


      val ssc = new StreamingContext( sc, Seconds(2) )

      val stream = ssc.createKafkaStream[String, String, StringDeserializer, StringDeserializer](
      kafkaProps,
      List(kafkaProps.getConfig("kafka.topic"))
      );

      stream.print()
      ssc.start()
      ssc.awaitTermination()

       

    5. Navigate to Message Hubs Bluemix dashboard and click the Service Credentials tab. Note the kafka_brokers, user, password and api_key fields.
    6. Replace the “x” in the Notebook code with the credentials from Message Hub.
    7. Now the code is ready
    8. Click on Cell – > Run
    9. Now the Code is running in Apache Spark. And you will be able to see the events from the devices here in the console,Spark-output 

     

    Now you have successfully integrated Apache Spark with Message Hub and able to get the device events from Watson IoT Platform.

  6. Advanced Spark Example - Running Average

    In this step, you will learn how to get the Running Average(rolling average) of the device events. 

    1. We will use the code from the previous step as a template for this example.
    2. First we need to decode the payload that we get from the Message Hub. We will use the below code to decode the payload.
       val decodedLines = stream.map( x => {
      val value = x._2
      val kafkaPayload = new JsonParser().parse(value).getAsJsonObject()
      val encodedPayload:String = kafkaPayload.get("payload").getAsString()
      val payload = Base64.decodeBase64(encodedPayload)
      (x._1, new String(payload))
      })
    3. Then we will use the below code to get the data points from the device events.
       val jsonMappedLines = decodedLines.map( x => {
      var dataMap:Map[String, Any] = Map()
      val payload = new JsonParser().parse(x._2).getAsJsonObject()
      val setObj = payload.entrySet()
      val itr = setObj.iterator()
      while(itr.hasNext()) {
      val entry = itr.next();
      try {
      dataMap.put(entry.getKey(), entry.getValue().getAsDouble())
      } catch {
      case e: Exception => dataMap.put(entry.getKey(), entry.getValue().getAsString())
      }
      }
      (x._1, dataMap)
      })
    4. Then use the below code to find the running average for each of the data point.
       val runningAverage = (value: Seq[Map[String, Any]], state: Option[Map[String, Any]]) => {
      var stateMap = state.getOrElse(Map())
      var count = 0;
      if(stateMap.contains("TotalEventsSoFar")) {
      count = stateMap.get("TotalEventsSoFar").get.asInstanceOf[Int] }

      value.foreach { row => {
      count = count + 1;
      row.foreach { entry => {
      try {
      val currentVal = entry._2.asInstanceOf[Double];
      if(stateMap.contains(entry._1)) {
      val oldAverage = stateMap.get(entry._1).get.asInstanceOf[Double] val newAvg = (oldAverage * (count - 1) / count) + (currentVal / count)
      stateMap.put(entry._1, newAvg)
      } else {
      stateMap.put(entry._1, currentVal)
      }
      } catch {
      case e: Exception => stateMap.put(entry._1, entry._2.asInstanceOf[String])
      }
      }}
      }
      }
      stateMap.put("TotalEventsSoFar", count)
      Some(stateMap)
      }

    Final Code

    %Addjar http://central.maven.org/maven2/org/apache/kafka/kafka-clients/0.9.0.0/kafka-clients-0.9.0.0.jar
    %Addjar http://central.maven.org/maven2/org/apache/kafka/kafka_2.10/0.9.0.0/kafka_2.10-0.9.0.0.jar
    %Addjar http://central.maven.org/maven2/org/apache/kafka/kafka-log4j-appender/0.9.0.0/kafka-log4j-appender-0.9.0.0.jar
    %Addjar https://github.com/ibm-messaging/message-hub-samples/raw/master/java/message-hub-login-library/messagehub.login-1.0.0.jar
    %Addjar https://github.com/ibm-messaging/iot-messgehub-spark-samples/releases/download/v0.1/streaming-kafka.jar
    %Addjar http://central.maven.org/maven2/commons-codec/commons-codec/1.2/commons-codec-1.2.jar
    %Addjar http://central.maven.org/maven2/com/google/code/gson/gson/2.6.2/gson-2.6.2.jar

    import org.apache.spark.streaming.Duration
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.StreamingContext
    import com.ibm.cds.spark.samples.config.MessageHubConfig
    import com.ibm.cds.spark.samples.dstream.KafkaStreaming.KafkaStreamingContextAdapter
    import org.apache.kafka.common.serialization.Deserializer
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.commons.codec.binary.Base64
    import java.util.Map.Entry
    import com.google.gson.JsonElement
    import java.util.Set

    import scala.collection.mutable.Map

    import com.google.gson.JsonObject
    import com.google.gson.JsonParser
    val kafkaProps = new MessageHubConfig
    val runningAverage = (value: Seq[Map[String, Any]], state: Option[Map[String, Any]]) => {
    var stateMap = state.getOrElse(Map())
    var count = 0;
    if(stateMap.contains("TotalEventsSoFar")) {
    count = stateMap.get("TotalEventsSoFar").get.asInstanceOf[Int] }

    value.foreach { row => {
    count = count + 1;
    row.foreach { entry => {
    try {
    val currentVal = entry._2.asInstanceOf[Double];
    if(stateMap.contains(entry._1)) {
    val oldAverage = stateMap.get(entry._1).get.asInstanceOf[Double] val newAvg = (oldAverage * (count - 1) / count) + (currentVal / count)
    stateMap.put(entry._1, newAvg)
    } else {
    stateMap.put(entry._1, currentVal)
    }
    } catch {
    case e: Exception => stateMap.put(entry._1, entry._2.asInstanceOf[String])
    }
    }}
    }
    }
    stateMap.put("TotalEventsSoFar", count)
    Some(stateMap)
    }

    kafkaProps.setConfig("bootstrap.servers", "kafka01-prod01.messagehub.services.us-south.bluemix.net:9093,kafka02-prod01.messagehub.services.us-south.bluemix.net:9093,kafka03-prod01.messagehub.services.us-south.bluemix.net:9093,kafka04-prod01.messagehub.services.us-south.bluemix.net:9093,kafka05-prod01.messagehub.services.us-south.bluemix.net:9093")
    kafkaProps.setConfig("kafka.user.name","xxxxxxxxxxxxxx")
    kafkaProps.setConfig("kafka.user.password", "xxxxxxxxxxxxxxxx")
    kafkaProps.setConfig("kafka.topic","sensor_event")
    kafkaProps.setConfig("api_key","xxxxxxxxxxxxxxxxxxx")
    kafkaProps.setConfig("kafka_rest_url", "https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443")

    kafkaProps.createConfiguration()


    val ssc = new StreamingContext( sc, Seconds(2) )
    ssc.checkpoint(".")

    val stream = ssc.createKafkaStream[String, String, StringDeserializer, StringDeserializer](
    kafkaProps, List(kafkaProps.getConfig("kafka.topic")));

    //stream.print()

    val reducedKey = stream.map( x => {
    val value = x._1
    val key = new JsonParser().parse(value).getAsJsonObject()
    val clientId:String = key.get("orgId").getAsString() + ":" + key.get("deviceType").getAsString() + ":" + key.get("deviceId")
    (clientId, x._2)
    })
    //reducedKey.print()

    val jsonMappedLines = reducedKey.map( x => {
    var dataMap:Map[String, Any] = Map()
    val payload = new JsonParser().parse(x._2).getAsJsonObject()
    val deviceObject = payload.get("d").getAsJsonObject()
    val setObj = deviceObject.entrySet()
    val itr = setObj.iterator()
    while(itr.hasNext()) {
    val entry = itr.next();
    try {
    dataMap.put(entry.getKey(), entry.getValue().getAsDouble())
    } catch {
    case e: Exception => dataMap.put(entry.getKey(), entry.getValue().getAsString())
    }
    }
    (x._1, dataMap)
    })
    //jsonMappedLines.print()

    // compute the running average
    val avgLines = jsonMappedLines.updateStateByKey(runningAverage)
    avgLines.print()


    ssc.start()
    ssc.awaitTermination()

    Now run this code by clicking Cell – > RUN. You will see the running average of all datapoints from all the devices.

    Spark-output-2

     

    Now, you have successfully executed the running average sample in Apache Spark with the events from your devices.

  7. Next Steps

    Now you can stream the data from IBM message hub to the analytics platform to build high performance scalable streaming analytics solutions. You can follow the blogs below to extend this tutorial.

    Also, We suggest you to look at the list of analytical and cognitive recipes around Watson IoT Platform to analyze the IoT events.


    For any questions/feedback , please leave your comments below.

13 Comments on "Integrating Watson IoT Platform with Message Hub and Apache Spark"

  1. Thank you! I had been investigating how to publish from my Watson Iot app to MessageHub, and you provided the missing link.

  2. StanfordDad June 29, 2016

    Hello. Thanks for this excellent recipe. I am new to Scala.

    My setup would work when the message is

    (MqttClient:aabbccddee,{“myName”: “Arduino DHT22″,”temperatureF”: 80,”temperatureC”: 26,”humidity”: 23})

    But fails when it is
    (MqttClient:aabbccddee,{ “d”: { “myName”: “Arduino DHT22”, “temperatureF”: 80, “temperatureC”: 26, “humidity”: 24 }})

    Any suggestions on how to tweak the code to make it work with the second message?

    • YMDH_sathish_Palaniappan June 30, 2016

      Thank you @StanfordDad for the comments !!

      Modify the “val jsonMappedLines = decodedLines.map” section to parse the parent object “d” first before the child by using the following code,

      val jsonMappedLines = decodedLines.map( x => {
      var dataMap:Map[String, Any] = Map()
      val payload = new JsonParser().parse(x._2).getAsJsonObject()
      val deviceObject = payload.get(“d”).getAsJsonObject()
      val setObj = deviceObject.entrySet()
      val itr = setObj.iterator()
      while(itr.hasNext()) {
      val entry = itr.next();
      try {
      dataMap.put(entry.getKey(), entry.getValue().getAsDouble())
      } catch {
      case e: Exception => dataMap.put(entry.getKey(), entry.getValue().getAsString())
      }
      }
      (x._1, dataMap)
      })

      Let me know if you still face any issues.

      Thanks !
      Sathish

  3. ibm_tester-01 March 18, 2017

    Hello. I am trying to reproduce this tutorial, however when I try to run the code to consume the Device Events in Apache Spark from Message Hub there are no results.

    Results:

    ——
    Time: XXXX
    —–

    ——
    Time: XXXX
    —–

    I have also tried to consume data from Message Hub in node.js apps and the data is exchanged correctly.

    Any suggestions?

    • mchirukuri April 20, 2017

      Hi,

      I tried the above recipe and I am facing the issue like you. I am also not getting the results from Message Hub to Apache Spark. I consumed data from Message Hub and it’s coming correctly too. Were you able to find a solution to this?

  4. JacquesRoy March 20, 2017

    I seem to have a similar problem as @ibm_tester-01

    I am using data science.ibm.com

    I first tried with a Jupyter/Scala/Spark 2.0 environment and it failed (strangely) when creating the StreamingContext.

    I switched to Jupyter/Scala/Spark1.6
    This time, it failed at:
    %Addjar https://github.com/ibm-messaging/message-hub-samples/raw/master/java/message-hub-login-library/messagehub.login-1.0.0.jar

    If I comment out this command, it runs but does not get anything from MessageHub.
    I know my parameters are fine since I was able to read Message Hub from IBM Streams.

    Something must have changed recently.
    Any help welcome.

  5. I used the same code what you used. But, I didn’t get the output.
    my output is like
    Using cached version of kafka-clients-0.9.0.0.jar
    Using cached version of kafka_2.10-0.9.0.0.jar
    Using cached version of kafka-log4j-appender-0.9.0.0.jar
    Using cached version of messagehub.login-1.0.0.jar
    Using cached version of streaming-kafka.jar
    default location of ssl Trust store is: /usr/local/src/spark160master/ibm-java-x86_64-80/jre/lib/security/cacerts
    com/ibm/cds/spark/samples/config/jaas.conf
    Registering JaasConfiguration: /gpfs/fs01/user/s50c-6a587813864c4f-ad7aa014f64a/notebook/tmp/CxpsA1ahB8XF4hFp/jaas.conf
    default location of ssl Trust store is: /usr/local/src/spark160master/ibm-java-x86_64-80/jre/lib/security/cacerts
    ——————————————-
    Time: 1491481410000 ms
    ——————————————-

    ——————————————-
    Time: 1491481412000 ms
    ——————————————-

    ——————————————-
    Time: 1491481414000 ms
    ——————————————-

    ——————————————-
    Time: 1491481416000 ms
    ——————————————-

    ——————————————-
    Time: 1491481418000 ms
    ——————————————-

    ——————————————-
    Time: 1491481420000 ms
    ——————————————-

    ——————————————-
    Time: 1491481422000 ms
    ——————————————-

    ——————————————-
    Time: 1491481424000 ms
    ——————————————-

    ——————————————-
    Time: 1491481426000 ms
    ——————————————-

    ——————————————-
    Time: 1491481428000 ms
    ——————————————-

    ——————————————-
    Time: 1491481430000 ms
    ——————————————-

    ——————————————-
    Time: 1491481432000 ms
    ——————————————-

    ——————————————-
    Time: 1491481434000 ms
    ——————————————-

    ——————————————-
    Time: 1491481436000 ms
    ——————————————-

    ——————————————-
    Time: 1491481438000 ms
    ——————————————-

    ——————————————-
    Time: 1491481440000 ms
    ——————————————-

    ——————————————-
    Time: 1491481442000 ms
    ——————————————-

    ——————————————-
    Time: 1491481444000 ms
    ——————————————-

    ——————————————-
    Time: 1491481446000 ms
    ——————————————-

    ——————————————-
    Time: 1491481448000 ms
    ——————————————-

    ——————————————-
    Time: 1491481450000 ms
    ——————————————-

    ——————————————-
    Time: 1491481452000 ms
    ——————————————-

    ——————————————-
    Time: 1491481454000 ms
    ——————————————-

    ——————————————-
    Time: 1491481456000 ms
    ——————————————-

    ——————————————-
    Time: 1491481458000 ms
    ——————————————-

    ——————————————-
    Time: 1491481460000 ms
    ——————————————-

    ——————————————-
    Time: 1491481462000 ms
    ——————————————-

    ——————————————-
    Time: 1491481464000 ms
    ——————————————-

    ——————————————-
    Time: 1491481466000 ms
    ——————————————-

    ——————————————-
    Time: 1491481468000 ms
    ——————————————-

    ——————————————-
    Time: 1491481470000 ms
    ——————————————-

    ——————————————-
    Time: 1491481472000 ms
    ——————————————-

    ——————————————-
    Time: 1491481474000 ms
    ——————————————-

    ——————————————-
    Time: 1491481476000 ms
    ——————————————-

    ——————————————-
    Time: 1491481478000 ms
    ——————————————-

    ——————————————-
    Time: 1491481480000 ms
    ——————————————-

    ——————————————-
    Time: 1491481482000 ms
    ——————————————-

    ——————————————-
    Time: 1491481484000 ms
    ——————————————-

    ——————————————-
    Time: 1491481486000 ms
    ——————————————-

    ——————————————-
    Time: 1491481488000 ms
    ——————————————-

    ——————————————-
    Time: 1491481490000 ms
    ——————————————-

    ——————————————-
    Time: 1491481492000 ms
    ——————————————-

    ——————————————-
    Time: 1491481494000 ms
    ——————————————-

    ——————————————-
    Time: 1491481496000 ms
    ——————————————-

    ——————————————-
    Time: 1491481498000 ms
    ——————————————-

    ——————————————-
    Time: 1491481500000 ms
    ——————————————-

    ——————————————-
    Time: 1491481502000 ms
    ——————————————-

    ——————————————-
    Time: 1491481504000 ms
    ——————————————-

    ——————————————-
    Time: 1491481506000 ms
    ——————————————-

    ——————————————-
    Time: 1491481508000 ms
    ——————————————-

    ——————————————-
    Time: 1491481510000 ms
    ——————————————-

    ——————————————-
    Time: 1491481512000 ms
    ——————————————-

    ——————————————-
    Time: 1491481514000 ms
    ——————————————-

    ——————————————-
    Time: 1491481516000 ms
    ——————————————-

    ——————————————-
    Time: 1491481518000 ms
    ——————————————-

    ——————————————-
    Time: 1491481520000 ms
    ——————————————-

    ——————————————-
    Time: 1491481522000 ms
    ——————————————-

    ——————————————-
    Time: 1491481524000 ms
    ——————————————-

    ——————————————-
    Time: 1491481526000 ms
    ——————————————-

    ——————————————-
    Time: 1491481528000 ms
    ——————————————-

    ——————————————-
    Time: 1491481530000 ms
    ——————————————-

    ——————————————-
    Time: 1491481532000 ms
    ——————————————-

    ——————————————-
    Time: 1491481534000 ms
    ——————————————-

    ——————————————-
    Time: 1491481536000 ms
    ——————————————-

    ——————————————-
    Time: 1491481538000 ms
    ——————————————-

    ——————————————-
    Time: 1491481540000 ms
    ——————————————-

    ——————————————-
    Time: 1491481542000 ms
    ——————————————-

    ——————————————-
    Time: 1491481544000 ms
    ——————————————-

    ——————————————-
    Time: 1491481546000 ms
    ——————————————-

    ——————————————-
    Time: 1491481548000 ms
    ——————————————-

    ——————————————-
    Time: 1491481550000 ms
    ——————————————-

    ——————————————-
    Time: 1491481552000 ms
    ——————————————-

    ——————————————-
    Time: 1491481554000 ms
    ——————————————-

    ——————————————-
    Time: 1491481556000 ms
    ——————————————-

    ——————————————-
    Time: 1491481558000 ms
    ——————————————-

    ——————————————-
    Time: 1491481560000 ms
    ——————————————-

    ——————————————-
    Time: 1491481562000 ms
    ——————————————-

    ——————————————-
    Time: 1491481564000 ms
    ——————————————-

    ——————————————-
    Time: 1491481566000 ms
    ——————————————-

    ——————————————-
    Time: 1491481568000 ms
    ——————————————-

  6. arikatla – I had the same problem when I was setting the list of brokers, username and password incorrectly. Also, ensure you are using the correct url as stated in the comment above your comment.

  7. I think that using https://github.com/ibm-messaging/iot-messgehub-spark-samples/releases/download/v0.1/streaming-kafka.jar can lead to subtle bugs because the kafka configuration is hidden from you inside that library. Users should be encouraged to read through the code in that library that is in the package ‘com.ibm.cds.spark.samples.**’ to fully understand what the library is doing. For example, the hidden code sets up a kafka poll loop with a duration of 1 second. Anyone trying to use a batch window size smaller than 1 second will encounter issues. Also, kafka configuration parameters are set in the hidden code to values that will not make sense for all use cases (such as auto.offset.reset=latest).

Join The Discussion