2021 Call for Code Awards: Live from New York, with SNL’s Colin Jost! Learn more

Get hands on experience with an IBM Event Streams Java sample application

To get some hands on experience with a sample application, we are going to review the Java Console sample in the event-streams-samples repository.

In this tutorial, we focus specifically on the Local Development sample.

We will look at how to write client code, so that you learn how to produce and consume messages from Apache Kafka.

Prerequisites

Overview of the Java sample

We are not going to walk through every line of code in the sample in this tutorial, but it is worth explaining the structure of the code in the sample.

The code can be found in the kafka-java-console-sample folder in the event-streams-samples GitHub repo.

Main method: EventStreamsConsoleSample.java

The EventStreamsConsoleSample.java file contains the main method. It does the following:

  • Parses and validates the command line arguments.
  • Checks that the topic that the sample is working against exists, and creates it if it does not exist.
  • Starts the clients on different threads. The sample can either run a producer, a consumer, or both at once. They are started and will run until the user cancels them (using Ctrl-C).

Producer

A producer is an application that publishes streams of messages to Kafka topics. You learned all about producers in our “Apache Kafka fundamentals” article.

Producer Configuration properties

For this sample, the producer configuration is built in EventStreamsConsoleSample.java in the getProducerConfigs() method, which builds on some common configuration that is used across all clients and sets a small number of producer-specific configuration properties.

These configuration properties are worth noting:

  • Serializers (and deserializers)

      configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // key.serializer
    
      configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value.serializer
    

    These are the serializers used for the message key and value that is produced. In the sample, a simple string is used for both so the Kafka-provided StringSerializer gives us what we need. Note the consumer must have matching deserializers.

  • Acknowledgements (acks)

      configs.put(ProducerConfig.ACKS_CONFIG, "all"); // acks
    

    With acks set to all the producer requires all in sync replicas to have received the message. The leader will send acknowledgement only when all in sync replicas have confirmed the message has been safely written. This is the most durable option, however this is at the cost of increased latency.

Read more about important producer configuration settings in the Event Streams on IBM Cloud documentation. For the complete documentation of all producer configuration see the Apache Kafka documentation. But, be warned, there are loads of configuration options that you might be tempted to change. We suggest that you stick to a few until you are comfortable with the behavior of the application.

ProducerRunnable.java

ProducerRunnable.java implements Runnable and is therefore run in its own thread.

The constructor creates a new instance of KafkaProducer based on the provided configuration.

// Create a Kafka producer with the provided client configuration
kafkaProducer = new KafkaProducer<>(producerConfigs);

The run() function is where the actual work is done. You will notice that the thread runs in a while loop, checking whether the application is shutting down via the closing variable.

A ProducerRecord is constructed to represent the message to be produced. In the comment, it notes that the sample application uses the default partitioner.

// If a partition is not specified, the client will use the default partitioner to choose one.
ProducerRecord<String, String> record = new ProducerRecord<>(topic,key,message);

In other cases, you may want to be in control of determining the partition yourself, which you can see an example of in the Kafka javadoc.

In the sample, the ProducerRecord is sent asynchronously, then immediately blocks waiting for the acknowledgement. This is sufficient for demonstration purposes in the sample, however it is unlikely to be the required behavior in a real world application because of the performance implications. When looking at the requirements for your application, you should consider how you want your producer to behave when sending messages and processing the acknowledgement.

// Send record asynchronously
Future<RecordMetadata> future = kafkaProducer.send(record);

// Synchronously wait for a response from Event Streams / Kafka on every message produced.
// For high throughput the future should be handled asynchronously.
RecordMetadata recordMetadata = future.get(5000, TimeUnit.MILLISECONDS);

Consumer

A consumer reads messages from one or more topics and processes them. You learned all about consumers in our “Apache Kafka fundamentals” article.

Consumer Configuration

For this sample, the consumer configuration is built in EventStreamsConsoleSample.java in the getConsumerConfigs() method. This method builds on some common configuration that is used across all clients. It also has similar configuration to the producer, such as the deserializer for key, value. However, the method does set a small number of consumer-specific configuration properties, such as:

  • group.id

      configs.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-java-console-sample-group"); // group.id
    

    The group.id property controls the consumer group which this consumer is part of. It will either join an existing group or create a new one as required.

  • auto.offset.reset

      configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // auto.offset.reset
    

    The auto.offset.reset property determines what to do when the current offset for this consumer is no longer present on the server, or there is no initial offset. latest means that the current offset is automatically set to the latest offset on the partition, that is the consumer will consume from the latest records.

Read more about important consumer configuration settings in the Event Streams on IBM Cloud documentation. For full documentation of all consumer configuration see the Apache Kafka documentation. But, be warned though that there are loads of configuration options that you might be tempted to change. We suggest that you stick to a few until you are comfortable with the behavior of the application.

ConsumerRunnable.java

Like the producer, ConsumerRunnable.java implements Runnable and is therefore run in its own thread.

The constructor create a new instance of KafkaConsumer based on the provided configuration.

// Create a Kafka consumer with the provided client configuration
kafkaConsumer = new KafkaConsumer<>(consumerConfigs);

Again, like the producer, most of the logic is inside the run() function with logic to identify if the application is being shutdown.

The consumer polls to see if there are any ConsumerRecords available. This is a collection and all available messages will be returned. If nothing is received within 3 seconds, the consumer finishes the poll() and logs that there were no messages consumed.

// Poll on the Kafka consumer, waiting up to 3 secs if there's nothing to consume.
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(3000L));

if (records.isEmpty()) {
    logger.info("No messages consumed");
}

If the consumer did receive some messages, the sample application simply loops over each message and prints the contents of each one.

for (ConsumerRecord<String, String> record : records) {
    logger.info("Message consumed: {}", record);
}

Time to get creative?!

Now that you understand more about the Java sample and how the clients work, it is time to play around with the code a bit. Download the code, and navigate into the kafka-java-console-sample folder, explore the docs, and get ready to play with the code.

Start by experimenting with starting and stopping the clients independently. What happens if you stop consuming messages for a period then start again?

How about you try modifying the client code? What would happen if the consumer was doing some lengthy processing of each message it reads? You could recreate this by adding a sleep to see what happens?

Review the configuration of the clients to see what effects each of these have on the clients:

Summary and next steps

In this tutorial, we walked through the Java sample application and learned what it does. Hopefully, you also played around with the sample code and gained more understanding about how producer and consumer code works.

You’re now ready to take on the IBM Event Streams coding challenge and write a consumer app.

Or, perhaps you’re ready to learn how to debug your application.