【Xi IoTで映えるデモがしたい 第22回】LEGOで結果受信してからの処理
LEGOをスマートファクトリー化する道のり
- 画像取得
- MQTT送信(Publish)
- Xi IoT Data Pipelineで MQTT受信(Subscribe)
- Xi IoT Data Pipelineで画像処理して送信(Publish)
- 管理画面を作って確認(Subscribe/Publish)
- RasPi+BrickPiで結果受信(Subscribe)してモーター制御 ←今回はココ
- スイッチ ON/OFF制御を組み込んで完了
出来上がった管理画面に合わせて、RasPi+BrickPi側の Pythonコードも改修していきます
画像の色判定結果を受信
Node-RED上のスイッチノードで、色判定結果を受けてトピック名を分岐してやっています
赤ならfactory/bucket1 青ならfactory/bucket2みたいな感じです
これをRasPi+BrickPiで、この分岐するトピックをうけとれるように PythonでSubする方法としてトピック名を「factory/#」とします
def on_connect(client, userdata, flags, rc): print("Connected on Edge with result code "+str(rc)) # トピック名 factory/xxxを受け取れる client.subscribe("factory/#")
モーターを動かす
まず、トピック名に応じた関数を呼び出し
mqclient.message_callback_add("factory/bucket1", on_select_bucket_1) mqclient.message_callback_add("factory/bucket2", on_select_bucket_2)
受け取ったトピック名に応じた関数に、LEGOモーターをどれくらい動かすかを定義しておきます
Bucket1はポジション:-180、Bucket2はポジション:180
def on_select_bucket_1(client, userdata, msg): print(msg.topic+" motor - "+str(msg.payload)) BP.set_motor_position_kp(MOTOR_BUCKET, kp=25) BP.set_motor_position(MOTOR_BUCKET, -180 ) def on_select_bucket_2(client, userdata, msg): if BUCKET_ORDER == 1: print("move to bucket 2") BP.set_motor_position_kp(MOTOR_BUCKET, kp=25) BP.set_motor_position(MOTOR_BUCKET, 180 )
モーターの状況を送信
現在のモーターの状況を管理画面で見れるようにするために
LEGOのモーターの情報(トレイとバケット)をまとめてPubしておきます
mqclient.publish("info",payload=infoFactory()) def infoFactory(): infoTray = BP.get_motor_status(MOTOR_TRAY) infoBucket = BP.get_motor_status(MOTOR_BUCKET) response = {} response["Tray"] = {} response["Bucket"] = {} response["Tray"]["power"] = infoTray[1] response["Tray"]["position"] = infoTray[2] response["Tray"]["speed"] = infoTray[3] response["Bucket"]["power"] = infoBucket[1] response["Bucket"]["position"] = infoBucket[2] response["Bucket"]["speed"] = infoBucket[3] return json.dumps(response)
管理画面でモニタ
現在のトレイ(ボールを運ぶレール)とバケット(ボールを振り分けて受け取る)の状況の表示をしてみましょう
Node-REDでの受け取りはここの部分です
ちゃんと管理画面にも数値が反映されました
これでほぼほぼ、プログラミング部分は準備完了です