If you are using IBM Cloud Private for Data, how can you analyze data sets in a notebook using Streams?
When developing a streaming analytics application, you might need to work with data in a static file for a couple of reasons:
- Using sample data from a file to develop and test the application is often a quick way to get started before connecting the application a live data source.
- You want to use static resources such as a configuration file in the application.
In this post I’ll cover how you can use data from a file in a Streams Python notebook running in IBM Cloud Private for Data.
Note for other languages: If you are developing with SPL, working with local files and directories is easy to do with the
DirectoryScan operators. Applications running in the IBM Cloud can apply the cloud-readiness techniques discussed on this page. The Streams Java API technique is discussed here.
How to do it
First, add the file to your project in IBM Cloud Private for Data (ICP4D). Then, you can include the file in the Streams application, or Topology, from the notebook.
Step 1: Add the file to the project
After uploading the file, open the notebook where you will be creating your Streams application.
When you add a data set to a project in ICP4D, that file is accessible within notebooks in that project.
This Python snippet resolves to the path to each data set or file:
However, this is the path to the file relative to the notebook. To open the file from a Streams application at runtime, we must make it available on the host where Streams is running.
Step 2: Include the file in the Streams application
Even though the file is available within the notebook where you define your Streams Topology, your Streams application cannot access it unless the file is made available on the host where the Streams instance is running.
To understand why, look at this graphic:
As you can see, you define the Streams Topology in the notebook, but the code is actually executed on the Streams instance when the Topology is submitted. So if your Topology includes code to read a file, that file must be available on the host where the Topology is running.
The key is to include the file in the code bundle that is sent to the Streams instance using
Topology.add_file_dependency(). The path to the file at runtime will be retrieved using
Putting it all together
As mentioned, the basic steps are
1) Finding the path to the data set from the notebook. Then,
2) Add the file to your Topology as a dependency, and,
streamsx.ec.get_application_directory() to get the path to the file on the Streams instance.
1. Find the path to the data set from the notebook.
As mentioned, each data set is within
For example, the path to the
data.csv data set is
Note: Since this path is subject to change, click Find data > Insert to code (pictured below) . This will insert a code snippet showing the correct path to the
Even if your data set is not listed under “Find Data” because it is not a .csv file, it is still present under the same folder.
2. Add the file to the Topology
After declaring the topology,
topo = Topology("MyApp") # include the file as a dependency topo.add_file_dependency(path_to_data_sets + "data.csv" , "etc")
At runtime, the file will be included under in the “etc” folder of your application directory. The application directory is a special folder that includes files specific to your application.
3. Access the full path to the file from your code:
import streamsx.ec path_at_runtime = streamsx.ec.get_application_directory() + "/etc/" + self.file_name
Here is a complete example. We have data in a CSV file called data.csv that has been added as a data set to our ICP4D project, with the following contents:
time,max,id,min 1551729580087,18,"8756",8 1551729396809,20,"6729",0 1551729422809,25,"6508",5
And we want to generate a stream of tuples and filter out
min values less than 5.
from streamsx.topology.topology import Topology from icpd_core import icpd_util import streamsx.topology.context import os # The CSVFileReader class opens the file at runtime and generates tuples: import csv import streamsx.ec class CSVFileReader: def __init__(self, file): self.file_name = file def __call__(self): # Convert each row in the file to a dict header = ["timestamp","max", "id", "min"] with open(streamsx.ec.get_application_directory() + "/etc/" + self.file_name) as handle: reader = csv.DictReader(handle, delimiter=',', fieldnames=header) #Use this to skip the header line if your file has one next(reader) for row in reader: yield row # submit each row file = "data.csv" topo = Topology("AnalyzeData") # add the file as a dependency topo.add_file_dependency(os.environ['DSX_PROJECT_DIR']+'/datasets/' + file, "etc") #use the CSVFileReader class to open the file on the Streams runtime host lines_from_file = topo.source(CSVFileReader(file)) lines_from_file.filter(lambda tpl: float(tpl["min"]) >= 5.0).print()
# Submit the application #turn off SSL # Change this to your instance name cfg=icpd_util.get_service_instance_details(name='my_streams_instance') cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False submission_result = streamsx.topology.context.submit("DISTRIBUTED", topo, cfg) print(submission_result)