Since this tutorial was published, IBM’s Message Connect service has been discontinued. This tutorial contains a workaround that lets you bypass that service, so you can still complete the steps here. But for the absolute latest, try the new version of this tutorial.

Looking for more notebook examples? Visit the Data Science Experience Learning Center.

In my Sentiment Analysis of Twitter Hashtags tutorial, we explored how to build a Spark Streaming app that uses Watson Tone Analyzer to perform sentiment analysis on a set of Tweets.

In that tutorial, Spark Streaming collects the Twitter data for a finite period. But it doesn’t run streaming analytics in real-time. It just accumulates the data into a static Resilient Data Set (RDD) for analysis in an IPython Notebook.

Here, I’ll show how to rebuild the analytics from that first tutorial for real-time streaming analytics. We’ll also publish the output to a live dashboard web app that displays and continuously updates graphic visualizations.

Note: You don’t need to complete the prior tutorial in order to follow the steps in this tutorial.

Overall Architecture

The following diagram represents all the different components for this application

Spark+Watson+Twitter Part 2 Architecture

  1. Message Connect Service: Streaming service available on Bluemix that connects to Twitter Stream and publishes the tweets to Kafka.
  2. Message Hub Service: High throughput, message bus Service powered by Apache Kafka
  3. Watson Tone Analyzer: provide sentiment analysis in the form of emotional, social, and writing tone scores.
  4. Spark Streaming application: Scala library consumes tweet events from Message Hub, enriches the data with Watson sentiment scores, runs the streaming analytics, and re-publishes the results to Message Hub as separate topics.
  5. Node.js web app: Provides a real-time dashboard that consumes the output of the streaming analytics from Message Hub and visualizes them as charts.

This tutorial explains how to build the app and covers these 5 components in detail.

Initial Setup

You’ll want to have a few things in place before you dive in. First, initiate Apache Spark and Watson Tone Analyzer services on Bluemix (IBM’s open cloud platform for building, running, and managing applications). You’ll also create a Scala notebook where we’ll store credentials and control streaming. Finally, you’ll want to get OAuth credentials from Twitter so you can access actual tweets.

Initiate IBM Analytics for Apache Spark Service and Add a Notebook

  1. Login to Bluemix (or sign up for a free trial).
  2. On your Bluemix dashboard, click Work with Data.
  3. Click New Service.
  4. Find and click Apache Spark then click Choose Apache Spark
  5. Click Create.
  6. Click Open.
  7. Click the Object Storage tab.
  8. add_obj_stor

  9. Click the Add Object Storage button and click Create.
  10. Click the My Notebooks tab.
  11. Click the Create Notebook button.
  12. Click the From URL tab.
  13. Enter any name, and under Notebook URL enter https://github.com/ibm-cds-labs/spark.samples/raw/master/streaming-twitter/notebook/Spark%20Streaming%20Twitter-Watson-MessageHub.ipynb
  14. Click Create Notebook
  15. Leave this notebook open. You’ll return here in a minute to enter information.

Initiate Watson Tone Analyzer Service

  1. In a new browser tab or window, open Bluemix, go to the top menu, and click Catalog.
  2. In the search box, type Tone Analyzer.
  3. Click the Tone Analyzer tile, then click Create.
  4. On left side of the screen, click Service Credentials
    tone_analyz_creds
  5. Copy and paste the username and password values into the Scala Notebook you just created. In the //Watson Tone Analyzer service section, replace the XXXXs for each Watson credential.
     "credentials": {
           "url":"XXXXX",
           "username":"XXXXX",
           "password":"XXXXX"
      }

    Leave this notebook open in a browser window. You’ll add more credentials in a few minutes.

Generate OAuth Credentials for Twitter

You’ll need these OAuth credentials to create the Message Connect Twitter stream.

Create a new app on your Twitter account and configure the OAuth credentials.

  1. Go to https://apps.twitter.com/. Sign in and click the Create New App button
  2. Complete the required fields:
    • Name and Description can be anything you want.
    • Website. It doesn’t matter what URL you enter here, as long as it’s valid. For example, I used my Bluemix account URL: https://davidtaiebspark.mybluemix.net .
  3. Below the developer agreement, turn on the Yes, I agree check box and click Create your Twitter application.
  4. Click the Keys and Access Tokens tab.
  5. Scroll to the bottom of the page and click the Create My Access Tokens button.
  6. Copy the Consumer Key, Consumer Secret, Access Token, and Access Token Secret. You will need them in a few minutes.
    twitter_keys

Message Hub Service

Now you’re ready to start laying down infrastucture for live-streaming. Start by creating a Message Hub instance in Bluemix:

  1. In Bluemix, on top menu, click Catalog.
  2. At the top of the page, type Message Hub in the search box then click the Message Hub tile that appears.
  3. On the right side of the screen, in the Service Name box type: messagehub-spark.
  4. Accept the other default settings (Standard plan is currently the only choice and leave the service unbound for now) and click Create.

    The service launches.

The Scala notebook provides a convenient and consistent mechanism to pass credentials for the many services this app uses. To do so, it uses the following setConfig methods in a Scala notebook cell:

   val demo = com.ibm.cds.spark.samples.MessageHubStreamingTwitter
    val config = demo.getConfig()
    ...
    //bootstrap.servers contains the list of brokers referenced in the
    //"kafka_brokers_sasl" field of the credential json
    config.setConfig("bootstrap.servers","kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094")

    config.setConfig("api_key","XXXX")
    config.setConfig("kafka.user.name","XXXX")
    config.setConfig("kafka.user.password","XXXX")
    config.setConfig("kafka_rest_url","https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443")

    //"ka'fka.topic.tweet" contains the name of the topic used to publish the tweets.
        //You'll configure this value in the Message Connect Service section e.g twitter-spark
    config.setConfig("kafka.topic.tweet","twitter-spark")

We’ll enter these credentials for Message Hub in a few minutes, after we deploy our dashboard app.

Message Connect Service

Note: Message Connect requires an existing instance of Message Hub. So, you must complete the Message Hub section, before following these steps.

To create an Message Connect instance:

  1. In Bluemix, go to the top menu and click Catalog.
  2. Scroll down to the bottom of the page and click the Bluemix Labs Catalog link.
  3. At the top of the page, type Message Connect in the search box.
  4. Click the Message Connect tile that appears.
  5. Accept the default settings (Experimental plan is currently the only choice and leave the service unbound for now) and click Create.

    The service launches.

    IBM Message Connect for Bluemix is a streaming service that can connect to multiple streaming data sources, generate events and publish them to Message Hub. Message Connect supports the following connectors (with more to come in the future):
     

    – Twitter
    – SalesForce
    – MQ Light: Connect to an on-premise MQ Light instance
    – IBM Cloudant

    Something went wrong? If you get an error here that something went wrong, ignore it and move on to the next section. This tutorial provides workarounds for cases where this experimental service may not be available.

  6. Click the Create your first stream button.

  7. Enter a name for your stream like twitter-spark.

  8. Click the Twitter tile. A form opens at the bottom of the screen.

  9. Enter your Consumer Key, Consumer Secret, Access Token, and Access Token Secret (see this tutorial’s Initial Setup section to read how to get your Twitter OAuth credentials).

  10. (optional) If you want to filter the tweets by keywords and users, enter values in those fields.

  11. Click Create Stream.

    Initialization will take a minute, then you see your stream with status Running in the Message Connect dashboard.

The Message Connect service automatically detected that a Message Hub instance was created in the same space and started publishing tweets using a topic name based on the name given to the stream. In this case, the topic is twitter-spark (all lowercase, spaces replaced by a dash).

Note: As you see at the end of the Message Hub section, the scala notebook specifies the topic name with:

 config.setConfig("kafka.topic.tweet","twitter-spark")

To verify that all pieces are connected, go to your Message Hub dashboard and check that the twitter-spark topic has been created.

Message Hub Topics

Note: Message Connect is still an experimental service and may occasionally be not working. If that happens to you, you can use one of these 2 workarounds:

  • The Scala notebook you’ll run in a few minutes has 2 optional cells you can run to temporarily fill in for the Message Connect Service.
  • Or, you can run the KafkaProducerTest from the scala ide. The following screenshot shows how to create a run configuration in eclipse (You can use this Sample configuration properties file as a starting point)
    Kafka Producer Run configuration

Spark Streaming Application

Tip: This section talks about code you’ll find in this GitHub directory.

A Word About Spark Streaming

In our first tutorial on Sentiment Analysis with Spark, we showed how to build analytics on the Twitter + Watson data using notebooks. The data was statically stored in a Spark SQL Table or an RDD. For this app, we rebuild these analytics so you can run them continuously on the streaming data received from Twitter. To achieve that, we’ll use Spark Streaming which is an extension to the core Spark API.

Spark Streaming uses Discretized Streams (DStream) as opposed to RDDs for Spark Core. A DStream abstracts the streaming data into a continuous micro-batch of RDDs as shown in this diagram:

DStream micro-batches

Creating a Streaming Context

When building a Spark Streaming app, the first step is to create a StreamingContext from a SparkContext and specify the batch time interval (after which a new RDD is generated by the DStream). Also, you must enable checkpointing on the StreamingContext, which lets you persist RDD metadata information periodically. Checkpointing lets you:

  1. Resume operations after a restart (normal restart or due to failure)
  2. Enable stateful transformations between micro-batch by using the updateStateByKey API

The following code shows how to create a StreamingContext with a Batch Time Interval of 5 seconds, then set the checkpoint directory:

   ssc = new StreamingContext( sc, Seconds(5) )
    ssc.checkpoint(kafkaProps.getConfig( MessageHubConfig.CHECKPOINT_DIR_KEY ));

Configure Spark Streaming Checkpointing to use Swift Object Storage

Checkpoint directory is identified by a URI that must point to a hadoop compatible filesystem, which means that the filesystem must provide an implementation of the org.apache.hadoop.fs.FileSystem class, like any of the following:

  • file. local filesystem
  • HDFS. Hadoop File System
  • FTP. File Transfer Protocol
  • S3. Amazon S3
  • swift. OpenStack Object Store supported by Bluemix and Softlayer.

When running this app on Bluemix, you can use the Object Storage container associated with your Spark instance as the checkpoint directory. Here’s how that works:

  1. Url must have the following format:
    swift://notebook.<name>/<container>

    where:

    • <name> is an abritrary string, like spark, that you’ll use later in the hadoop configuration step
    • <container> is the name of the container or folder where all the files will live, like ssc.
  2. Hadoop configuration for swift: set the following key/values pair in the hadoopConfiguration hashmap, as in this scala code:
    val prefix = "fs.swift.service.<name>"
    val hconf = sc.hadoopConfiguration
    hconf.set(prefix + ".auth.url", "<auth_url>/v2.0/tokens")
    hconf.set(prefix + ".auth.endpoint.prefix", "endpoints")
    hconf.set(prefix + ".tenant", "<project_id>")
    hconf.set(prefix + ".username", "<user_id>")
    hconf.set(prefix + ".password", "<password>")
    hconf.setInt(prefix + ".http.port", 8080)
    hconf.set(prefix + ".region", "<region>")
    hconf.setBoolean(prefix + ".public", true)

    As I mentioned, <name> must match the name specified in swift url. Other values, like auth_url, project_id, etc. come from your Object Storage service credentials in Bluemix.

    Object Storage Credentials

    For convenience and consistency, the application also supports passing the variables directly via the setConfig method in the Scala Notebook:

    val demo = com.ibm.cds.spark.samples.MessageHubStreamingTwitter
    val config = demo.getConfig()
    ...
    config.setConfig("name","spark");
    config.setConfig("auth_url","https://identity.open.softlayer.com");
    config.setConfig("project_id","XXXXXXXXXXXXXXXXXXXXX");
    config.setConfig("region","dallas");
    config.setConfig("user_id","XXXXXXXXXXXXXXXXXX");
    config.setConfig("password","XXXXXXXXXX");

For this tutorial, that’s the method we’ll use. So follow these steps, to enter the proper credentials in your Scala Notebook:

  1. In Bluemix, go to your dashboard.
  2. Click your Apache Spark Object Storage service to open it.
  3. Within the menu on the left, click Service Credentials.
  4. Copy and paste the 3 object storage credentials (Project_ID, userId, and password), replacing the XXX’s in your Scala notebook’s //Spark Streaming checkpointing configuration section.
  5. Again, leave this notebook open. You’ll return here to enter one last set of credentials in a minute.

Create a Custom Receiver for MessageHub/Kafka

Apache Spark already provides a Kafka connector for Spark Streaming based on Kafka 0.8, but we can’t use it here because Message Hub requires Kafka 0.9. So, I built a custom Spark Streaming receiver for Message Hub using Kafka 0.9 (see build.sbt updates to point at Kafka 0.9 libraries from Maven repository)

Tip: You can find the code in KafkaInputDStream.scala.

To create a new receiver, you need to create a scala class that inherits from org.apache.spark.streaming.dstream.ReceiverInputDStream and override the getReceiver method which returns an instance of type org.apache.spark.streaming.receiver.Receiver
In turn, the Receiver must implement the following lifecycle methods:

  • onStart: called when the receiver is started. Starts a new Thread that will poll MessageHub for new messages and store them in Spark’s memory.
  • onStop: called when the receiver is stopped. Cleans up all resources and stops the Thread.

See custom receiver full documentation

 def onStart() {
    ...

    //Create a new kafka consumer and subscribe to the relevant topics
    kafkaConsumer = new KafkaConsumer[K, V](kafkaParams)
    kafkaConsumer.subscribe( topics )

    new Thread( new Runnable {
        def run(){
            try{
                while( kafkaConsumer != null ){
                    var it:Iterator[ConsumerRecord[K, V]] = null;
                    if ( kafkaConsumer != null ){
                        kafkaConsumer.synchronized{
                            //Poll for new events
                            it = kafkaConsumer.poll(1000L).iterator
                            while( it != null && it.hasNext() ){
                                //Get the record and store it
                                val record = it.next();
                            store( (record.key, record.value) )
                            }
                            kafkaConsumer.commitSync
                        }
                    }
                    Thread.sleep( 1000L )
                }
                println("Exiting Thread")
            }catch{
                case e:Throwable => {
                    reportError( "Error in KafkaConsumer thread", e);
                    e.printStackTrace()
                }
            }
        }
    }).start
}

For convenience, I also created an implicit method called createKafkaStream that can be called from a StreamingContext object.

Build the streaming analytics

Tip: To follow the discussion in this section, refer to the code implemented in the runAnalytics method in MessageHubStreamingTwitter.scala.

We want to re-implement the last 2 analytics from our earlier static data version of this app, but this time, in Scala and using the Spark Streaming APIs (DStream).

Since Watson Tone Analyzer understands only English, the first transformation is to filter the tweets, keeping only those in English:

 val tweets = stream.map( t => t._2)
  .filter { status =>
    Option(status.getUser).flatMap[String] {
      u => Option(u.getLang)
    }.getOrElse("").startsWith("en") && CharMatcher.ASCII.matchesAllOf(status.getText) && ( keys.isEmpty || keys.exists{status.getText.contains(_)})
  }

StreamingAnalytic1

The next transformation invokes Watson Tone Analyzer for each tweet and combines the sentiment scores with the tweet data:

     val rowTweets = tweets.map(status=> {
      lazy val client = PooledHttp1Client()
      val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar )
      var scoreMap : Map[String, Double] = Map()
      if ( sentiment != null ){
        for ( tone <- Option( sentiment.children ).getOrElse( Seq() ) ){
          for ( result <- Option( tone.children ).getOrElse( Seq() ) ){
            scoreMap.put( result.id, (BigDecimal(result.normalized_score).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 )
          }
        }
      }

      EnrichedTweet(
          status.getUser.getName,
          status.getCreatedAt.toString,
          status.getUser.getLang,
          status.getText,
          Option(status.getGeoLocation).map{ _.getLatitude}.getOrElse(0.0),
          Option(status.getGeoLocation).map{_.getLongitude}.getOrElse(0.0),
          scoreMap
      )
    })

EnrichedTweet is a helper case class that defines the combined data model, like Tweet + Sentiment scores

StreamingAnalytics2

Now, extract the hashtags into a flat map of encoded values with the proper encoding.

    val metricsStream = rowTweets.flatMap { eTweet => {
     val retList = ListBuffer[String]()
     for ( tag <- eTweet.text.split("\s+") ){
       if ( tag.startsWith( "#") && tag.length > 1 ){
           for ( tone <- Option( eTweet.sentimentScores.keys ).getOrElse( Seq() ) ){
               retList += (tag + delimTagTone + tone + delimToneScore + eTweet.sentimentScores.getOrElse( tone, 0.0))
           }
       }
     }
     retList.toList
   }}

The app extracts each hastag from the tweets and encodes as tag-tone-sentiment value.
StreamingAnalytics3

The next transformations are focused on preparing the data for the visualizations:

     .map { fullTag => {
       val split = fullTag.split(delimToneScore);
       (split(0), split(1).toFloat)
    }}

This transformation uses the map function to transform the tag-tone-sentiment value into a (tag-tone,sentiment) pair

StreamingAnalytic4

Now we can compute the average sentiment score for each tag-tone pair. To do that, we use the combineByKey method which lets us combine the elements using custom functions. In this case, we want to compute the total sum and count for each tag-tone pair.

combineByKey is a higher order function that takes 3 functions:

  1. createCombiner creates an initial Value when a key is first encountered
  2. mergeValue is invoked when a value has already been created for the key to process (merge) the new value to the existing one.
  3. mergeCombiner merges together values created on different partitions
     .combineByKey(
       (x:Float) => (x,1),  //CreateCombiner creates list of tuples (sum, count)
       (x:(Float,Int), y:Float) => (x._1 + y, x._2+1), //mergeValue
       (x:(Float,Int),y:(Float,Int)) => (x._1 + y._1, x._2 + y._2), //mergeCombiner
       new HashPartitioner(sc.defaultParallelism)
    )

StreamingAnalytics5

The next transformation maps the output of the previous transformation which is (tag-tone, (sum,count)) pair into (tag, List(sentimentLabel, average)). Notice that we wrap the (sentimentLabel, average) tuple into a List. This prepares for the next transformation which reduces all the keys into a (tag, List( (sentimentLabel, average) )

     .map[(String,(Long/*count*/, List[(String, Double)]))]{ t => {
     val key = t._1;
     val ab = t._2;
     val split = key.split(delimTagTone)
     (split(0), (ab._2, List((split(1), BigDecimal(ab._1/ab._2).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble ))))
   }}

StreamingAnalytics6

The next transformation reduces the map by Key and aggregates all the associated values (tone,average_score) into a list of tuples. Using the mapValues transformation, we call the unzip function to separate the list of tones from their respective scores (to make it easier to write code that will create the chart). We end up with a DStream of (Tag, (count, List of Tones, List of average scores)) pairs.

    .reduceByKey(
        (t,u) => (t._1+u._1, (t._2 ::: u._2)
            .sortWith( (l,r) => l._1.compareTo( r._1 ) < 0 ))
    )
    .mapValues( (item:(Long, List[(String,Double)])) => {
     val unzip = item._2.unzip
     (item._1/(item._2.size), unzip._1, unzip._2)
   })

StreamingAnalytics7

For the last transformation, we need to maintain state within the DStream so that the next micro-batch can include the metrics calculated before. For that, we call updateStateByKey which maintains arbitrary state data for each key of the DStream. This function applies only to (key,value) pair DStream, which is what we’ve got here.
The following code calls the closure for each key, passing the previous value to be merged.

    .updateStateByKey( (a:scala.collection.Seq[(Long, List[String], List[Double])], b: Option[(Long, List[String], List[Double])]) => {
        val safeB = b.getOrElse( (0L, List(), List() ) )
        var listTones = safeB._2
        var listScores = safeB._3
        var count = safeB._1
        for( item <- a ){
        count += item._1
        listScores = listScores.zipAll( item._3, 0.0, 0.0).map{ case(a,b)=>(a+b)/2 }.toList
        listTones = item._2
        }

        Some( (count, listTones, listScores) )
   })

Publish the Results to MessageHub/Kafka

We successively apply (chain) the set of transformations described in the previous section to produce the metricsStream DStream. Transformations are lazy, which means that they execute only when an action is called, like collecting output from the DStream.

In the following code, we collect the first 5 records (knowing that they’re already sorted correctly) and publish them as JSON format to Message Hub:

    metricsStream.foreachRDD( rdd =>{
     val topHashTags = rdd.sortBy( f => f._2._1, false ).take(5)
     if ( !topHashTags.isEmpty){
       queue.synchronized{
         queue += (("topHashTags", TweetsMetricJsonSerializer.serialize(topHashTags.map( f => (f._1, f._2._1 )))))
         queue += (("topHashTags.toneScores", ToneScoreJsonSerializer.serialize(topHashTags)))
           try{
             queue.notify
           }catch{
             case e:Throwable=>logError(e.getMessage, e)
           }
       }
     }
   })
  }

Notice that we don’t directly call the kafka api to send the event. This is because the Spark framework requires the transformation closure (anonymous function passed as arguments to high-order functions), and unfortunately some of the needed kafka classes (like org.apache.kafka.clients.producer.ProducerRecord) are not serializable. To work around this issue, we post the JSON record in an asynchronous queue and have a separate Thread publish them to Message Hub asynchronously. The Thread is created only on the driver machine, but this is OK because the closure passed to the foreachRDD method executes only in the driver machine.

Real-time Dashboard Node.js Web Application

Now that we have completed the streaming analytics, the next step is to display the results in a dashboard that updates continuously.

Tip: To follow this discussion, see the code in this Node.js app’s GitHub repository. Also be sure to deploy your own copy.

Deploy the App

The fastest way to deploy this app to Bluemix, is to click the Deploy to Bluemix button.

If you’d rather deploy manually, refer to the readme.

Enter Message Hub Credentials in Notebook

  1. In bluemix, go to the dashboard.
  2. Click your new Twitter-Spark-Watson-Dashboard app to open it.
  3. In the menu on the left, click Environment Variables.

    You see the following json.

    {
       "messagehub": [
          {
             "name": "messagehub-spark",
             "label": "messagehub",
             "plan": "standard",
             "credentials": {
                "api_key": "XXXX",
                "kafka_admin_url": "https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443",
                "kafka_rest_url": "https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443",
                "kafka_brokers_sasl": [
                   "kafka01-prod01.messagehub.services.us-south.bluemix.net:9093",
                   "kafka02-prod01.messagehub.services.us-south.bluemix.net:9093",
                   "kafka03-prod01.messagehub.services.us-south.bluemix.net:9093",
                   "kafka04-prod01.messagehub.services.us-south.bluemix.net:9093",
                   "kafka05-prod01.messagehub.services.us-south.bluemix.net:9093"
                ],
                "user": "XXXX",
                "password": "XXXX"
             }
          }
       ]
    }
    
  4. Copy and paste the 3 Message Hub credentials (api_key, user, and password), replacing the XXX’s in your Scala notebook’s //Message Hub/Kafka service section.
  5. Again, leave this notebook open. You’ll return here to run some code in a minute.

Mozaik Dashboard Framework

To build this dashboard, we chose the Mozaik framework because it provides an easy-to-use widget framework based on ReactJS components. It also provides automated calling of api endpoints via WebSockets. Mozaik actually supports multiple dashboards. If more than one is defined, then the framework automatically rotates between them according to a customizable rotation duration. In this sample app, we define only one dashboard that contains 2 widgets:

  1. Pie chart showing the top 5 hashtags by number of occurences.
  2. Multi-series bar charts showing the average of all tone scores for tweets containing the top 5 hashtags

Defining the layout declaratively

With Mozaik, the layout of each dashboard is conveniently defined declaratively in config.js using a json syntax:

    ...
    dashboards: [
        {
            columns: 5,
            rows: 100,
            widgets: [
                {
                    type: 'sparkTwitter.top_hash_tags',
                    columns: 3, rows: 45,
                    x: 1, y: 4
                },
                {
                    type: 'sparkTwitter.tone_breakdown',
                    columns: 3, rows: 45,
                    x: 1, y: 51
                }
            ]
        }
    ]

The dashboards field is an array of JSON objects that each define a dashboard layout. The columns and rows fields represent the number of columns and rows in the tabular layout. The widgets field contains an array of JSON objects that describe a rectangular area. Each of these objects contains the following information:

  • columns number of columns taken by the area
  • rows number of rows taken by the area
  • x 1-based index indicating the start x-position of the area
  • y 1-based index indicating the start y-position of the area
  • type an id that identifies the widget being rendered in the specified area.

The type consists of 2 parts: the extensions id and the widget id.

You’ll find the extension id definition in App.jsx:

...
var extensions = {
    sparkTwitter: sparkTwitterComponents
};
...

In turn, the widget type definition is in mozaik-ext-sparkTwitterComponents.js.

module.exports = {
    topHashTags: require('./sparkTwitterTopHashTags.jsx'),
    toneBreakdown: require('./sparkTwitterToneBreakdown.jsx')
};

Note: We declare the id using camel case format in the Node.js module. But in config.js, we need to use snake case format (topHashTags becomes top_hash_tags)

As you see, the widget implementation is in the module referenced by the require statement. For example, the topHashTags widget is implemented in sparkTwitterTopHashTags.jsx. We’ll discuss the lifecycle of these widgets in a minute. For now, please take some time to look over these different files and to consult the documentation for the different frameworks used, like ReactJS and C3/D3.

Access Message Hub Events Using message-hub-rest Node Module

The dashboard uses API endpoints to get regular data updates. In turn, these API endpoints get updates by subscribing to Message Hub using the Kafka Rest Proxy apis. For convenience, this app uses the message-hub-rest module which provides high-level APIs to create a new consumer instance and publish/subscribe events to/from this instance.

The following code (available in messageHubBridge.js) shows how to create a new instance by reading the credentials from the VCAP_SERVICES. We first grab the VCAP_SERVICES data that contains the credentials for the Message Hub Service. We then use them to create a new MessageHub object, which provides the interface to communicate with Message Hub. In this sample app, we subscribe to 2 topics: topHashTags and topHashTags.toneScores. Note that Message Hub doesn’t auto-create them, so to avoid an error, we call createTopicIfNecessary first, to make sure they exist.

    var consumerInstanceName = "spark_twitter_consumer_instance";
    var topics = ["topHashTags", "topHashTags.toneScores"];

    ...

    var services = process.env.VCAP_SERVICES || configManager.get("DEV_VCAP_CONFIG");
    var instance = new MessageHub(services);
    instance.topics.get()
        .then( function(response){
            console.log("List of topics: ", response);

            //Change in MessageHub on 11/2/2015: topics are not autocreated anymore         var createTopicIfNecessary = function( topic ){
                if ( !_.find( response, function( t ){ return t === topic || (t.hasOwnProperty("name") && t.name === topic) })){
                    instance.topics.create(topic)
                    .then(function(res){
                        console.log("Successfully created topic " + topic);
                    })
                    .fail( function(error){
                        console.log("Unable to create topic "+topic, error);
                    })
                }
            }

            _.forEach( topics, function(topic){
                createTopicIfNecessary(topic);
            })
            ...
        })
        .fail( function(error){
            console.log("Failed to get list of topics: " + error);
        });

We then call the consumeTopic routine that periodically polls the Message Hub server to get new data:

//Helper that consumer a topic from MessageHub
    var consumeTopic = function( topic ){
        console.log("Create MessageHub consumer for topic: " + topic );
        instance.consume('consumer_' + topic, consumerInstanceName, { 'auto.offset.reset': 'largest' })
            .then( function( response ){
                var consumerInstance = response[0];
                var inProgress = false;
                //Set the interval for messages consuming
                setInterval( function(){
                    if ( inProgress ){
                        return;
                    }
                    inProgress = true;
                    consumerInstance.get(topic)
                        .then(function(data) {
                            inProgress = false;
                            if ( _.isArray(data) ){
                                if ( data.length > 0 ){
                                    //Take only the last value
                                    try{
                                        messagesByTopics[topic] = JSON.parse( data[data.length - 1] );
                                    }catch(e){
                                        console.log("Unable to parse Message Hub data", e, data[data.length-1]);
                                    }
                                }
                            }else{
                                messagesByTopics[topic] = data;
                            }
                        })
                        .fail(function(error) {
                            inProgress = false;
                            console.log("Unable to consume topic: " + topic, error);
                        });
                }, 4000);
            })
            .fail( function(error){
                console.log("Unable to get consumer instance for topic: " + topic, error);
            })
    };

The data received from Kafka Topics is stored in the messagesByTopics variable to be read later by the API endpoint when the dashboard sends a new request over WebSocket channel. The APIs endpoint implementation is located in sparkTwitterApiClient.js. Each API is declared as a field to the main object and must return a Promise object that returns the selected topic data from the MessageHubBridge. The runInterval field indicates how often the data refreshes from the browser.

    var client = function (mozaik) {
        return {
            runInterval: 2000,
            getTopHashTags: function(params) {
                return new Promise( function( resolve, reject){
                        resolve();
                    })
                    .then(function (res) {
                        console.log("Calling Api with params: ", params);
                        return messageHubBridge.getTopicMessage(params.topic);
                    });
            },
            getToneBreakdown:function(params) {
                return new Promise( function( resolve, reject){
                        resolve();
                    })
                    .then(function (res) {
                        console.log("Calling Api with params: ", params);
                        return messageHubBridge.getTopicMessage(params.topic);
                    });
            }
        };
    };

That’s it! The framework automatically sets the WebSocket connection and periodically calls the api to get new data. Next, you see how widgets consume the data to refresh the charts.

Visualize the results

As mentioned, each widget is implemented as a ReactJS Component, which provides a set of lifecycle events. Some that we used in our app are:

  • getInitialState initializes the widget state before the component is mounted.
  • getApiRequest specifies the API id configured in the main App.jsx module.
  • onApiData runs when the Mozaik framework gets new data from the WebSocket channel.
  • componentDidMount is invoked immediately after the widget renders. This is where the widget content is dynamically created. For example, the WinsOverTime widget creates the c3 chart.
  • componentDidUpdate is invoked when the new data has been received so that the widget can be refreshed accordingly.
  • componentWillUnmount is invoked when the component is about to be destroyed, so the widget can clean up any associated resources.
  • render returns the html fragment that will contain the widget.

You read how the data updates by consuming topics from Message Hub. Once new data comes in, the Mozaik framework automatically calls onApiData. The widget component then updates the state with a call to the setState method which in turn, triggers a refresh of the charts. For example, in sparkTwitterToneBreakdown.jsx:

    ...
    onApiData(metrics) {
        this.setState({"colData":metrics});
    }

    componentDidUpdate() {
        if ( this.state.colData.length > 0 ){
            this.chart.load(this.getChartData());
            this.onResize();
        }
    }
    ...

The C3 framework creates the chart object and provides a high-level programming model that is much easier to use than manipulating d3 apis directly.

Running the application from a Scala Notebook

Here’s how to quickly run the app from the Scala Notebook, you copied in the first section:

  1. Confirm you’ve filled in all credentials. (You shouldn’t see an more XXXX entries.)
  2. In the Scala notebook, run code cells 1-2 to load the jar and connect to services.
  3. If Message Connect isn’t working for you, enter your Twitter credentials in cell 3, and run it. This code will fill in for Message Connect to gather tweets.
  4. Run cell 4 to kick off Spark Streaming.
    You’ll see results streaming into your notebook. Let it run.
    Spark Streaming running
  5. Launch the dashboard app.
    In a separate browser tab or window, to go your Bluemix dashboard and launch your Twitter-Spark-Watson-Dashboard app by opening its URL.

    You see the chart updating every few seconds:

    Running Realtime dashboard

Conclusion

Now you know how to use IBM Analytics for Apache Spark in combination with Message Hub and Message Connect to deliver real-time streaming analytics. You’ve even got a live dashboard web app featuring charts and graphics that update continously, so your corner-office decision-makers can see trends as they happen.

Feel free to fork and enhance this code. Some suggestions:

  • select hashtags you want to monitor
  • set alerts based on emotion thresholds
  • display select tweets

The possibilities are endless. IBM Bluemix has an unparalleled selection of services (data, cognitive, mobile, etc.) that you can use to build the next killer feature. When you do, don’t forget to let us know about it!

Join The Discussion

Your email address will not be published. Required fields are marked *