IBM Developer Japan Webサイトは2021年3月15日をもって終了となり、日本語コンテンツの一部は、オープンソースとして、提供予定です。 URLはこちら

Apache Kafka を使用して IBM Db2 Event Store 内にデータをストリーミングする

このチュートリアルは Db2 Event Store ラーニング・パスを構成するコンテンツです

IBM Db2 Event Store は、イベント駆動型アプリケーションからストリーミングされたデータを迅速に取り込んで分析するように設計されたインメモリー・データベースです。膨大な数のイベントをリアルタイムで処理する機能を備え、高度なアナリティクス、そしてアクションにつながる洞察を引き出すようにストリーミング・データ・パフォーマンスが最適化された Db2 Event Store は、ファストデータに対応するための基礎構造になります。

このチュートリアルでは、IBM Db2 Event Store をベースに作成されたエンドツーエンドのアプリケーションを実行します。このアプリケーションは、センサー・データを Event Store 内にストリーミングして視覚化する IoT の使用ケースを単純化したものです。

学習の目的

IBM Db2 Event Store の実行に加え、次の目標を達成します。

  • sbt を使用して Scala アプリケーションを実行する
  • Kafka クラスターを作成する
  • IBM Db2 Event Store Scala API を理解する
  • Grafana をインストールして使用する

前提条件

このチュートリアルでは Docker を使用します。Docker は Windows、MacOS、または Linux 上で使用できます。このチュートリアルは、MacOS をベースに作成されていますが、IBM Db2 Event Store Developer Editor でサポートされている他のプラットフォーム (Windows または Linux) を使用する場合でも、簡単にチュートリアルの手順を調整できます。

具体的な前提条件は次のとおりです。

所要時間

このチュートリアルの所要時間は約 1 時間です。

手順

IBM Db2 Event Store Developer Edition をインストールする

最初のステップは、IBM Db2 Event Store をインストールすることです。Event Store はデスクトップ Electron アプリとして Docker 上で実行できます。

  • プラットフォーム固有の最新インストーラーをダウンロードします。
  • 注: MacOS を使用する場合は、ここをクリックしてインストーラー・ファイルを入手してください。
  • MacOS または Windows
  • インストーラー (dmg または exe) を起動して、すべてのデフォルトを受け入れます。
  • この処理は、帯域幅によっては時間がかかる場合があります。

依存関係をインストールする

アプリケーションを実行する前に、Kafka ストリーミング・コネクターと Grafana をインストールしましょう。

IBM Db2 Event Store の Kafka ストリーミング・コネクターを複製する

./bin/sbt sbt-version
  • IBM Db2 Event Store の Kakfa ストリーミング・コネクターを複製します。
git clone git@github.com:IBMProjectEventStore/db2eventstore-kafka.git
cd db2eventstore-kafka

Grafana をインストールする

brew update
brew install grafana
brew services restart grafana
  • Grafana 用の IBM Db2 Event Store データ・ソースをインストールします。
wget https://github.com/IBMProjectEventStore/db2eventstore-grafana/releases/download/ReactiveSummit2018/db2-event-store-grafana-1.1.tar
mkdir -p /usr/local/var/lib/grafana/plugins/db2-event-store
mv db2-event-store-grafana-1.1.tar /usr/local/var/lib/grafana/plugins/db2-event-store
cd /usr/local/var/lib/grafana/plugins/db2-event-store
tar -zxvf db2-event-store-grafana-1.1.tar
brew services restart grafana

IBM Db2 Event Store の Kafka ストリーミング・コネクターをコンパイルする

cd db2eventstore-kafka
sbt clean
sbt compile

IBM Db2 Event Store Developer Edition を初期化する

  • IBM Db2 Event Store Developer Edition を開きます。
    • Scala ノートブック「Introduction to IBM Db2 Event Store Scala API」を開きます。
    • 「Copy to Notebooks (ノートブックにコピー)」アイコンを選択します。
    • 最初の 2 つのセルを実行して TESTDB データベースを作成します。

Jupyter ノートブックのデータベースを作成するセルの実行が完了したら、センサー・データに IBM Db2 Event Store を使用できる状態になります。

図 1.  Db2 Event Store のスクリーン・キャプチャー 図 2.  Db2 Event Store のスクリーン・キャプチャー

  • Grafana ダッシュボードで使用する REST Server の初期化を実行します。

このステップにより、Event Store が Grafana データ・ソースをサポートするように初期化されます。以下のコマンドを実行して、IBM Db2 REST サーバーが外部リクエストを受け入れるようにします。

curl -X POST -H "Content-Type: application/json" -H "authorization: Bearer token" 'http://0.0.0.0:9991/com/ibm/event/api/v1/init/engine?engine=173.19.0.1:1100&rContext=Desktop'

ターミナルに {“code”:”ES100″,”message”:”Engine initialization succeeded”} というメッセージが出力されます。

Grafana ダッシュボード

  • Grafana にログインします。
    • デフォルトの資格情報「admin」と「admin」を使用します。
  • 新しいデータ・ソースを IBM Db2 Event Store に追加します。
  • Db2 Event Store データ・ソースを追加します。
  • ホストのポートを 9991 に変更します。
  • 「Refresh (更新)」、「Save & Test (保存してテスト)」の順に選択します。

新しいデータ・ソースを IBM Db2 Event Store に追加する 図 3.  新しいデータ・ソースを IBM Db2 Event Store に追加する

提供されている JSON ダッシュボード db2eventstorevisualization.json を開き、その内容をクリップボードにコピーします。

JSON を貼り付けてダッシュボードをインポートする

図 4.  JSON を貼り付けてダッシュボードをインポートする 図 5.  JSON を貼り付けてダッシュボードをインポートする

サンプル・コードを実行する

データを Event Store に取り込むために、まず、JSON ペイロードのバッチを Kafka クラスターのトピックにパブリッシュします。ストリーミング・コネクターの Event Store 拡張機能では、独自のローカル Kafka クラスターを作成できます。また、外部クラスターに対応するようにストリーミング・コネクターを構成することもできます。

サンプル・コードを実行するために、2 つのアプリケーションを実行します。

  • dataLoad/run。この JSON ジェネレーターは最小限のセンサー駆動のペイロードをシミュレーションして、各バッチを Kakfa トピックにパブリッシュします。
  • eventStream/run。このストリーミング・コネクターのメイン・コンポーネントは Spark Streaming と Kakfa の統合ライブラリーを使用して、構成可能なバッチ・サイズで Kafka メッセージを Event Store に取り込みます。
  • IBM Db2 Event Store の Kakfa ストリーミング・コネクターを実行します。

    • localBroker [ブール型] – 内部の単一ノードの Kafka ブローカーを使用する場合は true。外部で構成されたブローカーを使用する場合は false。
    • kafkaBroker [文字列型] – Kafka ブローカーの場所。ローカル・セットアップの場合は「localhost:9092」を渡し、外部セットアップの場合は ip:port の形式で場所を渡します。
    • topic [文字列型] – ストリームに使用する Kafka トピック。
    • eventStore [文字列型] – IBM Db2 Event Store の IP 構成。
    • database [文字列型] – 作成または使用する Db2 Event Store データベース。
    • user [文字列型] – 使用する Db2 Event Store ユーザー名。
    • password [文字列型] – 使用する Db2 Event Store パスワード。
    • metadata [文字列型] – この IoT デバイスのメタデータのタイプ。例えば、「sensor」、「appliance」、「weatherstation」などです。
    • streamingInterval [長整数型] – Apache Spark のストリーミング時間枠をミリ秒単位で定義する長整数。
    • batchSize [整数型] – IBM Db2 Event Store に送信するバッチのサイズ。
  • JSON ジェネレーターを実行します。

    • localBroker [ブール型] – 内部の単一ノードの Kafka ブローカーを使用する場合は true。外部で構成されたブローカーを使用する場合は false。
    • kafkaBroker [文字列型] – Kafka ブローカーの場所。ローカル・セットアップの場合は「localhost:9092」を渡し、外部セットアップの場合は ip:port の形式で場所を渡します。
    • tableName [文字列型] – IBM Db2 Event Store 内に作成するテーブルの名前。
    • topic [文字列型] – ストリームに使用する Kafka トピック。
    • group [文字列型] – ストリームに使用する Kafka グループ。
    • metadata [文字列型] – このシミュレーションされる IoT デバイスのメタデータのタイプ。例えば、「sensor」、「appliance」、「car」などです。
    • metadataId [長整数型] – この IoT デバイスを識別できる長整数。「238」、「002」などです。
    • batchSize [整数方] – Kafka キューに送信するバッチのサイズ。最小値は 1 です。
sbt "eventStream/run -localBroker true -kafkaBroker localhost:9092 -topic estopic -eventStore localhost:1100 -database TESTDB -user admin -metadata sensor -password password -metadata ReviewTable -streamingInterval 5000 -batchSize 10"
sbt "dataLoad/run -localBroker true -kafkaBroker localhost:9092 -tableName ReviewTable -topic estopic -group group -metadata sensor -metadataId 238 -batchSize 10"

データを視覚化する

アプリケーションが起動すると、Grafana ダッシュボードで着信センサー・データの視覚化が始まります。

センター・データ 図 6.  センサー・データ

次のステップ: IBM Fast Data Platform について学ぶ

これよりも遥かに完全なチュートリアルが IBM Db2 Event Store イネーブルメント・リポジトリーに用意されています。IBM Fast Data Platform を利用してデプロイできるエンドツーエンドの完全なアプリケーションの作成方法を学ぶには、この上級チュートリアルにアクセスしてください。

IBM Db2 Event Store を停止してデータベースを削除する

IBM Db2 Event Store データベースを削除するには、IBM Db2 Event Store Developer Edition をシャットダウンして、以下のコマンドを実行します。

docker stop $(docker ps -aq)
docker rm $(docker ps -aq)
cd ~/Library/Application\ Support/ibm-es-desktop
rm -rf zookeeper alluxio ibm

まとめ

このチュートリアルでは、最初に IBM Db2 Event Store Developer Edition をインストールしました。次に、センサー・ペイロードを表す JSON ペイロードを生成し、バッチで Apache Kafka クラスターにパブリッシュしました。Kafka 内で使用可能になったペイロードのバッチに Apache Spark Streaming と Kafka の統合を使用してアクセスし、IBM Db2 Event Store にペイロードを取り込みました。最後に Grafana を IBM Db2 Event Store の REST サーバーに接続し、IBM Db2 Event Store 内に取り込まれたペイロードに対して単純な述語を実行して結果を視覚化しました。

参考資料

結果

このチュートリアルでは Apache Kafka を使用してストリーミング・データを Db2 Event Store 内に取り込みました。このチュートリアルはラーニング・パス: Db2 Event Store シリーズの最終回です。ラーニング・パスを完了した今、Db2 Event Store の基礎知識とそのいくつかの高度な機能を習得できたはずですが、さらに詳しく Db2 Event Store を理解するには、こちらにアクセスしてください。