Apache Kafka is a popular distributed messaging system that has many use cases. In a high level picture, producers write messages to topics on Kafka, and consumers read messages from the topics and process the messages. A consumer tells Kafka which messages have been successfully processed by committing the offset of the messages within the topic. There is a concept of partitions where the messages are grouped on Kafka during both producer writing and consumer reading.

One of the questions Kafka users often encounter is the ordering of the messages, and one of the scenarios on the consumer side is the need to re-order the received messages during processing. There are various reasons the messages can be out-of-order. First of all, Kafka only guarantees message ordering within a partition, not across partitions. This places a burden on the producers and consumers to follow certain Kafka design patterns to ensure ordering. For example, the ability to partition data by key and one consumer per partition. There are cases where we want to have flexibility, or cases where we cannot follow these design patterns. One of the solutions in these cases is to let the consumer re-order the messages during processing. I will present a caching solution with Redis below.

Redis is a high performance in-memory cache with persistence options. We will use it as a cache and staging area for the messages. The message processor temporarily puts the messages in Redis as needed. When the message processor gets all the related messages it needs based on business or design logic, it re-assembles them in order, processes them and generates the output as it intends to do. Since the messages will only be stored in Redis temporarily and will be removed from the cache after they are re-ordered, the possibility that Redis runs out of memory is remote.

There can be multiple producers writing to the topic, and multiple consumers and message processors reading from the topic and interacting with the same Redis.

Let’s have a simple example to illustrate the approach. Let’s say we want to create a Hive table. The producer sends a series of messages to Kafka. There is one message payload for the table with all the table properties but not the columns. There is one separate message for each column with the column properties. There is one message for each column to link the column to the table. The messages can be out-of-order as seen by the consumer.

The logic in the message processor will be like this. When we see a column message, we put it in the Redis cache with the column’s guid as the key. (Assume we have a guid for all the entities in the messages.) When we see a table message, we check to see if all its columns are in the cache. (Assume we have put the number of columns the table expects in the table message payload.) If yes, we assemble the messages and generate the command to create the table. If not, we put the table message in Redis with the table’s guid as the key. The messages that link the columns to the table will let us look up the key-values in the cache. We can save the column-to-table relationship in the Redis cache as well. For example, the table guid as the key and a list of columns’ guid as the value. The exit point is that we have all the columns for a table and we generate the command to create the table. The no-longer-needed key-values in the cache will be removed accordingly afterwards.

As a conclusion, we can use Redis as a staging cache to re-order and re-assemble out-of-order Kafka messages in certain Kafka use cases.

Join The Discussion

Your email address will not be published. Required fields are marked *