Combining Apache Edgent and Streams makes it easy to run more complex analytics on events being generated by edge devices such as phones, cars, sensors, and so on. Applications running Edgent can send data to a message hub, such as the Watson IoT platform, for use by other applications. You can retrieve events for further analysis in your Streams application using the Streams IoT tooklit. You can also send data to Edgent from a Streams application using the same mechanism. Data sent by Edgent applications are called events, while data received are called commands.

I have been using this toolkit recently to communicate with an Edgent application and I wanted to share a few snippets for common tasks to help you on your way.
We’ll cover the following:

Edgent

Streams

Note that these are just snippets. If you want full step-by-step instructions, the Edgent to IoT and IoT to Streaming Analytics recipes are available.

Prerequisites

This cheat sheet focuses on using the available API to perform common tasks. Make sure you have all the right credentials by completing the setup instructions as documented in this post. You should have the IotPlatformBluemix application running in your Streams instance before you try to retrieve events sent from Edgent.

Edgent examples

Send events from Edgent to Streams

Once your device is registered, and you have the connection information in a file, you can send events from your Edgent application. The org.apache.edgent.connectors.iotp.IotpDevice class represents a registered device. You use instances of this class to communicate with the IoT platform. Data sent from Edgent is called an event. Events are always sent and received as JSON.

Specifically, use the IotDevice.events(TStream, event_id, qos) method to send a data stream to the platform. Event ids are used to group types of events.

device.events(stream, "event identifier", qos);

For example, this snippet will send an event called “heartbeat” to the IoT platform every minute:

 
//Create your topology
DirectProvider tp = new DirectProvider();
DirectTopology topology = tp.newTopology("IotpRangeSensor");

//Create the device using the properties file you created in the previous step
IotDevice device = new IotpDevice(topology, new File("/home/user/device.cfg"));

//source stream, once a minute
TStream hb = topology.poll(() -> new Date(), 1, TimeUnit.MINUTES);

// Convert to JSON
TStream hbAsJSON = hb.map(d -> {
  JsonObject j = new JsonObject();
  j.addProperty("when", d.toString());
  j.addProperty("hearbeat", d.getTime());
  return j;
});
hbAsJSON.print();
device.events(hbAsJSON, "heartbeat", QoS.FIRE_AND_FORGET);

Receive data sent from Streams

In an Edgent application, data received from the IoT platform is called a command. Commands are also sent and received as JSON.
Use the IotpDevice.commands() method to receive commands. This method generates a stream of JSON objects, where each tuple on the stream represents a command received from the IoT Platform.
Example:

DirectProvider tp = new DirectProvider();
DirectTopology topology = tp.newTopology("ReceiveCommands");
TStream statusMsgs = device.commands("command id");

Imagine a Streams application is sending commands called “display”, with the following format:

{"msg": "Alert code: 249", "action": "Reboot"}

Then the following snippet will configure your Edgent application to receive and print those commands:



//Create the device using the properties file you created in the previous step
IotDevice device = new IotpDevice(topology, new File("/home/user/device.cfg"));
TStream<JsonObject> rawCommandsStream = device.commands("display");

TStream&ltString> alertStream = rawCommandsStream.map(j -> {
//the raw commands stream has additional information, access the "payload" element to see your data
  JsonObject payload = j.getAsJsonObject("payload");
  String message = "Received alert: ";
  String alert = payload.get("msg").getAsString();
  String action = payload.get("action").getAsString();
  return message + alert + ", action: " + action;
});

alertStream.print();

Running this Edgent topology would give this output whenever a message was received:

Received alert: Alert code: 249, action: Reboot

See the Send data from Streams section of this post to see how to send a command from Streams.

Running your Edgent application

You can follow the instructions here to set up your development environment.

Note regarding events sent from Edgent 1.1.0+

As of this writing, if you are using Edgent 1.1.0 or newer, you must use the -Dcom.ibm.iotf.enableCustomFormat=false system property to enable the IoT toolkit to properly parse events sent by Edgent.
If you are sending events from Edgent and the payload is null, try adding this system property before you launch your application.

Streams Examples

Read data sent from Edgent to Streams

Each of the following snippets will show you how to read event named “sensors” from the IoT Platform. The incoming data is always in JSON format. For this example, the incoming data contained in the event is as follows:
{"name":"A","reading":{"N":50,"MIN":-2.149484255294765,"MAX":6.675185463350511,"MEAN":2.8835217763431027,"STDDEV":1.9093200278143514}}
Typically, your Streams application would perform analysis on the events received before deciding which commands to send, rather than repeatedly sending commands or sending a command in response to each received event, as is done in these examples.

Read data from Edgent using SPL

Use the EventsSubscribe operator to subscribe to events. You can use the optional eventIds parameter to specify which events you want to subscribe to. For example, this snippet will subscribe to events named “sensors”.


stream <DeviceEvent>SensorEventsJson = EventsSubscribe() {
param
eventIds: ["sensors"];
}

However, the stream produced by EventsSubscribe operator contains the event as a JSON string. Use the DeviceEventExtractData operator to convert the JSON into SPL tuples of a user-specified type. Continuing with the example data above, we could represent this as an SPL Type called SensorInfo with the following schema:

type AggInfo = float64 MAX, float64 MIN, float64 MEAN, float64 STDDEV;
type SensorInfo = rstring name, AggInfo reading;

And invoke the DeviceEventExtractData operator as follows:


stream<DeviceEvent> SensorEventsJson = EventsSubscribe() {
  param
   eventIds: ["sensors"];
}


stream<DeviceEventTs, SensorInfo> SensorEvents = DeviceEventExtractData(SensorEventsJson) {
  param
    events : true; // Events are selected by the EventsSubscribe
    dataType: SensorInfo;
}

Read data from Edgent using Java

Follow the instructions in this recipe to add needed jars to your classpath. See the section “Set up your Streams development environment”.

The IotStreams class contains the methods you need. Use IotStreams.eventsSubscribe() method to generate a TStream of events:

Topology t = new Topology("ReadFromIoT");
TStream events = IotStreams.eventsSubscribe(t, "sensors");
//The events stream contains metadata about the event, including device type and id
// use event.getData() to extract the payload which contains the data we are interested in

TStream payload = events.transform(event -> ((JSONObject)(event.getData())));
payload.print();

Read data from Edgent using Python

It is also quite simple to read IoT events from a Python application API, although a bit different.
You use the Topology.subscribe() method to subscribe to the stream of events published under the topic streamsx/iot/device/events.
You also need to specify a structured schema that represents the data coming from the Watson IoT platform.

#Helper function to parse the JSON
def get_event_data(tuple):
payload_json = tuple["jsonString"]
payload = json.loads(payload_json)
return payload["d"]

#Define a schema for the incoming events
events_schema = schema.StreamSchema("tuple ")

topo = Topology('ReadingsFromIot')
events = topo.subscribe("streamsx/iot/device/events", events_schema)

#This will subscribe us to all events from all devices,
#but since we're only interested in the 'sensors' events, we use a filter:

sensor_events = events.filter(lambda tuple: tuple["eventId"] == "sensors")
sensor_events.print()

#The filtered_events stream contains the event as JSON.
#To extract the data, we use the get_event_data() method to parse the json for each tuple:

event_data = sensor_events.map(get_event_data)
event_data.print()

Send commands to Edgent

When you want to send commands to Edgent, you need:
– A command id to distinguish commands from each other.
– The device type and device id of the device(s) that will be receiving the command. If you have received events from that device, then this information is already available in the data you received. Otherwise you’ll need the information that you entered when you registered the device with the Watson IoT Platform.

Your Streams application will publish a stream of commands that will be received by the IotPlatform application and forwarded to the device(s).

For the following examples, we will be sending a command with id “display” to our Edgent application. The Edgent application is expecting to receive commands with the following JSON format:
{"msg": "Alert! code: 249", "action": "Reboot"}

Send commands to Edgent from an SPL application

Use the CommandPublish operator from the Iot toolkit:

Define a type for your data:

type Alert = rstring msg, rstring action;

First use the CommandTupleToPayload operator to convert a SPL tuple containing the payload to JSON:


//this beacon just generates one command for demo purposes
stream<DeviceCmdId, tuple> Commands = Beacon(){
  param
     iterations: 1u;
  output
    Commands: typeId = "R-PI-02", deviceId = "rpi-2", cmdId = "display", alert = {msg ="Alert code 249", action="Reboot"};
}
 // Convert them into the required schema
stream < CommandsJson> = CommandTupleToPayload(Commands)
{
   param
     payload: alert; //the alert attribute on the incoming tuple has the command itself
}

//Send the correctly formatted stream using the `CommandPublish` operator:

() as PublishMsg = CommandPublish(CommandsJson) { }

Note: We use a Beacon to generate the command. See the IotpSensors.spl sample for an example of using the data from a received event to send a command.

Send commands to Edgent from a Java application

Again, make sure that your classpath is setup correctly as discussed in this recipe. See the section “Set up your Streams development environment”.

Use the commandPublish() method of the IotStreams class.
Here is how to send commands to a device in response to events we receive:


//subscribe to a stream of events
TStream events = IotStreams.eventsSubscribe(t, "sensors");
//send a response to the device for each event you receive
TStream command = events.transform(event -> {
 //get the device type and id from the incoming event
   String typeId = event.getDevice().getTypeId();
   String deviceId = event.getDevice().getId();
   Device device = new Device(typeId, deviceId);
   JSONObject data = new JSONObject();
   data.put("msg", "Alert! code 249");
   data.put("action","Reboot");
   String commandId = "display";
  
   return new DeviceCmd(device, commandId, null, data); 
});

//publish the stream
IotStreams.commandPublish(command);

Note that this example sends a command for each event received, typically you would only send commands as needed.

 

Send commands to Edgent from a Python application

You will need to install (or upgrade to) version 1.7+ of the streamsx python package.
pip install --upgrade streamsx

To send commands from Python:
– Use the map() function to convert your Stream of commands to a SPL structured schema.
– Then, publish the converted stream of commands to the IotPlatform application using Stream.publish(topic). The IotPlatform application will convert the command to JSON before sending it to your Edgent applicaation.

The topic to publish commands to is streamsx/iot/device/commands/send.

First, define the function that will map an incoming event to a command:


def get_cmd(event):
    #build the message you wish to send as a dictionary
    payload = {}
    
    payload["action"] = "Reboot"
    payload["msg"] = "Alert! code: 249"
    
    command_data =  {} 
    command_data ["d"] = payload

    #convert the whole payload to json
    command_as_json = json.dumps(command_data)

    #build the command 
    device_cmd ={}
    #get the device id and type from the incoming event
    device_cmd["typeId"] = event["typeId"]
    device_cmd["deviceId"] =event["deviceId"]
    device_cmd["cmdId"] = "display"
    device_cmd["jsonString"] = command_as_json
    return device_cmd   

Here is a complete application that subscribes to events and then uses the get_cmd function we just defined to send a command for each event.


topo = Topology('ReadingsFromIot')
#subscribe to events 
events_schema=  schema.StreamSchema("tuple ")
events = topo.subscribe("streamsx/iot/device/events", events_schema)
filtered_events = events.filter(lambda tuple: tuple["eventId"] == "sensors")


#define the schema for the commands
cmd_schema =schema.StreamSchema('tuple')

#map the events to a new stream of commands

cmd_stream = filtered_events.map(get_cmd)
#convert the stream of commands to SPL schema
commands_to_publish = cmd_stream.map(lambda x : (x["typeId"],x["deviceId"],x["cmdId"],x["jsonString"],), schema = cmd_schema)
#publish it to the IotPlatform app
commands_to_publish.publish('streamsx/iot/device/commands/send', cmd_schema)


Complete Sample Applications

To see Edgent and Streams communicating via the Watson IoT Platform, sample applications have been provided. Run the Edgent samples to generate events, and then run the corresponding SPL, Java or Python Streams application to process those events and send commands back to Edgent.

1. Run the sample Edgent application

The IotpSensors application in Edgent can be used to generate sample events and is the basis for the sample applications discussed in this post.

Run it as follows:

  1. Edit the file $EDGENT/java8/scripts/connectors/iotp/runiotpsensors.sh to uncomment the line:
    USE_OLD_EVENT_FORMAT=-Dcom.ibm.iotf.enableCustomFormat=false.
  2. Run the sample script, using the device.cfg file you created earlier:
cd $EDGENT/java8/scripts/connectors/iotp/
./runiotpsensors.sh path_to_device_cfg:

You should see output like this:

{"when":"2017-07-11T20:04:31Z","time":1499803471283}
Jul 11, 2017 4:04:31 PM com.ibm.iotf.client.AbstractClient connect
INFO: pool-1-thread-19-IotpSensors: Successfully connected to the IBM Watson IoT Platform
{"name":"B","reading":{"N":3,"MIN":-2.525664438740216,"MAX":-1.7670767522693176,"MEAN":-2.0693650866518003,"STDDEV":0.4020613941892835}}
{"name":"B","reading":{"N":4,"MIN":-2.525664438740216,"MAX":-1.7670767522693176,"MEAN":-2.0834865122722173,"STDDEV":0.3294944133737305}}

2. Run a Streams application to process events from Edgent

Once both the Edgent and Streams applications are running, you will have this output in your Edgent application:

Alert! .....
{"name":"B","reading":{"N":50,"MIN":-3.2237102368661628,"MAX":9.163582832013136,"MEAN":2.097168722140206,"STDDEV":3.0616790167831622}}
{"name":"B","reading":{"N":50,"MIN":-3.2237102368661628,"MAX":9.163582832013136,"MEAN":2.275773732201217,"STDDEV":3.08403923089503}}

The “Alert!” message is sent from the Streams application.

Useful Links

I hope these tips have been helpful. The following information can be consulted for detailed instructions and documentation:

Join The Discussion