IBM Support

Getting Started with Apache Kafka - Hadoop Dev

Technical Blog Post


Abstract

Getting Started with Apache Kafka - Hadoop Dev

Body

This blog shows you how to get started with Apache Kafka version 0.9.0.1 on IOP 4.2 using console commands and Java programs to run the Kafka producer and consumer.

Pre-requisites

  • You must have IOP 4.2 installed on your cluster. For more information on how to setup IOP 4.2, see the IOP Installation Instructions
  • Using the Ambari web user interface, verify that the Kafka and Zookeeper services are running. 

The Kafka home directory in IOP is located at /usr/iop/current/kafka-broker.

Kafka Console Tools

Kafka console tools are used for creation of topics, altering topic configurations, deletion of topics, and running Kafka producer and consumer. The Kafka console scripts referenced in this article are located under /usr/iop/current/kafka-broker/bin.

Creation of Topic

Note: The Replication-factor must be less than or equal to the number of available Kafka brokers.

[root@HOSTNAME kafka]#bin/kafka-topics.sh --create –zookeeper hostname.ibm.com:2181 --replication-factor 1 --partitions 1 --topic simple-topic --config max.message.bytes=10000  Created topic "simple-topic".

Viewing list of created topics

[root@HOSTNAME kafka]# bin/kafka-topics.sh --list --zookeeper hostname.ibm.com:2181  ambari_kafka_service_check  simple-topic

Getting information about topic

[root@HOSTNAME kafka]# bin/kafka-topics.sh --describe --zookeeper hostname.ibm.com:2181  Topic:simple-topic	PartitionCount:1	ReplicationFactor:1	Configs:max.message.bytes=10000  Topic:simple-topic	Partition: 0	        Leader: 1001	        Replicas: 1001	Isr: 1001

Altering topic
Kafka allows you to alter the following topic properties by passing in  “–alter” to kafka-topics.sh.

    1. Partition count
    2. Replication factor
    3. Topic configuration

Understanding Partitions
A Kafka topic can have multiple partitions. Each partition is identified by a unique id, and the messages can be identified by their offsets. Kafka topic messages are stored in logs under log.dir, which is configurable from Ambari. Having partitions provides the following benefits.

    1. Having multiple partitions allow logs to scale beyond a size that will fit on a single server. A single partition must fit on single server. If a topic has multiple partitions, it can handle huge amount of data over multiple servers.
    2. A partition is the unit of parallelism in Kafka. Writes to different partitions can be done in parallel.

More information on Kafka partitions can be found in the Kafka 0.9 documentation Kafka 0.9 documentation
Changing the partition count for topic

[root@HOSTNAME kafka]# bin/kafka-topics.sh --alter --topic simple-topic --partition 2 --zookeeper hostname.ibm.com:2181  WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected  Adding partitions succeeded!

Changing the topic configuration

[root@HOSTNAME kafka]# bin/kafka-topics.sh --alter --topic simple-topic --config max.message.bytes=100000 --zookeeper hostname.ibm.com:2181   WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.           Going forward, please use kafka-configs.sh for this functionality  Updated config for topic "simple-topic".

Delete a Kafka topic
In IOP4.2, by default the property delete.topics.enable is set to false. This setting prevents a topic deletion. To allow the deletion of topics, navigate to Kafka -> Configs -> Advanced Kafka-broker in Ambari, set the property delete.topics.enable to true, save the configuration and restart Kafka.

[root@HOSTNAME kafka]# bin/kafka-topics.sh --delete --topic simple-topic --zookeeper hostname.ibm.com:2181  Topic simple-topic is marked for deletion.  Note: This will have no impact if delete.topic.enable is not set to true.

Kafka Console Producer/Consumer

The following examples show how to run the Kafka Producer and Consumer on the console.

Kafka Console Producer

Run the following command to produce messages to “simple-topic”.

[root@HOSTNAME kafka]# bin/kafka-console-producer.sh --broker-list hostname.svl.ibm.com:6667 --topic simple-topic --producer.config producer.properties  Message 1  Message 2  Message 3  Message 4  ^c  

After sending messages, kill the console producer by pressing Ctrl+C
Note: To override the default producer configurations, specify the producer.config as part of the console producer command.

[root@HOSTNAME kafka]# cat producer.properties   acks=ALL  buffer.memory=10000  retries=0

More information about producer configurations can be found in the Kafka documentation in section Producer Config

Kafka Console Consumer

Run the following command to consume messages from “simple-topic”.

[root@HOSTNAME kafka]# bin/kafka-console-consumer.sh --new-consumer --zookeeper hostname.svl.ibm.com:2181 --topic simple-topic --from-beginning --bootstrap-server hostname.svl.ibm.com:6667 –consumer.config consumer.properties  Message 2  Message 4  Message 1  Message 3  ^CProcessed a total of 4 messages

Note: To override the default consumer configurations, specify the consumer.config as part of the console consumer command.

[root@HOSTNAME kafka]# cat consumer.properties   group.id=group-1  consumer.id=consumer-1  auto.commit.enable=true

Description of options used:

    1. new-consumer – Kafka version 0.9 introduced a new consumer API but the old consumer API is used by default. Specify this option to use the new consumer API.
    2. from-beginning – Start consuming messages from the beginning of the topic.
    3. bootstrap-server – List of Kafka brokers to connect to.

More information about consumer configurations can be found in the Kafka documentation in section Consumer Config

Getting Started with Kafka Java Producer and Consumer

The following examples show how to run Kafka producer and consumer using the Java APIs.

Pre-requisites to run the Java Program

    1. JDK version 1.7
    2. Maven version greater than 3.0

Kafka Producer Code Snippet

//Create a Producer Object:  KafkaProducer producer = new KafkaProducer(producerProperties);    for(int i = 0; i < 1000; i++) {      //Creating the producer record object with which topic it belongs to, along with Message Key and Value            // Key and Value for topic      String key = "Message "+Integer.toString(i);      String value = "Message" + Integer.toString(i);        //Creating ProducerRecord Object      ProducerRecord record = new ProducerRecord(this.topic, key, value);        //Publishing the record to topic      producer.send(record);  } 

Kafka Consumer Code Snippet:

//Create a consumer Object:  KafkaConsumer consumer = new KafkaConsumer(consumerProperties);    //Consumer subscribing to topic  consumer.subscribe(Arrays.asList(topic));    while (true) {  // Polling the consumer to get new messages for topic. Consumer waits upto 100 milliseconds for new data.       ConsumerRecords records = consumer.poll(100);  //Check, if the records retrieved is empty      if(records.isEmpty()) {          timeoutCount++;      }      else {  //Iterating the consumer records, to get the data from topic          for (ConsumerRecord record : records) {              System.out.printf(" key = %s, value = %s \n", record.key(), record.value());              messageCount++;          }          timeoutCount = 0;      }       // If Kafka consumer has not received data for 5 seconds (50 * 100ms), close the consumer.      if (timeoutCount == 50) {          System.out.println("Total No of Messages Consumed from the topic " + topic +" is " + messageCount);          System.out.println("Kafka Consumer Timeout, because no data is received from Kafka Topic");          break;      }  }

Kafka Producer Configuration

Properties producerProperties = new Properties();  producerProperties.put("bootstrap.servers", "hostname.ibm.com:6667");  producerProperties.put("acks","all");  producerProperties.put("retries", 0);  producerProperties.put("batch.size", 10000);  producerProperties.put("linger.ms", 1000);  producerProperties.put("buffer.memory",5242880);  producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Kafka Consumer Configuration

Properties consumerProperties = new Properties();  consumerProperties.put("bootstrap.servers", "hostname.ibm.com:6667");  consumerProperties.put("group.id", "consumer-group-"+groupId);  consumerProperties.put("enable.auto.commit", "true");  consumerProperties.put("auto.commit.interval.ms", "1000");  consumerProperties.put("session.timeout.ms", "20000");  consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  consumerProperties.put("auto.offset.reset", "earliest");

Running Kafka Producer/Consumer

1. Download the zip file from kafka-simple-producer-consumer
2. Create a local maven repository and install kafka-clients-0.9.0.1-IBM-3.jar. This jar is available on the Kafka cluster in the directory /usr/iop/4.2.0.0/kafka/libs.

$ mvn install:install-file -Dfile=/Users/bharatviswanadham/Kafka-Samples/kafka-simple-producer-consumer/lib/kafka-clients-0.9.0.1-IBM-3.jar -DgroupId=com.ibm.kafka -DartifactId=kafka-client -Dversion=0.9.0.1-IBM-3 -Dpackaging=jar  

In the place of –Dfile giving the path of the kafka-clients-0.9.0.1-IBM-3.jar location. For this example, I have provided my path /Users/bharatviswanadham/Kafka-Samples/kafka-simple-producer-consumer/lib/kafka-clients-0.9.0.1-IBM-3.jar.
Note: Before running, update the bootstrap.servers property in KafkaMain.java to reflect your Kafka cluster server details.

Build the kafka-simple-producer-consumer Program

 $mvn package  [INFO] Scanning for projects...   [WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-compiler-plugin is missing. @ line 29, column 21  [WARNING]   ...  [INFO]                                                                           [INFO] ------------------------------------------------------------------------  [INFO] Building kafka-examples 1.0-SNAPSHOT  [INFO] ------------------------------------------------------------------------  ...  [INFO] BUILD SUCCESS  [INFO] ------------------------------------------------------------------------  [INFO] Total time: 1.401s  [INFO] Finished at: Tue Jun 16 12:22:10 PDT 2016  [INFO] Final Memory: 13M/309M  [INFO] ------------------------------------------------------------------------

Example 1

In this example, the Kafka producer produces 1000 messages to a topic and the Kafka consumer consumes the messages from the same topic. Run the following command to start the program.

$ target/kafka-simple-producer-consumer  simple  #######################  Running Kafka Producer  Published Message to simple-topic  Published Message to simple-topic  Published Message to simple-topic		  ...  Kafka Producer Closed  #######################  #######################  Running Kafka Consumer  Running Kafka Consumer   key = Message 0, value = Message 0    key = Message 1, value = Message 1    key = Message 2, value = Message 2    key = Message 3, value = Message 3    key = Message 4, value = Message 4    key = Message 5, value = Message 5    key = Message 6, value = Message 6    key = Message 7, value = Message 7    key = Message 8, value = Message 8    key = Message 9, value = Message 9 .  ...  Total No of Messages Consumed from the topic simple-topic is 1000  Kafka Consumer Timeout, because no data is received from Kafka Topic  Consumer Closed

Example 2

In this example, the Kafka producer takes input from the user and writes data to the topic. When you enter “close”, it will shutdown the Kafka producer and the Kafka consumer will start consuming the data from the same topic. Run the following command to start the program.

$ target/kafka-simple-producer-consumer input  #######################  Running Kafka Producer, to publish user input to topic  Enter a Message to publish to kafka topic kafka-input-topic to stop enter message 'close'   Message from Keyboard  Published Message to kafka-input-topic  Enter a Message to publish to kafka topic kafka-input-topic; to stop, enter message 'close'   Message 1  Published Message to kafka-input-topic  Enter a Message to publish to kafka topic kafka-input-topic; to stop, enter message 'close'   Message 2  Published Message to kafka-input-topic  Enter a Message to publish to kafka topic kafka-input-topic; to stop, enter message 'close'   close  Kafka Producer is Closed  #######################  #######################  Running Kafka Consumer, to read messages from topic   key = kafka-input-topic, value = Message from Keyboard    key = kafka-input-topic, value = Message 1    key = kafka-input-topic, value = Message 2   Total No of Messages Consumed from the topic kafka-input-topic is 3  Kafka Consumer Timeout, because no data is received from Kafka Topic  Consumer Closed

For more information about Kafka, see the Apache Kafka documentation.

[{"Business Unit":{"code":"BU059","label":"IBM Software w\/o TPS"},"Product":{"code":"SSCRJT","label":"IBM Db2 Big SQL"},"Component":"","Platform":[{"code":"PF025","label":"Platform Independent"}],"Version":"","Edition":"","Line of Business":{"code":"LOB10","label":"Data and AI"}}]

UID

ibm16260133