The purpose of this blog is to provide help for developers who are getting started with Apache Kafka, using a Java client, or anyone who is interesting in learn how Kafka works in a very simple and practical way. After going through this article you will be able to respond questions like:

  • What is Kafka?
  • Why use the Kafka API?
  • What are producers and consumers?
  • How do I configure Kafka producers to start publishing?
  • How do I configure Kafka consumers to read messages?
  • What architecture does Kafka use?
  • What is the relation between Kafka and IBM Message Hub?

Let’s start…

What is Kafka?

Apache Kafka is an open source, distributed, partitioned and replicated commit log service. Apache Kafka is publish-subscribe messaging, rethought as a distributed commit log.
Kafka service is formed by the following components:

  • Producers
  • Consumers
  • Broker
  • Topics
  • Partitions
  • Messages

Why use the Kafka API?

If you are looking for an easy way to integrate your application with existing systems that have Kafka support, for example IBM Streaming Analytics, then use this approach.
Kafka API offers low latency and high throughput, meaning that Kafka handles hundreds of MB of writing and reading from multiple clients per second. In addition it offers a rich API.

What is IBM Message Hub?

IBM® Message Hub for IBM Cloud is an scalable, distributed, high throughput message bus to unite your on-premise and off-premise cloud technologies.
Message Hub is based on Apache Kafka version 0.10.2.1, which is a fast, scalable, and durable real-time messaging engine that is developed by the Apache Software Foundation.
If you want to know more about Message Hub, click here

Now that you know what Kafka is, you might wonder “what do I need to start working”, so here we go…

What’s required to use the Kafka API with Message Hub?

If you have all the ingredients, you’re almost ready to start cooking. Perhaps you know about the concepts, here is a brief description:

Glossary

  • Topics: where messages are published by a producer. Partitions are created within topics with the specific purpose of holding sequential messages. Topics are subscribed TO by consumers in order to read data.
  • Producers: in Kafka, producers publish message to the topics, with the option of configuring the partition that will store data.
  • Consumers: in Kafka, consumers read data from topic/partition by subscribing.
  • Messages: data that users want to publish/consume. Each message has a unique sequential id called an offset.
  • Broker: A Kafka cluster consists of one or more servers where topics are created.

Kafka design

Now, let’s put together all the pieces. You will need the producers to publish your messages to a broker, where data will be stored for 24 hours as configured, and at the other end you will need the consumers to read data from the broker. For a broader reference, let’s visualize by using the following diagrams.

Kafka architecture design

kafkaArchitectureDesign

Basic producer and consumer flow

mhubKafkaProducerConsumerFlow

At this point you have everything in order to start. So let’s cook…

Producer and consumer – Java implementation

As for every application, the first thing that you need to do is to present your credentials for authentication, and this is no exception. You must configure the environment, so for that reason take a look at the next step:

Connecting and authenticating

To connect to Message Hub. the Kafka API uses the kafka_brokers_sasl credentials, and the user and password from the VCAP_SERVICES environment variable.

Therefore you need to set the sasl.jaas.config property (0.10.21 clients or higher). For example:

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="USERNAME" \
    password="PASSWORD";

This configuration will be used by the producer and consumer.

It’s time for real coding, so follow the next sections for producers and consumers.

Writing producers

Producer properties configuration

To start working with producers, it’s necessary to set a configuration containing the required properties that will be passed as parameters to kafkaProducer. These properties must be changed based on user needs.

props = new Properties();
        props.put("key.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("bootstrap.servers","kafka05-broker.us-south.bluemix.net:9094");
        props.put("acks","all");
        props.put("block.on.buffer.full","true");
        props.put("batch.size","1");
        props.put("security.protocol","SASL_SSL");
        props.put("ssl.protocol","TLSv1.2");
        props.put("ssl.enabled.protocols","TLSv1.2");
        props.put("ssl.truststore.location","/usr/lib/j2re1.7-ibm/jre/lib/security/cacerts");
        props.put("ssl.truststore.password","password");
        props.put("ssl.truststore.type","JKS");
        props.put("ssl.endpoint.identification.algorithm","HTTPS");

For a broader reference of each property please click here

Producer Java API

Producer Class Diagram

It’s important to define whether the producer is going to publish into a topic (no matter what the partition is) or a specific topic/partition. For instance, using the first option, see the code implementation below:

Publish to a specific topic

Pass the topic name and message that will be published as a parameter of ProducerRecord:

ProducerRecord producerRecord = new ProducerRecord(
                    topic,message.getBytes("UTF-8"));

Then send to the broker:

kafkaProducer.send(producerRecord)

Putting it all together:

private void publishToTopic(){      
    String topic="mytopic13";
    String message="publish message to a topic";
    try {
      KafkaProducer kafkaProducer;
      kafkaProducer = new KafkaProducer(props);
      ProducerRecord producerRecord = new ProducerRecord(topic,message.getBytes("UTF-8"));
      RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
      //getting RecordMetadata is possible to validate topic, partition and offset
      System.out.println("topic where message is published : " + recordMetadata.topic());
      System.out.println("partition where message is published : " + recordMetadata.partition());
      System.out.println("message offset # : " + recordMetadata.offset());
      kafkaProducer.close();
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
    } catch (Exception e) {
      e.printStackTrace();
    }   
}

Publish to an specific topic/partition

Pass the topic name, partition, message key and message value that will be published as a parameter of ProducerRecord:

ProducerRecord producerRecord = new ProducerRecord(
                    topic,partition,key.getBytes("UTF-8"),message.getBytes("UTF-8"));

Note that the difference just resides in arguments added to ProducerRecord.

For more information about the producer API, please click here

Writing consumers

Consumer properties configuration

In order to start working with consumers, it’s necessary to set a configuration containing the required properties that will be passed as parameters to kafkaConsumer. These properties must be changed based on user needs.

props = new Properties();
        props.put("key.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("bootstrap.servers","kafka05-broker.us-south.bluemix.net:9094");
        props.put("group.id","test");
        props.put("enable.auto.commit","false");
        props.put("auto.offset.reset","earliest");
        props.put("auto.commit.interval.ms","1000");
        props.put("security.protocol","SASL_SSL");
        props.put("ssl.protocol","TLSv1.2");
        props.put("ssl.enabled.protocols","TLSv1.2");
        props.put("ssl.truststore.location","/usr/lib/j2re1.7-ibm/jre/lib/security/cacerts");
        props.put("ssl.truststore.password","password");
        props.put("ssl.truststore.type","JKS");
        props.put("ssl.endpoint.identification.algorithm","HTTPS");

For a broader reference of each property please click here

Consumer Java API

Consumer Class Diagram

Subscribe or assign

It’s so important to define which method will be used for message consumption. Below is a summary about the remarkable points of each one of them and when to use it:

Subscribe

  • Use when you want to consume all messages from the desired topic
  • This is automatically consumed from a topic, no matter which partitions the messages are in.
  • After subscribing, a re-balancing (re-assigning partition to a member) occurs. A consumer will coordinate with a group in order to assign partitions, this happens automatically when data consuming starts.
  • Define a List of strings of topics and then KafkaConsumer will subscribe to that list.

Assign

  • You want to define the topic/partition where messages will be consumed from.
  • This could be considered as a manual consumption as the partition is programmatically defined for the desired topic.
  • Define a List of TopicPartition and then KafkaConsumer will assign to that list.
  • It is not possible to subscribe and assign at the same time using the same consumer instance.

Consume from topic


Subscribing

In this code example kafkaConsumer will subscribe to a topic as shown below:

kafkaConsumer = new KafkaConsumer(props,new ByteArrayDeserializer(), new ByteArrayDeserializer());
String topic="mytopic13";
List topicList = new ArrayList();
topicList.add(topic);
kafkaConsumer.subscribe(topicList);

Assigning

kafkaConsumer = new KafkaConsumer(props,new ByteArrayDeserializer(), new ByteArrayDeserializer());
String topic="mytopic13";
List topicPartitionList = new ArrayList();
int partition=5;
topicPartitionList.add(new TopicPartition(topic,partition));
kafkaConsumer.assign(topicPartitionList);

Retrieve messages from topic

In order to retrieve messages, kafkaConsumer will use a “poll” method which fetches data from the topics or partitions specified, using one of the subscribed/assign APIs. This will return a ConsumerRecord if iterated. Here is a code snippet:

Iterator> it = kafkaConsumer.poll(1000).iterator();
while(it.hasNext()){
  ConsumerRecord record = it.next();
  final String message = new String(record.value(),Charset.forName("UTF-8"));
  if(record.key() != null){
    final String key = new String(record.key(),Charset.forName("UTF-8"));
    System.out.println("messageKey : " + key);
  }
  System.out.println("topic : " + record.topic());
  System.out.println("partition : " + record.partition());
  System.out.println("offset : " + record.offset());                
  System.out.println("value : " + message);
}

As described it’s possible to get data (topic, partition, offset, key, value) from ConsumerRecord.

Committing message

There are two possible ways to mark all received messages as committed.

  1. Use kafkaConsumer API, after all messages have been iterated use “commitSync”. This ensures commit offset even if the consumer crashes after its call. If you decide to use this option then disable enable.auto.commit setting to false. This option is useful for handling commits manually.
    kafkaConsumer.commitSync();
    
  2. Set “enable.auto.commit=true” and set time to “auto.commit.interval.ms=1000” to regulate frequency. Use when consumer’s offset will be committed in the background periodically.

Note: there is a risk using the second option: if the consumer crashes before committing offset for messages already processed, then a new consumer will end up repeating the process, which means messages will be retrieved twice.

Close consumer

When all messages have been processed and you want to end the consumer’s instance then you need to close as follows:

kafkaConsumer.close();

Putting it all together:

public void run(){
    kafkaConsumer = new KafkaConsumer(props,new ByteArrayDeserializer(), new ByteArrayDeserializer());
    String topic="mytopic13";
    if(isSubscribe){
      System.out.println("KafkaConsumer will subscribe");
      List topicList = new ArrayList();
      topicList.add(topic);
      kafkaConsumer.subscribe(topicList);
    }else{          
      System.out.println("KafkaConsumer will assign");
      List topicPartitionList = new ArrayList();
      int partition=5;
      topicPartitionList.add(new TopicPartition(topic,partition));
      kafkaConsumer.assign(topicPartitionList);
    }
    while(!closing){
      try{
        Iterator> it = kafkaConsumer.poll(1000).iterator();
        while(it.hasNext()){
          ConsumerRecord record = it.next();
          final String message = new String(record.value(),Charset.forName("UTF-8"));
          if(record.key() != null){
            final String key = new String(record.key(),Charset.forName("UTF-8"));
            System.out.println("messageKey : " + key);
          }
          System.out.println("topic : " + record.topic());
          System.out.println("partition : " + record.partition());
          System.out.println("offset : " + record.offset());                
          System.out.println("value : " + message);
        }
        kafkaConsumer.commitSync();
      } catch (final Exception e) {
      System.out.println("Consumer loop has been unexpectedly interrupted");
      shutdown();               
      }
    }
    kafkaConsumer.close();
}

At this point you should be able to write your own producers and consumers, to set the minimum configuration required to start publishing and reading messages from the broker.

Troubleshooting

If you face any issues, have a look at the dedicated Stack Overflow channel to see if your question has already been answered. If not, post your question and it’ll be answered either by a member of the Message Hub development team or the wider community. We encourage you to add at least these tags “message-hub” and “apache-kafka”

Useful links

Conclusion

Following this practical guide, you can easily set up Kafka with Message Hub and start publishing and consuming data. As you can see, the Kafka API for producers and consumers can be configured as required, based on your use cases.

2 comments on"Message Hub Kafka Java API"

  1. Hi,

    This is very helpful. However, when I am trying to wrap this code in an mdb and deploying on IBM BPM server, the send() method call never comes back. It keeps waiting on that statement. If I wrap the same code in java main application (a J2SE version), it runs fine.

    Can you provide any insight on this.

    Thanks
    Raj

  2. This is very helpful. Thanks a lot.

Join The Discussion

Your email address will not be published.