新しい IBM Developer JP サイトへようこそ!サイトのデザインが一新され、旧 developerWorks のコンテンツも統合されました。 詳細はこちら

Apache Kafka を使用してストリーム・プロセッサーを開発する

開発者はビジネス・エンティティーの状態を維持するために、イベント・ソーシングを使用して状態の変化をイベントとして記録するようになっています。イベント・ソーシングでは、さまざまなイベント・ソースに対応して、例えば、デバイスの状態変化を示す IoT デバイスやセンサーからのデータ、Web またはモバイル・アプリケーションからのクリック・ストリーム・データ、地理空間データ、ソーシャル・メディアのフィードなどを処理できます。

Apache Kafka は、非常によく使われているストリーミング・プラットフォームです。Kafka に備わっている 4 つのコア API のうちの 1 つに、Streams API があります。Streams API を使用すると、入力トピックから継続的にデータ・ストリームを取り、そのデータに対して何らかの処理を行ってから出力トピックに継続的にデータ・ストリームを送信する、ストリーミング・プロセッサーというアプリケーションを簡単に開発できます。

このチュートリアルでは、Kafka Streams を導入する方法と、ストリーム・プロセッサーを開発する際に陥りがちな落とし穴を回避する方法を説明します。

前提条件

所要時間

このチュートリアルの所要時間は約 1 時間です。

手順

ステップ 1. 開発環境を準備する

このチュートリアルでは、プログラミング言語として Java を使用し、開発環境として Eclipse IDE for Java Developers を使用します。最初のステップでは、必要なツールをインストールして構成し、streams-quickstart-java Maven アーキタイプを使用して Maven プロジェクトを作成します。

Kafka と Eclipse IDE をインストールする

  1. Apache Kafka をインストールします。https://kafka.apache.org/downloads から、最新の安定版 Apache Kafka バイナリー・アーカイブ・ファイルをダウンロードしてください。ダウンロードしたファイルを、コンピューター上の適切なディレクトリーに解凍します。

    注: Kafka サーバーと Kafka クライアント間で使用する通信プロトコルには後方互換性が備わっています。したがって、古いバージョンの Kafka サーバーを使用するとしても、最新の Apache Kafka を使って Kafka クライアント・プログラムを開発できます。

  2. Eclipse IDE for Java Developers をインストールします。https://eclipse.bluemix.net/ からインストール・ファイルをダウンロードしてください。Linux を使用している場合は、ダウンロードしたファイルをコンピューター上の適切なディレクトリーに解凍します。MacOS を使用している場合は、ダウンロードしたファイルをダブルクリックし、Eclipse.app ファイルを Applications フォルダーにドラッグします。Maven は Eclipse IDE 内に組み込まれているので、Maven を別途インストールする必要はありません。

  3. Eclipse IDE プログラムを起動します。Eclipse IDE ランチャーのダイアログで、ワークスペース・ディレクトリーを指定してから「Launch (起動)」をクリックします。

  4. ワークベンチを開くには、右上隅のワークスペース・アイコンをクリックします。

  5. Maven を使用するには、JDK を使用する必要があります。JDK がまだインストールされていない場合は、https://adoptopenjdk.net/ から OpenJDK 8 や OpenJ9 などの JDK 8 以降のバイナリー・アーカイブ・ファイルをダウンロードして、コンピューター上の適切なディレクトリーに解凍します。

  6. Java ランタイム (JRE) のインストール先を確認します。メニューから「Window (ウィンドウ)」 > 「Preferences (環境設定)」 (Linux の場合) または 「Eclipse」 > 「Preferences (環境設定)」 (MacOS の場合) の順に選択します。「Preferences (環境設定)」ウィンドウで、「Java」を展開して「Installed JREs (インストール済み JRE)」を選択します。チェックマーク付きの JRE が JDK のインストール先に含まれていない場合は、JDK の場所を確認します。JDK がリストアップされていなければ、JDK を追加して、JDK の場所にチェックマークを付けます。

Maven プロジェクトを作成する

Kafka Streams のチュートリアルでは、Streams プロジェクト構造を作成する際は Kafka Streams Maven アーキタイプを使用して mvn コマンドを実行するように推奨しています。以下の手順では、Eclipse IDE を使用してこの推奨に従います。

  1. メニューから 「File (ファイル)」 > 「New (新規)」 > 「Project (プロジェクト)」 の順に選択します。
  2. 「New Project (新規プロジェクト)」ダイアログで、「Maven」を展開し、「Maven Project (Maven プロジェクト)」を選択してから「Next (次へ)」をクリックします。
  3. 「New Maven Project (新規 Maven プロジェクト)」ウィザードで、「Next (次へ)」をクリックします。
  4. 「Select an Archetype (アーキタイプの選択)」ダイアログで、「Filter (フィルター)」ボックスに「org.apache.kafka」と入力し、「streams-quickstart-java」を選択してから「Next (次へ)」をクリックします。
  5. 以下の値を入力します。

    • Group ID (グループ ID): streams.examples
    • Artifact ID (アーティファクト ID): streams.examples
    • Version (バージョン): 0.1
    • Package (パッケージ): myapps
  6. Finish (完了)」をクリックします。

    パッケージ・エクスプローラー内でアイテムを展開して、生成されたファイルを探索してください。Maven には多数の機能がありますが、このチュートリアルではライブラリーの依存関係を解決する目的でのみ Maven を使用します。このステップでは次に、maven-archetype-quickstart Maven アーキタイプを使用して別の Maven プロジェクトを作成します。streams-quickstart-javamaven-archetype-quickstart との大きな違いは、プロジェクトの特性です。前者には Maven の特性しか備わっていませんが、後者には Maven と Java の両方の特性が備わっています。Eclipse のプロジェクト・ネーチャーは、そのプロジェクトが固有のタイプであることを指定します。このチュートリアルでは、Java タイプの機能を使用します。

  7. 以下の Maven アーキタイプを使用して別の Maven プロジェクトを作成します。

    • Group ID (グループ ID): org.apache.maven.archetype
    • Artifact ID (アーティファクト ID): maven-archetype-quickstart

      「Specify Archetype Parameters (アーキタイプのパラメーターを指定)」ダイアログでは、以下の値を指定します。

    • Group ID (グループ ID): com.ibm.code

    • Artifact ID (アーティファクト ID): streams.tutorial

      Finish (完了)」をクリックします。

Maven プロジェクトを構成する

パッケージ・エクスプローラー内に streams.tutorial プロジェクトが表示されます。この Maven アーキタイプ・プロジェクトを構成する必要があります。

  1. プロジェクトを展開します。
  2. Java 8 を使用するようにプロジェクトのプロパティーを更新します。「JRE System Library (JRE システム・ライブラリー)」を右クリックして、ポップアップ・メニューから「Properties (プロパティー)」を選択します。「Properties for JRE Library (JRE ライブラリーのプロパティー)」ダイアログで、「Execution environment (実行環境)」の値を「Java-1.8」に変更し、「Apply and Close (適用して閉じる)」をクリックします。
  3. Kafka Streams API を使用するために、pom.xml ファイルをダブルクリックして編集用に開きます。
  4. 「Maven POM Editor (Maven POM エディター)」ダイアログで、「Dependencies (依存関係)」タブをクリックします。「Add (追加)」ボタンをクリックし、「Select Dependency (依存関係の選択)」ダイアログで以下の値を入力してから「OK」をクリックします。

    • Group Id (グループ ID): org.apache.kafka
    • Artifact Id (アーティファクト ID): kafka-streams
    • Version (バージョン): 2.4.0
    • Scope (スコープ): compile

      依存関係を確認するために、ツールバー上の「Save (保存)」アイコンをクリックし、「Dependency Hierarchy (依存関係階層)」タブをクリックします。

  5. 依存関係を見るとわかるように、Apache Kafka は Simple Logging Facade for Java (SLF4J、http://www.slf4j.org/) を使用しています。したがって、ロギングを出力するには、SLF4J でサポートされているロギング・フレームワークのいずれかを使用する必要があります。このチュートリアルでは Apache log4j バージョン 1.2 を使用します。log4j バージョン 1.2 にバインドするために、「Dependency (依存関係)」タブをクリックし、「Add (追加)」ボタンをクリックします。以下の値を入力してから「OK」をクリックします。

    • Group Id (グループ ID): org.slf4j
    • Artifact Id (アーティファクト ID): slf4j-log4j12
    • Version (バージョン): 1.7.28
    • Scope (スコープ): runtime

      依存関係階層を確認するために、ツールバー上の「Save (保存)」アイコンをクリックし、「Dependency Hierarchy (依存関係階層)」タブをクリックします。

  6. log4j を使用するには、log4j.properties ファイルを準備する必要があります。このファイルは前に作成した streams.examples プロジェクトに含まれているので、それをコピーしましょう。streams.examples プロジェクト内の resources フォルダーを選択してから、「Edit (編集)」 > 「Copy (コピー)」 の順に選択します。次に、streams.tutorial プロジェクト内の src/main フォルダーを選択し、「Edit (編集)」 > 「Paste (貼り付け)」 の順に選択します。

  7. src/main/java フォルダー、com.ibm.code.streams.tutorial の順に展開します。そこに、「New Maven Project (新規 Maven プロジェクト)」ウィザードによって生成された App.java ファイルがあるはずです。このファイルは使用しないので、削除します。App.java を削除するには、App.java を選択してから、メニューで 「Edit (編集)」 > 「Delete (削除)」 の順に選択します。「Delete (削除)」ダイアログで「OK」ボタンをクリックします。

ステップ 2. ストリーム・プロセッサーを開発する

これから開発するストリーム・プロセッサーでは、1 つのソース・トピック (「integer」) と 2 つのターゲット・トピック (「even」と「odd」) を使用します。いずれのトピックも、コンピューター上で稼働する Apache Kafka サーバーで管理します。すべてのトピックには整数キーと文字列値があります。ストリーム・プロセッサーは「integer」トピックからイベントを読み取り、偶数キーを持つイベントを「even」トピックに配信し、奇数キーを持つイベントを「odd」トピックに配信します。イベント内の値を変更して、機密データを抽出したり、情報を拡充したりすることもできますが、このチュートリアルでは単に「even」トピックのイベントに含まれる文字列を大文字に変換し、「odd」トピックのイベントに含まれる文字列を小文字に変換します。

  1. 新しい Java クラスを作成するために、com.ibm.code.streams.tutorial パッケージを選択してから、メニューで 「File (ファイル)」 > 「New (新規)」 > 「Class (クラス)」 の順に選択します。「New Java Class (新規 Java クラス)」ダイアログで、「Name (名前)」フィールドに「EvenOddBranchApp」と入力し、「Finish (完了)」をクリックします。

  2. Java エディターで、作成したクラスの本体に以下のコードを入力して、EvenOddBranchApp.java ファイル内にトピック名を定義します。

     public static final String INTEGER_TOPIC_NAME = "integer";
     public static final String EVEN_TOPIC_NAME = "even";
     public static final String ODD_TOPIC_NAME = "odd";
    
  3. 作成したクラスの本体に以下のコードを入力して、createProperties メソッドを定義します。

     public static Properties createProperties() {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "even-odd-branch");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         return props;
     }
    

    このメソッドは、ストリームの実行を構成するための java.util.Properties を返します。StreamsConfig.APPLICATION_ID_CONFIG は、ストリーム・プロセッサーの ID です。StreamsConfig.BOOTSTRAP_SERVERS_CONFIG は、Kafka クラスターへの初期接続を確立するために使用するホスト/ポート・ペアのリストです。StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG は、キーのデフォルト・シリアライザー/デシリアライザー・クラスです。StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG は、値のデフォルト・シリアライザー/デシリアライザー・クラスです。Kafka Streams 構成について詳しくは、Kafka のドキュメントをご覧ください。

  4. 作成したクラスの本体に以下のコードを入力して、createTopology メソッドを定義します。

     public static Topology createTopology() {
         final StreamsBuilder builder = new StreamsBuilder();
         KStream<Integer, String> stream = builder.stream(INTEGER_TOPIC_NAME);
         KStream<Integer, String>[] branches = stream
                 .branch(
                         (key, value) -> key % 2 == 0,
                         (key, value) -> true
                 );
         branches[0]
                 .peek((key, value) -> System.out.printf("even: %s, %s%n", key, value))
                 .mapValues(v -> v.toUpperCase())
                 .to(EVEN_TOPIC_NAME);
         branches[1]
                 .peek((key, value) -> System.out.printf("odd: %s, %s%n", key, value))
                 .mapValues(v -> v.toLowerCase())
                 .to(ODD_TOPIC_NAME);
    
         return builder.build();
     }
    

    このメソッドは、このストリーム・プロセッサーのトポロジーを返します。ここでの トポロジー は、ソース、プロセッサー、シンクからなる有向非巡回グラフです。トポロジーには 1 つのソース (「integer」) と 2 つのシンク (「even」と「odd」) があります。偶数キーを持つイベントは最初のブランチに割り当てられ、他のイベントは 2 つ目のブランチに割り当てられます。最初のブランチ内のイベントは、値が大文字に変更された上で「even」トピックに配信されます。2 つ目のブランチ内のイベントは、値が小文字に変更された上で「odd」トピックに配信されます。ブランチ内では peek メソッドを使用してキーと値を出力します。

  5. main メソッドを定義するために、作成したクラスの本体に以下のコードを入力します。

     public static void main(String[] args) {
         Properties props = createProperties();
    
         final Topology topology = createTopology();
         final KafkaStreams streams = new KafkaStreams(topology, props);
         final CountDownLatch latch = new CountDownLatch(1);
    
         Runtime.getRuntime().addShutdownHook(new Thread("kafka-streams-shutdown-hook") {
             @Override
             public void run() {
                 streams.close();
                 latch.countDown();
             }
         });
    
         try {
             streams.start();
             latch.await();
         } catch (Throwable e) {
             System.exit(1);
         }
         System.exit(0);
     }
    

    トポロジーとプロパティーを基に org.apache.kafka.streams.KafkaStreams のインスタンスを作成できます。イベントの処理は、start メソッドを呼び出して開始し、close メソッドを呼び出して停止できます。close メソッドはシャットダウン・フック内で呼び出すことができます。

  6. Java エディターに多数のエラーが示されるのは、パッケージがまだインポートされていないためです。パッケージをインポートするには、メニューから 「Source (ソース)」 > 「Organize imports (インポートを編成)」 の順に選択します。

  7. EvenOddBranchApp.java ファイルを保存します。それには、「File (ファイル)」 > 「Save (保存)」 の順に選択します。

  8. 次は、「input」トピックに配信するイベントを生成するシンプルなプロデューサーを作成しましょう。com.ibm.code.streams.tutorial パッケージ内に IntegerProducer という新しいクラスを作成します。

  9. IntegerProducer クラスの本体に以下のコードを入力します。

     public static void main(String[] args) {
         Properties props = new Properties();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
         Producer<Integer, String> producer = new KafkaProducer<>(props);
         for (int i = 0; i < 10; i++) {
             producer.send(new ProducerRecord<Integer, String>(EvenOddBranchApp.INTEGER_TOPIC_NAME,
                     new Integer(i), "Value - " + i));
         }
         producer.close();
     }
    

    このプロデューサーは Kafka Producer API を使用して、「integer」トピックに配信する 10 個のイベントを生成します。

  10. Kafka パッケージをインポートします。メニューから 「Source (ソース)」 >「Organize imports (インポートを編成)」 の順に選択し、Kafka パッケージを選択してから「Finish (完了)」をクリックします。

  11. IntegerProducer.java ファイルを保存します。「File (ファイル)」 > 「Save (保存)」 の順に選択します。

ステップ 3. ストリーム・プロセッサーをテストする

以下のいずれかの方法でストリーム・プロセッサーをテストできます。

  • コンピューター上の Apache Kafka サーバーを使用する
  • Eclipse IDE 内の JUnit 5 を使用する

コンピューター上の Apache Kafka サーバーを使用してテストする

まず、コンピューター上の Apache Kafka サーバーを使用してストリーム・プロセッサーをテストする方法を説明します。Kafka Streams をデバッグする方法については、Kafka ドキュメントで「Testing Kafka Streams」のトピックをご覧ください。

  1. Kafka は Apache ZooKeeper を使用して命名と構成データを管理するので、まずは ZooKeeper サーバーを起動します。ターミナル・ウィンドウを開いて、カレント・ディレクトリーを、前に Kafka バイナリー・アーカイブ・ファイル解凍した Kafka ディレクトリーに変更してから、以下のコマンドを実行します。

     bin/zookeeper-server-start.sh config/zookeeper.properties
    
  2. Kafka サーバーを起動します。ターミナル・ウィンドウ内で新しいタブを開き、以下のコマンドを実行します。

     bin/kafka-server-start.sh config/server.properties
     `
    
  3. 3 つのトピックを作成します。ターミナル・ウィンドウ内で新しいタブを開き、以下のコマンドを実行します。

     bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic integer
    
     bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic even
    
     bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic odd
    
  4. 作成したトピックをリストアップします。それには以下のコマンドを実行します。

     bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    

    ターミナル・ウィンドウ内に evenintegerodd の 3 つのトピックが表示されるはずです。

  5. EvenOddBranchApp Java アプリケーションを起動します。パッケージ・エクスプローラー内で EvenOddBranchApp.java を選択し、メニューから 「Run (実行)」 > 「Run Configurations (実行構成)」 の順に選択します。「Run Configurations (実行構成)」ダイアログで、「Java Application (Java アプリケーション)」をダブルクリックします。

  6. Show Command Line (コマンド・ラインを表示)」ボタンをクリックします。「Command Line (コマンド・ライン)」ダイアログで、「Copy & Close (コピーして閉じる)」 ボタンの順にクリックし、「Run Configuration (実行構成)」ダイアログで「Close (閉じる)」ボタンをクリックします。コピーしたコマンド・ラインをターミナル・ウィンドウに貼り付けて、このコマンドを実行します。Java プログラムは Eclipse から実行できますが、その場合はシャットダウン・フックを実行できません。そのため、EvenOddBranchApp main メソッド内に、KafkaStreams close メソッドを呼び出すシャットダウン・フックを設定する必要があります。

  7. 「even」トピックを表示します。ターミナル・ウィンドウ内で新しいタブを開き、以下のコマンドを実行します。

     bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
     --topic even \
     --from-beginning \
     --formatter kafka.tools.DefaultMessageFormatter \
     --property print.key=true \
     --property print.value=true \
     --property key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer \
     --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
  8. 「odd」トピックを表示します。ターミナル・ウィンドウ内で新しいタブを開き、以下のコマンドを実行します。

     bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
     --topic odd \
     --from-beginning \
     --formatter kafka.tools.DefaultMessageFormatter \
     --property print.key=true \
     --property print.value=true \
     --property key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer \
     --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    
  9. 次は、IntegerProducer Java アプリケーションを実行しましょう。パッケージ・エクスプローラー内で IntegerProducer.java を選択し、「Run (実行)」 > 「Run As (実行)」 > 「Java Application (Java アプリケーション)」 の順に選択します。このシンプルなやり方にはシャットダウン・フックが含まれていないので、Eclipse から実行できます。

  10. EvenOddBranchApp コマンドを実行中のターミナル・ウィンドウ・タブを選択します。以下のログが表示されるはずです。

    even: 0, Value - 0
    odd: 1, Value - 1
    even: 2, Value - 2
    odd: 3, Value - 3
    even: 4, Value - 4
    odd: 5, Value - 5
    even: 6, Value - 6
    odd: 7, Value - 7
    even: 8, Value - 8
    odd: 9, Value - 9
    
  11. 「even」トピックを取り込み中のターミナル・ウィンドウ・タブを選択します。以下のログが表示されるはずです。

    0    VALUE - 0
    2    VALUE - 2
    4    VALUE - 4
    6    VALUE - 6
    8    VALUE - 8
    
  12. 「odd」トピックを取り込み中のターミナル・ウィンドウ・タブを選択します。以下のログが表示されるはずです。

    1    value - 1
    3    value - 3
    5    value - 5
    7    value - 7
    9    value - 9
    
  13. EvenOddBranchApp コマンドを強制終了します。EvenOdd コマンドを実行中のターミナル・ウィンドウ・タブを選択し、Ctrl + C キーを押します。ログの最後に、ストリーム・クライアントが完全に停止したことを通知する「Streams client stopped completely」というメッセージが出力されるはずです。

Eclipse IDE 内の JUnit 5 を使用してテストする

次は、kafka-streams-test-utilsJUnit 5 を使用してストリーム・プロセッサーをテストします。Eclipse デバッグ・ツールを使用してストリーム・プロセッサーのデバッグも行います。

  1. kafka-streams-test-utils をプロジェクトの依存関係に追加します。pom.xml を開きます。「Dependencies (依存関係)」タブをクリックし、「Add (追加)」ボタンをクリックします。以下の値を入力してから「OK」をクリックします。

    • Group ID (グループ ID): org.apache.kafka
    • Artifact ID (アーティファクト ID): kafka-streams-test-utils
    • Version (バージョン): 2.4.0
    • Scope (スコープ): test
  2. JUnit 4 依存関係を削除します。「Dependencies (依存関係)」リスト内の junit: 4.11 [test] を選択してから「Remove (削除)」ボタンをクリックします。

  3. JUnit 5 をプロジェクトの依存関係に追加します。「Add (追加)」ボタンをクリックします。以下の値を入力してから「OK」をクリックします。

    • Group ID (グループ ID): org.junit.jupiter
    • Artifact ID (アーティファクト ID): junit-jupiter
    • Version (バージョン): 5.5.2
    • Scope (スコープ): test
  4. 依存関係階層を確認します。ツールバー上の「Save (保存)」アイコンをクリックし、「Dependency hierarchy (依存関係階層)」タブをクリックします。

  5. src/main/java フォルダー、com.ibm.code.streams.tutorial の順に展開します。そこに、「New Maven Project (新規 Maven プロジェクト)」ウィザードによって生成された App.java ファイルがあるはずです。このファイルは使用しないので、削除します。App.java を削除するには、App.java を選択してから、メニューで 「Edit (編集)」 > 「Delete (削除)」 の順に選択します。「Delete (削除)」ダイアログで「OK」ボタンをクリックします。

  6. JUnit テスト・ケース・クラスを作成します。src/test/java 内にある com.ibm.code.streams.tutorial パッケージを選択します。「File (ファイル)」 > 「New (新規)」 > 「JUnit Test Case (JUnit テスト・ケース)」 の順に選択します。表示されるウィザードで、「New JUnit Jupiter test (新規 JUnit Jupiter テスト)」を選択し、名前として「EvenOddBranchAppTest」と入力してから「Finish (完了)」ボタンをクリックします。

  7. Java エディターで、EvenOddBranchAppTest.java クラス・ファイル内に生成された test メソッドを削除します。その上で、クラスの本体に以下のコードを入力します。

     private TopologyTestDriver testDriver = null;
     private TestInputTopic<Integer, String> integerInputTopic = null;
     private TestOutputTopic<Integer, String> evenOutputTopic = null;
     private TestOutputTopic<Integer, String> oddOutputTopic = null;
     private Serde<Integer> integerSerde = new Serdes.IntegerSerde();
     private Serde<String> stringSerde = new Serdes.StringSerde();
    
     @BeforeEach
     void setUp() throws Exception {
       Properties props = EvenOddBranchApp.createProperties();
       Topology topology = EvenOddBranchApp.createTopology();
       testDriver = new TopologyTestDriver(topology, props);
       integerInputTopic = testDriver.createInputTopic(EvenOddBranchApp.INTEGER_TOPIC_NAME,
           integerSerde.serializer(), stringSerde.serializer());
       evenOutputTopic = testDriver.createOutputTopic(EvenOddBranchApp.EVEN_TOPIC_NAME,
           integerSerde.deserializer(), stringSerde.deserializer());
       oddOutputTopic = testDriver.createOutputTopic(EvenOddBranchApp.ODD_TOPIC_NAME,
           integerSerde.deserializer(), stringSerde.deserializer());
     }
    
     @AfterEach
     void tearDown() throws Exception {
       testDriver.close();
     }
    
     @Test
     void testEven() {
       int key = 0;
       String value = "Value - 0";
       integerInputTopic.pipeInput(key, value);
       KeyValue<Integer, String> keyValue = evenOutputTopic.readKeyValue();
       assertEquals(keyValue, new KeyValue<>(key, value.toUpperCase()));
       assertTrue(evenOutputTopic.isEmpty());
       assertTrue(oddOutputTopic.isEmpty());
     }
    
     @Test
     void testOdd() {
       int key = 1;
       String value = "Value - 1";
       integerInputTopic.pipeInput(key, value);
       KeyValue<Integer, String> keyValule = oddOutputTopic.readKeyValue();
       assertEquals(keyValule, new KeyValue<>(key, value.toLowerCase()));
       assertTrue(oddOutputTopic.isEmpty());
       assertTrue(evenOutputTopic.isEmpty());
     }
    

    各テストの実行前に setUp メソッドが呼び出されます。テストの実行が完了するたびに、tearDown メソッドが呼び出されます。testEven メソッドは、偶数キーを持つ 1 つのイベントを「integer」トピックに入力してテストします。testOdd メソッドは、奇数キーを持つ 1 つのイベントを「integer」トピックに入力してテストします。

  8. Kafka パッケージをインポートします。メニューから 「Source (ソース)」 > 「Organize imports (インポートを編成)」 の順に選択し、Kafka パッケージを選択してから「Finish (完了)」ボタンをクリックします。静的インポートは追加されません。assertEqualsassertTrue の静的インポートを追加するには、各行の左側をクリックし、「Add static import for (静的インポートを追加)」をダブルクリックします。

  9. クラス・ファイルを保存します。「File (ファイル)」 > 「Save (保存)」 の順に選択します。

  10. JUnit テスト・ケースを実行します。パッケージ・エクスプローラーで、EvenOddBranchAppTest.java を選択し、「Run (実行)」 > 「Run As (実行)」 > 「JUnit Test (JUnit テスト)」 の順に選択します。「JUnit」ビューで、EvenOddBranchAppTest を展開します。テスト結果が表示されます。

  11. ブレークポイントを設定します。Java エディターの左側をダブルクリックします。青色の円のマークが表示されます。

  12. JUnit テスト・ケースをデバッグします。パッケージ・エクスプローラーで、EvenOddBranchAppTest.java を選択し、「Run (実行)」 > 「Debug As (デバッグ)」 > 「JUnit Test (JUnit テスト)」 の順に選択します。「Confirm Perspective Switch (パースペクティブ切り換えを確認)」ダイアログで、「Switch (切り換え)」ボタンをクリックします。

  13. 「Debug (デバッグ)」パースペクティブ内の「Variables (変数)」ビューで、変数名を展開して変数の値を確認します。ツールバー・アイコンを使用して、さまざまなデバッグ操作を試してください。

まとめ

このチュートリアルでは、Apache Kafka でストリーム・プロセッサーを開発するための環境を準備し、シンプルなストリーム・プロセッサーを開発しました。そして開発したストリーム・プロセッサーを Apache Kafka サーバーを使用してテストする手順と、Apache Kafka サーバーを使用しないでテストする手順を確認しました。