【Xi IoT】Elastic Beats で データシッピング その2 Kafka連携
Filebeat を Xi IoT の Kubernetes Apps として、動かすことができたので Kafka 周りを調べつつ構成を想像してみます(Elastic だけじゃなくて Kafka もはじめて)
Xi IoT + Elastic シリーズ
・Xi IoT に Elasticsearch と Kibana を導入してみる
・【Xi IoT】Elastic Beats で データシッピング その1 Xi IoT上で動かす
・【Xi IoT】Elastic Beats で データシッピング その2 Kafka連携(←イマココ)
・【Xi IoT】Elastic Beats で データシッピング その3 ingestノードでデータ整形
==各バージョン===
Xi IoT 1.18
Elastic 7.8(Elasticsearch/Kibana/Filebeat)
Python 3.7
=====================
Xi IoT 上の Kafka 設定
まず Xi IoT の Data Pipeline 設定を行います
扱うセンサーデータ
扱うセンサーデータは、過去に作ったラズパイ温度センサーを利用します
【Xi IoT実装】温度計アプリを実装してみる - konchangakita
【Xi IoT実装】温度計アプリを実装してみる Slack連携 その1 - konchangakita
【Xi IoT実装】温度計アプリを実装してみる Slack連携 その2 しきい値の設定 - konchangakita
この Data Pipeline では Function の中で Slack に直接メッセージをとばしています。Slack API の仕様なんかもあったようなので、Kafka でとばすメッセージ内容も考慮して、Function を改修
<Slack部分の変更点>
レガシートークンから Incoming Webhooks へ変更
https://api.slack.com/apps/A016BGU0V2M/incoming-webhooks?
<Kafka追加部分>
Data Pipeline から Kafka へ送る”メッセージ”に現在時刻も一緒にくっつけて送る
now1 = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))) # Japan time rmsg['now_dt'] = "{0:%Y-%m-%d %H:%M:%S }".format(now1) + str(now1.tzinfo)
改修後の Function はこんな感じ
Data Pipeline から Kafka へ送信されるデータ内容(メッセージ)のサンプル
{'temperature': 29, 'humidity': 57, 'limit_upper': 27, 'limit_under': 15, 'now_dt': '2020-06-28 09:20:10 UTC+09:00', 'hotcold': 'あっちぃ', 'limit_t': 27}
Kafka を使う Data Pipeline
Xi IoT の Data Pipeline で
1.Transportation に 新しい Function を設置
2. Output の Endpoint Type には Kafka を選択
3.Endpoint Name(Topic名)をユニークな名前に設定
ここまでの設定が終わると、Xi IoT ポータルの Kafka の Project に登録されます
(ver1.18 くらいにこのメニュー増えました)
これで Kafka のブローカーが作られました
Filebeat の Kafka 受信設定
Xi IoT で設定した Kafkaブローカーから、”メッセージ” を受け取るためにFilebeat の設定ファイルの filebeat.yaml の inputs に以下のように記載します
filebeat.inputs: - type: kafka hosts: ["{{.Services.Kafka.Endpoint}}"] # Xi IoTのKubernetes Apps 専用の記載 topics: ["temperature"] # Data Pipeline で設定したトピック名 group_id: "xiiot-kafka" # なんでもよい
全体像はこんな感じになります
これで、Xi IoT Kafkaブローカーから "メッセージ" を受け取り、Elasticsearch へ送ることができたはずです
Kibana で受信データ確認
Kafka から受け取ったデータを確認してみます
Elasticsearch の index を確認
Kibana で index の状態を確認してみて、「filebeat-7.8.0」というのができていればOK
(filebeat-7.8.0-2020xxx とか余計に名前が長くなっているとダメっぽい)
Elasticsearchにうまくデータが入ってきていれば、Docs Count が Kafka の受信ととも増えていくはずです
index patterns設定
Elasticsearch 内の index の内容を可視化するための設定をします
index名のパターンを設定
とりあえず無しで
データ内容を確認
Discovery を確認する
indexパターンが「filebeat-*」になっていることを確認して
内容がはいってればOK
"message" で Kafka から情報がとれました
ただ、このままだと "message" 内のデータが高まって入ってしまっているので、ただただデータ受け取り続けているだけなので、時系列に合わせたデータ内容ので可視化がうまいことできません
受け取った "メッセージ" の整形は、ingest ノードの設定に続きます