Digital Developer Conference: Hybrid Cloud. On Sep 22 & 24, start your journey to OpenShift certification. Free registration

Developing a stream processor with Apache Kafka

Developers use event sourcing as an approach for maintaining the state of business entities by recording each change of state as an event. We can handle many kinds of event sources, such as IoT devices or sensors that show device status changes, click stream data from web or mobile applications, geospatial data, and social media feeds.

Apache Kafka is a very popular streaming platform. One of Kafka’s four core APIs is the Streams API. By using the Streams API, we can easily develop a streaming processor, which is an application that takes continual streams of data from input topics, performs some processing on this data, and produces continual streams of data to output topics.

This tutorial will demonstrate how to get started with Kafka Streams and how to avoid the most common pitfalls in developing a stream processor.

Prerequisites

Estimated time

Completing this tutorial should take about 1 hour.

Steps

Step 1. Prepare your development environment

In this tutorial, I selected Java as a programming language, and the Eclipse IDE for Java Developers as the development environment. In this step, you will install required tools, configure them, and create a Maven project using streams-quickstart-java Maven Archetype.

Install Kafka and the Eclipse IDE

  1. Install Apache Kafka. Download the latest stable Apache Kafka binary archive file from https://kafka.apache.org/downloads. Unpack it to an appropriate directory on your computer.

    Note: The protocol for using the communication between Kafka servers and Kafka clients has backward compatibility. You can use the latest Apache Kafka for developing Kafka client programs even if you plan to use an older version of Kafka servers.

  2. Install Eclipse IDE for Java Developers. Download an install file from https://eclipse.bluemix.net/. If you are using Linux, unpack it to an appropriate directory on your computer. If you are using MacOS, double-click the file, and drag the Eclipse.app file to the Applications folder. Maven is included in the Eclipse IDE, so you do not have to install Maven separately.

  3. Start the Eclipse IDE program. In the Eclipse IDE Launcher dialog, specify your workspace directory, and click Launch.

  4. To open the workbench, click the Workbench icon in the top right corner.

  5. To use Maven, you have to use the JDK. If you need to install a JDK, download a binary archive file for JDK 8 or later, such as OpenJDK 8 or OpenJ9, from https://adoptopenjdk.net/ and unpack it to an appropriate directory on your computer.

  6. Check the location of your Java Runtime (JRE). From the menu, select Window > Preferences (in Linux) or Eclipse > Preferences (in MacOS). In the Preferences window, expand Java and select Installed JREs. If the checked JRE is not in the JDK location, check the JDK location. If JDK is not listed, add JDK and check the JDK location.

Create your Maven projects

The Kafka Streams tutorial suggests using a Kafka Streams Maven Archetype to create a Streams project structure by using the mvn command. Follow these steps to do this by using the Eclipse IDE:

  1. From the menu, Select File > New > Project.
  2. In the New Project dialog, expand Maven, select Maven Project, and click Next.
  3. In the New Maven Project wizard, click Next.
  4. In the Select an Archetype dialog, enter org.apache.kafka in the Filter box, select streams-quickstart-java, and click Next.
  5. Enter the following values:

    • Group ID: streams.examples
    • Artifact ID: streams.examples
    • Version: 0.1
    • Package: myapps
  6. Click Finish.

    Explore the generated files by expanding items in the Package Explorer. Maven has many functions, but this tutorial uses Maven only to solve library dependencies. You will create another Maven project using maven-archetype-quickstart Maven archetype in the next part. One big difference between streams-quickstart-java and maven-archetype-quickstart is project natures. The former has only Maven nature, but the latter has both Maven nature and Java nature. An Eclipse’s project nature specifies that the project is a specific kind of project. This tutorial uses the Java nature’s capability.

  7. Create another Maven project using the following Maven archetype:

    • Group ID: org.apache.maven.archetype
    • Artifact ID: maven-archetype-quickstart

      In the Specify Archetype Parameters dialog, specify the following values:

    • Group ID: com.ibm.code

    • Artifact ID: streams.tutorial

      Click Finish.

Configure your Maven project

The streams.tutorial project is displayed in the Package Explorer. We need to configure this Maven Archetype project.

  1. Expand the project.
  2. Update the project’s property to use Java 8. Right-click JRE System Library, and select Properties from the pop-up menu. In the Properties for JRE Library dialog, change the Execution environment to Java-1.8 and click Apply and Close.
  3. To use Kafka Streams API, double-click the pom.xml file to edit it.
  4. In the Maven POM Editor dialog, click the Dependencies tab. Click the Add button, and in the Select Dependency dialog, enter the following values and click OK.

    • Group Id: org.apache.kafka
    • Artifact Id: kafka-streams
    • Version: 2.4.0
    • Scope: compile

      To check the dependency, click the Save icon on the toolbar, and then click the Dependency Hierarchy tab.

  5. As you can see in the dependency, Apache Kafka uses Simple Logging Facade for Java (SLF4J, http://www.slf4j.org/). To output logging, you have to use one of logging frameworks supported by SLF4J. Let’s use Apache log4j version 1.2. To bind for log4j version 1.2, click the Dependency tab, and click the Add button. Enter the following values, and click OK.

    • Group Id: org.slf4j
    • Artifact Id: slf4j-log4j12
    • Version: 1.7.28
    • Scope: runtime

      To check the dependency hierarchy, click the Save icon on the toolbar, and click the Dependency Hierarchy tab.

  6. To use log4j, you have to prepare the log4j.properties file. The streams.examples project that you created previously contains this file, so let’s copy the file. Select the resources folder in the streams.examples project, and select Edit > Copy. Then, select the src/main folder in the streams.tutorial project, and select Edit > Paste.

  7. Expand the src/main/java folder and com.ibm.code.streams.tutorial. You can see the App.java file, which was generated by New Maven Project wizard. You do not use this file, so let’s delete it. To delete App.java, select App.java and select Edit > Delete from the menu. In the Delete dialog, click OK button.

Step 2. Develop your stream processor

You will develop a stream processor that uses one source topic (“integer”) and two target topics (“even” and “odd”), both of which will be managed by an Apache Kafka server that is running on your computer. All topics have integer keys and string values. The stream processor reads events from the “integer” topic. Events that have an even key are delivered to the “even” topic, and events that have an odd key are delivered to the “odd” topic. You can also change the values in events, such as extracting sensitive information or enriching information. In this tutorial, you will simply convert the string value to upper cases in “even” topic events, and to lower cases in “odd” topic events.

  1. To create a new Java class, select com.ibm.code.streams.tutorial package and select File > New > Class from the menu. In the New Java Class dialog, enter EvenOddBranchApp in the Name field, and click Finish.

  2. In the Java editor, in the EvenOddBranchApp.java file, define the topic names by entering the following code in the body of the class:

     public static final String INTEGER_TOPIC_NAME = "integer";
     public static final String EVEN_TOPIC_NAME = "even";
     public static final String ODD_TOPIC_NAME = "odd";
    
  3. To define createProperties method, enter the following code to the body of the created class:

     public static Properties createProperties() {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "even-odd-branch");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         return props;
     }
    

    This method returns an instance of java.util.Properties to configure streams execution. StreamsConfig.APPLICATION_ID_CONFIG is an identifier for the stream processor. StreamsConfig.BOOTSTRAP_SERVERS_CONFIG is a list of host/port pairs to use for establishing the initial connection to the Kafka cluster. StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG is a default serializer/deserializer class for key. StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG is a default serializer/deserializer class for value. You can find detailed information regarding Kafka Streams Config in the Kafka documentation.

  4. To define the createTopology method, enter the following code to the body of the created class:

     public static Topology createTopology() {
         final StreamsBuilder builder = new StreamsBuilder();
         KStream<Integer, String> stream = builder.stream(INTEGER_TOPIC_NAME);
         KStream<Integer, String>[] branches = stream
                 .branch(
                         (key, value) -> key % 2 == 0,
                         (key, value) -> true
                 );
         branches[0]
                 .peek((key, value) -> System.out.printf("even: %s, %s%n", key, value))
                 .mapValues(v -> v.toUpperCase())
                 .to(EVEN_TOPIC_NAME);
         branches[1]
                 .peek((key, value) -> System.out.printf("odd: %s, %s%n", key, value))
                 .mapValues(v -> v.toLowerCase())
                 .to(ODD_TOPIC_NAME);
    
         return builder.build();
     }
    

    This method returns a topology for this stream processor. A topology is an acyclic graph of sources, processors, and sinks. The topology has one source (“integer”) and two sinks (“even” and “odd”). Events which have even key are assigned to the first branch, and other events are assigned to the second branch. Events in the first branch are delivered to “even” topic with changing the value to upper case. Events in the second branch are delivered to “odd” topic with changing the value to lower case. peek methods are used to print key and value.

  5. To define main method, enter the following code to the body of the created class:

     public static void main(String[] args) {
         Properties props = createProperties();
    
         final Topology topology = createTopology();
         final KafkaStreams streams = new KafkaStreams(topology, props);
         final CountDownLatch latch = new CountDownLatch(1);
    
         Runtime.getRuntime().addShutdownHook(new Thread("kafka-streams-shutdown-hook") {
             @Override
             public void run() {
                 streams.close();
                 latch.countDown();
             }
         });
    
         try {
             streams.start();
             latch.await();
         } catch (Throwable e) {
             System.exit(1);
         }
         System.exit(0);
     }
    

    You can create an instance of org.apache.kafka.streams.KafkaStreams from the topology and the properties. You can start the event processing by calling start method and stop it by calling close method. You can call the close method within the shutdown hook.

  6. The Java editor shows many errors because no packages have been imported. To import packages, select Source > Organize imports from the menu.

  7. Save the EvenOddBranchApp.java file. Select File > Save.

  8. Now, let’s create a simple producer for generating events to the “input” topic. Create a new class IntegerProducer under the com.ibm.code.streams.tutorial package.

  9. Enter the following code the body of IntegerProducer class:

     public static void main(String[] args) {
         Properties props = new Properties();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
         Producer<Integer, String> producer = new KafkaProducer<>(props);
         for (int i = 0; i < 10; i++) {
             producer.send(new ProducerRecord<Integer, String>(EvenOddBranchApp.INTEGER_TOPIC_NAME,
                     new Integer(i), "Value - " + i));
         }
         producer.close();
     }
    

    This producer generates ten events to “integer” topic using Kafka Producer API.

  10. Import the Kafka packages. Select Source > Organize imports from the menu, select the Kafka packages, and click Finish.

  11. Save the IntegerProducer.java file. Select File > Save.

Step 3. Test your stream processor

You can test your stream processor in one of these ways:

  • Using an Apache Kafka server on your computer
  • Using JUnit 5 in your Eclipse IDE

Test using an Apache Kafka server on your computer

In this part, you learn how to test your stream processor using an Apache Kafka server on your computer. To learn how to debug Kafka Streams, see the “Testing Kafka Streams” topic in the Kafka documentation.

  1. Kafka uses Apache ZooKeeper to maintain naming and configuration data. Start a ZooKeeper server. Open a terminal window, change the current directory to the Kafka directory that you unpacked previously, and execute the following command:

     bin/zookeeper-server-start.sh config/zookeeper.properties
    
  2. Start your Kafka server. Open a new tab in the terminal window, and execute the following command:

     bin/kafka-server-start.sh config/server.properties
     `
    
  3. Create three topics. Open a new tab in the terminal window, and execute the following commands:

     bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic integer
    
     bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic even
    
     bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic odd
    
  4. List the created topics. Execute the following command:

     bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    

    You can see three topics in the terminal window: even, integer, and odd.

  5. Start our EvenOddBranchApp Java application. Select EvenOddBranchApp.java in the Package Explorer, and select Run > Run Configurations from the menu. In the Run Configurations dialog, double-click Java Application.

  6. Click the Show Command Line button. In the Command Line dialog, click Copy & Close button, and then click the Close button in Run Configuration dialog. Paste the copied command line to the terminal window, and execute the command. You can run a Java program from Eclipse, but you cannot execute a shutdown hook if you run from Eclipse. In the EvenOddBranchApp main method, you set the shutdown hook to call the KafkaStreams close method.

  7. Show the “even” topic. Open a new tab in the terminal window, and execute the following command:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic even \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
  8. Show the “odd” topic. Open a new tab in the terminal window, and execute the following command:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic odd \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
  9. Now, let’s run the IntegerProducer Java application. Select IntegerProducer.java in the Package Explorer, and select Run > Run As > Java Application. You can execute this simple procedure from Eclipse because it does not contain a shutdown hook.

  10. Select the terminal window tab that is executing the EvenOddBranchApp command. You can see the following log:

    even: 0, Value - 0
    odd: 1, Value - 1
    even: 2, Value - 2
    odd: 3, Value - 3
    even: 4, Value - 4
    odd: 5, Value - 5
    even: 6, Value - 6
    odd: 7, Value - 7
    even: 8, Value - 8
    odd: 9, Value - 9
    
  11. Select the terminal window tab that is consuming the “even” topic. You can see the following log:

    0    VALUE - 0
    2    VALUE - 2
    4    VALUE - 4
    6    VALUE - 6
    8    VALUE - 8
    
  12. Select the terminal window tab that is consuming the “odd” topic. You can see the following log:

    1    value - 1
    3    value - 3
    5    value - 5
    7    value - 7
    9    value - 9
    
  13. Terminate the EvenOddBranchApp command. Select the terminal window tab that is executing EvenOdd command, and type Ctrl + C. You can see the following at the end of log: Streams client stopped completely.

Test using JUnit 5 in your Eclipse IDE

In this part, you will test the stream processor using kafka-streams-test-utils and JUnit 5. You will also debug your stream processor using the Eclipse debug tool.

  1. Add kafka-streams-test-utils to your project dependencies. Open pom.xml. Click the Dependencies tab, and then click the Add button. Enter the following values, and click OK.

    • Group ID: org.apache.kafka
    • Artifact ID: kafka-streams-test-utils
    • Version: 2.4.0
    • Scope: test
  2. Remove the JUnit 4 dependency. In the Dependencies list, select junit: 4.11 [test] and then click the Remove button.

  3. Add JUnit 5 to your project dependencies. Click the Add button. Enter the following values, and click OK.

    • Group ID: org.junit.jupiter
    • Artifact ID: junit-jupiter
    • Version: 5.5.2
    • Scope: test
  4. Review the dependency hierarchy. Click the Save icon on the tool bar, and then click the Dependency hierarchy tab.

  5. Expand the src/main/java folder and com.ibm.code.streams.tutorial. You can see the App.java file, which was generated by New Maven Project wizard. You do not use this file, so let’s delete it. To delete App.java, select App.java and select Edit > Delete from the menu. In the Delete dialog, click OK button.

  6. Create a JUnit test case class. Select the com.ibm.code.streams.tutorial package under src/test/java. Select File > New > JUnit Test Case. In the wizard, select New JUnit Jupiter test, enter EvenOddBranchAppTest as the name, and click the Finish button.

  7. In the Java Editor, delete the test method that was generated in the EvenOddBranchAppTest.java class file. Then, enter the following code into the body of the class:

     private TopologyTestDriver testDriver = null;
     private TestInputTopic<Integer, String> integerInputTopic = null;
     private TestOutputTopic<Integer, String> evenOutputTopic = null;
     private TestOutputTopic<Integer, String> oddOutputTopic = null;
     private Serde<Integer> integerSerde = new Serdes.IntegerSerde();
     private Serde<String> stringSerde = new Serdes.StringSerde();
    
     @BeforeEach
     void setUp() throws Exception {
       Properties props = EvenOddBranchApp.createProperties();
       Topology topology = EvenOddBranchApp.createTopology();
       testDriver = new TopologyTestDriver(topology, props);
       integerInputTopic = testDriver.createInputTopic(EvenOddBranchApp.INTEGER_TOPIC_NAME,
           integerSerde.serializer(), stringSerde.serializer());
       evenOutputTopic = testDriver.createOutputTopic(EvenOddBranchApp.EVEN_TOPIC_NAME,
           integerSerde.deserializer(), stringSerde.deserializer());
       oddOutputTopic = testDriver.createOutputTopic(EvenOddBranchApp.ODD_TOPIC_NAME,
           integerSerde.deserializer(), stringSerde.deserializer());
     }
    
     @AfterEach
     void tearDown() throws Exception {
       testDriver.close();
     }
    
     @Test
     void testEven() {
       int key = 0;
       String value = "Value - 0";
       integerInputTopic.pipeInput(key, value);
       KeyValue<Integer, String> keyValue = evenOutputTopic.readKeyValue();
       assertEquals(keyValue, new KeyValue<>(key, value.toUpperCase()));
       assertTrue(evenOutputTopic.isEmpty());
       assertTrue(oddOutputTopic.isEmpty());
     }
    
     @Test
     void testOdd() {
       int key = 1;
       String value = "Value - 1";
       integerInputTopic.pipeInput(key, value);
       KeyValue<Integer, String> keyValule = oddOutputTopic.readKeyValue();
       assertEquals(keyValule, new KeyValue<>(key, value.toLowerCase()));
       assertTrue(oddOutputTopic.isEmpty());
       assertTrue(evenOutputTopic.isEmpty());
     }
    

    The setUp method is called before each test. The tearDown method is called after each test. The testEven method tests by inputting one event with even key to the “integer” topic. The testOdd method tests by inputting one event with odd key to the “integer”topic.

  8. Import the Kafka packages. Select Source > Organize imports from the menu, select the Kafka packages, and click the Finish button. Static imports are not added. To add static imports for assertEquals and assertTrue, click to the left of each line, and double-click Add static import for.

  9. Save the class file. Select File > Save.

  10. Execute the JUnit test case. In the Package Explorer, select EvenOddBranchAppTest.java and then select Run > Run As > JUnit Test. In the JUnit view, expand EvenOddBranchAppTest. You can see the test results.

  11. Set breakpoints. Double-click to the left of the Java editor. You’ll see blue circle marks.

  12. Debug the JUnit test case. In the Package Explorer, select EvenOddBranchAppTest.java and then select Run > Debug As > JUnit Test. In the Confirm Perspective Switch dialog, click the Switch button.

  13. In the Debug perspective, in the Variables view, expand the variable names to check the values of the variablers. Try out the different debug operations using the toolbar icons.

Summary

In this tutorial, you prepared an environment for developing a stream processor with Apache Kafka, developed a simple stream processor, and tested the stream processor with and without an Apache Kafka server.