Introduction

With IBM® Streaming Analytics for Bluemix™, you can perform real-time analysis on data in motion as part of your Bluemix application. The Streaming Analytics service is powered by IBM InfoSphere® Streams, which is an advanced analytic platform that custom applications use to quickly ingest, analyze, and correlate information as it is produced by real-time data sources. InfoSphere Streams can handle very high data rates and perform its analysis with predictable low-latency, so your application can operate at the speed of data.

This tutorial explains how to obtain, run, and extend a starter application that is written in Liberty for Java™ and that uses the Streaming Analytics service.

What you’ll need to build your application

  • A Bluemix account and a DevOps Services account, both linked to your IBM ID.
  • The Cloud Foundry cf command line tool.

Overview of the New York City Traffic Streaming Analytics starter application

It is helpful to understand some background information about the application that is used in this tutorial. This demo application shows a Bluemix Liberty application that uses the Streaming Analytics service. The Bluemix application uses the Streaming Analytics service REST API to interact with the Streaming Analytics service instance and to control an InfoSphere Streams application. This application reads data from a public website on the Internet, performs analytics on the data, and uses HTTP to post the results back to the Liberty application. The Liberty application also provides the user interface to display the calculated results.

The starter application retrieves public data from New York City traffic sensors, calculates aggregate statistics, and displays summary lists.

Real-time traffic speed data

The NYC Traffic Sample demo data comes from a public website created by the New York City Department of Transportation (DOT). The Transportation Management Center maintains a map of traffic speed detectors throughout the city. The speed detectors themselves belong to various city and state agencies.

 


 

Tutorial steps

The steps in this tutorial are:

  1. Create a Bluemix app and bind the Streaming Analytics service to it.
  2. Obtain your own copy of the source code.
  3. Deploy your app.
  4. View the running app.
  5. Review the Liberty application source code.
  6. Review the InfoSphere Streams application source code.
  7. Stopping the Bluemix and Streams applications.
  8. Customize or extend the app.

Step 1 – Create a Bluemix app and bind the Streaming Analytics service

  1. Log in to Bluemix. In the dashboard, click the CREATE AN APP button. Find and click Liberty for Java from the available runtimes in the catalog. Enter a name and host of your liking and click CREATE.
  2. On the page displayed after creating the app, click the ADD A SERVICE button. Locate the Streaming Analytics service in the catalog (under Big Data) and click it. Then click CREATE to bind it to your application.

Step 2 – Obtain your own copy of the source code

The source code for the starter application is stored in an IBM DevOps Service project, so there are multiple ways that you can obtain your own copy of the code, including forking the project, cloning the git repository, or simply downloading the contents as a zip file.

Use this link to download the zip file: NYC Traffic Sample Application.

  1. After the download is completed, extract the .zip file.
  2. Rename the directory NYCTrafficSample in the extracted files to match the name of the Bluemix Liberty application that you created in Step 1 of this tutorial, for example, myapp.

Step 3. Deploy your app

Deploy the starter application as-is, without any code changes. Later in the tutorial, you will have a chance to customize the application and deploy again.

  1. In a shell window, cd to the directory containing your extracted application.
    cd myapp
  2. Connect to BlueMix using your API URL. Typically, this is http://api followed by your Bluemix URL. For example, http://api.ng.bluemix.net.
    cf api api-url
  3. Log in to BlueMix:
    cf login
  4. Deploy your app:
    cf push myapp

Step 4 – View the running app

In Bluemix, access the route for your application in your web browser. A basic web page (shown below) is displayed with the title “Streaming Analytics Traffic Sample”. The browser data auto-refreshes each minute. Use the different parts of this web page to see the results and status of the app.

  • The Current Application Status line shows information about the InfoSphere Streams application.
  • The New York City Traffic Information section displays three tables of the top ten statistics calculated by the InfoSphere Streams application.
  • The Full Application Status section shows the complete history of the Liberty application interacting with Streaming Analytics service.

NYCApp

Application Overview

The starter application is a complete, yet simple, application. There is no customization required for the application to be run. The overall application logic flow is:

  1. Check instance status and start the Streaming Analytics instance if necessary.
  2. Submit the InfoSphere Streams application.
  3. Start a background timer to cancel the job and stop the instance after a period of time.
  4. Gather output from the SPL (Streams Processing Language) program. (The SPL program uses an HTTPPost operator to send data to a POST API in the Liberty application.)
  5. Return application status information via the RefreshStatus REST API that interacts with the Streaming Analytics service.

A block diagram of the components involved in the starter application is provided below.
demoappBlock

Step 5 – Review the Liberty application source code

To understand the application, we start with examining the Liberty application source code.

  1. Open the index.html file to view the basic user interface. Notice how it has a number of different divisions and a call to the index.js file. These divisions are populated dynamically by calls in the javascript.
    <div id="FullAppStat"></div>
    <div id="id00"></div>
    <script type="text/javascript" src="index.js"></script>
  2. Open the index.js file. This javascript program interacts via REST calls with the TrafficResource Java program that implements jax-rs interfaces for this application’s data. Notice the call to refresh status and setting an interval to call every 60 seconds.
    refreshStatus(); // run once immediately
    var refreshInterval1 = window.setInterval(refreshStatus, 60000); // Refresh status every 60 seconds
    
  3. Open the TrafficResource.java file to view the application logic. There are several functions this program provides:
    1. Interact with the Streaming Analytics service instance.The code in synchronized private static JSONObject getCredentials() will initialize and get vcap information used on all REST calls to the service. As with other Bluemix services and add-ons, you must extract the credentials and service URLs for the Streaming Analytics services from the VCAP_SERVICES runtime environment variable. The starter application performs this step. Below is an example of the environment information that is returned for the Streaming Analytics service. It includes a subsection labeled credentials which includes all the information required to use the REST API of the service. The host and port for the API are provided. Paths are provided to each of the operations in the API. Finally a userid and password are provided that must be passed on each call to the API.
      "streaming-analytics": [
        {
          "name": " Streaming Analytics-dev-aj",
          "label": "streaming-analytics",
          "plan": "Beta",
          "credentials": {
            "bundles_path": "/jax-rs/bundles/service_instances/xxx/service_bindings/xxx",
            "start_path": "/jax-rs/streams/start/service_instances/xxx/service_bindings/xxx",
            "statistics_path": "/jax-rs/streams/statitics/service_instances/xxx/service_bindings/xxx",
            "status_path": "/jax-rs/streams/status/service_instances/xxx/service_bindings/xxx",
            "rest_port": "443",
            "resources_path": "/jax-rs/resources/service_instances/xxx/service_bindings/xxx",
            "jobs_path": "/jax-rs/jobs/service_instances/xxx/service_bindings/xxx",
            "rest_url": "https://streams-app-service-dev.stage1.mybluemix.net",
            "userid": "xxx",
            "stop_path": "/jax-rs/streams/stop/service_instances/xxx/service_bindings/xxx",
            "rest_host": "streams-app-service-dev.stage1.mybluemix.net",
            "password": "xxx"
          }
        }
      ]
    2. Make specific calls such as get instance status, get job status, start instance, stop instance submit job, and cancel job. Here’s an example of one of the more complex calls, submit job:
      1. In firstTimeInit code, note how it obtains the URL this Liberty application is running in and passes it as a submission time parameter to the InfoSphere Streams application. This is the URI of the REST API that the SPL program will send data to.
        JSONObject submitAPIResult= getsubmit(uriBase+"jax-rs/addLine");
      2. In getsubmit, it builds the JSON input necessary on the submit REST API:
         
        final String fileName = "NYCTraffic.sab";
        JSONObject submitJSON = new JSONObject();
        JSONObject parms = new JSONObject();
        parms.put("Top10.host", uri);
        submitJSON.put("submissionParameters", parms);
        JSONObject config = new JSONObject();
        submitJSON.put("configurationSettings", config);
        submitJSON.put("jobName", jobName);
        LOGGER.info("Here's the submit json: " + submitJSON);
        return doSubmit(fileName, submitJSON);
        
      3. doSubmit will build the multipart mime data needed on the submit REST API and call the API:
        private JSONObject doSubmit(String fileName, JSONObject submitJSON)    {
          LOGGER.info("doSubmit for file: " + fileName);
          LOGGER.info("doSubmit for json: " + submitJSON.toString());
                
          final JSONObject result = new JSONObject();
          result.put("APICall", "submit");
          result.put("APIstatus", "starting");
               
          try {
            final String fullFileName = "./apps/myapp.war/public/" + fileName;
            File file = new File(fullFileName);
            LOGGER.info("File to upload is: " + file.getAbsolutePath().toString());
            if(!file.exists()) {
              LOGGER.severe("Response EXCEPTION: Application Not Found: file: " + file.getAbsolutePath() + " ... " +
                     file.getAbsolutePath());
              throw new WebApplicationException(createJSONResponse(Status.NOT_FOUND, "Application not found"));
            }
                    
            final String url = instanceURL + jobsPath + "?bundle_id=" + fileName;
            result.put("submit", url);
            // Using a Custom client so we can set to use TLSv1.2 
            CloseableHttpClient httpclient = HttpClients.custom().
                            setSSLSocketFactory(sslSocketFactory).build();
            HttpPost httpPost = new HttpPost(url);
            httpPost.addHeader("accept", "application/json");
            httpPost.addHeader("Authorization", getAPIKey());
                    
            FileBody bin = new FileBody(file);
            LOGGER.info("Here's the submit json: " + submitJSON);
                   
                    
            StringBody submitJson = new StringBody(submitJSON.toString(), ContentType.APPLICATION_JSON );
        
            HttpEntity reqEntity = MultipartEntityBuilder.create()
                    .addPart("bin", bin)
                    .addPart("json", submitJson)
                    .build();
        
            httpPost.setEntity(reqEntity);   
                    
            CloseableHttpResponse response1 = httpclient.execute(httpPost);
            try {
                LOGGER.info(response1.getStatusLine().toString());
                HttpEntity entity1 = response1.getEntity();
                        
                InputStream is = entity1.getContent();
                JSONObject submitResult = JSONObject.parse(is);
                result.put("submit",submitResult);
                if (response1.getStatusLine().getStatusCode() != 200) {
                    result.put("APIstatus", "failed");
                } else {
                        result.put("APIstatus", "complete");
                }
                EntityUtils.consume(entity1); // and ensure it is fully consumed
            } finally {
                response1.close();
              }
            } catch (final Exception exception) {
                exception.printStackTrace();
            LOGGER.severe("Get submit EXCEPTION: " + exception.getMessage());
                exception.printStackTrace();
                result.put("APIstatus", "exception");
                result.put("exception", exception.toString());
            }
            LOGGER.info("GET submit response body: " + result);
        
            return result;
        }
        
    3. Interact with the InfoSphere Streams application that is producing the analyzed data. Examine the source code in the addLine method.
       /**
       * POST 
       * rest api to receive a json string and add to lines 
      */
      @POST
      @Path("addLine")
      @Consumes(MediaType.APPLICATION_JSON)
      @Produces(MediaType.TEXT_PLAIN)
      public String addLine(final JSONObject         jsonObject) {
      
        LOGGER.info("POST addLine: add a json line: " + jsonObject.toString());
        JSONObject parsedObj = null;
        try {
          parsedObj = JSONObject.parse((String) jsonObject.get("jsonString"));
        } catch (IOException e) {
          e.printStackTrace();
        }
        topNs.put(parsedObj.get("attributeName").toString(), parsedObj);
        return "Line added";
      }
      

      It implements a REST API to receive JSON data from the application and saves it in the private static HashMap<String, JSONObject> topNs = new HashMap<String, JSONObject>();. The topNs will then be used when the JavaScript retrieves the top ten lists to display by calling the top tens API.

      /**
        * GET 
        * TopTenLists retrieve the top ten lists 
        */
      @GET
      @Path("topTens")
      @Produces(MediaType.APPLICATION_JSON)
      public JSONObject getTopTens()    { 
          JSONObject topTensObj = new JSONObject();
          JSONArray tensArray = new JSONArray();
          for (JSONObject value : topNs.values()) {
            tensArray.add(value);
          }
          topTensObj.put("TopTens", tensArray);
          return topTensObj;
      }
      

Step 6 – Review the InfoSphere Streams application source code

The InfoSphere Streams application SPL program was developed in the InfoSphere Streams Quick Start Edition.

NYCTraffic_spl2

The main operators in its flow are:

  1. stream <rstring rawObservation> inputStream = InetSource()
    Which periodically reads information from http://207.251.86.229/nyc-links-cams/LinkSpeedQuery.txt. The information is returned as rstring lines.
  2. stream<NYCTrafficSchema> Tokenized = Custom(inputStream)
    Which parses the xml and turns the data into a sequence of InfoSphere Streams tuples.
  3. stream<NYCTrafficSchema> deduped = DeDuplicate(Tokenized)
    Which looks for duplicate tuples and throws out any that have been seen before. This is needed because the input data does not always accurately update each sensor each minute.
  4. Aggregate.
    stream<OutputSchema> Agg1 = Aggregate(deduped)  {                                                                                 
        window                                                                          
            deduped : sliding, time(300), time(60);                                                     
        param                                                                           
            groupBy : Id;       
            aggregateIncompleteWindows : true;                                              
        output Agg1 :   
        currentSpeed = Last(Speed),
        time = Last(DataAsOf),
            maxSpeed = Max(Speed),
            minSpeed = Min(Speed),
            avgSpeed = Average(Speed),
            linkName=Last(LinkName);                        
    }  
    

    The aggregate produces statistics over a 5 minute window, outputting one set for each link identifier each minute.

  5. The output of the aggregation is input to 3 composite operators, for example, stream<OutputList> minSpeed = Top10(Agg1). The composite operator will:
    1. Sort by the attribute of interest.
    2. Use a custom operator to clip the number of tuples from each aggregate output to the top 10.
    3. Convert the tuple to a JSON representation.
    4. Use the HTTPPost operator to send the data to our Liberty application’s REST API.

Step 7 – Stopping the Bluemix and Streams applications

The starter application consists of two parts: the Bluemix Liberty application running in the Liberty runtime and the Streams application running in the Streaming Analytics service instance.

The Liberty Application has a timer built in that if left running will cancel the streams application and stop the streams instance after 2 hours, freeing up the streams instance resources.

If you wish to stop and/or restart the Bluemix application you can do this from the Bluemix console.

StopBluemixApp

Note: Stopping the Bluemix application will not cancel the the streams application job. To cancel the streams application job, you must go the the Streams Console and locate the job and cancel it. The job name will be “NYCTraffic_<your bluemix route>” for example “NYCTraffic_mpktest3.stage1.mybluemix.net”. There are a number of ways to get to the cancel command in the console. The screen below shows locating the job in the Streams Graph, hovering over the information icon cloudInfoIcon and clicking “Cancel Job”. Restarting the Bluemix application will reuse the job if it exists or start a new job if it does not exist.

NYCTrafficCancelJobNew

Step 8 – Customize or extend the application

Now that you’re familiar with the starter app, you may want to modify the source code for the application in order to customize it or extend it.

Modify the Bluemix Liberty application

To modify the Liberty application, follow the steps below.

  1. Plan your customizations. There are a number of interesting ways that you could extend the starter application.
    • Modify the JavaScript tablebuilder function in index.js to display different or additional columns.
    • If you would like the starter application to run longer, you could modify the TrafficResource.java delay variable in the firstTimeInit method to a higher number.
      It’s currently set at long delay = 120*60*1000; // delay in milliseconds (120 minutes).
  2. Modify the source code for the app to reflect your desired customizations.
  3. Build the app using the Ant command.
    The directory you deployed from in Step 3 contains an ant build.xml file. To build the application, change to that directory and run the ant command.
    Example:

    cd myapp 
    ant
  4. Deploy the modified application using cf push appname.

Modify the InfoSphere Streams application

To modify the InfoSphere Streams application, follow the steps below.

Note:  You will need access to an InfoSphere Streams environment to develop and test in, where InfoSphere Streams is installed and you can modify the application and recompile to produce a new .sab (streams application bundle) file.

If you don’t already have an InfoSphere Streams environment, you can download the InfoSphere Streams Quick Start Edition, free-of-charge from: http://www-01.ibm.com/software/data/infosphere/streams/quick-start/.

        1. Plan your customizations. There are a number of interesting ways that you could extend the starter application.
          • Adjust the aggregate operator to keeps statistics across a longer period like 15 minutes. You would do this by changing the value from 300 to 900 on this line:
            deduped : sliding, time(300), time(60);
          • Modify the composite to keep only the top 5 instead of the top 10 records. You would do this by changing the value from 10 to 5 on this line:
            if (count++ < 10) {
        2. Copy the SPL source code file NYCTrafficSample\WebContent\public\NYCTraffic.spl to an environment where you can edit and compile InfoSphere Streams applications, for example, the Streams Quick Start Edition. Modify the file. Compile to produce a new .sab file.
        3. Replace the NYCTrafficSample\WebContent\public\NYCTraffic.sab in the liberty project with the new version.
        4. Build the Bluemix Liberty application using the Ant command.
        5. Deploy the modified application using cf push.

Conclusion

Streaming Analytics for Bluemix allows you to quickly and easily run analytic applications that produce real-time results. The starter application used in this tutorial demonstrates how you can use an InfoSphere Streams application as part of a larger Bluemix application. With the Streaming Analytics service, creating and running real-time applications has never been easier.

Download the source code

Visit the source repository

9 comments on"Bluemix Streaming Analytics Starter Application"

  1. Rahul_Narain July 21, 2015

    Way to go.
    This is excellent.

  2. mikeinminIBM May 13, 2016

    I updated the code in github and the sample code in the article to use TLS v 1.2 security on the https calls to the Streaming Analytics service REST API’s.

  3. mikeinminIBM July 13, 2016

    I noticed the public feed that produces the xml that this application depends on is currently not producing data. You can see this by going to the public url:
    http://207.251.86.229/nyc-links-cams/TrafficSpeed.php

    It is currently returning an empty xml document.

    The Bluemix and Streams apps will successfully start and refresh the status on the UI but the tables will be empty.

    Top Ten currentSpeedId currentSpeed Link Description
    Top Ten maxSpeedId maxSpeed Link Description
    Top Ten minSpeedId minSpeed Link Description

  4. mikeinminIBM July 22, 2016

    Since the public XML feed stopped working, I updated the code in the git project to use a different txt based feed. I updated the article as well to reflect the changes.

  5. Avinashbgm March 27, 2017

    Hello,

    Can I have the updated git project details or else please provide the link

    Thank you.

  6. Avinashbgm March 27, 2017

    Hello,

    Can we do same by taking atlanta city traffic speed data uri

    Thank you.

    • Natasha DSilva March 27, 2017

      Hi,
      You could write a similar application but that is if you have traffic data from Atlanta City. That data is something that would be provided by the City itself so you would have to get in touch with them to see if it exists and what format the data is in.
      Hope this helps.

Join The Discussion