Overview

Skill Level: Beginner

In this recipe, you will create a Streaming Analytics application that processes events sent to the Watson IoT platform from a device running Apache Edgent. The application will also send commands back to the Edgent application.

Ingredients

Step-by-step

  1. Introduction

    This recipe picks up where the Apache Edgent on Pi to Watson IoT Platform recipe left off. In that recipe, you created an Edgent application running on your Raspberry Pi that is sending events to the Watson Internet of Things (IoT) platform. Now, we want to connect to Watson IoT platform from the Streaming Analytics service so that we can do some back end processing. 

    Why use Streaming Analytics to process events from Edgent?

    Apache Edgent is made for running analytics at the edge, but there are a lot of cases where you need a centralized streaming analytics hub–that’s where the Streaming Analytics service comes in. Using the IBM Streams-powered Streaming Analytics service, you can take advantage of all that Streams has to offer. This includes operators that perform advanced analytics such as timeseries, geofencing, and that connect you to messaging servers and databases like Kafka, RabbitMQ, Redis, DB2, dashDB, and more.

    Using the Watson IoT platform as a bridge between your edge device and the Streaming analytics service, this recipe will show you how to receive events sent from Edgent and send commands back to Edgent from a Streaming Analytics application.

    This is a common pattern – develop your edge analytics application, send events to the Watson IoT platform, and then retrieve those events from the Streaming analtyics service.

    How does it work?

    The recommended way to connect to Watson IoT Platfrom from IBM Streams is to use the IoT toolkit. This toolkit uses the streaming publish-subscribe model within IBM Streams applications to separate connecting to a specific IoT service from writing analytics applications. The toolkit provides an application called IotPlatformBluemix that connects to the IoT Platform and publishes streams for device events, device commands and device status (if supported by the message hub). Analytics applications then subscribe to device events and/or commands of interest.

    The IotPlatformBluemix application handles all communication with Watson IoT Platform, so you don’t have to configure a connection to the platform for every Streams application you create. The following diagram illustrates the flow of data from your edge device to your Streams application:

    So, to get data from the Watson IoT platform into your Streams application, you follow these steps:

    1. Make sure your edge device is generating data and the IoT platform is receiving it. Steps to do so are described in the previous recipe. If you did not complete the previous recipe, instructions are below to quickly send simulated data to the IoT platform.
    2. Generate and save credentials needed to allow the IotPlatformBluemix application to connect to the Watson IoT platform.
    3. Submit the IotPlatformBluemix application to the Streaming analytics service, or a local Streams instance.
    4. Develop your Streams application that subscribes to data streams published by the IotPlatformBluemix app.

     

    This recipe will show you how to complete these steps.

  2. Setup instructions

    To complete this recipe, you need to set up and connect instances of the Streaming Analytics and Watson IoT Platform services. There are two ways to meet these requirements. The recommended way is using the Streams IoT Starter Kit, which automatically carries out the necessary tasks. You could also manually complete those steps.

    Automatic setup

    Deploy the Streams IoT Starter kit. 
    Once deployment is finished, visit the landing page and perform the following tasks:

    • Under “View all Credentials” > “Edgent credentials”, download your device credentials by clicking “Download device.cfg”
    • In the same page, download credentials for your Streaming analtyics service by clicking “Streams Credentials” > “Download credentials as JSON”.
    • Submit the IoTPlatform application: Click “Tools” and “Submit job” if it is not running in your instance.

     

    Manual setup
    This post on Streamsdev has all the information you need. Follow the instructions therein.

     

    Regardless of the method you choose, you should have the following before moving on to the next section:

    1. Registered device with the Watson IoT platform and its credentials in a device.cfg
    2. The IoTPlatform or IoTPlatformBluemix application running in your instance
    3. Credentials for your Streaming analtyics service in a file called credentials.cfg
  3. Make sure your Edgent application is generating data

    In the first recipe, we created an Edgent application running on a Raspberry Pi that was sending events to the IoT platform.  That application should be running now so that our Streams application can process those events and send commands in response. If you skipped that recipe, you can run a sample application that will generate data.

     

    Skipped the previous recipe?

    If you do not already have a running application, you can download Apache Edgent and run the IoT sensors sample application:

    Run the Edgent sample:The IotpSensors application in Edgent generates the data that will be processed in this recipe.

    To run the application and start generating data:

    1. Download and unpack Edgent
    2. For Edgent 1.1.0 and greater, edit the file $EDGENT/java8/scripts/connectors/iotp/runiotpsensors.sh to uncomment the line:
      USE_OLD_EVENT_FORMAT=-Dcom.ibm.iotf.enableCustomFormat=false.
    3. Run the sample script, using the device.cfg file you created earlier

      cd $EDGENT/java8/scripts/connectors/iotp/
      ./runiotpsensors.sh path_to_device_cfg:

    You should see output like this:

    "when":"2017-07-11T20:04:31Z","time":1499803471283}
    Jul 11, 2017 4:04:31 PM com.ibm.iotf.client.AbstractClient connect
    INFO: pool-1-thread-19-IotpSensors: Successfully connected to the IBM Watson IoT Platform
    {"name":"B","reading":{"N":4,"MIN":-2.525664438740216,"MAX":-1.7670767522693176,"MEAN":-2.0834865122722173,"STDDEV":0.3294944133737305}}

    Now you are ready to process those events in Streams.

  4. Set up your Streams development environment

    At this point, your Edgent application is sending events to the Watson IoT platform, and the IotPlatformBluemix application is publishing a stream of events from your Edgent application. You can now create a Streams application to analyze those events. You can write the application in SPL, Java, or Python.  In this recipe, we will create a Java application to do so using the Streams Java Application API.

    Sample applications for SPL and Python

    This recipe will create a Java application, but you can also create Streams applications to consume events from Edgent using SPL or Python. See the samples repository on Github for Python and SPL examples.

    If you do not already have one, you will need access to an installation of IBM Streams. You can download the IBM Streams Quick Start for free. Once you have installed IBM Streams, you will set up your environment for Watson IoT integration.

    If you are using Streams 4.1

    Streams 4.1 users need to follow the instructions in this paragraph. Download these open source IBM Streams toolkits. Download and unpack each release tarball in a toolkits directory. I recommend $HOME/toolkits. The following links are to releases that are compatible with each other and have been tested to work with this tutorial.

    Export the location of the toolkits by setting the STREAMS_SPLPATH environment variable to the path of the folder where you unpacked the toolkits.

    export STREAMS_IOT_TOOLKITS=$HOME/toolkits/
    export STREAMS_SPLPATH=$STREAMS_IOT_TOOLKITS:$STREAMS_SPLPATH

    Setting up your Streams Environment

    We will use Streams Studio for development. Set up your workspace for development by following these instructions to integrate the Java API with Streams Studio. You can skip the sections on running and importing the samples if you like.

    Before starting Streams Studio, the STREAMS_SPLPATH environment variable must be set and contain the path to the required toolkits.   If you are using the toolkits included in Streams 4.2, set it to $STREAMS_INSTALL/toolkits. If you downloaded the toolkits, set it to the folder containing the toolkits you downloaded.

    • Download the sample project
    • Import the sample project into Streams Studio: File > Import >General/Existing Projects into Workspace. Select ‘archive file’ and browse to the project you just downloaded, click Finish.
    • Add dependencies to your classpath:  Right click on the project, select Build Path>Configure Build Path…>Click the Libraries tab.
    • For each jar file listed below,  Select “Add External Jars” > and browse to the location of the jar file and add it to the path.
      • Browse to $STREAMS_INSTALL/lib/com.ibm.streams.operator.samples.jar to the build path
      • If you are using Streams 4.2. or later, add
        • $STREAMS_INSTALL/toolkits/com.ibm.streamsx.iot/lib/com.ibm.streamsx.iot.jar
        • $STREAMS_INSTALL/toolkits/com.ibm.streamsx.topology/lib/com.ibm.streamsx.topology.jar
        • $STREAMS_INSTALL/toolkits/com.ibm.streamsx.datetime/impl/lib/com.ibm.streamsx.datetime.jar
      • If you are using Streams 4.1, Add:
        •  $HOME/toolkits/com.ibm.streamsx.iot/lib/com.ibm.streamsx.iot.jar
        • $HOME/toolkits/com.ibm.streamsx.datetime/lib/com.ibm.streamsx.datetime.jar
        • $HOME/toolkits/com.ibm.streamsx.topology/lib/com.ibm.streamsx.topology.jar
  5. Process Watson IoT Events in your Streams application

    Now that your development environment is set up for development, create a Java application to process events from Edgent. The ReadFromWatsonIoT class in the sample has the Streams topology in the main method:

    import java.io.File;
    import java.util.HashMap;
    import java.util.Map;
    import com.ibm.streamsx.iot.DeviceEvent;
    import com.ibm.streamsx.iot.IotStreams;
    import com.ibm.streamsx.topology.TStream;
    import com.ibm.streamsx.topology.Topology;
    import com.ibm.streamsx.topology.context.AnalyticsServiceProperties;
    import com.ibm.streamsx.topology.context.StreamsContext.Type;
    import com.ibm.streamsx.topology.context.StreamsContextFactory;
    public class ReadFromWatsonIoT {

    public static void main(String[] args) throws Exception {
    // create a Streams Topology
    Topology topology = new Topology("ReadFromIot");
    //subscribe to the "sensors" events stream
    TStream<DeviceEvent> events = IotStreams.eventsSubscribe(topology, "sensors");
    events.print();
    //Code to submit the application goes here
    //
    }

    The above code is all that is needed to ingest data from the IoT platform in a Streams Java application. The key portion of this code is the use of the IotStreams.eventsSubscribe() method to subscribe to the data stream published by the IotPlatformBluemix application.  Remember that you launched it in the previous step and it is consuming events sent from your Edgent application. The last line just prints the stream to stdout so we can see the events as they come in.

     

    It is a good idea to run the simple application above to verify that you can successfully read the data from your Edgent application. All that is missing is to add code to submit the application to the Streaming Analytics service.

    Edit lines 18 and 19 of SubmitToService class to specify  the path to the credentials file and the name of your service instance. Recall that you copied and saved this information in Step 3 above.

     // This code submits the application to the Streaming Analytics service
    Map<String, Object> config = new HashMap<>();
    String credentialFile = "/path/to/credentials/cfg"; //Change this to the credentials file
    config.put(AnalyticsServiceProperties.VCAP_SERVICES, new File(credentialFile));
    String serviceName = "service name"; //Insert your Streaming Analytics service name here.
    config.put(AnalyticsServiceProperties.SERVICE_NAME, serviceName);
    // Submit application bundle to Bluemix
    StreamsContextFactory.getStreamsContext(Type.ANALYTICS_SERVICE).submit(topology, config);

    The full application is in ReadFromWatsonIoT.java in the sample Java project.

  6. Submit the Streams application to Bluemix

    Compile and Submit Your Streams Application

    If you are using Streams 4.1, make sure you update STREAMS_SPLPATH to include the path of the downloaded toolkits:

    export STREAMS_IOT_TOOLKITS=$HOME/toolkits
    export STREAMS_SPLPATH=$STREAMS_IOT_TOOLKITS:$STREAMS_SPLPATH

    If you are using Streams Studio, and have followed the earlier instructions to set your project’s build path, your code should automatically be compiled. So to run the application, right click anywhere in the editor and select Run As > Java Application to run the application.

    If you are building from the command line, follow these steps:

    1. To compile, your command should look approximately like this:
      javac -cp $STREAMS_IOT_TOOLKITS/com.ibm.streamsx.iot/lib/com.ibm.streamsx.iot.jar:$STREAMS_IOT_TOOLKITS/com.ibm.streamsx.topology/lib/com.ibm.streamsx.topology.jar:$STREAMS_INSTALL/lib/com.ibm.streams.operator.samples.jar -d bin com/ibm/streamsx/iot/sample/*.java
    2. Run the application:
      java -cp "$STREAMS_IOT_TOOLKITS/com.ibm.streamsx.iot/lib/com.ibm.streamsx.iot.jar:$STREAMS_IOT_TOOLKITS/com.ibm.streamsx.topology/lib/com.ibm.streamsx.topology.jar:$STREAMS_INSTALL/lib/com.ibm.streams.operator.samples.jar:bin/:$STREAMS_INSTALL/ext/lib/*:$STREAMS_INSTALL/lib/*" com.ibm.streamsx.iot.sample.ReadFromWatsonIoT

    If your compilation and submission succeed, your output should look like this (it may take a couple of minutes to upload the application bundle to your Bluemix service):
    INFO: Streaming Analytics Service instance status response:{"state":"STARTED","plan":"Standard","enabled":true,"status":"running"}June 29, 2017 7:50:31 PM com.ibm.streamsx.topology.internal.context.AnalyticsServiceStreamsContext submitJobToService
    INFO: Streaming Analytics Service: Submitting bundle : com.ibm.streamsx.iot.sample.IotSensors.sab to Streaming Analytics-jbJune 29, 2017 7:50:31 PM com.ibm.streamsx.topology.internal.context.AnalyticsServiceStreamsContext getBluemixSubmitConfig
    INFO: Streaming Analytics Service submit job request:{}June 29, 2017 7:50:41 PM com.ibm.streamsx.topology.internal.context.AnalyticsServiceStreamsContext postJob
    INFO: Streaming Analytics Service submit job response:{"jobId":"1","application":"de5466f4-3613-48a1-b474-f8bf6843d640","state":"STARTED","plan":"Standard","enabled":true,"status":"running"

    Your two applications should look like they have merged into one because of the publish/subscribe stream connection. You can see this in the resulting Streams graph (viewable in your Streaming Analytics console as before).

    From the log viewer, you should be able to see the output of your application.

    Select “Log Viewer” from the top left corner of the console.

    Select your application from the list of running jobs. It should have the same name you gave the Topology object you created. Expand it and expand the PEs till you find the one that has Print, click “Console Log” and “Load console messages”. You should see this:

     

     

    If you do not see any output, verify that your Edgent application is running and generating events. Also verify that the application configuration object has the correct values (no extra spaces, e.t.c), and you specified the correct name for the application configuration object when you submitted the IotPlatformBluemix app.

  7. Send Device Commands through Watson IoT

    So far, we have seen how to read events sent by an Edgent application by subscribing to the stream of events using the IotStreams.eventsSubscribe() method.  The micro-service architecture enabled by the IotPlatformBluemix application makes this very easy to send commands to the device using a similar mechanism. To send commands, we use the IotStreams.commandPublish() method.

    Continuing our example, we receive events from the Edgent application when an object is nearer than 30 cm away.  Now, for every event received, we want to send this response: “Enable LED for event received at [time]“.

    We are going to create a stream of commands to the device. All we need is the device type and id, as well as the message we want to send. We can get the device type and id from the events stream. The expected format is the DeviceCmd type in the IoT toolkit. (The DeviceCmd type also has a SPL counterpart.)

    The ReadAndSendCommands.java sample class creates a stream of commands by transforming the events stream:

     TStream<DeviceCmd> command = events.transform(event -> {
    // Get the information of the device we received the event from.
    String typeId = event.getDevice().getTypeId();
    String deviceId = event.getDevice().getId();
    //Create a new device
    Device device = new Device(typeId, deviceId);
    // Create the command payload
    JSONObject payload = new JSONObject();
    String response = "Enable LED for event received @ " + new Date();
    payload.put("msg", response);
    // Create a device command to be sent to the "display" commands stream
    return new DeviceCmd(device, "display", payload);
    });

    Publish the stream of commands:

    IotStreams.commandPublish(command);

    The full application is in ReadAndSendCommands.java in the sample project.

  8. Receive Device Commands in your Edgent application

    You can skip the next two steps if you did not complete the previous recipe.

    Now, we’re going to return to our Edgent application from the last recipe.  Recall that we sent events to the Watson IoT platform. Now we want our Edgent application to recieve commands from the Streams application.

    Recall that we used the IotDevice.events() method send events from Edgent .  Now in to the IoTRangeSensor.java class, subscribing to device commands with the id of “display” requires one line of code:

    TStream<JsonObject> statusMsgs = device.commands("display");

    statusMsgs is a JsonObject stream containing the device commands. To print the command stream, we use:

    statusMsgs.print();

    Now our Edgent application is ready to receive commands. Relaunch the application after adding the new code. If you recall, the IotpRangeSensors sample application on our Pi, is launched as follows:

    $ java -cp $CP $MAIN <device cfg file> <simulatedSensor?> <simulatedLED?>
    So far we have been using a simulated LED. When the device receives the command from Streams, you should see this on the output:

    Enable LED for event received at  Thu Jun 22 18:48:54 EDT 2017
    *******Simulated LED Flash!*******
  9. Optional: Flash an LED on Receiving Alerts

    If you do not have an LED, you can skip this part. Now we are going to flash an LED every time we get a device command. You will need to have an LED hooked up to pin 12 (See these instructions to set up an LED, however use pin 12 as your control pin). Again, we are using the Pi4J libraries that we used for the range sensor. You can see a full LED class that is part of the Edgent sample here. The most important parts are shown below:

    public class LED {
       private final GpioController gpio = GpioFactory.getInstance();
       private GpioPinDigitalOutput pin;
       public LED(Pin pin) {
        this.pin = gpio.provisionDigitalOutputPin(pin, "LED", PinState.HIGH);
        this.pin.setShutdownOptions(true, PinState.LOW);
        this.pin.low();
       }
       public void flash(long ms) {
        this.pin.pulse(ms);
       }
    }

    To flash the LED on every command received using Edgent, we will sink the statusMsgs stream using the flash function from our LED class.

    Pin ledPin = RaspiPin.GPIO_01;
    // PI4J custom numbering (pin 12 on RPi2)
    LED led = new LED(ledPin);
    // Create LED object
    // Sink statusMsgs stream with a 1 second flash
    statusMsgs.sink(j -> led.flash(1000));

    The complete IotRangeSensors application is on GitHub and also inclued in Edgent in the samples folder.

  10. Run more advanced analytics using Streams

    The example we demonstrated here was very simple, the Streams application sent a command to Edgent for every event received. The main reason to use Streams as a backend is to run analytics that aren’t practical for the edge.  One case where you may need backend analytics is if you want to use large windows to aggregate data over many devices. Windows of a few minutes are practical at the edge, but when you begin to have hours of data from many devices, you need to be on a more powerful system. In this example, we extract the minimum value from each sensor event and place it in a 24 hour window. We calculate a rolling average for the last 24 hours, something that is trivial to do with Streams.

    1. Extract just the reading from the device events stream.
       
      // We just want the payload reading which is of form
      // "reading":{"N":10,"MIN":20.02074668027724,"MAX":38.910292772562215,"MEAN":27.591084814851353,"STDDEV":6.66598501958506}
      TStream <JSONObject>reading = events.transform(event -> ((JSONObject)(event.getData().get("reading"))));
    2. Extract the max from the reading to create a Double stream of the minimum sensor readings.
      TStream<Double> sensorMin = reading.transform(v -> (Double) v.get("MIN"));
    3. Create a sliding window that maintains a List object with the last 24 hours of sensor minimum readings (the minimum readings come from 10-tuple aggregates on the devices).
      Function<List<Double>, Double> aggFn = new Function<List<Double>, Double>() {
      double rollingAvg = 0;
      @Override
      public Double apply(List<Double> mins) {
        // Calculate average based on window values
        OptionalDouble optRollingAvg = mins.stream().mapToDouble( d -> d).average();
        if (optRollingAvg.isPresent()){
            rollingAvg = optRollingAvg.getAsDouble();
        }
        return rollingAvg;
       }
      };
      TStream<Double> rollingAverageStream = sensorMin.last(24, TimeUnit.HOURS).aggregate(aggFn);
      rollingAverageStream.print();

    This application is in the ComputeRollingAverage class in the sample project.

  11. Next Steps

    You now have a range sensor whose readings are intelligently handled by Edgent and sent to the Streaming Analytics service.  You also have a Streaming Analytics application that can run backend analytics and send device commands back to your Raspberry Pi to be handled by Edgent.

    Happy Streaming!

     

     

Join The Discussion