Skill Level: Beginner


This recipe is going to connect your Edgent on Pi to the Streaming Analytics Service.



  1. Introduction

    This recipe is going to pick up where our Apache Edgent on Pi to Watson IoT Platform recipe left off from. Your Raspberry Pi is sending useful information to Watson IoT, so now we want to do some back end processing using the Streaming Analytics Bluemix service.

    Apache Edgent (incubating*) is made for running analytics at the edge, but there are a lot of cases where you need a centralized streaming analytics hub–that’s where the Streaming Analytics service comes in. Using the IBM Streams-powered Streaming Analytics service opens you up to dozens of toolkits and several hundred operators that are already made to connect to the messaging servers and databases you care about, run the analytics you want, and much more.

    In this recipe, we will use IBM Streams to subscribe to the Watson IoT range sensor data stream that we created in the first recipe. That data stream consists of 10-tuple aggregates with reported minimum, maximum, mean, and standard deviation. Using just a few lines of code, we will compute a rolling daily average of the reported minimums from our devices, then we will send device commands back to the Raspberry Pis that we receive data from. Finally, we will subscribe to those commands on our Raspberry Pi and flash an LED each time we receive a command. Check out this quick demo to see it in action.

  2. Generate Watson IoT API Keys

    First we need to generate Watson IoT Api keys so that the Streaming Analytics service can connect to our “sensors” data stream.

    1. Login to Bluemix and from the Dashboard find your Internet of Things Platform service.
    2. Click on Launch dashboard.
    3. Click on the access tab.
    4. Click on Api Keys.

    5. Click on Generate API Key in the top right.
    6. Take note of your API Key and Authentication Token (you will not be able to view it again). You can also add a comment for identifying what it is used for.
    7. Click Finish.
  3. Launch IoT Platform IBM Streams App

    The best way to connect to Watson IoT Platfrom from IBM Streams is to use our IoT toolkit. This toolkit uses the streaming publish-subscribe model within IBM Streams applications to separate connectivity to the specific IoT service from analytical applications. An independent application connects to the message hub and publishes streams for device events, device commands and device status (if supported by the message hub). Analytics applications then subscribe to device events and or commands of interest.

    We will launch a pre-built IoTPlatform application that will connect to your Watson IoT Platform service and act as the “independent connection” application. The analytics application we write won’t need any IoT Platform connection information, other than the stream we want to subscribe to, because all communication with Watson IoT Platform will all be handled by this application.

    To submit this application, you will need the following details from Watson IoT:

    • org – Organization identifier (top-right of Watson IoT dashboard)
    • authKey – API authorization key (retrieved in the last step)
    • authToken – API authorization token (retrieved in the last step)

    To launch the IoT Platform application:

    1. Download the IoT Platform application bundle here. Alternatively:
       $ wget https://github.com/IBMStreams/streamsx.iot/releases/download/v0.7.0/com.ibm.streamsx.iot.watson.apps.IotPlatform.sab 
    2. If you haven’t already, create a Streaming Analytics Bluemix service instance.
    3. Make sure your instance is started.
    4. Click Launch to launch the console.
    5. Click on the play button next to Jobs to submit your Streams application bundle.
    6. Click on Browse and find your downloaded com.ibm.streamsx.iot.watson.apps.IotPlatform.sab.
    7. Click submit.
    8. Fill in the three required submission time parameters (leave the optional encrypted field blank) and click OK.

    You will see your Streams graph in the top right. All operators should be healthy. You can expand the graph for a better view and more details. We are now connected to our instance of the Watson IoT Platform service, so the next step is to build an application that connects and processes a specific data stream.

  4. Subscribe to Watson IoT Events from IBM Streams

    We will use the Java Application API to write our IBM Streams application and submit it to the Bluemix Streaming Analytics Service. The Java Application API works very similarly to Edgent, so your approaches on each platform can be similar.
    Before we jump into the code, you will need access to an installation of IBM Streams. You can download the IBM Streams Quick Start for free. Once you have installed IBM Streams, you will set up your environment for Watson IoT integration.

    Setting up your Streams Environment

    1. We will download 4 open source IBM Streams toolkits to help us. Download and unpack each release tar ball in a toolkits directory. I recommend $HOME/toolkits. The following links are to releases that are compatible with each other and have been tested to work with this tutorial.
    2. Export the location of the toolkits:

      $ export STREAMS_TOOLKITS=$HOME/toolkits/


    3. You can read about setting up the Streams Studio Eclipse environment here.


    Subscribing to Watson IoT Events from Streams

    1. For this tutorial, we will build our application in a class called IoTSensors. We will build the Streams topology inside the main method (as we did with Edgent in the previous recipe).
       public class IotSensors {

      private IotSensors() { }

      public static void main(String[] args) throws Exception {

      // Streams Topology

    2. Just as in Edgent, we start off by creating a topology in the main method.
       Topology topology = new Topology();
    3. Since we already have the IotPlatfrom application running in our Streaming Analytics service, gathering the events is trivial with the static IoTStreams class which handles the inter-application communication between your application and the IotPlatform application that you launched in the previous step.
       TStream<DeviceEvent> events = IotStreams.eventsSubscribe(topology, "sensors");
    4. To see the events as theyre coming in, we print the event stream to the console.
  5. Run Back End IBM Streams Analytics

    One of the main reasons to use IBM Streams as a backend is to run analytics that arent practical for the edge. Using IBM Streams opens you up to dozens of toolkits and several hundred operators that are already made to connect to the messaging servers and databases you care about, run the analytics you want, and much more. You can see the list of toolkits available in the product and check out our open source toolkits. If you dont like Java, weve released an Alpha for Python. Or you can start learning SPL and using the IDE for graphical editing. You can watch a video showing off the graphical editor here.

    One case where you may need backend analytics is if you want to use large windows to aggregate data over many devices. Windows of a few minutes are practical at the edge, but when you begin to have hours of data from many devices, you need to be on a more powerful system. In this example, we extract the minimum value from each sensor event and place it in a 24 hour window. We calculate a rolling average for the last 24 hours…something that is trivial to do with IBM Streams.

    1. Extract just the reading from the device events stream.
        // We just want the payload reading which is of form 
      // "reading":{"N":10,"MIN":20.02074668027724,"MAX":38.910292772562215,"MEAN":27.591084814851353,"STDDEV":6.66598501958506}
      TStream<JSONObject> sensorReading = events.transform(v -> (JSONObject) v.getPayload().get("reading"));
    2. Extract the max from the reading to create a Double stream of the minimum sensor readings.
       TStream<Double> sensorMin = sensorReading.transform(v -> (Double) v.get("MIN"));
    3. Create a sliding window that maintains a List object with the last 24 hours of sensor minimum readings (the minimum readings come from 10-tuple aggregates on the devices).
        TStream<Double> rollingAverageStream = sensorMax.last(24, TimeUnit.HOURS).aggregate(
      new Function<List<Double>, Double>() {
      double rollingAvg = 0;

      public Double apply(List<Double> mins) {

      // Calculate average based on window values
      OptionalDouble optRollingAvg = mins.stream().mapToDouble( d -> d).average();

      if (optRollingAvg.isPresent())
      rollingAvg = optRollingAvg.getAsDouble();

      return rollingAvg;


  6. Send Device Commands through Watson IoT

    Sending device commands is also very easy with the micro-service architecture enabled by the IotPlatform application.

    All we need is the device type and id, which we can retrieve from the events stream.

    1. Create the DeviceCmd stream by transforming the events stream:
       TStream<DeviceCmd> command = events.transform(event -> {

      // Define the device based on the device we received the event from.
      String typeId = event.getDevice().getTypeId();
      String deviceId = event.getDevice().getId();
      Device device = new Device(typeId, deviceId);

      // Create the command payload
      JSONObject payload = new JSONObject();
      payload.put("msg", "Event Received @ " + new Date().toString() +
      " Event: " + event.getPayload().toString());

      // Create a device command to be sent to the "display" commands stream
      return new DeviceCmd(device, "display", payload);
    2. Send the messages through the IotStreams platform.


  7. Submit Streams Application to Bluemix

    You will need to get your Streaming Analytics service credentials to submit your application directly to Bluemix.

    Getting Your Streaming Analytics Service Credentials

    1. From the Streaming Analytics Service home page click on Service Credentials.
    2. Create a file called credentials.cfg in the following format. The service name is typically something like Streaming Analytics-jb.
      "streaming-analytics": [
      "name": "name-streams",
      "credentials": {
      "password": "XXXXXXXXXXXXXXXXXXXXX",
      "rest_port": "443",
      "bundles_path": "/jax-rs/bundles/service_instances/...",
      "statistics_path": "/jax-rs/streams/statitics/service_instances/...",
      "resources_path": "/jax-rs/resources/service_instances/...",
      "stop_path": "/jax-rs/streams/stop/service_instances/...",
      "rest_host": "streams-app-service.ng.bluemix.net",
      "jobs_path": "/jax-rs/jobs/service_instances/...",
      "start_path": "/jax-rs/streams/start/service_instances/...",
      "rest_url": "https://streams-app-service.ng.bluemix.net",
      "userid": "xxxxxxxxxxxxxxxxxxxxxxx",
      "status_path": "/jax-rs/streams/status/service_instances/..."
    3. Add code to your IotSensors class to compile and submit your IBM Streams application bundle with the credential file and the service name.

       // Add credential file and Streaming Analytics service name to config
      Map<String, Object> config = new HashMap<>();
      config.put(AnalyticsServiceProperties.VCAP_SERVICES, new File(credentialFile));
      config.put(AnalyticsServiceProperties.SERVICE_NAME, serviceName);

      // Compile and submit application bundle to Bluemix
      StreamsContextFactory.getStreamsContext(Type.ANALYTICS_SERVICE).submit(topology, config);

    Compile and Submit Your IBM Streams Application

    1. To compile, your command should look approximately like this:
      $ javac -cp $STREAMS_TOOLKITS/com.ibm.streamsx.iot/lib/com.ibm.streamsx.iot.jar:$STREAMS_TOOLKITS/com.ibm.streamsx.topology/lib/com.ibm.streamsx.topology.jar:$STREAMS_INSTALL/lib/com.ibm.streams.operator.samples.jar -d bin src/com/ibm/streamsx/iot/sample/IotSensors.java
    2. For submission, I pass in the credentials.cfg file and the Streaming Analytics service name rather than hard-code it into my application:
      $ java -cp $STREAMS_TOOLKITS/com.ibm.streamsx.iot/lib/com.ibm.streamsx.iot.jar:$STREAMS_TOOLKITS/com.ibm.streamsx.topology/lib/com.ibm.streamsx.topology.jar:$STREAMS_INSTALL/lib/com.ibm.streams.operator.samples.jar:bin/ com.ibm.streamsx.iot.sample.IotSensors credentials.cfg "Streaming Analytics-jb"
    3. If your compile and submission succeeds, your output should look like this (it may take a couple of minutes to upload the application bundle to your Bluemix service):
      INFO: Streaming Analytics Service instance status response:{"state":"STARTED","plan":"Standard","enabled":true,"status":"running"}
      Apr 29, 2016 7:50:31 PM com.ibm.streamsx.topology.internal.context.AnalyticsServiceStreamsContext submitJobToService
      INFO: Streaming Analytics Service: Submitting bundle : com.ibm.streamsx.iot.sample.IotSensors.sab to Streaming Analytics-jb
      Apr 29, 2016 7:50:31 PM com.ibm.streamsx.topology.internal.context.AnalyticsServiceStreamsContext getBluemixSubmitConfig
      INFO: Streaming Analytics Service submit job request:{}
      Apr 29, 2016 7:50:41 PM com.ibm.streamsx.topology.internal.context.AnalyticsServiceStreamsContext postJob
      INFO: Streaming Analytics Service submit job response:{"jobId":"1","application":"de5466f4-3613-48a1-b474-f8bf6843d640","state":"STARTED","plan":"Standard","enabled":true,"status":"running"


      Your two applications should look like they have merged into one because of the publish/subscribe stream connection. You can see this in the resulting Streams graph (viewable in your Streaming Analytics console as before).


  8. Subscribe to Device Commands from Edgent on Pi

    We already saw how to send our data streams to Watson IoT in the last recipe, now we want to subscribe to device commands. Recall that we already have an IotDevice object in our Edgent IoTRangeSensor.java code:

     IotDevice device = new IotpDevice(topology, new File(deviceCfg)); 

    To subscribe to device commands with the id of “display” requires one line of code:

     TStream<JsonObject> statusMsgs = device.commands("display");

    Really complicated right? Now we have a JsonObject stream containing the device commands. To print the command stream, we use:



  9. Flash an LED on Receiving Alerts

    Now we get to the fun part! Every time we get a device command, we want to flash an LED. You will need to have an LED hooked up to pin 12 (See these instructions to set up an LED, however use pin 12 as your control pin).

    Again, we are using the Pi4J libraries that we used for the range sensor. You can see a full LED class that is part of the Edgent sample here.

    The most important parts are shown below:

     public class LED {
    private final GpioController gpio = GpioFactory.getInstance();
    private GpioPinDigitalOutput pin;

    public LED(Pin pin) {
    this.pin = gpio.provisionDigitalOutputPin(pin, "LED", PinState.HIGH);
    this.pin.setShutdownOptions(true, PinState.LOW);

    public void flash(long ms) {

    To flash the LED on every command received using Edgent, we will sink the statusMsgs stream using the flash function from our LED class.

     Pin ledPin = RaspiPin.GPIO_01; // PI4J custom numbering (pin 12 on RPi2)
    LED led = new LED(ledPin); // Create LED object

    // Sink statusMsgs stream with a 1 second flash
    statusMsgs.sink(j -> led.flash(1000));


  10. Next Steps

    You now have a range sensor whose readings are intelligently handled by Edgent, sent to the Streaming Analytics service where you run backend analytics, then send device commands back to your Raspberry Pi to be handled by Edgent.

    Happy Streaming!

    *Apache Edgent is currently undergoing Incubation at the Apache Software Foundation.

9 Comments on "Connect Apache Edgent on Pi to the Streaming Analytics Service"

  1. Aarushi_Mittal May 11, 2017

    @Alex Cook where do we have to write this code ?
    On Eclipse ?
    Also , how does streaming analytics service communicate with eclipse made code ??
    Really confused.
    Spent a month on this

    • Alex Cook May 11, 2017

      @Aarushi_Mittal, it shouldn’t matter where you write your code. The build/run commands provided should be sufficient. The job gets submitted using a REST API and the actual job runs on Bluemix. Feel free to post errors in your comments to provide more context.

      • Aarushi_Mittal May 11, 2017

        @Alex Cook thanks for the quick response
        for this command : $ javac -cp $STREAMS_TOOLKITS/com.ibm.streamsx.iot/lib/com.ibm.streamsx.iot.jar:$STREAMS_TOOLKITS/com.ibm.streamsx.topology/lib/com.ibm.streamsx.topology.jar:$STREAMS_INSTALL/lib/com.ibm.streams.operator.samples.jar -d bin src/com/ibm/streamsx/iot/sample/IotSensors.java

        i need streams installed ..
        where do i install it ? on my windows machine ?
        Also how to proceed further?
        i coded on eclipse, but i dont have streams so it wont compile correctly. also the code does not receive any readings that pi is sending to watson iot? I did not understand that piece of architecture.

        • Alex Cook May 11, 2017

          In step 4:
          > Before we jump into the code, you will need access to an installation of IBM Streams. You can download the IBM Streams Quick Start for free.

          The link for the quickstart is here: http://www.ibm.com/analytics/us/en/technology/stream-computing/
          You will need to use the VM image to run on a Windows machine.

          • Aarushi_Mittal May 11, 2017

            yes i downloaded the vm image
            Now whats next
            My vm image is working and i am able to see the sysadmin desktop

          • Alex Cook May 11, 2017

            Are you running/building your code in the VM so that it has access to the Streams install?

          • Aarushi_Mittal May 11, 2017

            I copied the jar samples.operator to my local windows machine from the VM
            then added it as an external jar to my eclipse project.
            Now i am trying to run it from the command line

          • Aarushi_Mittal May 11, 2017

            Also how are Streaming analytics service , my eclipse project and the Part 1 setup( code running on raspberry pi that i sending data to IoT Watson Platform) communicating ?
            What is the architecture there ?

          • Aarushi_Mittal May 11, 2017

            Exception in thread “main” java.lang.Error: Unresolved compilation problems:
            JSONObject cannot be resolved to a type
            The method transform(Function) from the type TStream refers to the missing type JSONObject
            This lambda expression refers to the missing type JSONObject
            JSONObject cannot be resolved to a type
            The method getPayload() from the type DeviceEvent refers to the missing type JSONObject
            The method aggregate(Function<List,A>) in the type TWindow is not applicable for the arguments (new Function<List,Double>(){})
            JSONObject cannot be resolved to a type
            JSONObject cannot be resolved to a type
            The method getPayload() from the type DeviceEvent refers to the missing type JSONObject
            Cannot infer type arguments for HashMap
            The method put(String, File) is undefined for the type Map
            The method put(String, String) is undefined for the type Map
            ANALYTICS_SERVICE cannot be resolved or is not a field

            at IotSensors.main(IotSensors.java:63)

            I am getting these errors on compilation

Join The Discussion