Introduction

In this article, I will describe the new KafkaProducer and KafkaConsumer nodes that have been provided in IBM Integration Bus 10.0.0.7. I will show how you can publish messages to a topic on IBM Message Hub and consume messages from that topic.

Overview of Apache Kafka

Apache Kafka is a distributed streaming platform which allows applications to publish and subscribe to streams of records. Kafka is architected as a cluster of one or more servers. A stream of records is called a ‘topic’. A ‘Producer’ can publish messages to a topic. A ‘Consumer’ can subscribe to a topic and consume the messages that were published.

picture180

The above diagram shows that when messages are published on a Kafka topic, the messages are added to the tail of the log. As each message is published on the topic, it is identified by the ‘offset’ of the message in the log. All consumers retrieve messages from the same log, and as messages are consumed they are not destroyed but remain in the log for the pre-defined retention period. The offset is used by consumers to identify their position when receiving messages. The retention period can be defined by time, or by log size.

When a consumer starts, it must tell Kafka from where in the commit log it wishes to start receiving messages. This can either be the earliest available message or the latest message. When a consumer disconnects and then re-connects and it wishes to see all messages published while it was disconnected, then it must remember the offset of the last message it consumed.

picture181

To balance load and achieve scalability, Kafka allows topics to be divided into ‘partitions’.
Each partition is an ordered, immutable sequence of records that is continually appended to in a structured commit log.

By defining a topic to have multiple partitions, messages published on the topic will be distributed equally amongst the partitions. Consumers in a consumer group will also split equally across the partitions. Kafka manages the distribution of messages and the assignment of consumers.

For more information, see the Apache Kafka documentation.

IBM Message Hub

IBM Message Hub is a scalable, distributed, high throughput messaging system, built on Apache Kafka. This can be used to stream data to analytics to realize powerful insights. IBM Message Hub uses a set of credentials which Producer and Consumer applications must use to publish or consume messages from a topic. For more information see IBM Message Hub.

New Kafka Nodes

In IBM Integration Bus 10.0.0.7, we have provided 2 new Kafka nodes which can be used for integration solutions which require interactions with topics on a Kafka Cluster. The nodes are in a new Kafka drawer in the toolkit.

picture108The KafkaProducer node allows you to publish messages to a topic on a Kafka server.

 

 

picture109The KafkaConsumer node allows you to consume messages from a topic on a Kafka server.

 

KafkaProducer Node

The Kafka Producer node can be used to publish messages from a message flow to a Kafka topic. The node publishes messages non-transactionally to the Kafka topic. The effect of this on a message flow is that if an error occurs downstream in your message flow which causes the message to be rolled back, the publication of the message to the Kafka topic is not rolled back. You can use an acks property to specify if confirmation is needed from the Kafka server for the message that was published. If you wish, you can set acks to 0 which means that the KafkaProducer node does not wait for any acknowledgement.

KafkaConsumer Node

The KafkaConsumer node can be used to subscribe to a Kafka topic so that messages can be consumed and processed by the message flow. A KafkaConsumer node can subscribe to only a single Kafka topic. If the topic contains multiple partitions, then the KafkaConsumer node will receive messages from any of those partitions. The KafkaConsumer node can be defined to be part of a Consumer Group. This allows you to increase concurrency. If you deploy multiple KafkaConsumer nodes that share the same Group ID, Kafka will ensure that messages that are published on the topic are shared across the consumer group.

Security

All Kafka nodes that are deployed to the same integration server must use the same set of credentials to authenticate to the Kafka cluster. To save the credentials that the Kafka nodes will use to connect to the Kafka cluster, you use the mqsisetdbparms command to configure the resource name in the form kafka::KAFKA::integrationServerName.

Example using Kafka nodes with IBM Message Hub

The diagram below shows the interaction between two IIB message flows and IBM Message Hub using the KafkaProducer and KafkaConsumer nodes.

picture112

The first message flow will receive HTTP messages and publish it to a topic on Message Hub using the KafkaProducer node. The second message flow will subscribe to the same topic on Message Hub and consume messages from it and write them out to a file on the file system

The following steps will take you through how to construct the scenario.

Flow Development Steps

Step 1: Add the Message Hub service in Bluemix

picture1

Step 2: Create a topic in Message Hub

  • Open the Message Hub service by clicking on it in your list of services.
  • Create a topic in Message Hub by clicking the button shown below:

picture2

picture1

Step 3: Make a note of your Message Hub connection and credentials

  • Click on ‘Service Credentials’ and then click on ‘View Credentials’ under the ACTIONS column.

picture11

  • Make a note of the value of the server, user and password.

picture12

Step 4: Store the Message Hub credentials in IBM Integration Bus

  • Open an IIB Command Console.
  • Create two Integration Servers called Server1 and Server2.
  • Store the Message Hub credentials for both Integration Servers using the mqsisetdbparms command as follows:
mqsisetdbparms IIB_NODE -n kafka::KAFKA::Server1 -u username -p password

mqsisetdbparms IIB_NODE -n kafka::KAFKA::Server2 -u username -p password

where username and password are the values that you copied from the Message Hub service credentials.

  • Restart your Integration Node.

Step 5: Create the message flows

  • Create two separate applications called KafkaProducerApp and KafkaConsumerApp

picture13

  • In the KafkaProducerApp, add a message flow called ProducerFlow and add these nodes:

picture18

  • Configure the KafkaProducer node as follows:
    • On the Basic Tab, specify the topic name and bootstrap server, using the value of the server that you copied from the Message Hub service credentials.

picture19

  • On the Security Tab, specify ‘SASL_SSL’ as the Security protocol and leave the default value of ‘TLSv1.2’ for the SSL protocol.

picture21

  • Specify a Path suffix for the URL for the HTTP Input node.

picture31

  • In the KafkaConsumerApp, add a message flow called ConsumerFlow and add these nodes:

picture41

  • Configure the KafkaConsumer node as follows:
  • On the Basic Tab, specify the topic name, bootstrap server and a value for the Consumer group ID. Specify ‘latest’ as the Default message offset.

picture51

  • On the Security Tab, specify ‘SASL_SSL’ as the Security protocol and leave the default value of ‘TLSv1.2’ for the SSL protocol.

picture61

  • Specify a directory and file name on the FileOutput node.

picture71

Step 6: Deploy the applications

  • Deploy the ProducerFlow to Server1 and the ConsumerFlow to Server2 using the Flow Exerciser.
  • Send in a message.
  • If you click on the point after the KafkaProducer node, you will see details for where the message was published to in the WrittenDestination section of the LocalEnvironment.

picture81

  • If you click on the point after the KafkaConsumer node, you will see details for where the message was consumed from in the Kafka.Input section of the LocalEnvironment.

picture101

  • Notice that the offset is 0, indicating that this is the first message that has been consumed.
  • You will see that the file that you specified in the FileOutput node has been created with the text that you sent to the Producer flow.

Step 7: Check Activity Log in WebUI

  • Open the IIB Web User Interface.
  • Select the ProducerFlow and click on the Activity Log tab.
  • You will see an entry for the message that was published.

picture102

  • Now select the ConsumerFlow and click on the Activity Log tab.
  • You will see an entry for the message that was consumed.

picture103

Summary

In this article, I have described an overview of Apache Kafka and shown how to use the new KafkaProducer and KafkaConsumer nodes to publish and consume messages from a topic on IBM Message Hub. The Kafka nodes can also be used with any Kafka Server implementation. All Kafka nodes that are deployed to the same integration server must use the same set of credentials to authenticate to the Kafka cluster. IBM Message Hub uses SASL_SSL as the Security Protocol. For more information, see the IBM Integration Bus v10 Knowledge Center.

Ben Thompson’s recent article Explore the new features of IBM Integration Bus 10.0.0.7 provides information about further enhancements that have been provided.

You may wish to watch my video, ‘Introducing the new KafkaConsumer and KafkaProducer nodes’ which demonstrates a scenario to show how they can be used with IBM Integration Bus.

 

5 comments on"Using the new Kafka Nodes in IBM Integration Bus 10.0.0.7"

  1. Hi,

    When running the producer flow with the above configuration I get the below error

    Error sending request to http “http://localhost:7800/publish”

    SOAP-ENV:Server
    BIP3113E: Exception detected in message flow KafkaProducer (integration node TESTNODE_root)
    http://localhost:7800/publishTopic
    Exception. BIP2230E: Error detected whilst processing a message in node 'KafkaProducer.KafkaProducer'. : F:\build\S1000_slot1\S1000_P\src\DataFlowEngine\TemplateNodes\ImbOutputTemplateNode.cpp: 303: ImbOutputTemplateNode::processMessageAssemblyToFailure: ComIbmOutputNode: KafkaProducer#FCMComposite_1_1
    BIP3895E: Failed to initialise Kafka output connector. Reason ''Failed to construct kafka producer''. : ContainerServices.java: 138: throwException: :

    What could have gone wrong???

    • SanjayNagchowdhury September 01, 2017

      Hi Vijay,

      There isn’t enough information in your comment for us to give you direct assistance. I suggest that you check the Event Log to see if there is further information which helps you diagnose where you have gone wrong. Otherwise, I advise you to raise a PMR and work with our Service team to resolve the issue.

      Thanks

      Sanjay

      • Hi Sanjay

        Thanks for taking time to respond to my query.Let me provide more details which I have been trying now.

        Basically I wanted to send a message to my local q – make it transfer to message hub – use a consumer flow to pick it up

        1) Secure gateway is successfully established between my laptop to bluemix ( running successfully with no errors)
        2) Created a topic Called MQLight in message hub
        3) Created a bridge to my Local Q using the topic created above
        4) Successfully the bridge is connected to my local q, I could see input q count as 1 in my explore
        5) Created a Kafka consumer to listen to the MQlight topic ( I am not sure if this will work, since the bridge says unidirectional, let me know if it has to be done other wise..nevertheless there is connection issue to my message hub)
        6) Copied the server , user and password from credentials for message hub and used in my kafa consumer node
        7) also ran mqsisetdbparm to set the kafka credentials to my integration server, the reportproperties also confirms the same ( as per the article , but the error shows something like ” KAFKA’. : login” )
        *****************************************************************************************************
        mqsisetdbparms MYNODE -n kafka::KAFKA::default -u 9kVhxRHGNiuFxxOf -p XXXXXXXXX

        C:\Program Files\IBM\IIB\10.0.0.9>mqsireportdbparms MYNODE -n *
        BIP8180I: The resource name ‘jdbc::HRDB’ has userID ‘db2admin’.
        BIP8180I: The resource name ‘jdbc::HRDB_SecurityId’ has userID ‘db2admin’.
        BIP8180I: The resource name ‘kafka::KAFKA::default’ has userID ‘9kVhxRHGNiuFxxOf’.
        ******************************************************************************************************
        Below error is shown in event log

        ( MYNODE.default ) Java node error: ‘[BIPmsgs:3883]BIP3883E: The Kafka client failed to connect to the Kafka server as the security credentials ‘KAFKA’ have not been configured for the Integration Server.

        The Kafka client requested a userid and password in order to authenticate the connection to the Kafka server but no Kafka security credentials have been configured for the Integration Server.

        Use the mqsisetdbparms command to configure the security credentials for the Integration server using the resource name ‘KAFKA’. : login’.

        Error message generated by user Java node.

        Contact the node provider for further details.

  2. Allen Schmutzler July 27, 2017

    Sanjay… are there any performance considerations/implications or testing that has been done

Join The Discussion

Your email address will not be published. Required fields are marked *