Managing IBM Event Streams topics using the Kafka Admin API

When developing distributed applications in the cloud, secure and efficient communication between them is vital and challenging. In modern cloud architectures, this is usually achieved by using a messaging infrastructure. IBM Event Streams provides a fully managed enterprise messaging and event streaming service that is powered by the industry standard Apache Kafka.

Before we go any further, let’s first describe what a broker and topic are. Brokers and topics are the core concepts of Kafka and will be mentioned throughout this tutorial. Kafka runs on a cluster of one or more servers which are called brokers. The Kafka broker is responsible for receiving messages from producing applications and writing them onto the disk as well as fetching and sending those messages to the consuming applications. The stream of messages are stored in categories called topics and messages in a single topic are distributed across brokers in a cluster. That’s enough for now, but you can learn more about the concept of topics by reading the Kafka documentation.

This tutorial introduces a robust Admin interface that is provided by Kafka, the Admin API, which is an API that implements a group of primitives directly from the broker that provides developers with a simple way to manage topics. (This API was previously called “AdminClient”.)

The Admin API provides many features and this tutorial will explain some of the most common ones. You’ll learn how to:

  • Create a new topic
  • Query configuration settings of a topic
  • Add partitions to an existing topic
  • Alter topic configuration, such as the retention period
  • Delete a topic

Learning objectives

In this tutorial, you’ll learn how to manage topics and their configurations.

Prerequisites

None.

Estimated time

You can complete this tutorial in about 30 minutes.

Steps

1. Set up your development environment

First, you need to follow the steps in the Getting Started with Event Streams tutorial to deploy an Event Streams service and create the service credentials.

You can use any of the available Kafka clients, but for the purposes of this tutorial, I will be using the Java client. Make sure that you have the latest version of the Kafka client. You can also use the client libraries that are downloaded when building our sample application.

2. Connect to your IBM Event Streams service instance

First, you need to create the client configuration to connect to your Event Streams service instance.

In the client configuration, you must specify the API key and Kafka broker addresses, which you can find from the service credentials that are created in the IBM Cloud console.

The following Java code is an example of creating a connection to an Event Streams service.


private static final String brokers  = "broker0:9093,broker1:9093,broker3:9093";

private static final String apiKey ="myapikey";


Properties configs = new Properties();


configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);

configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");

configs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

configs.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + apiKey + "\";");

configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");

configs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2");     

configs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");


Admin admin = AdminClient.create(configs);

3. Create a topic

Two key features of topics that you need to understand are topic partitioning and replication factor.

A topic is split into partitions which is a way of spreading your data across several brokers inside your Event Streams service instance. The use of partitions is one of the reasons why Kafka can achieve such high-throughput and low latency performance.

Also, every partition has something called a replication factor, which is basically the number of brokers that your messages are replicated across. The replication factor gives you the benefit of resilience because if one broker becomes unavailable, you will still be able to consume your messages from the other broker.

In this example, I will create a topic with 1 partition and a replication factor of 3.


private static final String topicName = "test-topic";

NewTopic newTopic = new NewTopic(topicName, 1, (short) 3);

CreateTopicsResult createTopicsResult = admin.createTopics(Collections.singleton(newTopic));

4. List topics

With the Admin API, you can list topics and get the metadata for a specific topic.

The following example code shows how you can list topics in your Event Streams instance:


ListTopicsResult listTopicResult = admin.listTopics();

System.out.println(listTopicResult.names().get(60L, TimeUnit.SECONDS));

Output:

Output that shows the list of topics

5. Review topic metadata

The broker stores metadata for each topic which contains important information, such as number of partitions it has, the replication factor for each of those partitions, and which replicas are in sync.

Let’s now explore the metadata of the topic we just created by calling the describeTopics() function:


DescribeTopicsResult describeTopicResult = admin.describeTopics(Collections.singleton(topicName));

System.out.println(describeTopicResult.all().get(60L, TimeUnit.SECONDS));

Output:

Output that shows the topic metadata

As you can see in this output, it also shows which replica is the leader for a partition. Your application always produces and consumes topics from the leader. This example shows that the topic partition 0 on kafka-2 is the leader. If kafka-2 becomes unavailable for some reason, another broker that has the replica of the partition will be elected as a leader.

6. Add partitions to an existing topic

In some cases, you might want to increase the number of partitions on your existing topic. By adding partitions, you can scale your topic as the amount of data that you are publishing to the topic grows or when you need to achieve more parallelism and better performance. The Admin API makes it very easy to add partitions to an existing topic.

However, you should take care when increasing the number of partitions if messages are produced with keys. In this scenario, it changes the partitioning of that topic. When producing a keyed message, Kafka maps the message to a partition based on the hash of the key that provides a guarantee that messages with the same key always arrives at the same partition. However, if the number of partitions changes, there is no longer such a guarantee.

Also please note that after you have added the partitions, they cannot be deleted. To reduce the number of partitions, you would have to delete the topic and recreate it with the desired number of partitions.

Now let’s see how we can add more partitions to our topic, and then query the metadata again to see the change.


Map <String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(topicName, NewPartitions.increaseTo(3));

CreatePartitionsResult createPartitionsResult = admin.createPartitions(newPartitions);

createPartitionsResult.all().get(60L, TimeUnit.SECONDS);

DescribeTopicsResult describeTopicsResult = admin.describeTopics(Collections.singleton(topicName));

System.out.println(describeTopicsResult.all().get(60L, TimeUnit.SECONDS));

You will see that there are now 3 partitions in the metadata returned.

Output of adding partitions

7. Change topic configurations

The Admin API is a great tool to manage your topic configurations. It allows you to view all the configurations set for your topic. You also can change the following configuration settings for topics in IBM Event Streams:

  • cleanup.policy
  • retention.ms
  • segment.bytes
  • segment.ms
  • retention.bytes
  • segment.index.bytes

I’m going to retrieve the configuration for my test topic. Because I’ve just created this topic, it has the default configuration set by IBM Event Streams.


DescribeConfigsResult describeConfigsResult = admin.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName)));

Map<ConfigResource, Config> topicConfigs = describeConfigsResult.all().get(60L, TimeUnit.SECONDS);

System.out.println(topicConfigs.values());

A snippet of the configuration in the output:

Output of topic configuration

The retention period is one of the most important configuration settings you should consider when creating a new topic because this is how long the messages in your topic live. By default, topics in Event Streams have a 24 hours (86400000milliseconds) retention period which means after 24 hours the message is discarded and disk space is freed up.

I’m going to use the retention period as an example to demonstrate how you can change a topic configuration with the Admin API.

Let’s increase the retention period for my topic to 72 hours. Retention period is in milliseconds so first I’m converting 72 hours to milliseconds.


long retentionPeriod = TimeUnit.HOURS.toMillis(72);

ConfigEntry ce = new ConfigEntry("retention.ms", String.valueOf(retentionPeriod));

AlterConfigOp aco = new AlterConfigOp(ce, AlterConfigOp.OpType.SET);

ConfigResource cr = new ConfigResource(Type.TOPIC, "test-topic");

Map<ConfigResource, Collection<AlterConfigOp>> updatedConfig = new HashMap<>();

updatedConfig.put(cr, Collections.singleton(aco));

AlterConfigsResult acr = admin.incrementalAlterConfigs(updatedConfig);

acr.all().get(60L, TimeUnit.SECONDS);

We can validate that the retention period has been updated by describing the topic configuration again. This time, we will only output the retention period instead of all the configuration settings.



DescribeConfigsResult describeConfigsResult =   admin.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName)));

Map<ConfigResource, Config> topicConfigs = describeConfigsResult.all().get(60L, TimeUnit.SECONDS);

ConfigEntry retentionPeriodConfig = topicConfigs.values().iterator().next().get("retention.ms");

long retentionPeriod = TimeUnit.MILLISECONDS.toHours(Long.parseLong(retentionPeriodConfig.value()));

System.out.println("Retention period for " + topicName + " is " + retentionPeriod + " hours");

Output:

Output of changing the topic configuration

8. Create a topic with a non-default configuration

You can also set these configuration settings when calling the createTopics() function instead of changing them after you’ve already created the topics.

Let’s create another topic that has a log compaction set as an example. Log compaction is a mechanism that retains the last produced value for each message key in a single topic partition.


NewTopic anotherTopic = new NewTopic("test-topic-2", 1, (short)3);

anotherTopic.configs(Collections.singletonMap("cleanup.policy", "compact"));

CreateTopicsResult createTopicResults = admin.createTopics(Collections.singleton(anotherTopic));

createTopicResults.all().get(60L, TimeUnit.SECONDS);

9. Delete a topic

In some rare cases, you might want to delete a topic in order to remove all of the messages stored on disk under this topic.



DeleteTopicsResult deleteTopicsResult = admin.deleteTopics(Collections.singleton(topicName));

deleteTopicsResult.all().get(60L, TimeUnit.SECONDS);

Summary

This tutorial described how to perform some of the most common topic management tasks using the Kafka Admin API.

For more details about the Java Admin API, see the Javadoc.