Overview

Skill Level: Intermediate

With basic knowledge in SQL and Jupyter notebooks.

We developed the seamless storage driver, an Apache Spark driver which abstracts away the gap between real-time and historical data in the Watson IoT Platform, enabling transparent analytics over all data using native Spark SQL.

Ingredients

·         An IBM Bluemix account

·         Bluemix Watson IoT Platform Starter Service

·         A Bluemix Spark Service Instance

·         DSX Jupyter notebook

·         The Seamless Storage Driver

Step-by-step

  1. Introduction

    The Watson IoT Platform allows using the Cloudant NoSQL database to store historical device data as described here.  Device data is stored in daily, weekly, or monthly databases depending on a selected “bucket interval”. This bucketing of data into separate databases can have management and performance advantages makes querying and visualizing the data unwieldy. Data scientists and application developers need to be aware of the continually changing and possibly large set of databases in order to write their queries, and visualizing data from a large dataset typically requires manually accessing these databases. To hide this complexity from users, the seamless storage driver analyzes queries and maps them to exactly those databases that contain the relevant data.

    In the diagram below, we see data ingested from the Watson IoT Platform to Cloudant with a daily bucketing interval. Using the seamless storage driver, a user can write queries directly to the dataset without needing to refer to the various underlying databases. Moreover, the driver analyzes the queries and accesses only the relevant databases – in the example below only 2 Cloudant databases are relevant.

     

    Captureing

     

    The Main Components in a Nutshell

    ·         IBM Cloudant: A NoSQL database as a service, records are stored as JSON documents in collections called “databases”.

    ·          Watson IOT Platform Historical Storage: An extension to the Watson IOT Platform that can be configured.
               This extension buckets IOT records in Cloudant databases according to daily, weekly or monthly intervals.
               This blog explains how to query these databases individually without using Spark.

    ·          Apache Spark: A distributed analytics framework. A simple interactive interface using a Jupyter notebook is available
               in the DSX (Data Science Experience). We will use such a notebook to demonstrate the driver.

    ·          Seamless Storage Driver: Implemented as a Spark SQL data source. It analyzes incoming queries, then routes them to
               the required Cloudant buckets. The driver relies on the Spark Cloudant Connector.

  2. Register To Data Science Experience

    Go to DSX and sign up. As part of this you will get:

    ·         An instance of Spark as a service

    ·         5 GB of object storage (we will expand on this in part II of the blog)

  3. Create an instance of The Watson IoT Platform starter app

    Follow the Getting Started with Watson IoT Platform Starter guide. Using it, you will be able to simulate a device, record its activity and save the data to your Cloudant account using the Watson IoT Platform’s Historical Data Storage extension.  
    The WIOTS starter application deploys and connects:

    ·         Watson IoT Platform

    ·         IBM® Cloudant® NoSQL DB for Bluemix®

    ·         IBM® SDK for Node.js for Bluemix®

    After installing WIOTS, you need to connect it with the required services. Just go to the left side menu, choose Apps and then Dashboard. Under All Apps, click on the WIOTS app.

     

      appsDashboardconnectSparkToWatson2

     

    On the left menu, go to the Connections tab.

     

    connectSparkToWatson3

    Connect the Spark instance and the object storage services to the WIOTS application.

  4. Configure Watson IoT to use Cloudant as Historical Data Storage

    After connecting WIOTS to your services, you need to configure Cloudant as Historical Data Storage, which will be used to save the simulated sensor’s data.
    Go to the left main menu and choose Services and then Dashboard:

     

    servicesDashboard

    Click on the Internet of Things Platform service. You will be advanced to the Manage tab of WIOTS. Press the Launch button:

     

    launceDashboardArrow

     

    After launching the dashboard follow this guide to configure the Historical Data Storage.

     

  5. Create a simulated IoT device

    If you wish to use your own devices or data you can skip this part.

    You can define a device type and then create instances of this type as explained here. (Note that you can create many instances of a device type). After you start simulating a device, experiment with it by changing its temperature and go to the Watson IoT Platform Dashboard to monitor the data using the supplied cards. (You can also create new cards – see the Watson IoT platform Starter documentation).

  6. See the Seamless Storage Driver in action

    First, load the notebook we use in this tutorial as described below. It is a python notebook but the driver will work for any notebook or programming language.
    Enter the DSX website, then log in and follow the next steps:

    1. On the left menu, choose Default project (or any other project that suits you), and press the add notebook button on the right top corner:

    addNotebooks

    2. Choose the From URL tab, choose a name for the notebook, then go to the Notebook URL box, enter the notebook’s URL: https://github.com/guyGerson/sparkCloudantObjstorDriver/blob/master/blog-seamless-driver-demo.ipynb and select your Spark service instance:

    addNotebooks2

    3. Click the Create Notebook button on the right bottom corner to complete the process.

    4. Enter the notebook and choose the edit button on the top right menu to start running the notebook.

    When you open the notebook, notice that the notebook includes 4 main cells:

     

    a. Setup

    now1
    Executing this command will download the Seamless Storage Driver jar file and add it to your Spark instance class path. When using the DSX notebook you only need to run it the first time you use the notebook. Link to the driver jar file: https://github.com/guyGerson/sparkCloudantObjstorDriver/raw/master/sparkCloudantObjstorDriver-assembly-1.0.jar
    Important: you must restart the spark kernel to apply this change.

    b. Configuration

    cloudantConfigurationCell

    This will set the credentials the driver uses when accessing your Cloudant data. You can find your credentials in the credentials tab of the Cloudant service in Bluemix.


    create10

    Here we create a logical table (or DataFrame) named “cloudantData” that uses the driver to execute queries.
    Parameters: 

                         i. CloudantData = an arbitrary name you can choose for the logical table.

                         ii. DbChoiceName = the custom name you chose when creating the WIoTP Historical Storage Extension,
                             if not supplied this will be set as “default”. The convention of a Cloudant database name is:
                             “iotp_<orgid>_<DbChoiceName >_<bucket> “.

                         iii. TimestampField = the name of the field that represents time in your record.

    c. Executing Queries

    theQuery

    The above is a regular Spark SQL query. Since we used the “cloudantData” table defined above, Spark will use the Seamless Storage driver to retrieve the data. Note that we transparently access the data without paying attention to any bucketing details.

    d. Chart Plotting Logic

    The final step is visualizing the simulated IoT data. To do this we create a simple chart using the notebook’s built-in Matplotlib library. This cell in our notebook takes the output of the previous cell (variable ‘d’) and visualizes it as below.

    theGraph

     

    And that’s it!

  7. Conclusions and next steps

    By using the seamless storage driver, the user can easily create simple and intuitive queries to access the entire dataset. Without this driver the user would need to understand which underlying databases are relevant to the query and generate complex queries which merge data from numerous different databases.

    You can now connect your own devices to the Watson IoT Platform and add more complex logic to process, analyze, and visualize your real-time and historical data in a unified way. For example, the following graphs visualize an elevator monitoring app:

    Capture2

    Our next blog “Unify Your Watson IoT Platform Data With Seamless Analytics (Part II)(coming soon) will extend the Seamless Storage Driver to Object Storage. The blog will demonstrate connecting the Watson IoT Platform to both Object Storage for long term storage and Cloudant for recent data. It will then  show how to query all data using the Seamless Storage Driver as one unified dataset.

    Come and see a demo of our Seamless Storage driver in action at InterConnect 2017.

    This is a joint work with Tal Ariel And Paula Ta-Shma.

     

     

3 comments on"Unify Your Watson IoT Platform Data With Seamless Analytics (Part I)"

  1. mchirukuri April 17, 2017

    Hello Guy Gerson,

    I followed your recipe and I’m facing an issue while executing the second cell i.e where cloudant is configured in the python notebook. The error is as follows,

    “Py4JJavaError: An error occurred while calling o130.sql.
    : java.util.NoSuchElementException: next on empty iterator”

    I have run this code both in “Python 2 with Spark 2.0” and “Python 2 with Spark 1.6” kernals but the same error remained. Could you please help me out on this.

    • Guy Gerson April 18, 2017

      Hi,

      can you send me more information so i can reproduce the issue? e.g. is the data you are using is generated by the simulator described in the tutorial or a different dataset? what are the databases names/buckets (in cloudant)?
      please send the information to guyger@il.ibm.com

      Thanks,
      Guy

  2. dileep.skiva August 24, 2017

    Hi,

    I followed yours steps and i’m getting below error when changing DbChoiceName. if i don’t change then it says ‘no cloudant databases found under current configuration’ but dont get below error. I tried it with python2 and Spark 1.6, 2.0, 2.1

    Py4JJavaError: An error occurred while calling o224.sql.
    : java.lang.NoSuchMethodError: org/apache/spark/sql/SQLContext.sql(Ljava/lang/String;)Lorg/apache/spark/sql/DataFrame;
    at cloudantObjectstore.providers.CloudantProvider$$anonfun$2.apply(CloudantProvider.scala:22)
    at cloudantObjectstore.providers.CloudantProvider$$anonfun$2.apply(CloudantProvider.scala:21)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at cloudantObjectstore.providers.CloudantProvider.getDataFrame(CloudantProvider.scala:21)
    at cloudantObjectstore.providers.CloudantProvider.getSchema(CloudantProvider.scala:51)
    at cloudantObjectstore.CloudantObjectstoreScan.schema$lzycompute(CloudantObjectstoreScan.scala:23)
    at cloudantObjectstore.CloudantObjectstoreScan.schema(CloudantObjectstoreScan.scala:20)
    at org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:40)
    at org.apache.spark.sql.execution.datasources.CreateTempViewUsing.run(ddl.scala:82)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
    at org.apache.spark.sql.Dataset.(Dataset.scala:186)
    at org.apache.spark.sql.Dataset.(Dataset.scala:167)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
    at java.lang.reflect.Method.invoke(Method.java:507)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:785)

Join The Discussion