Overview

Skill Level: Beginner

This recipe - co-authored with Rene Meyer - shows how to process event data from the IBM Watson IoT Platform using IBM Message Hub as message broker and Node.js consumers that store the event data in a Db2 Warehouse on Cloud data warehouse.

Ingredients

Software Requirements


Recipe Requirement
Get the setup ready, by covering the following 2 IoT Recipes

The following recipe is optional:

Step-by-step

  1. Introduction

    The Recipe ‘Simulate an Elevator ‚Äď All using Node-RED nodes‘ showed you how you can simulate a real world IoT scenario using Node-RED as digital twin for a collection of elevators. A second recipe titled ‘Monitor Elevator events in real time to take corrective actions using Watson IoT Platform‘, demonstrated how to perform real time analytics on the device events and how to implement a closed control loop putting the elevators in maintenance mode should the motor temperature go beyond 200 degree Fahrenheit. A third (and optional) recipe titled ‘Process Elevator device data using the Watson IoT Platform and Node.js Consumers‘ showed how to implement, test and deploy a Node.js consumer application that read event data from the Watson IoT Platform and stored it in a DB2 Warehouse on Cloud database table that contained the latest event data for each device.

    In this recipe we shall show you how to implement, test and deploy a Node.js consumer application that reads IoT event data from the IBM Message Hub service rather than directly from the Watson IoT Platform. The IBM Watson IoT Platform will serve as a producer for IoT event data by using the IBM Watson IoT Bridge that can be configured to send data from the IBM Watson IoT Platform to the IBM Message Hub.¬† The advantage of this architecture is that the Message Hub service can act as a uniform message broker for internal system events as well as external events such as those received from the IBM Watson IoT Platform.¬† IBM Message Hub is in turn built on Apache Kafka and is available through IBM Cloud as a fully managed service, and the communication between the Message Hub and the Node.js consumers will therefore take place using standard Apache Kafka API’s.¬†

    This recipe will take you through the following basic steps to implement, test and deploy the consumer application:

    1. Get started by creating the Cloud Foundry application for the Node.js consumer in IBM Cloud. Moreover, also instsall the required development tools on your local desktop.
    2. Configure the Watson IoT Bridge to IBM Message Hub.
    3. Create and configure the Node.js consumer application on your local desktop.
    4. Inspect and understand the code for the Node.js consumer.
    5. Create the target database table using the DB2 Warehouse on Cloud console.
    6. Modify the Node-RED Elevator Simulator to send additional properties that are needed for storing event data in the database.
    7. Test the Node.js consumer application running on your local desktop.
    8. Deploy the consumer application to IBM Cloud as a Cloud Foundry application using the command line interface.

     

    If you have already gone through the recipe titled ‘Process Elevator device data using the Watson IoT Platform and Node.js Consumers‘, then you will probably be familiar with many steps in this recipe since they are similar and you can therefore go through the steps (1, 3, 7) in fast forward mode and focus on things that are new or significantly different (2,4). The step concerned with modifying the Node-RED flow (6) and configuring the target database (5) can be skipped entirely.

  2. Architecture

    Before you start it is important to provide you with some details on the data flow and the architecture of the solution. Elevator event data from the Watson IoT Platform will be send to the Message Hub service. This is done by the Message Hub Bridge which is part of the Watson IoT service. From the Message Hub, the device data will be streamed to a number of Node.js consumers which will receive the event data and store it in a dedicated database table (ELEVATOR_STATUS) managed by the Db2 Warehouse on Cloud service. The status information can then be used by e.g. IBM Cognos Analytics to monitor if the elevators are up running or not:

    02.1-Deployment-Architecture

    In an initial step you will clone a repository on GitHub to your local machine and then modify the code of the Node.js consumer to use the credentials for your Message Hub and DB2 services in IBM Cloud. Once this is done the consumer application can be run and tested locally (in principle). This configuration is suitable during the development phase. Once finalized the application will be pushed to the IBM Cloud using the Command Line Interface (CLI). From then on it will run as a Cloud Foundry application in IBM Cloud.

    The IBM Message Hub service is based on Apache Kafka. Apache Kafka in turn is a high-throughput reliable message processing system originating from LinkedIn but being Open Source since 2011. Using Kafka we can achieve a decoupling of message producers and message consumers which results in larger flexibility compared to a direct one-to-one communication scheme. At the same time Kafka offers essential features such as:

    • Persistence of messages to avoid that messages are lost.
    • Distributed and horizontal scaling allowing for multiple servers and consumers.
    • Automatic recovery from broker failure.

     

    Kafka runs as a cluster of one or more (distributed) servers called brokers that store streams of records in categories called topics. You could be tempted to think of topics as queues but actually they are not the same.  A topic is a category or feed name to which records are published by producers and read by consumers. The Kafka cluster is a collection of brokers distributed on one or more machines so that the load can be processed simultaneously. Zookeeper monitors the health and maintains the metadata information about the brokers. It is the central service for maintaining configuration information, monitoring overall health of the cluster and balancing broker assignments and re-assignments. For example, should a broker go down, Zookeeper will need to distribute its load to the remaining brokers.

    02.2-Apache-Kafka-Architecture

    Each record consists of a key, a value, and a timestamp. Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it. Topics are broken up into ordered commit logs called partitions.  Each partition is an ordered, immutable sequence of records that is continually appended to. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition. 

    02.3-Topics-and-Partitions

    The producers are in principle responsible for choosing the partition within the topic when sending a message. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function that can be defined by the producer. Messages sent by a producer to a particular topic partition will be appended in the order they are sent, i.e. messages from the producers are appended to the end (right) of the partition.

    Consumers can decide whether they want to read from the beginning of the topic, or whether they want to read from the next record. Consumers can furthermore be grouped into consumer groups that will read all read messages from one and the same topic to achieve load balancing and higher throughput. Each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Basically this is achieved by letting all consumers in a group share the notion of “next record to read” for each partition. The notion of topics and consumer groups allow Kafka applications to adopt a queue as well as a broadcasting protocol for communicating messages between producers and consumers.¬† If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances. If all the consumer instances have different consumer groups, then each record will be broadcasted to all the consumer processes.

    Consumers in a group will furthermore be associated with specific partitions. This assignment will automatically be changed should one of the consumers abort, which contributes to the fault tolerance of Apache Kafka. Kafka provides fault tolerance and high throughput by distributing the partitions over the servers in the Kafka cluster and by replicating the partitions across a number of servers. Each partition has one server which acts as the “leader” and zero or more servers which act as “followers“. The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others to balance load across the cluster.

    Kafka messages are kept in persistent store for a period of time. The Kafka cluster retains all published records‚ÄĒwhether or not they have been consumed‚ÄĒusing a configurable retention period. For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka’s performance is effectively constant with respect to data size so storing data for a long time is not a problem.

    Apache Kafka offers several kind of API’s that can be used by Apache Kafka clients – be it Java, Scala, Node.js or Python clients (to mention a few):

    • A Producer API that allows an application to publish records to one or more Kafka topics.
    • A Consumer API that allows an application to subscribe to one or more topics and process the records of the topics.
    • A Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.

     

    In this recipe we shall use the streams API that comes with the node-rdkafka libary contributed by Blizzard Entertainment. The stream API is the easiest way to consume messages.  The Standard API is more performant, particularly when handling high volumes of messages. However, it requires more manual setup. 

    A minimal consumer client for IBM Message Hub using the Streams API is outlined below, in form of a client that simply reads event data for devices of type “elevator-events” and prints the received event data to standard output:

     02.4-Consumer-Application

    The client imports the ‘node-rdkafka’ library first. It then defines the configuration parameters required to create a new Kafka consumer (line 6-18), before it creates the consumer and the read stream (line 21).¬† During this call it also subscribes the client to the topic named “elevator-events” (line 4, 21). The event handler for receiving messages is simply defined to print the received event data to standard output (line 25-28). Beyond that, the client also creates an event handler for handling Kafka errors (line 31-34).

    There are more aspects to a real client application of course which will be covered in detail in section 6. The way the credentials are handled must be changed from being hardwired into the code to being retrieved from the environment. How this is done depends on where the application is supposed to be deployed. If deployed to Cloud Foundry, the credentials be taken from a VCAP_SERVICES environment variable. If deployed as Docker image, e.g. to Kubernetes, the credentials are often read by the client using environment variables (although using files is an option for Kubernetes as well). Deployment to a local desktop during development and testing can use both approaches. The real client will therefore be defined to support all possible approaches to allow for flexible deployment to various Cloud environments.

    Moreover, the real client will need to store the data to a specific database table in the data warehouse. This will require handling of additional credentials and parameters, as well as use of the Node.js ibm_db2 library for interfacing to the Db2 database. Using this library usually involves opening a connection to the data base, submitting a SQL query to the database and finally closing the connection to the database. However, for the current application the situation is a bit more complex than that because we can’t assume that the messages are handled in the order that they are produced. Node.js is inherently asynchronous (non-blocking) and if deployed into a production setting we must also expect that there are several concurrent consumer applications running in parallel to handle the load. The SQL statement will need to take this situation into account which we will discuss in more detail in section 6.

  3. Getting Started

    Before you get started you will need to go through these preparation steps:

    1. Create a Cloud Foundry Node.js application to serve as target for the deployment to IBM Cloud.
    2. Download and install the required development tools on your local Mac or Linux environment.
    3. Create the Message Hub service if not already done and associate it with the application.
    4. Create the DB2 Warehouse on Cloud service if not already done and associate it with the application.
    5. Export the credentials of the services to serve as input for a local VCAP file during development.

     

    Create Node.js Application

    First step is to create a Node.js starter application for the consumer. Later you will then bind the Message Hub and the DB2 Warehouse on Cloud services to it manually using the Web Browser, but for now simply do the following to create the application:

    1. Select the IBM Cloud Catalog in the upper right corner of the browser window.
    2. Select Cloud Foundry Apps in in the Categories section in the left part of the window.
      03.1-SDK-for-Nodejs
    3. You will now see the various applications offered. Select SDK for Node.js.
    4. In the next window, provide a unique name for the application (e.g. ‘elevator-mh-consumer-<your initials>’) and click Create.
      03.2-CF-Apps-Create

    This will create the application for you and take you to a tutorial with commands that you will use in the next subsection to install the development tools to your local desktop.

    Download and Install Development Tools

    To prepare the development environment on your local machine, you will need to download and install the following tools:

    • IBM Cloud Command Line Interface
    • Git
    • Node.js
    • Atom (or Visual Studio Code)

     

    The tutorial also provides the links to the IBM Cloud Foundry CLI, Git and Node.js that you need for this recipe:

    03.3-Getting-Started-Tutorial

    We installed the tools on an Ubuntu Linux 64 bit virtual machine.  You can get instructions on how to download and install the ATOM editor here:  https://atom.io/.

     

    Create Message Hub Service

    Next create the Message Hub service and bind it to to the application:

    1. Select the IBM Cloud Catalog again.
    2. Enter “hub” as search string.
      03.4-MH-Catalog
    3. Select the ‘Message Hub’ service.
      03.5-Create-MessageHub-Service
    4. Click Create to create the service.

     

    Next, bind the new Message Hub service to the consumer application:

    1. Go back to the application that you just created.
    2. Select the Connections tab to the left.
      03.6-Create-MH-Connection
    3. Click Create Connection.
    4. Select the Message Hub Service that you just created.
      03.7-Select-MH-Service
    5. Click Connect.
    6. When asked if you want to restage the application, click Cancel.

     

    Create Database Service

    If you have already gone through the recipe ‘Create DashDB data warehouse for reporting on elevator event data‘ then you can (and should) reuse the DB2 Warehouse on Cloud service from that recipe. Add this service in a similar way as above, but rather than cancelling the restage, accept a restage of the application.

    Otherwise, if you need to create a new DB2 Warehouse service go through the following steps:

    1. Select the IBM Cloud Catalog.
    2. Enter “db2” as search string.
      03.8-Db2-Service-Instantiation
    3. Select the ‘Db2 Warehouse on Cloud’ service.
    4. Click Create to create the service.

     

    To connect the Db2 service to the application, do the following:

    1. Go back to the application that you just created.
    2. Select the Connections tab again.
    3. Click Create Connection.
    4. Select the service named DB2 Warehouse on Cloud.
    5. Accept the defaults and click Connect.

     

    Notice that for the current exercise you may as well use the Db2 on Cloud service which provides a regular transactional database. The Db2 Warehouse on Cloud is better suited for analytics purposes but at a higher cost than Db2 on Cloud.

     

    Export Environment Variables

    You have now created a new application with two services. Next get the credentials for the Message Hub and Db2 services by going through the following steps:

    1. Select the Runtime tab.
      03.9-Environment-Variables
    2. Select Environment Variables in the top of the screen.
    3. Click Export to export the environment variables and save them to a local file on your file system.

     

    You have now exported the VCAP_SERVICES environment variable to a file on your local file system so that it can be used for running and testing the consumer application locally on your desktop. If you need to manage the connections later on you can simply go back to the application, click the connections tab and then invoke the relevant command, e.g. to retrieve the credentials, using the menus:

    03.10-Manage-Credentials

  4. Configure the Message Hub Bridge

    Before you can deploy and test the application you will need to configure the Message Hub Bridge for the IoT Service so that device data will be forwarded to the Kafka cluster. The bridge provides a unidirectional link into Message Hub that allows you to store historical data. By connecting Message Hub to Watson IoT Platform, you can use Message Hub as an event pipeline to consume your device events from the Watson IoT Platform and make the events available in real time to the rest of the platform.

    To achieve this, simply go through the following step (the steps are also described in general in the online documentation titled ‘Connecting and configuring a historian service by using Message Hub‘):

    1. Locate the IoT service of the Elevator simulator in the IBM Cloud dashboard.
    2. Select the service and Launch the console.
    3. Select the Extensions button in the toolbar to the left of the IBM Watson IoT Platform dashboard.
    4. In the Historical Data Storage tile, click Setup:
      04.1-Create-Extension
    5. Select the Message Hub service that you created in the previous section.
      04.2-Select-Message-Hub
    6. Select a time zone. The timestamps on the device data that is sent to the Message Hub are converted to the selected time zone.
    7. Specify a rule for forwarding device events as shown in the screenshot. Only events that match the custom forwarding rules are now forwarded:
      04.3-Configure-MH-Bridge
    8. To create the topics with more than the default two partitions, simply specify the topic configuration. as shown in the screenshot.
    9. Click Done.
    10. Click Authorize.
    11. In the Authorization dialog box, click Confirm.

     

    You could also have selected a default topic and device events would then be sent to this default topic. However, we will need to use a more granular topic assignments by leaving the default topic blank and adding custom forwarding rules. This way, we can add additional topics in the future, i.e. to handle elevator alarms.

    You can now check that the bridge has been configured correctly and is working according to intentions by going through the following steps:

    1. Select the Message Hub service in the IBM Cloud dashboard.
    2. Observe that the topic has been created:
      04.4-Message-Hub-Topic
    3. For your information: by clicking the + button you can create new topics if needed:
      04.5-Create-Topic
    4. Select Grafana to monitor the traffic:
      04.6-Cloud-Monitoring
    5. Observe that messages are indeed arriving.

     

    The Message Hub Bridge has now been configured and since we are seeing messages coming in we can be sure that it is working according to intention.

  5. Create and Configure Message Consumer

    To create a Node.js application that reads event data from the Message Hub and saves it into a DB2 Warehouse on Cloud database you will need to go through the following steps:

    1. Clone a GitHub repository to your local desktop.
    2. Start the ATOM code editor and get an overview of the files of the consumer application.
    3. Add the credentials for the IBM Message Hub and DB2 Warehouse on Cloud services to a local credentials file.
    4. Install the required node packages using the node package manager.

     

    To get the source code from Git you will need to clone a GitHub repository to your local machine. Do the following in a command line Terminal:

    1. Create a new folder.
      mkdir consumer
    2. Change the current directory to the folder.
      cd consumer
    3. Invoke the command:
      git clone https://github.com/EinarKarlsen/mh-nodejs-consumer
    4. Change the directory to the folder ‘mh-nodejs-consumer’ by invoking
      cd mh-nodejs-consumer

     You have now downloaded the files of the consumer to desktop.

    1. Start the ATOM editor by invoking the command
      atom .
      05.2-ATOM-Editor

    The main files of the application are:

    • mhreceiver.js which is the main application that initializes the Message Hub consumer and then reads the event data and stores it in the appropriate database table.
    • ibmdb2interface.js which contains commands for constructing the required SQL statements for updating the database as well as a command for submitting the SQL query to the database.
    • package.json which is the core Node.js file defining the applications, modules, packages and more needed to build and run the application.
    • manifest.yml which is the Cloud Foundry manifest file defining the application name, its resource allocations, routes (host), services and more. See e.g. https://docs.cloudfoundry.org/devguide/deploy-apps/manifest.html#minimal-manifest for an overview.
    • vcap-local.example.json which is a template local VCAP_SERVICES file that shall be modified with the server credentials that was saved in the previous section.
    • .env.example is a template ‘.env’ file containing the names of the environment variables of relevance to the application. A few of the variables are mandatory, the rest just represent an alternative to the vcap-local file to for configuring a local consumer application and its services.

     

    The ‘vcap-local.example.json’ file is used as a basis for retrieving server credentials when the consumer application is running on a local machine outside Cloud Foundry. The file mirrors the VCAP_SERVICES environment variable used when running inside Cloud Foundry which brings two advantages. First, the same piece of code can be used to access the server credentials such as the URL, the user name and the password independent of where the application is running. Second, it is possible to run and test the application locally and then be sure that the code that accesses server credentials will not break once the application is pushed to Cloud Foundry. This is basically achieved by using the ‘cfenv’ node package to access the server credentials. In addition, the ‘.env’ file can also be used to define the service credentials in case the consumer is targeted to be deployed outside Cloud Foundry (e.g. in a Docker container). The ‘.env’ file provides in extend additional environment variable such as the name of the target database (for a good overview of use of environment variables in Node.js see the blog ‘Working with Environment Variables in Node.js‘).

    To update the credentials you will just need to replace the sections containing placeholders in the file ‘vcap-local.example.json’ with the corresponding information from the exported VCAP_SERVICES file. The easiest way to do this is to copy the entire JSON object for the service, so do the following:

    1. Open the file ‘vcap-local.example.json’ (as shown above).
    2. Open the VCAP_SERVICES file that you exported from IBM Cloud in the previous section.
    3. Copy the JSON object named “dashDB” from the exported VCAP_SERVICES file and paste it to the ‘vcap-local.json’ file.
    4. Do the same for the Message Hub service.
    5. Invoke File > Save As and save the file as ‘vcap-local.json’.

     

    As a final step you will need to install the packages needed by the application. The packages are listed in the ‘package.json’ file and it is worth taking a peek into this file before the application is installed.

    1. Open the file ‘package.json’.
      05.3-Package-Json
    2. The file defines the name of the application, the start command and the dependencies to node packages.

     

    The application uses the package ‘cfenv’ as mentioned previously to access the server credentials from a VCAP_SERVICES file when running locally or from the VCAP_SERVICES environment variable when running in Cloud Foundry. The package dotenv is furthermore used to access environment variables. The other key packages are ibm_db and node-rdkafka. The package node-rdkafka is used for accessing Apache Kafka. The application uses the ibm_db2 package, which in turn requires an ODBC driver to be installed. The installation of that driver is discussed in the blog “How to install IBM DB2 Database Driver on Ubuntu for Node.js“. The node package manager can download and install the required packages and drivers. To achieve this, do the following:

    1. Invoke the following command from the shell:
      npm install
    2. The command may fail. If this is the case you will need to install g++:
      sudo apt-get install g++
    3. Once this has completed, retry the installation either by
      npm install ibm_db
    4. or by installing all packages:
      npm install

     

    It is possible to perform an initial test of the consumer application by now. In the command shell enter the following command:

    node mhreceiver.js     // alternatively
    npm start

    If you have started the Elevator Simulator application, then you should see that the application connects to the Message Hub instance and receives the first message coming in – after which it will abort with an error ((unless you have already run through the recipe ‘Process Elevator device data using the Watson IoT Platform and Node.js Consumers‘). There are several reasons why it may fail:

    1. The target database table has not yet been created in the DB2 Warehouse on Cloud database, and the application has not yet been configured to use that database table.
    2. The payload received is missing important information such as the deviceId.

    Without the deviceId the information becomes meaningless since we can’t associate the telemetrics with a device. Before fixing these “technicalities” we will however walk through the application code and explain the most important aspects so that you understand what is going on “behind the scene”.

     

     

  6. Message Consumer Code Inspection

    Now that the application has been configured and the node packages have been installed, time has come to provide get into the inner workings of the application and have a look at the code. The following aspects are important in understanding how the application works:

    1. General handling of local and remote VCAP properties that holds the service credentials.
    2. Reading of the service credentials to construct an internal service configuration object.
    3. Connecting to the Message Hub service and processing of event data that arrives.
    4. Constructing the SQL statement that updates the target database table with the incoming event data.
    5. Connecting to the DB2 Warehouse on Cloud service and submitting the SQL statement to update the target database table.

     

    The logic is contained in two files: ‘mhreceiver.js’ and ‘ibmdb2interface.js’. Open the file ‘mhreceiver.js’ first. The first part of the file contains declarations that require the needed packages. These are for example dotenv for reading environment variable, ibm-db for interfacing to the database and cfenv for reading the VCAP properties.
    06.1-Load-VCAP-File

    The lines 4-17 sets the variable appEnv either to the content of the ‘vcap-local.json’ file (if present) or to the Cloud Foundry VCAP_SERVICES environment variable (if the application is running as Cloud Foundry application). From then on the same set of code can be used to access the service credentials independent of where it is running.

    The next section of code defines how the credentials are read and the configuration parameters for the Message Hub service are constructed:
    06.2-Configure-MH-Service

    The first branch of the if-statement determines the configuration using the VCAP file. The second branch allows for configuring the service using environment variables stored in an ‘.env’ file (or passed to the process as parameters during startup).

    The next section of code defines the configuration of the DB2 Warehouse service. At the end a configuration string is computed that is used later on to connect to the DB service:
    06.3-Configure-DB-Service

    Notice that the name of the target database table is defined by an environment variable named DBTABLE.

    The interesting piece of the code comes at the end of the file:

    06.4-Connect-to-Kafka-1

    The consumer application connects to the Message Hub service with the configuration parameters that were defined previously. It then starts listening to device event for the device type “Elevator” (line 92). When an event comes in it constructs an SQL statement from the basis of the event data calling the function createSQLMergeStatement. Finally this SQL statement is executed by calling the function executeSQLStatement. The logic requires that the deviceId and deviceType (as well as the timestamp) are available as properties of the message value.

    If you want to get insight into the structure of the message received from IBM Message Hub, then you can simply uncomment line 93 and 94 in the code, save the file and start the consumer application. You will then be able to see the meta data such as the partition and index offset as well as the message buffer that contains the value. The second line of output shows the value as a JSON formatted payload:

    06.5-Message-Payload

    The definition of the two database related functions can be found in the file ‘ibmdb2interface.js’. The function createSQLMergeStatement is declared first. The construction of the SQL statement starts by defining a JSON object called mapping that contains all device data as well as parameters such as the name of the target database table (dbtable), the column names of relevance to that table (columns) and the actual values (values).
    05.5-SQL-Statement-Mapping

    The SQL statement is itself defined as a template using the properties of the mapping object as template variables. Calls to the format statement (line 68-69) will replace the template variables in curly braces with the values of the properties in the mapping variable:
    05.6-SQL-Statrement-Raw

    By defining the SQL statement this way it becomes possible to separate the overall logic of the SQL statement from the actual parameters making things easier to read and maintain. As can be seen, it is a MERGE statement that merges the actual values (table t2) into the target database table (table t1). The ibm_db2 library is used in this case by calling asynchronous functions that utilizes callbacks. This means that the database insertion may not appear in the same order as the event data is coming in. However, the MERGE statement will handle this case. First it tests if there is already a record for the device associated with the device data (line 57). If this is not the case (line 64-65), a new record is simply inserted into the target table. If there is already an existing record, then the update of the target database will only take place if the timestamp for the device data is larger than the timestamp already recorded in the table for the device (line 58-59) – in other words an update will only take place if the information received is more current than the information already hold by the database for the device. This scheme lends itself very well for processing event data using a set of concurrent consumers to handle large loads. The last thing that happens in this function is that the placeholder values are replaced with the actual data contained in the mappings variable. This is done using the format function which has been required by the node package named string-template.

    The final piece of code is the function executeSQLStatement that can be used to execute an SQL statement that updates the database.

    05.7-SQL-Statement

    The function opens up a connection to the DB2 database server first using the provided connection string. If the connection is successful a callback function will eventually be called that submits the SQL query provided as parameter to the function to the database server. Finally, the connection to the database is closed.

  7. Create Target Database Table

    In this section you will create the target database table for storing the current status of each elevator and then configure the application to use this table.

    The database shall be created using the following SQL statement as basis:

    CREATE TABLE DASH6769.ELEVATOR_STATUS (
    DEVICEID VARCHAR(64) NOT NULL,
    DEVICETYPE VARCHAR(64),
    MOTORTEMP BIGINT,
    CABINSPEED BIGINT,
    CABINTEMP BIGINT,
    CURRENTFLOOR BIGINT,
    DIRECTION BIGINT,
    DOOROPEN INTEGER,
    LOAD BIGINT,
    MAINTENANCEREASON VARCHAR(128),
    MAINTENANCESTOP INTEGER,
    NUMBEROFFLOORS BIGINT,
    STATE VARCHAR(32),
    TIMESTAMP VARCHAR(128),
    DATEVALUE BIGINT
    );

    Rather than using DASH6769 as schema name, you will need to use the username as provided in the credentials for the DB2 service. To create the database table do the following:

    1. In the IBM Cloud user interface, select the application that you created in section 3.
    2. Select the Connections tab.
    3. Select the DB2 Warehouse on Cloud service.
    4. Click OPEN to open the console.
      06.1-DB2-Service
    5. Click the Explore command at the top. Note the name of the database with the same name as your DB2 user name (it has DASH as prefix).
    6. Select the Run SQL command at the top of the window.
    7. Enter the SQL statement above and click Run All.
      06.2-Create-Table

     

    To test whether the database was created successfully, go through these steps:

    • Select the Explore command again.
      06.3-Database-table
    • Select the schema with the prefix DASH.
    • Select the database named ELEVATOR_STATUS and observe the column names.

     

    Keep the browser window open as is. You will need in the next section to view how data is added and updated when the consumer application starts running.

    As a last step you will need to inform the consumer application about the name of the target table. This information should be kept in the ‘.env’ file:

    1. In the ATOM editor, open the file ‘.env.example’.
    2. Enter the name of the database table that you created.
      06.4-Env-file
    3. Invoke File > Save as … and save the file as ‘.env’.

     

    Notice that there are other environment variables defined. They can be used as an alternative to using a local VCAP properties when running the consumer application on your own machine rather than as a Cloud Foundry application on IBM Cloud.

  8. Update the Payload of the Elevator Simulator

    In section 5 it was briefly mentioned that the consumer application would fail because the payload received is missing important information such as the timestamp and the deviceId. Both are in fact passed as metadata if the messages are read directly from the Watson IoT Platform. However, when using the Message Hub as a message broker, the bridge from the Watson IoT Platform will just pass on the payload as message value and the timestamp as meta data. Essential data relevant for generating reports regarding the current status of the elevators is therefore missing in the payload itself. However, we can modify the Elevator Simulator so that the payload gets augmented with the needed deviceId and timestamp. At the same time we will also pass on the device Type (just in case). Basically this require 2 type of changes:

    1. Modify the Node-RED Elevator simulator so that the payload get’s augmented with the additional properties.
    2. Change the Elevator Type schema in the Watson IoT Platform so that the properties are declared as part of the schema type.

    To modify the Node-RED flow do the following:

    1. Open the application for the elevator simulator in the IBM Cloud dashboard.
    2. Select the IoT service
    3. Open the Node-Red editor for the elevator simulator.
    4. This will take you to the main flow of the elevator simulator. Scroll down until you see the flows relevant for sending messages to the IoT Platform:
      08.1-As-Is-Situation
    5. You will need to insert a function node as shown below:
      08.2-After-Node-Insertion
    6. Insert a function flow, name it “Augment Payload” and link it to the IBM IoT service node and the output node as shown above.
    7. Click the ‚ÄúSend Status Event‚ÄĚ node to augment the Elevator Schema with additional properties.
      08.3-Open-Send-Status-Event
    8. Select the Pencil to edit the Schema.
      08.4-Add-Property-Timestamp
    9. Click the small +add button to add a new property.
    10. Name the property ‚Äútimestamp‚ÄĚ and give it the definition as shown above.
    11. Click the +add button again to add a deviceId property of type String.
    12. Repeat the same step to create a third property named ‚ÄúdeviceType‚ÄĚ, also of type String.¬†
    13. Click Update.
    14. Click Done.
    15. Double click the function node named ‚ÄúAugment Payload‚ÄĚ that you have just created.
    16. Add the piece of code as shown below:
      08.5-Function-Node
    17. Click Done.
    18. Deploy the changes by clicking the Deploy button in the top right corner of the Node-RED editor.
    19. Select the debug tab and observe that the payload has been augmented with the new properties.
      08.6-Resulting-Payload

    Next, you will need to change the Elevator schema in the Watson IoT Platform so that the properties are included in the schema:

    1. Launch the console of the Watson IoT Service for the elevator simulator.
    2. Select the Device tab.
      08.7-Select-Elevator-Schema
    3. Select Manage Schemas.
    4. Select the Elevator Device type
    5. Select Properties
    6. Select the Pencil to the right to start editing the properties.
      08.8-Properties
    7. Select Add Properties.
    8. Enable the properties that you added (timestamp, deviceId and deviceType):
      08.9-Add-Properties
    9. Click OK.
    10. Scroll down and check that the properties have been added to the schema:
      08.10-Modified-Schema
    11. Click Finish to complete the process.
  9. Test the Node.js Consumer

    Having configured the application and the target database, time has come to test the Node.js consumer application. This is quite simply done by following these steps:

    1. Start the elevator simulator when not done already.
    2. Start the consumer application by entering the following on the command line in the terminal window:
      node mhreceiver.js
    3. Observe that application connects to the Message Hub service.
      09.1-Kafka-Connection
    4. Observe that the application eventually prints out log messages saying that it has inserted records into the database.
      09.2-Consumer-output

     

    To test that the application works as intended, go back to the DB2 Warehouse on Cloud console that you opened in the previous section.

    1. Select the ELEVATOR_STATUS table if not already done by now.
    2. Click the command View Data in the bottom right corner.
      07.3-Elevator-Status-Table
    3. Observe that data has been inserted.
    4. Scroll to the right until you see the TIMESTAMP column.
    5. Let the application run for a short moment.
    6. Click Back in the upper left corner of the DB2 console.
    7. Click View Data again and scroll to the right until you see the TIMESTAMP column.
      07.4-Timestamp-values
    8. Observe that the values of the TIMESTAMP column have been updated.
    9. Stop the consumer application by typing Ctrl>C in the command shell.

     

  10. Deploy Consumer to Cloud Foundry on IBM Cloud

    To deploy the application to Cloud Foundry a number of steps are required:

    1. Modify the Cloud Foundry manifest file.
    2. Log into IBM Cloud using the Command Line Interface and define the target organization and space.
    3. Push the application to IBM Cloud using the Command Line Interface.
    4. View the logs of the running application to test that it works as intended.

     

    Before the application can be deployed it is necessary to modify the manifest file by giving the application a unique name and route that does not conflict with other Cloud Foundry applications:

    1. Open the file manifest.yml.
    2. Change the name of the application, e.g. by replacing ‘ewk’ with your initials.
    3. Add a services section and enter the name of the Message Hub service and the DB2 Warehouse on Cloud service that you associated with the application in the previous section as shown below.
      10.1-Manifest
    4. Save the changes.

     

    The manifest file ensures that the application can be deployed automatically without manual intervention. So if the application does not yet exist in IBM Cloud in your organization it will be created when the application is pushed to IBM Cloud.

    To deploy the application, do the following in the terminal window:

    1. Enter the following command to log in using your credentials:
      bx login -a https://api.eu-gb.bluemix.net -u <your-ibm-id> -p <your password>
    2. Should the command ask you to select an account, select your own.
    3. If you need to change your region to something different than UK, see the instructions below.
    4. Enter the following command to set your organization and development space using your organization name and space: 
      bx target -o <your-organization-name> -s dev
    5. Enter the following command to get a listing of the apps deployed in your space
      bx cf apps
    6. Enter the following command to push the application to IBM Cloud:
      bx cf push
    7. Once deployed successfully, enter the following command to view the logs (using your application name as defined in the manifest file of course):
      bx cf logs <application-name>
    8. Click Ctrl>C to interrupt the log command.
    9. You can start and stop the applications using the following commands:
      bx cf stop <application-name>
      bx cf start <application-name>

     

    You can run the following command to set your API endpoint, replacing the API-endpoint value with the API endpoint for your region:

    bx cf api <API-endpoint>

    The following regions are momentarily available:

     

    You can find additional information on how to work with Cloud Foundry applications using the command line in the “Getting started tutorial“.

     

  11. Conclusion

    The recipe has demonstrated how to use IBM Message Hub on the IBM Cloud together with the Message Hub Bridge of the IBM Watson IoT Platform and a Node.js consumer application to read and save IoT device data in a Db2 Warehouse on Cloud database. This established the initial steps in using the IBM Message Hub service (based on Apache Kafka) as a central message broker for external device events as well as internal system events. In upcoming recipes we will then show how to use the Message Hub together with Functions as a Service to process alarms and Apache Spark Streaming to provide real time analytics.

    In this recipe we have simply worked locally on the desktop and pushed the application to IBM Cloud using the CLI. The preceding recipe titled ‘Process Elevator device data using the Watson IoT Platform and Node.js Consumers‘ also shows how to use work locally on the desktop (or using the Eclipse Orion Web IDE) and then deliver the changes to the Git repository of the Continuous Delivery Toolchain associated with the application. We refer to that recipe for additional details.

  12. Acknowledgement

    This recipe represents joint work between Rene Meyer and Einar Karlsen.

Join The Discussion