新しい IBM Developer JP サイトへようこそ!サイトのデザインが一新され、旧 developerWorks のコンテンツも統合されました。 詳細はこちら

ドア監視システムの開発を通して IoT のスキルを磨く

チュートリアル「IoT ソリューション開発の出発点として単純なホーム・オートメーション・システムを構築する」では、Raspberry Pi に接続された複数のセンサーを使ってドア・モニタリング・デバイスを組み立て、Node-RED を使用してこのシステムの IoT フローを設計し、Apple HomeKit フレームワークと iOS アプリケーションを使ってシステムのモニタリングを行いました。この手法に必要となるコードは皆無かそれに近いほどでした。

このチュートリアルでも同様のシステムであるドア・モニタリング・システムを構築しますが、焦点は異なります。コードの内容を詳しく探り、IoT システムのアーキテクチャーと設計、アナリティクス、セキュリティーなどの側面に目を向けます。ハードウェアとワイヤーをいじくりまわす作業を最小限にするために、このプロジェクトでは複数の個別のハードウェア・センサーを使用するのではなく、センサーの集合をスマートフォン・デバイスで代用します。開発中に使用する私の古い Android スマートフォンには、加速度センサー、近接センサー、光度センサーが内蔵されています。古いものであっても、ほとんどのスマートフォンにはこれらの基本的なセンサーを装備できるはずなので、現物のスマートフォンが実際のセンサーの役割を十分に果たします。

このチュートリアルで構築するドア・モニタリング・システムでは、センサー (スマートフォン) をドアに取り付けて、数種類のセンサー測定値を継続的に記録します。これらの測定値を使用すれば、データを基にさまざまなアクションをトリガーできます。さらに、収集したデータを使って分析を行うこともできます。

前提条件

サンプル IoT システムのアーキテクチャー

単純な IoT アーキテクチャーは、デバイスとクラウドの 2 つの層からなるものとしてみなせます。複数のデバイスがネットワーク経由でクラウドに直接接続されて、クラウドがデバイスのオーケストレーションを行って、デバイス・データを分析、モニタリングなどに使用するという構図です。この仕組みは、極めて単純な使用ケースには有効に機能しますが、システムの規模が大きくなってくると複雑さが増してきます。

標準的な IoT アーキテクチャーには多種多様な IoT デバイスが関わってきます。これらの IoT デバイスに内蔵されたセンサーが異なる頻度で、しかもさまざまな形式で生成するデータを処理した上で分析し、洞察を引き出すことになります。IoT デバイスはネットワークに直接接続される場合も、ゲートウェイ・デバイス経由で接続される場合もあります。ネットワークに接続することで、デバイスは他のデバイスと相互に通信したり、クラウド・サービス、クラウド・アプリケーションと通信したりできるようになります。デバイスを地理的に分散させて、任意の無線プロトコル (Bluetooth、Zigbee、Wi-Fi、セルラー、RFID など) あるいは有線プロトコル (Ethernet など) を使用してデバイスから情報を送信することもできます。

IoT デバイスが使用するプロトコルもデータ形式もさまざまに異なることから、データをクラウドに送信する前に各種のプロトコルとデータ形式を集約できる層をデバイスとクラウドの間に設けると役立ちます。ゲートウェイあるいはエッジ層とも呼べるこの層は、ルーターに統合したり、別の専用デバイス上で統合したりできます。ゲートウェイまたはエッジ層では、集約処理の他に以下の目的を果たすことができます。

  • デバイスのプロビジョニング: デバイスにとってゲートウェイは最初の接点となるため、新しいデバイスをプロビジョニングしてネットワークに登録するために使用できます。
  • エッジ・コンピューティング: デバイスで生成されたデータのすべてが IoT システムに役立つというわけではありません。そこで、ゲートウェイを使用してデータを前処理またはフィルタリングしてからクラウドに送信すれば、クラウドでの不要な負荷が回避されて、帯域幅とリソースの使用効率に優れたシステムにすることができます。
  • デバイスの仮想抽象化: クラウド層はエッジを頼りにしてデバイスと通信できます。デバイスはインターネットに断続的にしか接続できないことがあるため、エッジによって、最後に既知となったデバイスの状態をクラウドに知らせたり、上位層からデバイスに送信されたコマンドをエッジで保持したりできます。デバイスがオンライン状態に戻った時点で、エッジからデバイスにこれらのコマンドを送信できます。

役割と責任を複数のサービスに分割することで、この 3 層からなるアーキテクチャーをさらに拡張できます。例えば、IBM の IoT リファレンス・アーキテクチャーでは、エンタープライズ層とユーザー層をアーキテクチャーに追加してあり、各層のさまざまなコンポーネント (サービス) が表現されています。IoT アーキテクチャーを設計する際は、所定の層またはサービスが関わり合う必要があるのは、その層/サービスが直接接続されている層/サービスとだけである点に注意してください。

このドア・モニタリング・システムの場合、プロジェクトの実装では 1 台のスマートフォン・デバイスを使用することになりますが、複数のデバイスに簡単にスケーリングできるようにシステムを設計して、セキュリティー、デバイス管理、アナリティクスといったさまざまな側面に触れたいと思います。このシステムに、ゲートウェイ層を表す別のデバイスを追加します。それは、ゲートウェイとして使用する、Wi-Fi 対応の予備の Raspberry Pi です。皆さんは、これとは別のマイクロコントローラーを使用するのでもかまいません。あるいは、PC でゲートウェイ関連のコードを実行することもできます。ここでは、セキュリティー、デバイス管理、モニタリング、アナリティクス、視覚化などの既存の機能を一から作成しなくても済むように、IBM Watson IoT Platform とその他のサービスを利用します。

ドア・モニタリング・システムのアーキテクチャーについては、以下の図を参照してください。

ドア・モニタリング・システムの IoT システムのアーキテクチャーを示す図

サンプル IoT アプリケーションで使用する IoT プロトコル

前述のように、IoT システムでは異機種混合のデバイスが使用されます。これらのデバイスはさまざまに異なるプロトコルを使ってゲートウェイと通信することが考えられますが、IoT の分野で最も普及しているプロトコルは MQTT (MQ Telemetry Transport) です。MQTT はマシン間メッセージング・プロトコルとして、一般的な IoT デバイスに伴う電力とリソースの制約を念頭に、軽量かつ高速なプロトコルとなるように設計されています。このプロトコルは TCP/IP で実行され、パブリッシュ/サブスクライブ・モデルに基づいて動作します。MQTT の仕組みについて詳しくは、記事「Getting to know MQTT」を参照してください。

機能的な MQTT システムを構成する主要なエンティティーは次のとおりです。

  • MQTT クライアント
  • メッセージ・ブローカー
  • トピックとメッセージ

クライアントは (互いに間接的に) ブローカーと通信し、階層を形成するメッセージ・トピックにメッセージをパブリッシュすることや、メッセージ・トピックをサブスクライブすることができます。トピックは、データを種類別に分離するために使用されます。

ドア・モニタリング・システムでは、ゲートウェイ (Raspberry Pi) と IBM Cloud 内のメッセージ・ブローカーとの間でデータを送受信するための通信プロトコルとして MQTT を使用します。Watson IoT Platform は MQTT を使用して動作します。

スマートフォンからデータを送信するための通信プロトコルとしては、複数の選択肢があり、Bluetooth、MQTT、WebSocket、または HTTP を使用することができます。どのプロトコルを使用するかに応じて、適切なリスニング・ロジックをゲートウェイに実装する必要があります。通常は、デバイスのセンサー・データやその他の情報にアクセスできるよう、カスタムのモバイル・アプリケーションや Web サイトを構築するという方法をとりますが、この使用ケースはセンサー・データを中継する単純なものなので、デバイスのセンサー測定値にアクセスするには、SensorNode (MQTT を使用)、SensorUDP (WebSocket を使用)、SensorStreamer (HTTP を使用) などの既存の任意のアプリケーションや、Sensor API を活用したカスタム Web ページ (こちらを参照) を利用できます。

ゲートウェイでは MQTT プロトコルを使用してクラウドと通信するため、継続的にテレメトリー・データを取得するには HTTP は不要なオーバーヘッドです。したがって、SensorUDP を使用してセンサー・データをストリーミングします。

手順

システムのアーキテクチャーが決まり、使用するプロトコルを選択したので、早速コーディングを開始しましょう。

すべてのコードは私の GitHub リポジトリーから入手できます。

ステップ 1. IoT Platform サービス・インスタンスを作成する

  1. IBM Cloud アカウントにログインします。

  2. IBM Cloud カタログを開き、サイドバーから「Internet of Things (モノのインターネット)」を選択し、「Internet of Things Platform 」を選択します。

  3. サービス名を入力し、リージョンとプランを選択してから「Create (作成)」をクリックします。

  4. サービスが作成されたら、「Launch (起動)」をクリックします。以下のような Watson IoT Platform ダッシュボードが開きます。

    Watson IoT Platform ダッシュボードのスクリーンショット

ステップ 2. 新しいゲートウェイ・デバイスのタイプを作成して、ゲートウェイ・デバイスを登録する

このチュートリアルでは、デバイスと IoT Platform サービスとの間のゲートウェイとして Raspberry Pi を使用します。新しいゲートウェイ・デバイスのタイプを作成して、前のステップで作成した IoT Platform サービス・インスタンスにそのタイプを登録する必要があります。

Watson IoT Platform マニュアルで説明している手順に従って、デバイスを登録できます。

  1. Watson IoT Platform ダッシュボードのサイドバーから「Devices (デバイス)」を選択し、「Device Types (デバイス・タイプ)」を選択します。「Add Device Type (デバイス・タイプの追加)」をクリックします。

    「Add Device Type (デバイス・タイプの追加)」ウィンドウのスクリーンショット

  2. 選択したデバイス・タイプのデバイスを追加し、ウィザードを使用してゲートウェイ・デバイスを登録します。

    「Add Device (デバイスの追加)」ウィンドウのスクリーンショット

  3. ウィザードの最後の画面で、デバイスの資格情報をコピーします。この資格情報が表示されるのはこれが最初で最後なので、後で使用できるよう、コピーしておく必要があります。

    デバイスの資格情報が表示されたウィンドウのスクリーンショット

ここでは例として、「raspi」というゲートウェイ・タイプを作成して、ID「raspi-1」のゲートウェイ・デバイスを登録しました。デフォルトでは、ゲートウェイ・デバイスは特権ロールで登録されます。この特権ロールには、デバイスを Watson IoT Platform サービスに追加する権限があります (追って詳細を説明します)。

ステップ 3. スマートフォンのデバイス・タイプを作成する

次は、スマートフォンのデバイス・タイプを作成する必要があります (ゲートウェイは、スマートフォンとは別のデバイス・タイプです)。Android で稼働するデバイスを使用するため、ここではデバイス・タイプの名前を「Android」に設定しますが、皆さんはお好きな名前を自由に使ってください。

「Add Device Type (デバイス・タイプの追加)」ウィンドウのスクリーンショット

注: この時点では、(raspi-1 を登録したように) スマートフォン・デバイスを登録する必要はありません。後でわかるように、特権ゲートウェイを使用して、プログラムによってスマートフォンを登録します。

ステップ 4. ゲートウェイを構成してクラウドに接続する

ゲートウェイ・デバイスは、MQTT プロトコルを使用してクラウドと通信することになります。Watson IoT Platform 内で MQTT ブローカーとの接続を構成するには、以下の情報が必要です (この情報は、ゲートウェイ・デバイス raspi-1 を登録したときにコピーしました)。

  • 組織 ID (6 文字の ID)
  • デバイス・タイプ (この例では raspi)
  • 具体的なデバイスのデバイス ID (例として raspi-1 を使用しました)
  • 認証トークン

セキュリティーを確保するために、MQTT ブローカーとの接続には TLS を使用してください。ブローカーとの MQTT 接続を開始するには、ホスト・アドレス、クライアント ID、ユーザー名、パスワードが必要です。接続先のホスト・アドレスの形式は <orgid>.messaging.internetofthings.ibmcloud.com、clientId の形式は "d:<orgid>:<device_type>:<device_id>" となります。認証にトークンを使用する場合、ユーザー名は use‑token‑auth で、パスワードは認証トークン自体の値です。

接続の設定をするには paho-mqtt などのライブラリーを利用できますが、正しく構成するにはある程度の作業が必要になる場合があります。幸い、IBM では開発作業を大幅に楽にする Python SDK を提供しています (この SDK は内部で paho-mqtt を使用します)。

wiotp-sdk を使用する以下のコードによって、ゲートウェイ・クライアントとして Watson IoT Platform 内の MQTT ブローカーに接続できます。 この Python スクリプトは、私の GitHub リポジトリーからダウンロードできます。

まず、SDK をインストールします。

# Installing the pypi package
$ pip install wiotp-sdk

次に、以下の Python スクリプトで接続処理を行います。

# File gateway_client.py

import wiotp.sdk.gateway

def get_gateway_cilent(config_file_path):
    config = wiotp.sdk.gateway.parseConfigFile(config_file_path)
    client = wiotp.sdk.gateway.GatewayClient(config=config, logHandlers=None)
    return client

client = get_gateway_cilent('gateway_config.yml')

get_gateway_cilent は yaml 構成ファイルのパスを取ります。この構成ファイルに、上述の構成のすべてを指定できます。ファイルの内容は以下のようになります。

# File gateway_config.yml
identity:
    orgId: 65cjsf
    typeId: raspi
    deviceId: raspi-2
auth:
    token: your_auth_token_here
options:
    domain: internetofthings.ibmcloud.com
    logLevel: debug
    mqtt:
        port: 8883
        transport: tcp

connect メソッドを呼び出すと、接続を開始できます。

client.connect() # Initiating the connection

構成ファイル内で logLevel が debug に設定されていれば、以下のような出力が表示されます。

# The debug output
2020-03-03 22:38:46,897   wiotp.sdk.gateway.client.GatewayClient  DEBUG   Connecting... (address = 65cjsf.messaging.internetofthings.ibmcloud.com, port = 8883, clientId = g:65cjsf:raspi:raspi-1, username = use-token-auth)
2020-03-03 22:38:46,897   wiotp.sdk.gateway.client.GatewayClient  DEBUG   Connecting with clientId g:65cjsf:raspi:raspi-1 to host 65cjsf.messaging.internetofthings.ibmcloud.com on port 8883 with keepAlive set to 60
2020-03-03 22:38:46,898   wiotp.sdk.gateway.client.GatewayClient  DEBUG   User-Agent: MQTT/3.1.1 (Darwin 19.2.0) Paho/1.5.0 (Python) WIoTP/0.11.0 (Python)
2020-03-03 22:38:47,316   wiotp.sdk.gateway.client.GatewayClient  DEBUG   16 Sending CONNECT (u1, p1, wr0, wq0, wf0, c1, k60) client_id=b'g:65cjsf:raspi:raspi-1'
2020-03-03 22:38:47,474   wiotp.sdk.gateway.client.GatewayClient  DEBUG   16 Received CONNACK (0, 0)
2020-03-03 22:38:47,474   wiotp.sdk.gateway.client.GatewayClient  INFO    Connected successfully: g:65cjsf:raspi:raspi-1

ステップ 5. ゲートウェイのステータス・イベントを送信する

接続を確立した後は、ブローカーにデータをパブリッシュできるかどうかテストできます。テストのために、イベントをクラウドに送信します。MQTT の用語を使用すると、MQTT ブローカー内の特定のトピックにメッセージをパブリッシュするという表現になります。慣例により、MQTT のイベント・トピックは、iot-2/type/<device type>/id/<device id>/evt/<event type>/fmt/ の形式をとります。

トピックの構造にはそれほど気を遣う必要はありません。構造は wiotp-sdk が内部で処理してくれます。以下に、ゲートウェイ・デバイス raspi-1 のステータス・イベントをパブリッシュするコードを記載します。

# File gateway_client.py

import psutil

def send_status_event(client):
    payload = {
        "cpu": psutil.cpu_percent(),
        "memory": psutil.virtual_memory().percent
    }
    return client.publishEvent(eventId="status", msgFormat="json", data=payload, qos=1)

現在の CPU とメモリーの使用状況をステータス・イベントにメッセージ・データとして格納して送信するために、client.publishEvent メソッドを使用しています。QoS level を 1 に設定しているのは、メッセージが少なくとも「1 回」配信されるようにするためです。送信したステータス・イベントは、Platform 内のウィンドウに以下のように反映されます。

デバイスのステータス・イベントを示す画面のスクリーンショット

ステップ 6. デバイスをゲートウェイに接続する

次は、このシステムにスマートフォン・デバイスを追加しましょう。前述のとおり、ゲートウェイには UDP ソケットを介してスマートフォンを接続します。SensorUDP アプリでは加速度センサー、ジャイロスコープ、磁気センサー、光度センサー、近接センサーに対応できますが、すべてのデバイスにこれらすべてのセンサーが備わっているわけではないので (私の古い Android デバイスにはジャイロスコープと磁気センサーはありません)、データの一部は欠落する可能性があります。

スマートフォンから送信されるデータをゲートウェイのソケットで listen する関数を実装する必要があります。そのための関数のコードは以下のとおりです。

# File udp_listener.py

import socket
from struct import unpack_from

UDP_IP = "0.0.0.0"
UDP_PORT = 6000 # The port where you want to receive the packets

# https://play.google.com/store/apps/details?id=com.ubccapstone.sensorUDP&hl=en_IN
properties = ['x_acc', 'y_acc', 'z_acc', 'x_gravity', 'y_gravity', 'z_gravity',  'x_rotation', 'y_rotation', 'z_rotation',
              'x_orientation', 'y_orientation', 'z_orientation', 'deprecated_1', 'deprecated_2', 'ambient_light', 'proximity',
              'keyboard_button_pressed']


def unpack_and_return(data, offset):
    return unpack_from("!f", data, offset)[0]


def process_data(data):
    offset = 0
    result = {}
    for property in properties:
        result[property] = unpack_and_return(data, offset)
        offset += 4
    return result


def listen_sensor_data():
    sock = socket.socket(socket.AF_INET,
                         socket.SOCK_DGRAM) # UDP
    sock.bind((UDP_IP, UDP_PORT))
    # Keep listening for data indefinitely
    while True:
        data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
        data = process_data(data)
        yield data, addr

for data, addr in listen_sensor_data():
     print(data, addr)

ゲートウェイ・デバイスの IP アドレスを調べるには、ipconfig シェル・コマンドを使用できます。その IP アドレスをポート番号 (この例では 6000) と併せて SensorUDP アプリ内で追加してください。

SensorUDP アプリのスクリーンショット

スマートフォンがデータの送信を開始すると、ゲートウェイ・デバイスのログに以下のような出力が表示されます。

{'x_acc': -0.003208909183740616, 'y_acc': -0.006447315216064453, 'z_acc': 0.01796436309814453, 'x_gravity': 0.0, 'y_gravity': 0.0, 'z_gravity': 0.0, 'x_rotation': 0.0, 'y_rotation': 0.0, 'z_rotation': 0.0, 'x_orientation': 0.0, 'y_orientation': 0.0, 'z_orientation': 0.0, 'deprecated_1': 0.0, 'deprecated_2': 0.0, 'ambient_light': 56.0, 'proximity': 100.0, 'keyboard_button_pressed': 0.0} ('172.20.10.4', 42048)

{'x_acc': -0.01315280981361866, 'y_acc': 0.034984588623046875, 'z_acc': -0.01422119140625, 'x_gravity': 0.0, 'y_gravity': 0.0, 'z_gravity': 0.0, 'x_rotation': 0.0, 'y_rotation': 0.0, 'z_rotation': 0.0, 'x_orientation': 0.0, 'y_orientation': 0.0, 'z_orientation': 0.0, 'deprecated_1': 0.0, 'deprecated_2': 0.0, 'ambient_light': 56.0, 'proximity': 100.0, 'keyboard_button_pressed': 0.0} ('172.20.10.4', 42048)

スマートフォンとゲートウェイ・デバイスを正常に接続させるには、この両方が同じネットワークに接続されるようにしてください。セキュリティーを強化するために、SSL、ファイアウォール、IP 許可リストなどをプロビジョニングすることもできます。

ステップ 7. ゲートウェイ経由でデバイスのイベントを送信する

デバイスから送信されるデータをゲートウェイで受信できるようになったので、残る作業はそのデータをクラウド内の MQTT ブローカーにパブリッシュすることだけです。ゲートウェイは自身に接続されたデバイスの代理としてイベントをパブリッシュできます。特権ゲートウェイがデバイスの代理として初めてイベントをパブリッシュすると、Watson IoT Platform が自動的にそのデバイスを登録します。このメカニズムは、デバイスの自動プロビジョニングと呼ばれています。さらに、REST API を使用して複数のデバイスを一括登録するメカニズムもありますが、このチュートリアルでは 1 台のデバイスしか扱わないので、前者のメカニズムを使用します。イベントを送信するためのコードは以下のとおりです。

# File gateway_client.py
ANDROID_DEVICE_TYPE = "Android"

def send_android_device_event(client, device_id, eventId, data):
    client.publishDeviceEvent(
      ANDROID_DEVICE_TYPE, device_id, eventId, msgFormat="json", data=data, qos=0)

for data, device_addr in listen_sensor_data():
     send_android_device_event(client, device_addr[0], "status", data)

client.publishEvent と同じく、client.publishDeviceEvent はゲートウェイ・クライアント経由でデバイスのイベントをパブリッシュします。上記のスニペットでは、スマートフォンの静的 IP アドレス (現時点で入手しているスマートフォンに固有の情報であるため) をデバイス ID として使用しています。理想的には、特定のデバイスを識別するためにデバイス ID は不変かつ一意でなければなりません。これからデバイス用のカスタム・アプリケーションの作成に取り掛かろうとしている場合は、データに含まれるデバイス ID などのメタ情報を共有するための規定を追加できます。

イベントのパブリッシュを開始すると、以下のように、IoT Platform ダッシュボード内にセンサーの測定値がリアルタイムで反映されるようになります。

デバイスのステータス・イベントを示す画面のスクリーンショット

ステップ 8. エッジでデータをフィルタリングして集約する

現時点では、スマートフォンが速いペースでセンサー・データをストリーミングし、そのデータがそのままでクラウド内の MQTT ブローカーに送信されます。MQTT ブローカーにはペースを緩めてデータが送信されるようにしましょう。

以下のコードでは、そのためのメカニズムを実装するために timeloop ライブラリーを使用しています (このライブラリーは内部で Python の threading モジュールを使用します)。

# File main.py

from datetime import timedelta
import time
from timeloop import Timeloop


client = get_gateway_cilent('gateway_config.yml')
client.connect()
time.sleep(2)
tl = Timeloop()

devices_data = {}


# Job that runs every 5 seconds
@tl.job(interval=timedelta(seconds=5))
def send_gateway_status():
    send_status_event(client)


# Job that runs every 200 milliseconds
@tl.job(interval=timedelta(milliseconds=200))
def send_device_readings():
    for device_addr, data in devices_data.items():
        send_android_device_event(client, device_addr, "status", data)
    devices_data.clear()

tl.start()

for data, device_addr in listen_sensor_data():
    devices_data[device_addr[0]] = data

上記のスニペットの内容は以下のとおりです。

  • ゲートウェイのステータス更新 (CPU とメモリーの使用状況データ) を 5 秒間隔でパブリッシュします。
  • デバイス・データを 200 ミリ秒間隔でパブリッシュします。

デバイスはそのホスト・アドレスに基づいて区別されるため、複数のデバイスを使用するとしても、上記のスニペットは有効です。このロジックをさらに拡張して、前回と比べて値が大幅に変化している場合にだけイベントをパブリッシュすることもできます。このようにすると、クラウドでの計算のオーバーヘッドが軽減されるだけでなく、ネットワーク帯域幅も節約されるため、この部分をエッジ・コンピューティングの一部分にすることができます。

ステップ 9. データを視覚化する

データをさまざまな方法で視覚化すると、発生したイベントを把握しやすくなります。Watson IoT Platform を利用して、クラウドに送信されるデータを視覚化しましょう。

視覚化を作成するために、新しいボードを作成します。

  1. Watson IoT Platform ダッシュボードを表示して、「Create New Board (新しいボードの作成)」を選択します。

    Watson IoT Platform の「Create a new board (新しいボードの作成)」ウィザードのスクリーンショット

  2. Add new card (新しいカードを追加)」をクリックします。「Devices (デバイス)」の下に表示されている使用可能なカードのタイプの中から、「Line chart (折れ線グラフ)」を選択します。

    折れ線グラフ・カードを追加する画面を示す、Watson IoT Platform のスクリーンショット

  3. カード内に表示するデータの送信元デバイスを選択します。カードのタイプによっては、表示するデータ・セットのデータ・ソースとして別のカードを使用できる場合もあります。その場合は、従属カード上に表示されるデータを、もう一方のカード上の値を選択してフィルタリングできます。例えば、視覚化カード内に表示される値を、デバイス・リスト・カードから表示対象のデバイスを選択してフィルタリングできます。

  4. データ・セットを接続します。イベント・タイプ (ステータス) を選択してから、プロパティーを構成するためにデータ・タイプを選択して値の範囲を入力し、「次へ 」をクリックします。

    データ・セットをカードに接続する画面を示す、Watson IoT Platform のスクリーンショット

  5. カードのサイズ、色、タイトルを入力してカードの外観を構成してから、「Submit (送信)」をクリックしてカードをボードに追加します。

いろいろな視覚化 (ゲージ、折れ線グラフ、セマフォーなど) を試して、以下のボードを作成しました。このボードをひと目見れば、システムの概要を把握できます。

Watson IoT Platform の「Visualizations (視覚化)」ダッシュボードのスクリーンショット

ステップ 10. コマンドと情報をデバイスに送信する

これまでの手順でセットアップしたインフラストラクチャーは、データを一方向 (デバイスからゲートウェイ、ゲートウェイからクラウド) でしか送信しません。反対方向でデータを送信する必要がある場合はどうすればよいでしょうか。例えば、デバイスに特定の動作をさせるための命令を送信する場合などです。こうした命令は、コマンドを使用して行うことができます。コマンドとは、クラウドやアプリケーション層から下位の層に送信される MQTT メッセージに他なりません。

wiotp-sdk SDK はアプリケーション・クライアント (この 3 層のアーキテクチャーのうちのアプリケーション層) 向けに、デバイスまたはゲートウェイに送信する関数を抽象化します。以下のコード・スニペットは、このような使用ケースの 1 つの実装を説明しています。

# File application.py

import wiotp.sdk.application

def get_gateway_cilent(config_file_path):
    config = wiotp.sdk.gateway.parseConfigFile(config_file_path)
    client = wiotp.sdk.gateway.ManagedGatewayClient(config=config, logHandlers=None)
    return client

def send_reset_command(client, type, id):
  data = {'reset': True}
  client.publishCommand(type, id, "reset", "json", data)


app_client = get_gateway_cilent("app_config.yml")
app_client.connect()
send_reset_command(app_client, 'raspi', 'raspi-1')

上記のコードで、ゲートウェイとは別の場所からクラウドに直接接続するアプリケーション・クライアントを初期化します。このアプリケーション・クライアントを使用して、ゲートウェイに自身のリセットを命令する単純なコマンドをゲートウェイに送信します。この使用ケースは未熟な実装ですが、これを拡張して、デバイスを再起動させたり、データの送信速度を変更したりするなどの有用な処理を行うことも可能です。

ゲートウェイの構成と同じく、yaml ファイル内でアプリケーションの構成を指定します。

# app_config.yml
identity:
    appId: any_app_id
auth:
    key: your_key
    token: your_token
options:
    domain: internetofthings.ibmcloud.com
    logLevel: debug

Watson IoT Platform ダッシュボードの「Apps (アプリ)」セクション内にある「Generate API Key (API 鍵の生成)」ボタンをクリックすると、(ゲートウェイ・クライアントに使用したものとは異なる) アプリケーションの鍵とトークンが生成されます。

Watson IoT Platform の「Generate API Key (API 鍵の生成)」ウィンドウのスクリーンショット

ロールとして「Backend Trusted application (バックエンドの信頼できるアプリケーション)」を割り当てています。

「Generate API Key (API 鍵の権限)」ウィンドウで割り当てた権限を示す、Watson IoT Platform のスクリーンショット

次の画面に表示される鍵と認証トークンをコピーして、application_config.yml ファイル内に貼り付けます。

アプリケーション側には上記の実装で十分ですが、ゲートウェイについてはどうでしょう。ゲートウェイではコマンドをどのように処理すればよいでしょうか。ゲートウェイには以下のロジックを追加する必要があります。

# File main.py contd..
def gateway_command_callback(cmd):
    print("Command received for {}:{}: {}".format(cmd.typeId, cmd.deviceId, cmd.data))
    if cmd.typeId == 'reset':
        reset_data(devices_data)
    else:
        print("Unknown command type received")


def reset_data():
    devices_data.clear()
    # Add more custom logic here.
    pass


# Subscribing to specific command
# client.subscribeToCommands(self, commandId="reset")
# Subscribing to all commands being sent to the gateway
client.subscribeToCommands(self)
# Registering a callback
client.commandCallback = gateway_command_callback

上記のコードで、ゲートウェイ・クライアントがコマンドを受信するたびに呼び出されるコールバック関数を登録しています。これにより、送信された ID とデータに基づいてコマンドを処理できます。同様に、コマンドをデバイスに送信することもできます。

ステップ 11. デバイスを管理する

複雑な IoT システムには多数のデバイスが含まれるため、アーキテクチャーにデバイス管理機能を組み込んでおくと役立ちます。IoT デバイスはセキュリティー上のリスクにさらされた環境内にデプロイされることがよくあります。こうしたデバイスを積極的にモニタリングし、デバイスで障害が発生した場合に回収したり、リスクにさらされた環境内でも引き続き運用できるよう更新したりしなければなりません。

例えばデバイスを工場出荷時設定にリセットしたり、セキュリティー問題にパッチをあてるため、あるいはバグを修正するために更新を適用したりできるデバイス管理機能があれば、開発者が IoT デバイスを制御することができます。

IBM Watson IoT Platform のデバイス管理プロトコルを使用して IoT デバイスを管理対象デバイスとして登録すると (これまでのところ、非管理対象デバイスを扱ってきました)、リモートからのデバイスの再起動、工場出荷時設定へのリセット、ファームウェアのダウンロードやアップグレードを行えるようになります。管理対象として登録されたデバイスは、MQTT ベースのデバイス管理プロトコルを実装します。wiotp-sdk には便宜のために、管理対象デバイス管理対象ゲートウェイの個別のクライアントも用意されています。この使用ケースでは、wiotp.sdk.gateway.GatewayClient ではなく単に wiotp.sdk.gateway.ManagedGatewayClient を使用できます。このクライアントを使用すれば、Watson IoT Platform ダッシュボードからデバイス管理アクションをトリガーできます。コマンドを処理する場合と同じく、デバイス管理アクションを処理するコールバックをセットアップする必要があります。

管理対象デバイスを登録すると、Watson IoT Platform ダッシュボード内のデバイスの詳細ページ上にある「Device Actions (デバイス・アクション)」セクションが自動的に更新されて、管理対象デバイスに対するアクションをトリガーするためのボタンが表示されます。

Watson IoT Platform の「Device Actions (デバイス・アクション)」ウィンドウのスクリーンショット

ダッシュボード内の「Devices (デバイス)」ページ上にある「Action (アクション)」タブで、これまでに適用されたデバイス・アクションのログを確認できるようにもなります。このページ上にある「Initiate Action (アクションを開始)」ボタンを使用して、管理対象デバイスに対してアクションをトリガーすることもできます。これらのアクションは、特定の 1 台のデバイスまたは指定したタイプのすべてのデバイスに同時に適用できます。

「Actions (アクション)」タブで確認できるログを示す、Watson IoT Platform のスクリーンショット

ステップ 12. データをデータベース内に保管する

後で使用できるようにデータを保持するにはどうすればよいでしょうか。それには、イベントを Cloudant NoSQL データベース内に保管できます。

IoT アプリケーションに Cloudant DB サービスを追加します。

  1. IBM Cloud カタログで、左側のメニューから「Databases (データベース)」を選択し、「Cloudant」をクリックします。

  2. プランとリージョンを選択してから、サービス名を指定します。「Create (作成)」をクリックします。

    IBM Cloud 内の Cloudant DB サービスを示す画面のスクリーンショット

  3. サービス・ダッシュボードを表示して、アプリケーション内で使用するサービス資格情報を生成する必要があります。それには、サービスのページ上にある「Service credentials (サービス資格情報)」をクリックします。

    Cloudant DB サービス資格情報を示す画面のスクリーンショット

  4. 資格情報を入手したら、アプリケーション・クライアントを Cloudant サービスにバインドして、イベントを Cloudant データベースに送信するためのコネクター、宛先、ルールを構成できます。このバインディングを行うためのコードは以下のとおりです。

# File application.py

CLOUDANT_CREDS = {
  "apikey": "your_api_key_here",
  "host": "eeca8e52-1774-4698-93d1-cafa434762ac-bluemix.cloudantnosqldb.appdomain.cloud",
  "password": "your_pwd_here",
  "port": 443,
  "username": "your_username_here"
}

SERVICE_BINDING = {
    "name": "any-binding-name",
    "description": "Test Cloudant Binding",
    "type": "cloudant",
    "credentials": CLOUDANT_CREDS
}

ANDROID_DEVICE_TYPE = "Android"
GATEWAY_DEVICE_TYPE = "raspi"
STATUS_EVENT_TYPE = "status"


def get_application_client(config_file_path):
    config = wiotp.sdk.application.parseConfigFile(config_file_path)
    app_client = wiotp.sdk.application.ApplicationClient(config)
    return app_client


def create_cloudant_connections(client, service_binding):
    # Bind application to the Cloudant DB
    cloudant_service = client.serviceBindings.create(service_binding)

    # Create the connector
    connector = client.dsc.create(
        name="connector_1", type="cloudant", serviceId=cloudant_service.id, timezone="UTC",
        description="Data connector", enabled=True
    )

    # Create a destination under the connector
    destination_1 = connector.destinations.create(name="sensor-data", bucketInterval="DAY")

    # Create a rule under the connector, that routes all Android status events to the destination
    connector.rules.createEventRule(
        name="status_events", destinationName=destination_1.name, typeId=ANDROID_DEVICE_TYPE, eventId=STATUS_EVENT_TYPE,
        description="Send android status events", enabled=True
    )

    # Create another destination under the connector
    destination_2 = connector.destinations.create(name="gateway-data", bucketInterval="DAY")

    # Create a rule under the connector, that routes all raspi status events to the destination
    connector.rules.createEventRule(
        name="status_events", destinationName=destination_2.name, typeId=GATEWAY_DEVICE_TYPE, eventId=STATUS_EVENT_TYPE,
        description="Gateway status events", enabled=True)

create_cloudant_connections(app_client, SERVICE_BINDING)

バインディング、コネクター、宛先、ルールについて詳しくは、こちらをご覧ください。要約すると、上記のコードでは Cloudant サービスを利用して、ある特定の履歴データベースにすべての Android ステータス・イベントが送信され、それとは別の履歴データベースにすべての raspi ステータス・イベントが送信されるようにアプリケーションを構成しています。どちらの履歴データベースでも、1 日ごとにデータがバケットに分類されます。

wiotp-sdk を使用して以上のルールをセットアップした後に Cloudant サービス・ダッシュボードを表示すると、以下のように複数のデータベースが生成されていることを確認できます。

Cloudant DB サービス・ダッシュボードのスクリーンショット

これらのデータベース内に、すべてのイベント・データがドキュメント形式で保管されます。「JSON」ボタンをクリックすると、イベント・データを JSON 形式で表示できます。

Cloudant DB サービスにより JSON 形式で表示されたイベント・データを示す画面のスクリーンショット

ステップ 13. データを高度なアナリティクスに使用する

Cloudant 内に保管したデータを使用して、高度なアナリティクスを行うことができます。Watson Studio 内で利用できる Jupyter Notebook にデータを取り込んで、アナリティクスのカスタム・ロジックを作成しましょう。

  1. IBM Cloud カタログを表示して、「AI 」から「Watson™ Studio」を選択します。
  2. サービスを作成します。リージョンを選択し、料金プランとして「Lite (ライト)」を選択します。「Service name (サービス名)」に名前を入力し、リソース・グループを選択します。
  3. Get Started (開始)」をクリックしてダッシュボードを起動します。次に、「Create project (プロジェクトを作成)」 > 「Create an empty project (空のプロジェクトを作成)」 をクリックし、プロジェクトの名前を入力します。
  4. 新しいクラウド・ストレージ・サービスを作成します (「Lite (ライト)」プラン > 「Create (作成)」をクリックします)。「Refresh(更新)」をクリックして、作成されたサービスを表示します。
  5. Create (作成)」をクリックします。新しいプロジェクトが開きます。
  6. リソースを追加するには、「Add to project (プロジェクトに追加)」ボタンをクリックします。

    Watson Studio プロジェクトを示す画面のスクリーンショット

  7. Notebook を選択しますが、他にもリソースがあるので調べてください。名前と説明を入力し、適切なランタイムを選択します。Watson Studio では Python 環境と R 環境の両方をサポートしています。また、Spark と Scala のサポートも備わっています。ここでは、Python で動かす Spark 環境を選択しましょう。

  8. Cloudant 内に保管されているデータを使用するには、プロジェクトに接続リソースを追加する必要があります。それには、Watson Studio プロジェクトのページを表示して再度「Add to project (プロジェクトに追加)」をクリックした後、今回は「Connection (接続) 」アセットを選択してから Cloudant サービスを選択し、「Create (作成)」をクリックします。

    Cloudant サービスの接続リソースを示す、Watson Studio のスクリーンショット

  9. Notebook 内で Cloudant サービスを表示し、資格情報をクリックしてセルに挿入できます。

    Cloudant 資格情報を追加する画面を示す、Watson Studio のスクリーンショット

  10. Notebook 内で以下のコードを実行します。

# The Watson studio notebook cell
!pip install --upgrade pixiedust

import pixiedust

username = credentials_1["username"]
password = credentials_1["password"]
host = username + '.cloudantnosqldb.appdomain.cloud'
dbName = 'iotp_65cjsf_sensor-data_2020-03-03' # DB name, can be found on the cloudant service

pixiedust.installPackage("cloudant-labs:spark-cloudant:2.0.0-s_2.11")


cloudantdata = sqlContext.read.format("com.cloudant.spark").\
option("cloudant.host", host).\
option("cloudant.username", username).\
option("cloudant.password", password).\
load(dbName)

cloudantdata.show()

これで、すべての Android イベント・データを PySpark データフレーム・オブジェクトとして利用できるようになったので、PySpark と Python での分析と視覚化に使用することができます。

PySpark データフレーム・オブジェクトを示す、Watson Studio のスクリーンショット

このドア・モニタリング・システムのコンテキストでは、以下のようなカスタム・アナリティクスが考えられます。

  • 加速度センサーと近接センサーのデータを分析し、特定の 1 日にドアが開けられる回数を推定する。
  • 特定の週で最も頻繁にドアが開けられた時間帯を特定する。
  • 各種の統計値と指標を計算し、それらの指標に基づいて他のサード・パーティーのサービスやアプリケーション (IFTTT など) を呼び出す。
  • データを処理して、Watson IoT Platform サービス内の視覚化カードでは生成できないような複合グラフを描画する。

まとめ

このチュートリアルでは、標準的な IoT システムを構成する各種のコンポーネントについて説明しました。IoT は幅の広いトピックであり、その使用ケースは複数の分野にまたがります。けれども、このチュートリアルで説明したさまざまなコンセプトの知識があれば、ほとんどの IoT 使用ケースに対処できるはずです。

このチュートリアルでセットアップしたインフラストラクチャーでは、以下のアイデアを簡単に試すことができます。

  • 近接センサーを使用してイベント (音であいさつする、ゲートウェイに接続された他のデバイスの電源を入れるなど) をトリガーする。
  • 加速度センサーの測定値をモニタリングし、何らかの異常検出手法を使用して、1 日を通してドアが開けられた回数やその他の統計値を算出する。
  • カスタム・アプリケーションを作成して、ノイズ・レベルなどの他のデータを記録する。または、個別のハードウェア・センサー (温度センサー、湿度センサー、光度センサーなど) を使用することも考えられます。アプリケーション内のボタンやハードウェアのブザーを追加してドア・ベルとして機能させることも可能です。
  • 光度がしきい値を下回った時点で (ネットワークに接続された) ライトを点灯させるなど、光度に基づいてアクションを実行する。