2021 Call for Code Awards: Live from New York, with SNL’s Colin Jost! Learn more

IBM Developer Blog

Follow the latest happenings with IBM Developer and stay in the know.

Follow along with three examples of using SQL Stream Builder with IBM Cloud offerings


The Apache Flink project provides the ability to perform stateful computations over data streams. The SQL Stream Builder interface is used to create stateful stream processing jobs using SQL. The executed SQL queries run as jobs on Flink. Together, these components make up the Cloudera Streaming Analytics (CSA) package, which is available with Cloudera Data Platform Streaming Edition with IBM.

In this post, we will cover how to use the streaming services available in Cloudera Data Platform to communicate with IBM services. Specifically, we will cover:

To accompany this, we’ve created a video demonstration. Check out the video below that goes through the examples covered here.

Producing messages with Apache Kafka

Before we tackle consuming messages on IBM products, we need to produce messages. Typically, you would get feeds of streaming data like access data used to detect fraud use or real-time financials data. There are many tutorials available online that cover these concepts extensively, like the Apache Kafka Quickstart Document. We will be using Kafka brokers that are installed on the Cloudera Data Platform as part of the streaming bundle.

For all of the examples here, we will be relying on messages that were generated using the kafka-python library. The code snippet below is what we used to generate fake data and to send messages:

from kafka import KafkaProducer

import json
import time
import uuid

topic = 'stevemartest'
producer = KafkaProducer(bootstrap_servers='cid-vm-05.cdplab.local:9093',
                        security_protocol='SASL_SSL',
                        sasl_mechanism='GSSAPI',
                        sasl_kerberos_service_name='kafka',
                        value_serializer=lambda v: json.dumps(v).encode('utf-8'))

i = 0
while True:
    i = i + 1
    msg = {'uid': uuid.uuid4().hex, 'sdata': i%2 }
    print(msg)
    meta = producer.send(topic, key=b'message', value=msg)
    producer.flush()
    time.sleep(1)

When run, the code will produce messages similar to those seen below:

{"uid": "ce1178d33abf4e6297b56b362270830e", "sdata": 0}
{"uid": "ed30f9904f8443ddaef189f1ba5bb02c", "sdata": 1}
{"uid": "d90c3e2bd9994e3e8e6e202ead6ea183", "sdata": 0}
{"uid": "8ce9270033454f54b9fc67b7ef43bc83", "sdata": 1}

Run the application to start producing messages:

[root@cid-vm-05 ~]# kinit stevemar
Password for stevemar@CDPLAB.LOCAL:
[root@cid-vm-05 ~]# python3 kafka-tests/producer.py
{'uid': 'af2adbcf77bc45ed97339b669261f10b', 'sdata': 1}
{'uid': '61b777c774c64a788042f7c94e4950de', 'sdata': 0}
{'uid': '90e554b6f51d4952962530deca86f41b', 'sdata': 1}
...

Now that we are able to produce messages, we will start consuming them. To begin with, let’s read them using SQL Stream Builder.

Reading Kafka messages with SQL Stream Builder

Installing SQL Stream Builder (SSB) and Flink on a Cloudera cluster is documented in the CSA Quickstart page. Additionally, we found it beneficial to Enable Knox for SSB to authenticate more easily.

By default, the Kafka instance on the Cloudera Data Platform cluster will be added as a Data Provider. Choose to create a new Apache Kafka table; this will be our data source for all examples going forward. Note that some changes may be required in Ranger for your user or group access policies to be able to view all Kafka topics.

Adding a new Apache Kafka data source Figure 1. Adding a new Apache Kafka data source

Viewing the Data Definition Language (DDL) of an Apache Kafka source should look similar to the configuration below:

CREATE TABLE `default_catalog`.`default_database`.`kafka` (
  `uid` VARCHAR(2147483647),
  `sdata` BIGINT
) COMMENT 'kafka'
WITH (
  'properties.bootstrap.servers' = 'cid-vm-05.cdplab.local:9093',
  'properties.auto.offset.reset' = 'earliest',
  'connector' = 'kafka',
  'properties.ssl.truststore.location' = '/opt/cloudera/security/pki/truststore.jks',
  'properties.request.timeout.ms' = '120000',
  'properties.transaction.timeout.ms' = '900000',
  'format' = 'json',
  'topic' = 'stevemartest',
  'properties.security.protocol' = 'SASL_SSL',
  'scan.startup.mode' = 'earliest-offset',
  'properties.sasl.kerberos.service.name' = 'kafka'
)

Now we can run a simple SQL query to see the data being produced by our Python application that is posting the messages to our Kafka instance:

select * from kafka

The Results tab in the UI shows the JSON payload being produced.

SSB executing a SQL query and displaying the data Figure 2. SSB executing a SQL query and displaying the data

Now that we’re able to confirm that SSB is working with our Kafka instance, we can go to the next step of pushing the messages to another system. That is done by creating and defining new sinks. There are many predefined sinks available out of the box.

Sinking messages to PostgreSQL on IBM Cloud with SQL Stream Builder

The first sink we will test uses the JDBC option that is supported by Flink. At the time of this writing, there is no support for Db2, so we decided to test with PostgreSQL on IBM Cloud.

Once an instance of PostgreSQL was created, we navigated to the Credentials page to find the relevant information to establish a connection, such as hostname, username, and password. Using the IBM Cloud Shell was convenient since it already included the psql command-line interface.

To authenticate to the PostgreSQL instance, perform a step similar to the one below, replacing the credentials with your own:

cloudshell:~$ PGSSLROOTCERT=87ca6778-aaaa-1111-bbbb-2222-be1101c
cloudshell:~$ psql "host=aac4-1111-bbbb-2222-570065c.databases.appdomain.cloud port=32662 dbname=ibmclouddb user=ibm_cloud_111222333 password=ibm_pass_aaabbbccc"

Ensure that a table is created by running the SQL statement below because we’ll be sinking messages to this table:

CREATE TABLE test_table_kafka (
    uid VARCHAR(1000),
    sdata BIGINT
);

Back in SSB, we create a new table, but choose Flink DDL and the JDBC template. Substitute in the connection information for the PostgreSQL instance.

CREATE TABLE test_table_kafka (
  `uid` VARCHAR(1000),
  `sdata` BIGINT
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://aac4-1111-bbbb-2222-570065c.databases.appdomain.cloud:32662/ibmclouddb',
    'table-name' = 'test_table_kafka',
    'username' = 'ibm_cloud_111222333',
    'password' = 'ibm_pass_aaabbbccc'
)

We will once again run a simple SQL query. But this time we’ll switch the Sink from None to the new PostgreSQL option:

select * from kafka

The SQL statement will be executed, and you’ll be able to see the data being placed into the PostgreSQL database. We’ve added a few psql commands below that can be used to view the data.

To list tables:

ibmclouddb=> \dt
                     List of relations
 Schema |        Name        | Type  |        Owner
--------+--------------------+-------+---------------------
 public | test_table_kafka   | table | ibm-cloud-base-user

To describe a table:

ibmclouddb=> \d test_table_kafka
                  Table "public.test_table_kafka"
 Column |          Type           | Collation | Nullable | Default
--------+-------------------------+-----------+----------+---------
 uid    | character varying(1000) |           |          |
 sdata  | bigint                  |           |          |

To count the number of rows in the table:

ibmclouddb=> select count(*) from test_table_kafka;
 count
-------
  9888

To view the first 10 rows of the table:

ibmclouddb=> select * from test_table_kafka limit 10;
               uid                | sdata
----------------------------------+-------
 0ddc4ff5e95349cba8b7d012bb152c6e |     0
 eef42141040844b8a5bed7fbe6437ec3 |     1
 bb46c46197c9429f96a9139212482a88 |     0
 9543d995131c4921bfe11a754f94b0a6 |     1
 6e8e3180054241a3971d8d39bb5518ac |     0
 d90ff2d560484034a80a3eaf9a6c5abe |     1
 564b1361536e4ad58bc8857a27c9bf58 |     0
 e15011fc9c0748ea96bc4e4c9e824c40 |     1
 37bc09cec2e7415aac4e4f98116921f6 |     0
 3a29f48b3110421d91bdf74ec7f92862 |     1

We’ll now move onto the next example, using IBM Cloud Object Storage.

Sinking messages to IBM Cloud Object Storage with SQL Stream Builder

The second sink we will test uses the S3 plugin that is supported by Flink. The IBM Cloud Object Storage service offers an S3-compatible API so the plugin can be used without any modification.

Once an instance of Cloud Object Storage is created, navigate to the Credentials page to create a new API key and secret. Be sure to include the option for enabling HMAC credentials. Jot down the API key and secret since we’ll need them later.

Create a bucket in Cloud Object Storage, in the examples below, we called it ssb-sink-test.

Back in SQL Stream Builder, we’ll create a new table, but this time we’ll choose Flink DDL. Substitute in information about your connection below:

CREATE TABLE cos (
  uid VARCHAR(64),
  sdata BIGINT
) PARTITIONED BY (uid) WITH (
  'connector' = 'filesystem',
  'path' = 's3://ssb-sink-test/mydata',
  'format' = 'json',
  'sink.partition-commit.delay'='0s',
  'sink.partition-commit.trigger'='process-time'
)

Additionally, Flink’s configuration file will need to be updated. The configuration file is located at /etc/flink/conf/flink-conf.yaml. See the example below for which new properties are required:

s3.access-key: 6c5e41-access-key-136f8f
s3.connection.ssl.enabled: false
s3.endpoint: s3.us-east.cloud-object-storage.appdomain.cloud
s3.path.style.access: true
s3.secret-key: 6c5e41-secret-key-136f8f

We will once again run a simple SQL query, but this time we’ll switch the Sink from None to the new COS option:

select * from kafka

The SQL statement will be executed and you’ll be able to see the data being placed as individual files in the bucket we created.

Data saved in an IBM Cloud Object Storage bucket Figure 3. Data saved in an IBM Cloud Object Storage bucket

Looking at a specific file shows us the payload.

The payloads of Kafka messages are seen within the file Figure 4. The payloads of Kafka messages are seen within the file

Now that we are able to see messages on two IBM Cloud services, we’ll turn our attention to IBM DataStage, an ETL offering available on both IBM Cloud Pak for Data and IBM Cloud Pak for Data-as-a-Service.

Reading Kafka messages with IBM DataStage

In this example, we are not using SQL Stream Builder but using built-in capabilities of IBM DataStage to read messages from a Kafka broker. It is worth mentioning that our Cloudera cluster had Kerberos enabled, so some configuration was required. The Configuring Hive with Kerberos documentation was helpful and could be adapted for IBM DataStage.

Once the required Kafka configuration files were moved over to the appropriate IBM DataStage container, we could test the connection. A simple job with a single Kafka source and Peek target can test the connection. By default, the connection will read 100 messages at a time.

Data saved in an IBM Cloud Object Storage bucket Figure 5. Data saved in an IBM Cloud Object Storage bucket

Looking at the logs will show the most recent messages from the Kafka broker.

Payloads of Kafka messages are seen within the file Figure 6. The payloads of Kafka messages are seen within the file

Summary and next steps

We hope you learned more about how to integrate IBM products with Apache Flink and Cloudera SQL Stream Builder. Let’s stay in touch by visiting the IBM Community to post questions and talk to our experts.