In this post I will cover how you can connect to IBM Db2 Event Store 2.0 from Streams application running in IBM Cloud Pak for Data.

Db2 Event Store is an in-memory database designed for massive structured data volumes. Streams now includes the com.ibm.streamsx.eventstore toolkit to allow you to write data  from Streams applications to DB2 Event Store.

If you need to connect to an older version of Db2 Event Store (for example 1.1.3), then you find information about the required toolkit and/or Python packages on streamsx.eventstore .

Connect to Db2 Event Store from a Streams Python application

  • To insert streaming data into a table, use the streamsx.eventstore.insert() function within your Python topology. This function¬†connects to the database, creates the table and inserts the rows with the content of incoming tuples.
  • All other SQL statements such as querying the database can be executed using the¬†streamsx.eventstore.run_statement()¬†function

Example: Insert data into a table

In this video, I walk through  how to use the Streams Python notebook Streams-EventStoreSample in Cloud Pak for Data for data ingestion into a database table.  This notebook is also available as a template in IBM Cloud Pak for Data.

The Streams Python notebook covers the following steps:

  • Connects to Streams instance
  • Configures the application configuration with fields required to connect to the Event Store instance like URL and credentials.
  • Retrieves required certificate and password for SSL authentication, because the IBM Db2 Event Store 2.0 uses SSL authentication by default. Find a description how to get client keystore and public SSL certifcate using REST API here.
  • Creates an application topology, starting with a data generator that simulates the data source
  • The¬†streamsx.eventstore.insert() function provides the features:
    • connect to the database
    • table creation
    • insert rows with the content of incoming tuples
  • Build and launch the application
  • Verify the insert operation with collecting the job metrics

Running SQL statements

You can add the following sample code in cells at the end of Streams Python notebook Streams-EventStoreSample to perform a SQL query, for instance, to count the number of rows of a table.

For this scenario the streamsx.eventstore.run_statement()function is used.

Step 1: Download the version 1.7.0 of the JDBC toolkit from GitHub (this step can be skipped in future when JDBC toolkit with version 1.7.0 or higher is installed on the Streams build-service).

import streamsx.eventstore as es
# download jdbc toolkit from GitHub
jdbc_toolkit = es.download_toolkit(url='https://github.com/IBMStreams/streamsx.jdbc/releases/download/v1.7.0/streamsx.jdbc.toolkits-1.7.0-20190614-0746.tgz', name='com.ibm.streamsx.jdbc')

Step 2: Define and submit the application

from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
import streamsx.eventstore as es

topo = Topology('EventStoreJDBCSampleSelectCount')

streamsx.spl.toolkit.add_toolkit(topo, jdbc_toolkit) # comment this line if you have skipped step 1

# Note: Define table name, table schema name and column names in upper case
sample_schema = StreamSchema('tuple<int32 TOTAL, rstring string>') # TOTAL is the result and string is the SQL query from the input

sql_query = 'SELECT COUNT(*) AS TOTAL FROM USER.STREAMSEVENTSTORESAMPLETABLE'
s = topo.source([sql_query]).as_string()

r = es.run_statement(s, 
                     credentials=app_cfg, # name of the application configuration
                     schema=sample_schema,
                     keystore=es_keystore, # location of clientkeystore file
                     truststore=es_keystore) # location of clientkeystore file

r.print()

cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
job_config = streamsx.topology.context.JobConfig(tracing='info')
job_config.add(cfg)

# this submits the topology to the IBM Streams service where it will run until cancelled
submission_result = streamsx.topology.context.submit("DISTRIBUTED", topo, config=cfg)

 

As a result you can find a console output of the application like below:

{'TOTAL': 28500, 'string': 'SELECT COUNT(*) AS TOTAL FROM USER.STREAMSEVENTSTORESAMPLETABLE'}

Connect to Db2 Event Store from a SPL application

You can also develop your application in SPL. For this you can find the following SPL samples for your reference:

Packages and toolkits

  • The streamsx.eventstore.insert() function adds the EventStoreSink operator of the com.ibm.streamsx.eventstore to the topology. This operator¬† connects to the database, creates the table and inserts the rows with the content of incoming tuples. For insert operation the com.ibm.streamsx.eventstore toolkit shall be used.
  • The streamsx.eventstore.run_statement() covers the JDBCRun operator of the com.ibm.streamsx.jdbc toolkit and shall be used for other kind of SQL statements, for example, in order to run queries.

Reference links

Join The Discussion