If you are using IBM Cloud Private for Data,  how can you analyze data sets in a notebook using Streams?

When developing a streaming analytics application, you might need to work with data in a static file for a couple of reasons:

  • Using sample data from a file to develop and test the application is often a quick way to get started before connecting the application a live data source.
  • You want to use static resources such as a configuration file in the application.

In this post I’ll cover how you can use data from a file in a Streams Python notebook running in IBM Cloud Private for Data.

Note for other languages: If you are developing with SPL, working with local files and directories  is easy to do with the FileSource and DirectoryScan operators.  Applications running in the IBM Cloud can apply the cloud-readiness techniques discussed on this page.  The Streams Java API technique is discussed here.

How to do it

First, add the file to your project in IBM Cloud Private for Data (ICP4D). Then, you can include the file in the Streams application, or  Topology, from the notebook.

Step 1: Add the file to the project

If you haven’t already, add the file to your project. From the project page, click Assets > Data Assets > Add data asset.¬†

After uploading the file,  open the notebook where you will be creating your Streams application.

When you add a data set to a project in ICP4D, that file is  accessible within notebooks in that project.

This Python snippet resolves to the path to each data set or file:

os.environ['DSX_PROJECT_DIR']+'/datasets/'

However, this is the path to the file relative to  the notebook.  To open the file from a Streams application at runtime, we must make it available  on the host where Streams is running.

Step 2: Include the file in the Streams application

Even though the file is available within the notebook where you define your Streams Topology,  your Streams application cannot access it unless the file is  made available on the host where the Streams instance is running.

To understand why, look at this graphic:

As you can see, you define the Streams Topology in the notebook, but the code is actually executed on the Streams instance when the Topology  is submitted. So if your Topology includes code to read a file, that file must be available on the host where the Topology is running.

The key is to include the file in the code bundle that is sent to the Streams instance using Topology.add_file_dependency().  The path to the  file at runtime will be retrieved using streamsx.ec.get_application_directory().

Putting it all together

As mentioned, the basic steps are

1)  Finding the path to the data set from the notebook. Then,

2) Add the file to your Topology as  a dependency, and,

3) Use streamsx.ec.get_application_directory() to get the path to the file on the Streams instance.

 

1. Find the path to the data set from the notebook.
As mentioned, each data set is within

os.environ['DSX_PROJECT_DIR']+'/datasets/'

For example, the path to the data.csv data set is

os.environ['DSX_PROJECT_DIR']+'/datasets/data.csv'

Note: Since this path is subject to change, click Find data > Insert to code (pictured below) . This will insert a code snippet showing the correct path to the datasets directory.

Even if your data set is not listed under “Find Data” because it is not a .csv file, it is still present under the same folder.

2. Add the file to the Topology

After declaring the topology,


topo = Topology("MyApp")

# include the file as a dependency

topo.add_file_dependency(path_to_data_sets + "data.csv" , "etc")

At runtime, the file will be included under in the “etc” folder of your application directory.¬† The application directory is a special folder that includes files specific to your application.

3. Access the full path to the file from your code:

import streamsx.ec

path_at_runtime = streamsx.ec.get_application_directory() + "/etc/" + self.file_name

Complete example

Here is a complete example. We have data in a CSV file called data.csv that has been added as a data set to our ICP4D project, with the following contents:

time,max,id,min
1551729580087,18,"8756",8
1551729396809,20,"6729",0
1551729422809,25,"6508",5

And we want to generate a stream of tuples and filter out min values less than 5.

from streamsx.topology.topology import Topology
from icpd_core import icpd_util
import streamsx.topology.context
import os
# The CSVFileReader class opens the file at runtime and generates tuples:

import csv
import streamsx.ec
class CSVFileReader:
    def __init__(self, file):
        self.file_name = file
    def __call__(self):
        # Convert each row in the file to a dict
        header = ["timestamp","max", "id", "min"]
        with open(streamsx.ec.get_application_directory()
               + "/etc/" + self.file_name) as handle:
            reader = csv.DictReader(handle, delimiter=',',
                                    fieldnames=header)
            #Use this to skip the header line if your file has one
            next(reader)
            for row in reader:
                yield row  # submit each row  


file = "data.csv"
topo = Topology("AnalyzeData") # add the file as a dependency 
topo.add_file_dependency(os.environ['DSX_PROJECT_DIR']+'/datasets/' + file, "etc")

#use the CSVFileReader class to open the file on the Streams runtime host 
lines_from_file = topo.source(CSVFileReader(file))
lines_from_file.filter(lambda tpl: float(tpl["min"]) >= 5.0).print() 

# Submit the application
#turn off SSL
# Change this to your instance name
cfg=icpd_util.get_service_instance_details(name='my_streams_instance')
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
submission_result = streamsx.topology.context.submit("DISTRIBUTED", topo, cfg)
print(submission_result)

Learn more about the Streams Python API in the Python development guide  and the full API reference.

Topology.add_file_dependency() reference.

Happy Streaming!

Join The Discussion