konchangakita

KPSを一番楽しんでいたブログ 会社の看板を背負いません 転載はご自由にどうぞ

【Xi IoT】Elastic Beats で データシッピング その3 ingestノードでデータ整形

ingestノードは、Elasticsearch にデータ格納する前処理してくれる便利なやつです

こんなことをやらせようと思います
f:id:konchangakita:20200630230215p:plain
(実際にはElasticsearch内の機能のような気もしますが)

Logstash でも似たようなことができるようですが、誰か教えてください


Xi IoT + Elastic シリーズ
 ・Xi IoT に Elasticsearch と Kibana を導入してみる
 ・【Xi IoT】Elastic Beats で データシッピング その1 Xi IoT上で動かす
 ・【Xi IoT】Elastic Beats で データシッピング その2 Kafka連携
 ・【Xi IoT】Elastic Beats で データシッピング その3 ingestノードでデータ整形(←イマココ)
==各バージョン===
Xi IoT 1.18
Elastic 7.8(Elasticsearch/Kibana/Filebeat)
Python 3.7
=====================

何も考えず filebeat から Elasticsearch へデータを投げ込むと、Kafka から受け取った "メッセージ" がそのまんま入ってきてしまいます
f:id:konchangakita:20200630004413p:plain
f:id:konchangakita:20200630230618p:plain

これでは、時系列で温度をプロットするなんてことができません
こんな風に格納されるようにもっていきたいです

"temperature" : 27
"humidity" : 63

ingestノードの設置

まずは Dev Toolを使って練習してみます

filebeats から 受け取っている Kafka の "message" の生データを確認してみる

GET /filebeat-7.8.0/_doc/_search

f:id:konchangakita:20200701013543p:plain
全部シングルクォーテーション「'」がダブルクォーテーション「"」に変換されているようですね

"message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}"""


サンプルとしてこれを使います
テスト用の index にインプットしてみる

PUT /test_index/_doc/1
{
  "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}"""
}
GET /test_index/_doc/1
===
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 8,
  "_seq_no" : 7,
  "_primary_term" : 5,
  "found" : true,
  "_source" : {
    "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}"""
  }
}


json整形する ingest pipeline を試しに作ってみます
"プロセッサ" と呼ばれる関数みたいなもの呼び出しながら書いていきます
json整形には "json" プロセッサを使います

PUT _ingest/pipeline/temp_pipeline1  #pipeline名
{
  "description": "Kafka-temperature Pipeline",
  "processors": [
    {
      "json": {
        "field": "message",
        "target_field": "massage_json"
      }
    }
  ]
}


先ほどの "message" をこの ingest pipeline を通してみます

PUT /test_index/_doc/2?pipeline=temp_pipeline1
{
  "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}"""
}

良い感じに "massage_json" 配下で、json分割されてデータ格納されました

GET /test_index/_doc/2
===
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 3,
  "_seq_no" : 10,
  "_primary_term" : 5,
  "found" : true,
  "_source" : {
    "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}""",
    "massage_json" : {
      "limit_upper" : 27,
      "now_dt" : "2020-06-30 23:55:12 UTC+09:00",
      "hotcold" : "あっちぃ",
      "temperature" : 30,
      "limit_under" : 15,
      "humidity" : 60,
      "limit_t" : 27
    }
  }
}


次に "message" 内で受け取っているタイムスタンプを、Elasticsearch + Kibana で理解できる形式に変換・格納してやりたいです
これにはひと工夫必要で、こちらのサイトで
elasticsearch/grok-patterns at master · elastic/elasticsearch · GitHub
パターンマッチの練習サイト
Grok Constructor
(Kibanaにもぽいのがあるけど、イマイチ使い方わからん)

パターンマッチングを検討しながら、以下のような感じで
 1.grok で "now_dt" からパターンマッチさせ抜き出す
 2.data で Elasticsearch + Kibana が理解できる形式で格納
 3.remove で 不要なのを削除
ingest pipeline を作成

PUT _ingest/pipeline/temp_pipeline1
{
  "description": "Kafka-temperature Pipeline",
  "processors": [
    {
      "json": {
        "field": "message",
        "target_field": "massage_json"
      },
      "grok": {
        "field": "massage_json.now_dt",
        "patterns": [ "%{TIMESTAMP_ISO8601:massage_json.timestamp}" ]
      },
      "date" : {
        "field" : "massage_json.timestamp",
        "formats" : ["yyyy-MM-dd HH:mm:ss"],
        "timezone": "Asia/Tokyo"
      },
      "remove": {
        "field": "massage_json.timestamp"
      }
    }
  ]
}


この ingest pipeline に同じ "message" を通してみます

PUT /test_index/_doc/3?pipeline=temp_pipeline1
{
  "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}"""
}
PUT /test_index/_doc/3?pipeline=temp_pipeline1
{
  "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}"""
}
GET /test_index/_doc/3
===
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "3",
  "_version" : 1,
  "_seq_no" : 11,
  "_primary_term" : 5,
  "found" : true,
  "_source" : {
    "@timestamp" : "2020-06-30T23:55:12.000+09:00",  # ここにタイムスタンプ
    "message" : """{"temperature": 30, "humidity": 60, "limit_upper": 27, "limit_under": 15, "now_dt": "2020-06-30 23:55:12 UTC+09:00", "hotcold": "\u3042\u3063\u3061\u3043", "limit_t": 27}""",
    "massage_json" : {
      "limit_upper" : 27,
      "now_dt" : "2020-06-30 23:55:12 UTC+09:00",
      "hotcold" : "あっちぃ",
      "temperature" : 30,
      "limit_under" : 15,
      "humidity" : 60,
      "limit_t" : 27
    }
  }
}

ここまで、Kibana の Dev Tool で ingest node pipeline を作ってきましたが、Stack Management の Ingest Node Pipelines でi一覧確認できます
(7.8から見れるようになった?)
f:id:konchangakita:20200701133701p:plain

ここで一から作ったりテストできるようですが、イマイチ使い方分からなかった。。。
f:id:konchangakita:20200701134707p:plain


filebeats側の設定

こちらは一行追加するだけ
filebeat.yml に ingest node pipeline 名を追記です

yaml

output.elasticsearch:
hosts: ['${ELASTICSEARCH_HOST:elasticsearch}:${ELASTICSEARCH_PORT:9200}']
username: ${ELASTICSEARCH_USERNAME}
password: ${ELASTICSEARCH_PASSWORD}
pipeline: "temp_pipeline" # ココ
|


Kibanaで受け入れ手順

ingest node を有効にするには、既に登録されている index patterns と index を消します
(きっと index名を変えたりしてもいけるんだと思いますが )
f:id:konchangakita:20200625114204p:plain
f:id:konchangakita:20200625114338p:plain
Templatesまで消さないといけないのかはわからない
f:id:konchangakita:20200625114441p:plain

この後に作られる index からは、ingest node を通っているので、データが整形されているはずです
新しく index patterns まで作り直し、確認してみましょう
(index名が[filebeat-7.8.0]以外に日付がついていると余計なmappingがついてくるので、また消した方がよい?)

新しく index patterns を作る際には、"Time Filter field" は、ingest pipeline で作った @timestamp を選びましょう
f:id:konchangakita:20200701170800p:plain

これで、ようやく時系列に沿ったデータの可視化ができます
ここからが Kibana の本番ですね、ここまで結構大変でした

Discovery で 「filebeat-7.8.0」のデータを確認しておきます
f:id:konchangakita:20200701171902p:plain
f:id:konchangakita:20200701171519p:plain

ちゃんとそれぞれデータ入ってますね

Kibana でグラフ化

グラフ化もしてみます

色々ありますが、初心者には Lens が簡単そう
f:id:konchangakita:20200701175446p:plain


・X-axis に "@timestamp"
・Y-axis に "message_json.temperature", "message_json.himidity"
これでやっとぽいグラフにようやく辿り着きましたf:id:konchangakita:20200701181041p:plain

ここまで elastic 初心者には分かりにくい英語のドキュメント半分と勘でなんときました
Xi IoTほどではないけれども、elastic の 「ELK stack on k8s, filebeat, ingest node」 あたりの情報があんまりなかったので、結構苦労しました。。。
elastic7.x がまだあんまりポピュラーじゃないだけなのだろうか('Д')