【Xi IoT】Elastic Beats で データシッピング その3 ingestノードでデータ整形
ingestノードは、Elasticsearch にデータ格納する前処理してくれる便利なやつです
こんなことをやらせようと思います
(実際にはElasticsearch内の機能のような気もしますが)
Logstash でも似たようなことができるようですが、誰か教えてください
Xi IoT + Elastic シリーズ
・Xi IoT に Elasticsearch と Kibana を導入してみる
・【Xi IoT】Elastic Beats で データシッピング その1 Xi IoT上で動かす
・【Xi IoT】Elastic Beats で データシッピング その2 Kafka連携
・【Xi IoT】Elastic Beats で データシッピング その3 ingestノードでデータ整形(←イマココ)
==各バージョン===
Xi IoT 1.18
Elastic 7.8(Elasticsearch/Kibana/Filebeat)
Python 3.7
=====================
何も考えず filebeat から Elasticsearch へデータを投げ込むと、Kafka から受け取った "メッセージ" がそのまんま入ってきてしまいます
これでは、時系列で温度をプロットするなんてことができません
こんな風に格納されるようにもっていきたいです
"temperature" : 27 "humidity" : 63
ingestノードの設置
filebeats から 受け取っている Kafka の "message" の生データを確認してみる
GET /filebeat-7.8.0/_doc/_search
全部シングルクォーテーション「'」がダブルクォーテーション「"」に変換されているようですね
"message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}"""
サンプルとしてこれを使います
テスト用の index にインプットしてみる
PUT /test_index/_doc/1 { "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}""" }
GET /test_index/_doc/1 === { "_index" : "test_index", "_type" : "_doc", "_id" : "1", "_version" : 8, "_seq_no" : 7, "_primary_term" : 5, "found" : true, "_source" : { "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}""" } }
json整形する ingest pipeline を試しに作ってみます
"プロセッサ" と呼ばれる関数みたいなもの呼び出しながら書いていきます
json整形には "json" プロセッサを使います
PUT _ingest/pipeline/temp_pipeline1 #pipeline名 { "description": "Kafka-temperature Pipeline", "processors": [ { "json": { "field": "message", "target_field": "massage_json" } } ] }
先ほどの "message" をこの ingest pipeline を通してみます
PUT /test_index/_doc/2?pipeline=temp_pipeline1 { "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}""" }
良い感じに "massage_json" 配下で、json分割されてデータ格納されました
GET /test_index/_doc/2 === { "_index" : "test_index", "_type" : "_doc", "_id" : "2", "_version" : 3, "_seq_no" : 10, "_primary_term" : 5, "found" : true, "_source" : { "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}""", "massage_json" : { "limit_upper" : 27, "now_dt" : "2020-06-30 23:55:12 UTC+09:00", "hotcold" : "あっちぃ", "temperature" : 30, "limit_under" : 15, "humidity" : 60, "limit_t" : 27 } } }
次に "message" 内で受け取っているタイムスタンプを、Elasticsearch + Kibana で理解できる形式に変換・格納してやりたいです
これにはひと工夫必要で、こちらのサイトで
elasticsearch/grok-patterns at master · elastic/elasticsearch · GitHub
パターンマッチの練習サイト
Grok Constructor
(Kibanaにもぽいのがあるけど、イマイチ使い方わからん)
パターンマッチングを検討しながら、以下のような感じで
1.grok で "now_dt" からパターンマッチさせ抜き出す
2.data で Elasticsearch + Kibana が理解できる形式で格納
3.remove で 不要なのを削除
ingest pipeline を作成
PUT _ingest/pipeline/temp_pipeline1 { "description": "Kafka-temperature Pipeline", "processors": [ { "json": { "field": "message", "target_field": "massage_json" }, "grok": { "field": "massage_json.now_dt", "patterns": [ "%{TIMESTAMP_ISO8601:massage_json.timestamp}" ] }, "date" : { "field" : "massage_json.timestamp", "formats" : ["yyyy-MM-dd HH:mm:ss"], "timezone": "Asia/Tokyo" }, "remove": { "field": "massage_json.timestamp" } } ] }
この ingest pipeline に同じ "message" を通してみます
PUT /test_index/_doc/3?pipeline=temp_pipeline1 { "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}""" }
PUT /test_index/_doc/3?pipeline=temp_pipeline1 { "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}""" }
GET /test_index/_doc/3 === { "_index" : "test_index", "_type" : "_doc", "_id" : "3", "_version" : 1, "_seq_no" : 11, "_primary_term" : 5, "found" : true, "_source" : { "@timestamp" : "2020-06-30T23:55:12.000+09:00", # ここにタイムスタンプ "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}""", "massage_json" : { "limit_upper" : 27, "now_dt" : "2020-06-30 23:55:12 UTC+09:00", "hotcold" : "あっちぃ", "temperature" : 30, "limit_under" : 15, "humidity" : 60, "limit_t" : 27 } } }
ここまで、Kibana の Dev Tool で ingest node pipeline を作ってきましたが、Stack Management の Ingest Node Pipelines でi一覧確認できます
(7.8から見れるようになった?)
ここで一から作ったりテストできるようですが、イマイチ使い方分からなかった。。。
filebeats側の設定
こちらは一行追加するだけ
filebeat.yml に ingest node pipeline 名を追記です
yaml |
output.elasticsearch:
hosts: ['${ELASTICSEARCH_HOST:elasticsearch}:${ELASTICSEARCH_PORT:9200}']
username: ${ELASTICSEARCH_USERNAME}
password: ${ELASTICSEARCH_PASSWORD}
pipeline: "temp_pipeline" # ココ
|
Kibanaで受け入れ手順
ingest node を有効にするには、既に登録されている index patterns と index を消します
(きっと index名を変えたりしてもいけるんだと思いますが )
Templatesまで消さないといけないのかはわからない
この後に作られる index からは、ingest node を通っているので、データが整形されているはずです
新しく index patterns まで作り直し、確認してみましょう
(index名が[filebeat-7.8.0]以外に日付がついていると余計なmappingがついてくるので、また消した方がよい?)
新しく index patterns を作る際には、"Time Filter field" は、ingest pipeline で作った @timestamp を選びましょう
これで、ようやく時系列に沿ったデータの可視化ができます
ここからが Kibana の本番ですね、ここまで結構大変でした
Discovery で 「filebeat-7.8.0」のデータを確認しておきます
ちゃんとそれぞれデータ入ってますね
Kibana でグラフ化
色々ありますが、初心者には Lens が簡単そう
・X-axis に "@timestamp"
・Y-axis に "message_json.temperature", "message_json.himidity"
これでやっとぽいグラフにようやく辿り着きました
ここまで elastic 初心者には分かりにくい英語のドキュメント半分と勘でなんときました
Xi IoTほどではないけれども、elastic の 「ELK stack on k8s, filebeat, ingest node」 あたりの情報があんまりなかったので、結構苦労しました。。。
elastic7.x がまだあんまりポピュラーじゃないだけなのだろうか('Д')