konchangakita

KPSを一番楽しんでいたブログ 会社の看板を背負いません 転載はご自由にどうぞ

【Xi IoT】Elastic Beats で データシッピング その2 Kafka連携

f:id:konchangakita:20200624172205p:plain

Filebeat を Xi IoT の Kubernetes Apps として、動かすことができたので Kafka 周りを調べつつ構成を想像してみます(Elastic だけじゃなくて Kafka もはじめて)
f:id:konchangakita:20200624205723p:plain


Xi IoT + Elastic シリーズ
 ・Xi IoT に Elasticsearch と Kibana を導入してみる
 ・【Xi IoT】Elastic Beats で データシッピング その1 Xi IoT上で動かす
 ・【Xi IoT】Elastic Beats で データシッピング その2 Kafka連携(←イマココ)
 ・【Xi IoT】Elastic Beats で データシッピング その3 ingestノードでデータ整形

==各バージョン===
Xi IoT 1.18
Elastic 7.8(Elasticsearch/Kibana/Filebeat)
Python 3.7
=====================

Xi IoT 上の Kafka 設定

まず Xi IoT の Data Pipeline 設定を行います

扱うセンサーデータ

扱うセンサーデータは、過去に作ったラズパイ温度センサーを利用します
【Xi IoT実装】温度計アプリを実装してみる - konchangakita
【Xi IoT実装】温度計アプリを実装してみる Slack連携 その1 - konchangakita
【Xi IoT実装】温度計アプリを実装してみる Slack連携 その2 しきい値の設定 - konchangakita
この Data Pipeline では Function の中で Slack に直接メッセージをとばしています。Slack API の仕様なんかもあったようなので、Kafka でとばすメッセージ内容も考慮して、Function を改修

<Slack部分の変更点>
レガシートークンから Incoming Webhooks へ変更
https://api.slack.com/apps/A016BGU0V2M/incoming-webhooks?
f:id:konchangakita:20200627111833p:plain

<Kafka追加部分>
Data Pipeline から Kafka へ送る”メッセージ”に現在時刻も一緒にくっつけて送る

    now1 = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))) # Japan time
    rmsg['now_dt'] = "{0:%Y-%m-%d %H:%M:%S }".format(now1) + str(now1.tzinfo)

改修後の Function はこんな感じ

Data Pipeline から Kafka へ送信されるデータ内容(メッセージ)のサンプル

{'temperature': 29, 'humidity': 57, 'limit_upper': 27, 'limit_under': 15, 'now_dt': '2020-06-28 09:20:10 UTC+09:00', 'hotcold': 'あっちぃ', 'limit_t': 27}

Kafka を使う Data Pipeline

Xi IoT の Data Pipeline で
1.Transportation に 新しい Function を設置
2. Output の Endpoint Type には Kafka を選択
3.Endpoint Name(Topic名)をユニークな名前に設定
f:id:konchangakita:20200621222147p:plain

ここまでの設定が終わると、Xi IoT ポータルの Kafka の Project に登録されます
(ver1.18 くらいにこのメニュー増えました)
f:id:konchangakita:20200628114057p:plain
これで Kafka のブローカーが作られました

Filebeat の Kafka 受信設定

Xi IoT で設定した Kafkaブローカーから、”メッセージ” を受け取るためにFilebeat の設定ファイルの filebeat.yaml の inputs に以下のように記載します

    filebeat.inputs:
    - type: kafka  
      hosts: ["{{.Services.Kafka.Endpoint}}"]  # Xi IoTのKubernetes Apps 専用の記載
      topics: ["temperature"]  # Data Pipeline で設定したトピック名
      group_id: "xiiot-kafka"  # なんでもよい


全体像はこんな感じになります

これで、Xi IoT Kafkaブローカーから "メッセージ" を受け取り、Elasticsearch へ送ることができたはずです


Kibana で受信データ確認

Kafka から受け取ったデータを確認してみます

Elasticsearch の index を確認

Kibana で index の状態を確認してみて、「filebeat-7.8.0」というのができていればOK
(filebeat-7.8.0-2020xxx とか余計に名前が長くなっているとダメっぽい)
Elasticsearchにうまくデータが入ってきていれば、Docs Count が Kafka の受信ととも増えていくはずです
f:id:konchangakita:20200625102914p:plain
f:id:konchangakita:20200625102953p:plain

index patterns設定

Elasticsearch 内の index の内容を可視化するための設定をします
f:id:konchangakita:20200625110535p:plain

index名のパターンを設定
f:id:konchangakita:20200625111105p:plain

とりあえず無しで
f:id:konchangakita:20200625111412p:plain

データ内容を確認

Discovery を確認する
f:id:konchangakita:20200625111545p:plain

indexパターンが「filebeat-*」になっていることを確認して
内容がはいってればOK
f:id:konchangakita:20200625112658p:plain

"message" で Kafka から情報がとれました
f:id:konchangakita:20200625113041p:plain

ただ、このままだと "message" 内のデータが高まって入ってしまっているので、ただただデータ受け取り続けているだけなので、時系列に合わせたデータ内容ので可視化がうまいことできません
受け取った "メッセージ" の整形は、ingest ノードの設定に続きます