By Ai Lian Mi (miailian@cn.ibm.cn), Ya Peng Dong (dypdong@cn.ibm.com), and Dian Wang (wangdian@cn.ibm.com) – CICS Transaction Server Software Engineers.
Today, Kafka is widely adopted by numerous businesses around the world as one of the most popular distributed streaming platforms. Now you can also send messages from CICS to a Kafka server. In this blog, we’ll introduce how a Liberty Kafka client running in CICS Liberty sends messages to the Kafka server by calling the Kafka client API.
1. Kafka and IBM Event Streams
Apache Kafka is a distributed streaming platform that is used to publish and subscribe to streams of records. It originated at LinkedIn and became an open-sourced Apache project in 2011. Being fast, scalable, durable, and fault-tolerant, Kafka has seen explosive growth over the years. Today, it’s used in production by more than a third of the Fortune 500.
The idea of integrating Kafka with CICS derives from multiple requests from our customers, expressing their desire to have Kafka connection from CICS implemented. After all, their business is inside CICS, and are gradually seeing the need to send their data into a Kafka server.
We’ve also adopted IBM Event Streams in the solution. It is a high-throughput, fault-tolerant, publish-subscribe technology for building event-driven applications, based upon the Apache Kafka project.
Currently three mechanisms are available to transfer data from the mainframe to IBM Event Streams:
- The Liberty Kafka client running in CICS
Our project shows this case, which involves a Liberty JVM server running in CICS. We set up a Liberty Kafka client that is actually a Java application running in CICS Liberty. This client sends the messages to the Kafka server by calling the Kafka client API.
- The MQ connector
If you have an IBM MQ license, you can use our MQ Event Streams connectors to connect to IBM Event streams.IBM has created a pair of connectors, available as source code or as part of IBM Event Streams. An MQ source connector is used to copy data from IBM MQ into Event Streams or Apache Kafka, and an MQ sink connector is used to copy data from Event Streams or Apache Kafka into IBM MQ. The connectors themselves can run either under USS, on zLinux, or distributed OSs.
- Inbound REST APIs
The REST API allows multiple systems, including z/OS Connect EE, to push data into IBM Event Streams. This means we can push events from any mainframe system into IBM Event Streams with the RESTful API. The REST API is one-way only – i.e. from Z to IBM Event Streams.
2. Project background
Some vendors are seeking for new channel application solutions in the banking industry.
A Thai bank, worked with one business vendor to develop a banking channel application based on the Kafka server. Their solution relied on the mainframe data and how it was going to send messages from CICS to the Kafka server. It was agreed that the best solution was to send messages directly from CICS to the Kafka server.
In response to the request, we developed the Liberty Kafka client and delivered a POC (Proof of Concept) at the customer’s site.
3. Project architecture
Our whole solution consists of the following:
- A COBOL program running in CICS region responsible for sending the application data. The COBOL program calls the Java program running in the CICS Liberty JVM server through EXEC CICS LINK. The data is transferred via CICS channels and containers.
- Annotating a Java method and configuring the Liberty JVM server to invoke Java EE applications from CICS programs.
The Java program in Liberty implements the following logic:
- Getting the data from CICS channels and containers using the JCICS API.
- Calling the DFHJSON program to convert the application data into JSON.
- Implementing the Kafka producer client to send the JSON data to the Kafka server by calling the Kafka client API.
4. Link to Liberty
With link to Liberty, you can invoke a Java EE application running in a Liberty JVM server either as the initial program of a CICS transaction or by using the LINK
, START
, or START CHANNEL
commands from any CICS program. Link to Liberty allows COBOL, PL/I, or even assembler applications to easily utilize the Java EE functions in a standard manner.
We use the link to Liberty feature to expose our Java application to a CICS program. The steps below are needed.
- Configuring a Liberty JVM server
To configure your Liberty JVM server to support linking to Java EE applications, add the cicsts:link-1.0
feature to server.xml. Ensure that you add the feature before deploying Java applications.
- Preparing a Java EE application
For a Java EE application to be linked to by a CICS program, it needs to be a plain Java object (POJO) and annotated to enable the required Java method.
public class LinkToLiberty {
@CICSProgram("PUTQ")
public void putQ()
{
try
{
//JCICS example, Get data from Channel/Container.
Channel currentChannel =Task.getTask().getCurrentChannel();
if (currentChannel != null)
{
Container conHead = currentChannel.getContainer("HEAD");
if (conHead != null)
{
byte[] dataArry =conHead.get();
Container conPayload = currentChannel.getContainer("PAYLOAD");
if(conPayload !=null)
{
byte[]payloadData = conPayload.get();
}
// TODO: we will use the data in container here
}
}
}
}
}
In the code above, we annotate the putQ()
method with the @CICSProgram
annotation, specifying its parameter to match the name of the PROGRAM resource to be created, e.g. @CICSProgram("PUTQ")
. When we enable the bundle resource for this Java EE application, CICS will automatically create a PROGRAM
resource, whose name will be the same as specified in the annotation’s parameter. Channels and containers are used to pass data between Java and COBOL programs. In our case, the program obtains the current channel in the beginning. Then, the data is passed in by the COBOL program from a channel using getcontainer()
.
5. Transforming application data to JSON by linking to DFHJSON
The JSON transformer linkable interface DFHJSON
is a CICS-supplied program that can be called to perform transformation between application data and JSON. Your application program can transform application data to JSON by linking to DFHJSON
.
You must have an enabled JSONTRANSFRM
bundle resource that defines the JSON binding and JSON schema. As we start with a JSON schema, we can run a DFHJS2LS
batch job to create the data mapping for the application record.
You’ll complete a procedure similar to what’s listed below:
- The application program must create a channel, for example MyJSONChan, and put the following containers into the channel.
DFHJSON-DATA
DFHJSON-TRANSFRM
- Use the LINK command to transform the data to JSON:
EXEC CICS LINK PROGRAM('DFHJSON') CHANNEL(‘MyJSONChan')
- Get container
DFHJSON-ERROR
and check if any errors occurred during the transformation.
- Get container
DFHJSON-JSON
and make use of the JSON in your application.
Your code snippet will look like this:
// 1. Create a channel MyJSONChan.
jsonChan = Task.getTask().createChannel("MyJSONChan");
// Put the contents of the language structure into the DFHJSON-DATA container.
Container conJsonData = jsonChan.createContainer("DFHJSON-DATA");
conJsonData.put(dataArry);
// Put the name of the BUNDLE resource into the DFHJSON-TRANSFRM container.
Container conTransfrm = jsonChan.createContainer("DFHJSON-TRANSFRM");
conTransfrm.putString("OUTP");
// 2. Issues an EXEC CICS LINK request to the program DFHJSON to do the transformation.
Program p = new Program();
p.setName("DFHJSON");
p.link(jsonChan);
// 3. Check the DFHJSON-ERROR container to see if there is any error.
Container bitconJsonErr = jsonChan.getContainer("DFHJSON-ERROR");
if (bitconJsonErr != null)
{
byte[] ba = bitconJsonErr.get();
ByteBuffer bb = ByteBuffer.wrap(ba);
int cicsrc = bb.getInt();
logger.error("error happens for CICS Json parser! DFHJSON-ERROR="+cicsrc);
//Check the DFHJSON-ERROR container to get more errors messages.
Container conERRORMSG = jsonChan.getContainer("DFHJSON-ERRORMSG");
if (conERRORMSG != null)
{
String ERRORMSG = conERRORMSG.getString();
logger.error("DFHJSON-ERRORMSG="+ERRORMSG);
}
} else {
// 4. Get the JSON message from the DFHJSON-JSON container.
Container conJson = jsonChan.getContainer("DFHJSON-JSON");
if (conJson != null)
{
sJson = conJson.getString();
}
}
6. Performance test
We use pooled objects to improve performance. When they’re not used, multiple objects will be created whenever data is sent to Kafka. The pooled objects have boosted performance significantly by reusing objects that are too expensive to create.
In our case, we call the KafkaProducer API to send the JSON messages to a Kafka topic “ACCOUNT-BALANCES- INBOUND-TOPIC
”. The message is sent asynchronously, and a CallBack class is used to implement the onCompletion()
method:
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_LIST);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> data = new ProducerRecord<String, String>(TOPIC, sJson);
producer.send(data, new SimpleCallBack(startTime, 0, sJson));
The creation of a new producer (KafkaProducer) can be costly because every time data is sent to Kafka, the process is New KafkaProducer open-> Send -> Close. Whenever you open, the Kafka client connects to the Kafka server to read meta data, compose messages, and sent again. This can be a cumbersome process so to avoid this kind of problem, we configured the Kafka object pool.
The Object Pool is a design pattern that allows reuse of expensive objects. We use Apache Commons Pool (org.apache.commons.pool2
) to create our own object pool. Basically, what we create is a pool to store KafkaProducer objects (pooled objects). As a result, the Liberty application borrows KafkaProducer by calling pool.borrowObject when sending the data, and then returns KafkaProducer back to the pool by calling pool.returnObject after it finishes sending the data.
pool = new GenericObjectPool<KafkaProducer<String, String>>(new ProducerFactory(), config);
producer = ProducerListener.pool.borrowObject();
producer.send(data, new SimpleCallBack(startTime, 0, sJson));
ProducerListener.pool.returnObject(producer);
The project is deployed to the development system at the customer’s site with a great performance result – 3,000 records are sent in one minute, and the average response time is as short as 0.0045 seconds.
Interested? You can try implementing our solution in your system. Comment below with any questions; we’d be more than happy to help!