The new Kafka Toolkit enables Streams applications to easily integrate with Apache Kafka. This enables Streams applications to subscribe and publish to Kafka topics with little effort.

Introduction

Apache Kafka is a distributed streaming platform, similar to a message queue. It enables applications to build real-time streaming data pipelines in order to move data from one system to another with ease. The main abstraction in Kafka is the concept of a topic. A topic is similar to a queue in a messaging system whereby publisher applications can write records (messages) to the topic and consuming applications can read records from the topic. However, unlike most queues in a messaging system, the records written to a Kafka topic are persisted for a duration of time. This persistence allows consumer applications to start consuming from the topic at an arbitrary point, as opposed to only being able to consume starting from the latest messages.

This distinction between a topic and queue allows applications to use Kafka as more than just a simple message delivery system. For example, applications can use a Kafka topic to store rolling log data, which can later be retrieved in it’s entirety. Applications can also use Kafka topics as a means to store application state information. In the event that the application crashes, this state data can be retrieved and restored.

With that said, it is easy to see how Apache Kafka is a natural fit in Streams applications. Since Kafka has been designed to handle high-speed streaming data, application developers can integrate with Kafka without worrying about it being the bottleneck in their pipelines. As mentioned previously, there are many scenarios where integrating Kafka with real-time, streaming data can be critical. For example, combining IBM Streams with Kafka can be useful when analyzing website activity. As users view pages and perform searches on a site, this data can be logged to a Kafka topic. A Streams application can consume and analyze this information in order to provide real-time statistics on page views and activities. A Streams application could also be designed to replay or simulate a user’s activity on a site by reading from an earlier point in the Kafka topic.

Why It’s Needed

There may be some confusion as why this toolkit is necessary since the Messaging toolkit (com.ibm.streamsx.messaging) already contains Kafka operators. The primary reason for creating a new, Kafka-specific toolkit, is because the Messaging toolkit is becoming too large and difficult to maintain. Currently, the Messaging toolkit contains operators for Kafka, MQTT, RabbitMQ, JMS and XMS. Whenever an application wants to include support for one of the message brokers, they need to load the entire toolkit including all of the libraries for each of the message brokers. This results in application bundles being larger than necessary. Furthermore, each time an operator needs to be updated to support a new message broker version, the entire toolkit needs to be upgraded, possibly resulting in a new major version. From an application perspective, it can become very confusing as to which version of the toolkit supports which broker versions.

Given the above reasons, the Kafka operators are being split from the Messaging toolkit and packaged into a separate toolkit. This will make it much easier to update and fix the operators without impacting applications not using Kafka. It also means that fixes can be quickly rolled out and easily tracked.

What’s New!

While moving the Kafka operators into a separate toolkit makes it easier for application developers to use, it also provides an opportunity to introduce some interesting features. Some of the interesting features found in the new toolkit include:

  • Kafka configs can be specified via application configurations (no need to use properties files!)
  • JAAS configuration can be specified on a per-operator instance basis using the new sasl.jaas.config Kafka configuration
    • this means that different Kafka operators can be fused into the same PE and connect to different Kafka clusters
    • no more jaas.conf files
  • Automatic determination of serializer and deserializer for keys and messages
    • no need to specify the *.serializer or *.deserializer Kafka configs
  • Default values set for client.id and group.id Kafka configs (random IDs assigned upon operator initialization)
  • Support for user-defined partitions and serializers/deserializers
  • Support for non-string based keys and messages
  • KafkaConsumer can begin consuming from either the start or end of a topic
  • Support for at-least-once message delivery
  • KafkaProducer can specify a specific partition to write messages to

Migrating to the Kafka Toolkit

Developers that are currently using the Kafka operators found in the Messaging toolkit are encouraged to migrate their applications to use the Kafka toolkit. A Migration Guide is available for developers to use to migrate their applications.

Kafka Toolkit for Streams

The Kafka Toolkit enables IBM Streams applications to integrate with Apache Kafka. The initial version of this toolkit is comprised of 2 operators that enable Streams to publish and subscribe to Kafka topics. The initial set of operators available in the toolkit are:

  • KafkaProducer
  • KafkaConsumer

A description of each of the operators is provided in the following section.

Operators

KafkaProducer

This operator is capable of publishing (or producing) records to one or more Kafka topics. Users can either provide a static list of topics to publish to or can pass in the topic via an input attribute. The KafkaProducer is also capable of publishing to a specific partition within a topic.

Automatic Serialization

The operator will automatically select the appropriate serializers for the key and message based on their types. The following table outlines which serializer will be used given a particular type:

Serializer SPL Types
org.apache.kafka.common.serialization.StringSerializer rstring
org.apache.kafka.common.serialization.IntegerSerializer int32, uint32
org.apache.kafka.common.serialization.LongSerializer int64, uint64
org.apache.kafka.common.serialization.DoubleSerializer float64
org.apache.kafka.common.serialization.ByteArraySerializer blob

Users can override this behaviour and specify which serializer to use by setting the key.serializer and value.serializer properties.

KafkaConsumer

This operator is capable of consuming records from one or more Kafka topics. The operator can be configured to either subscribe to a topic or it can be assigned to specific topic-partitions.

Automatic Deserialization

The operator will automatically select the appropriate deserializers for the key and message based on their types. The following table outlines which deserializer will be used given a particular type:

Deserializer SPL Types
org.apache.kafka.common.serialization.StringDeserializer rstring
org.apache.kafka.common.serialization.IntegerDeserializer int32, uint32
org.apache.kafka.common.serialization.LongDeserializer int64, uint64
org.apache.kafka.common.serialization.DoubleDeserializer float64
org.apache.kafka.common.serialization.ByteArrayDeserializer blob

Users can override this behaviour and specify which deserializer to use by setting the key.deserializer and value.deserializer properties.

Kafka’s Group Management

The operator is capable of taking advantage of Kafka’s group management functionality. In order for the operator to use this functionality, the following requirements must be met

  • The operator cannot be in a consistent region
  • The startPosition parameter value cannot be Beginning (must be End or not specified)
  • None of the topics specified by the topics parameter can specify which partition to be assigned to

In addition to the above, the application needs to set the group.id Kafka property in order to assign the KafkaConsumer to a specific group.

Kafka Properties via Application Configuration

Users can specify Kafka properties using Streams’ application configurations. Information on configuring application configurations can be found here: Creating application configuration objects to securely store data. Each property set in the application configuration will be loaded as a Kafka property. For example, to specify the bootstrap servers that the operator should connect to, an app config property named bootstrap.servers should be created.

What’s next…

The recent release of Kafka v0.11 introduces some exciting new features. This includes features such as:

  • Exactly-once and transactional message delivery
  • Support for record headers

The next major version of the Kafka toolkit will be based on Kafka v0.11 in order to take advantage of these new features. The next release of the toolkit is tentatively scheduled to be released in July-August 2017. Stay tuned!

Download the toolkit

The toolkit is open source and available on Github.

Samples

Samples demonstrating how to use the operator can be found in the samples directory in the streamsx.kafka repository.

Join The Discussion