Raspberry PiでPaho Pythonクライアントを活用したMQTTの使用ガイド
MQTTは、パブリッシュ/サブスクライブモデルのIoT向け軽量メッセージングプロトコルで、最小限のコードと帯域幅で信頼性の高いリアルタイム通信を実現します。特に、リソースが限られている機器や帯域幅の小さいネットワークに有効で、IoT、モバイルインターネット、IoV、電力業界などで広く採用されています。
ラズベリー・パイは、イギリスのラズベリー・パイ財団によって開発されたARMベースの小型シングルボードコンピュータである。このボードはUSBインターフェースとイーサネットインターフェースを備えており、キーボード、マウス、ネットワークケーブルを接続することができる。このボードはPCの基本機能を持ち、ラズベリーパイはWi-Fi、Bluetooth、多数のGPIOを統合しており、教育、ファミリー・エンターテインメント、IoTなどで広く使用されている。
このプロジェクトでは、Pythonを使ってRaspberry Pi上にシンプルなMQTTクライアントを書き、このクライアントとMQTTブローカー間の接続、サブスクリプション、メッセージング、その他の機能を実装する。
Raspberry Pi MQTTプロジェクトの準備
オペレーティング・システムのインストール
Raspberry Piは、Raspberry Pi OS、Ubuntu、Kodiなど、さまざまなOSを実行できる万能ミニコンピューターだ。それぞれのOSには、独自の機能、利点、推奨アプリケーションがある。
特にRaspberry Pi OSは、Raspberry Piハードウェアとの互換性が高く、最適化されたソフトウェアやツールがプリインストールされているため、初心者に非常におすすめです。Debian LinuxをベースにRaspberry Pi専用にカスタマイズされており、プログラミング、マルチメディア、電子工作プロジェクトに使いやすいプラットフォームを提供します。
Raspberry Pi OSをインストールするには、公式ドキュメントのインストールガイドに従うことをお勧めします。この記事では、Raspberry Pi OS with desktop (Debian version: 11)をインストールしたRaspberry Pi 4を使用します。
Python3のインストール
Raspberry Pi上のMQTTの開発言語としてPythonを使用します。Pythonは習得しやすい構文で、膨大な種類のライブラリやチュートリアルがオンラインで利用できるため、MQTTを扱うのに最適です。
このプロジェクトはPython 3.6を使って開発されている。通常、Raspberry PiにはPython 3がプリインストールされています。しかし、Python 3がインストールされているかどうかわからない場合は、以下のコマンドで確認することができます:
$ python3 --version
Python 3.6.7
コマンドラインが "Python 3.x.x"("x "はバージョン番号を示す)を返した場合、Python3がRaspberry Piにインストール済みであることを意味する。インストールされていない場合は、"apt "コマンドを使ってインストールするか、Python3のインストールガイドラインに従ってください。
sudo apt install python3
Paho MQTTクライアントをインストールする
これは、Python 2.7および3.xでMQTT v5.0、v3.1.1、およびv3.1をサポートするクライアント・クラスを提供するPaho Pythonクライアント・ライブラリを使用します。さらに、Paho Pythonクライアント・ライブラリには、MQTTサーバーへの単発メッセージの発行を非常に簡単にする便利なヘルパー関数が含まれています。
ソースコードを使ってインストールする
git clone https://github.com/eclipse/paho.mqtt.python
cd paho.mqtt.python
python3 setup.py install
pip3を使ってインストールする
pip3 install paho-mqtt
MQTTブローカーを構築
先に進む前に、通信とテストを行うためのMQTTブローカーがあることを確認してください。MQTTブローカーを入手するには、いくつかのオプションがあります:
プライベート展開
EMQXは、IoT、IIoT、コネクテッドカー向けの最もスケーラブルなオープンソースのMQTTブローカーです。以下のDockerコマンドを実行することでEMQXをインストールすることができます。
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx
フルマネージドクラウドサービス
フルマネージドクラウドサービスは、MQTTサービスを開始するための最も簡単な方法です。EMQX Cloudを利用すれば、わずか数分でサービスを開始でき、AWS、Google Cloud、Microsoft Azureの20以上のリージョンでMQTTサービスを実行し、グローバルな可用性と高速接続を確保することが可能です。
最新版のEMQX Cloud Serverlessは、開発者が数秒で簡単にMQTTの導入を開始できるように、永久無料の1Mセッション分/月の無償提供をしています。
無料公開のMQTTブローカー
無料公開MQTTブローカーは、MQTTプロトコルの学習とテストを希望する人だけが利用できます。セキュリティリスクやダウンタイムの懸念があるため、本番環境での使用は避けることが重要です。
このブログ記事では、 broker.emqx.io の無料公開MQTTブローカーを使用します。
MQTT Broker Info
Server: broker.emqx.io
TCP Port: 1883
WebSocket Port: 8083
SSL/TLS Port: 8883
Secure WebSocket Port: 8084
詳しくは、こちらをご確認ください:無料公開のMQTTブローカー。
MQTTのクイックスタート
MQTTコネクションの作成
コネクトのコード例
# test_connect.py
import paho.mqtt.client as mqtt
# The callback function. It will be triggered when trying to connect to the MQTT broker
# client is the client instance connected this time
# userdata is users' information, usually empty. If it is needed, you can set it through user_data_set function.
# flags save the dictionary of broker response flag.
# rc is the response code.
# Generally, we only need to pay attention to whether the response code is 0.
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected success")
else:
print(f"Connected fail with code {rc}")
client = mqtt.Client()
client.on_connect = on_connect
client.connect("broker.emqx.io", 1883, 60)
client.loop_forever()
上記のコードを "test_connect.py "というファイル名で保存します。ファイルを実行するには、Raspberry Piでターミナルを開き、ファイルのあるディレクトリに移動します。そして以下のコマンドを入力してスクリプトを実行します。これでMQTTクライアントが起動し、MQTTブローカーに接続します。
python3 test_connect.py
on_connect
関数では、MQTTブローカーが返すレスポンスコードをチェックする。レスポンスコードが 0
の場合、"Connected success" と表示し、接続に成功したことを示します。しかし、レスポンス・コードが not 0
の場合は、以下のレスポンス・コード表に基づき、その意味を確認する必要がある。
0: connection succeeded
1: connection failed - incorrect protocol version
2: connection failed - invalid client identifier
3: connection failed - the broker is not available
4: connection failed - wrong username or password
5: connection failed - unauthorized
6-255: undefined
If it is other issues, you can check the network situation, or check whether `paho-mqtt` has been installed.
サブスクライブ
MQTTプロトコルは、トピックに基づいてメッセージをルーティングする。サブスクライバは、ブローカに興味のあるトピックをサブスクライブすることができる。パブリッシャーが特定のトピックにメッセージを送信すると、ブローカーはそのトピックを購読している全ての購読者にそのメッセージを転送します。
テキストエディタを開き、以下のコードを入力してください。ファイルを "subscriber.py "として保存します。
# subscriber.py
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# Subscribe, which need to put into on_connect
# If reconnect after losing the connection with the broker, it will continue to subscribe to the raspberry/topic topic
client.subscribe("raspberry/topic")
# The callback function, it will be triggered when receiving messages
def on_message(client, userdata, msg):
print(f"{msg.topic} {msg.payload}")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
# Set the will message, when the Raspberry Pi is powered off, or the network is interrupted abnormally, it will send the will message to other clients
client.will_set('raspberry/status', b'{"status": "Off"}')
# Create connection, the three parameters are broker address, broker port number, and keep-alive time respectively
client.connect("broker.emqx.io", 1883, 60)
# Set the network loop blocking, it will not actively end the program before calling disconnect() or the program crash
client.loop_forever()
subscribe()
関数を使うと、Raspberry Piが特定のトピックを購読できるようになります。上のコードでは、この関数を使って raspberry/topic
というトピックを購読し、メッセージの着信を監視しています。
さらに、 will_set()
機能を利用して、ウィル・メッセージを設定する。MQTTのこの機能により、デバイスが意図せず電源オフになってしまった場合に、指定したトピックにメッセージを送信することができます。この機能を使用することで、Raspberry Piの電源が切れたか、ネットワーク接続に問題があるかを判断することができます。
メッセージのパブリッシュ
テキストエディタを開き、以下のコードを入力してください。ファイルを "publisher.py "として保存します。
import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# Send a message to the raspberry/topic every 1 second, 5 times in a row
for i in range(5):
# The four parameters are topic, sending content, QoS and whether retaining the message respectively
client.publish('raspberry/topic', payload=i, qos=0, retain=False)
print(f"send {i} to raspberry/topic")
client = mqtt.Client()
client.on_connect = on_connect
client.connect("broker.emqx.io", 1883, 60)
client.loop_forever()
publish()
関数を使うと、特定のトピックにメッセージを送ることができます。上のコード例では、この関数を使用してトピック raspberry/topic
にメッセージを送信しています。QoS パラメータは、メッセージ配信のサービス品質を定義する MQTT のもう 1 つの機能です。QoSレベルの詳細については、「Introduction to MQTT QoS 0, 1, 2」を参照してください。
Pahoクライアントの使い方については、ブログ「Pahoクライアントを使ってPythonでMQTTを使う方法」をご覧ください。
テスト
以下のテストでは、MQTTX を使用する。MQTTX は、macOS、Linux、Windows で動作するエレガントなクロスプラットフォーム MQTT 5.0 デスクトップ・クライアントである。そのユーザーフレンドリーなチャットスタイルのインターフェースにより、ユーザーは簡単に複数のMQTT/MQTTS接続を作成し、MQTTメッセージをサブスクライブ/パブリッシュすることができる。
サブスクライブ
MQTT subscription スクリプト
subscriber.py
を実行すると、クライアントは正常に接続し、パブリッシャーがメッセージを発行するのを待ち始めます。python3 subscriber.py
MQTTXをパブリッシャーとして "raspberry/topic "にメッセージをパブリッシュする。
MQTTXによって発行されたメッセージが表示されます。
メッセージのパブリッシュ
MQTTXクライアント内で
raspberry/topic
をサブスクライブする。ターミナルで
publish.py
を実行する。MQTTXクライアントでは、Raspberry Piからパブリッシュされたメッセージを見ることができます。
ウィル・メッセージのテスト
次に、ウィル・メッセージが正常にセットされたかどうかをテストする。
MQTTXクライアントで
raspberry/status
をサブスクライブする。プログラムを中断したり、Raspberry Piのネットワークを切断する。
MQTTXクライアントで、
raspberry/status
が受信したメッセージを表示する。
Raspberry Pi MQTTアドバンスド
Raspberry Piのシリアルデータを読む
提供されているコードは、Raspberry Piとのシリアル接続を確立し、シリアルポートからデータを読み取り、MQTTブローカーにパブリッシュします。 serial
ライブラリを使用してシリアルポートの設定とデータの読み込みを行い、 paho.mqtt.client
ライブラリを使用して MQTT ブローカーに接続してデータをパブリッシュします。コードはループ内でシリアルデータを読み取り、指定されたMQTTトピックにパブリッシュし、定義された反復回数だけこのプロセスを繰り返します。最後に、MQTT ブローカーとの接続を切断し、シリアル接続を閉じます。
import time
import paho.mqtt.client as mqtt
import serial
# Establishing the connection with the serial port
ser = serial.Serial(
# Serial Port to read the data from
port='/dev/ttyAMA0', # Use `dmesg | grep tty` to find the port
baudrate=9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=1
)
broker_address = "broker.emqx.io"
broker_port = 1883
topic = "emqx/serial/read"
client = mqtt.Client()
client.connect(broker_address, broker_port)
# Read data from serial port and publish it to MQTT broker
for i in range(10):
data = ser.readline().decode()
client.publish(topic, data)
print("Read data {data} from serial port and publish it to MQTT broker".format(data=data))
time.sleep(1)
client.disconnect()
ser.close()
Raspberry Piのシリアルデータを書き込む
提供されたコードは、Raspberry Piとのシリアル接続を確立し、MQTTメッセージをリッスンする。メッセージを受信すると、シリアル・ポートに書き込まれる。このコードでは、 serial
ライブラリを使用してシリアルポートの設定とデータの書き込みを行い、 paho.mqtt.client
ライブラリを使用して MQTT ブローカーへの接続、MQTT メッセージの受信処理、シリアルポートへのデータのパブリッシュを行っています。MQTT クライアントは、特定の MQTT トピックをサブスクライブする on_connect
や、受信メッセージを処理する on_message
など、必要なコールバックでセットアップされます。接続されると、コードは無限ループに入り、継続的に MQTT メッセージをリッスンし、シリアル・ポートに書き込みます。
import paho.mqtt.client as mqtt
import serial
# Establishing the connection with the serial port
ser = serial.Serial(
# Serial Port to read the data from
port='/dev/ttyAMA0', # Use `dmesg | grep tty` to find the port
baudrate=9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=1
)
broker_address = "broker.emqx.io"
broker_port = 1883
topic = "emqx/serial/write"
def on_connect(client, userdata, flags, rc):
print("Connected to MQTT broker")
client.subscribe(topic)
def on_message(client, userdata, msg):
payload = msg.payload.decode()
print("Received message: {payload} on topic {topic}".format(payload=payload, topic=msg.topic))
# Write data to serial port
ser.write(payload.encode())
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_address, broker_port)
client.loop_forever()
まとめ
このブログでは、Python MQTT クライアント・ライブラリ paho-mqtt
を使って、Raspberry Pi 上でクライアントを書き、テストします。クライアントとMQTTブローカー間の接続、サブスクリプション、メッセージング、その他の機能を実装しました。
ここまでの進歩は素晴らしい!MQTTを使って多くのエキサイティングなアプリケーションを構築するための基本を学ぶことができました。例えば
携帯電話からMQTTメッセージを使ってRaspberry Piを遠隔操作できます。
Raspberry PiからMQTTブローカーに定期的にデバイスデータを送信することで、携帯電話のメッセージを継続的に監視し、受信することができます。
Raspberry PiがMQTTブローカーにアクセスし、様々なセンサーやESPモジュールを使用することで、数多くの興味深いIoTアプリケーションを作成することができる。
次に、MQTTガイドをチェックすることができます:EMQが提供する「Beginner to Advanced」シリーズで、MQTTプロトコルの機能を学び、MQTTのより高度なアプリケーションを探求し、MQTTアプリケーションとサービス開発を始めましょう。
リソース
- ESP32上のMQTT:初心者ガイド
- Raspberry PiベースのMicroPython MQTTチュートリアル
- ESP8266とMQTTでLEDを遠隔制御
- ESP8266がArduinoでMQTTブローカーに接続
- NodeMCU(ESP8266)を介したMQTTクラウドサービスへのセンサーデータのアップロード