天天看點

ipguard用戶端如何解除安裝_如何在 Python 中使用 MQTT項目初始化選擇 MQTT 用戶端庫Pip 安裝 Paho MQTT 用戶端Python MQTT 使用連接配接 MQTT 伺服器導入 Paho MQTT用戶端設定 MQTT Broker 連接配接參數編寫 MQTT 連接配接函數釋出消息訂閱消息完整代碼消息釋出代碼消息訂閱代碼測試消息釋出消息訂閱總結

Python 是一種廣泛使用的解釋型、進階程式設計、通用型程式設計語言。Python 的設計哲學強調代碼的可讀性和簡潔的文法(尤其是使用空格縮進劃分代碼塊,而非使用大括号或者關鍵詞)。Python 讓開發者能夠用更少的代碼表達想法,不管是小型還是大型程式,該語言都試圖讓程式的結構清晰明了。

MQTT 是一種基于釋出/訂閱模式的 輕量級物聯網消息傳輸協定 ,可以用極少的代碼和帶寬為聯網裝置提供實時可靠的消息服務,它廣泛應用于物聯網、移動網際網路、智能硬體、車聯網、電力能源等行業。

本文主要介紹如何在 Python 項目中使用 paho-mqtt 用戶端庫 ,實作用戶端與 MQTT 伺服器的連接配接、訂閱、取消訂閱、收發消息等功能。

項目初始化

本項目使用 Python 3.6 進行開發測試,讀者可用如下指令确認 Python 的版本。

➜  ~ python3 --version             Python 3.6.7           

選擇 MQTT 用戶端庫

paho-mqtt 是目前 Python 中使用較多的 MQTT 用戶端庫,它在 Python 2.7 或 3.x 上為用戶端類提供了對 MQTT v3.1 和 v3.1.1 的支援。它還提供了一些幫助程式功能,使将消息釋出到 MQTT 伺服器變得非常簡單。

Pip 安裝 Paho MQTT 用戶端

Pip 是 Python 包管理工具,該工具提供了對 Python 包的查找、下載下傳、安裝、解除安裝的功能。

pip3 install -i https://pypi.doubanio.com/simple paho-mqtt           

Python MQTT 使用

連接配接 MQTT 伺服器

本文将使用 EMQ X 提供的 免費公共 MQTT 伺服器 ,該服務基于 EMQ X 的 MQTT 物聯網雲平台 建立。伺服器接入資訊如下:

  • Broker: broker.emqx.io
  • TCP Port: 1883
  • Websocket Port: 8083

導入 Paho MQTT用戶端

from paho.mqtt import client as mqtt_client           

設定 MQTT Broker 連接配接參數

設定 MQTT Broker 連接配接位址,端口以及 topic,同時我們調用 Python random.randint 函數随機生成 MQTT 用戶端 id。

broker = 'broker.emqx.io'port = 1883topic = "/python/mqtt"client_id = f'python-mqtt-{random.randint(0, 1000)}'           

編寫 MQTT 連接配接函數

編寫連接配接回調函數 on_connect ,該函數将在用戶端連接配接後被調用,在該函數中可以依據 rc 來判斷用戶端是否連接配接成功。通常同時我們将建立一個 MQTT 用戶端,該用戶端将連接配接到 broker.emqx.io 。

def connect_mqtt():    def on_connect(client, userdata, flags, rc):        if rc == 0:            print("Connected to MQTT Broker!")        else:            print("Failed to connect, return code %d", rc)    # Set Connecting Client ID    client = mqtt_client.Client(client_id)    client.on_connect = on_connect    client.connect(broker, port)    return client           

釋出消息

首先定義一個 while 循環語句,在循環中我們将設定每秒調用 MQTT 用戶端 publish 函數向 /python/mqtt 主題發送消息。

def publish(client):     msg_count = 0     while True:         time.sleep(1)         msg = f"messages: {msg_count}"         result = client.publish(topic, msg)         # result: [0, 1]         status = result[0]         if status == 0:             print(f"Send `{msg}` to topic `{topic}`")         else:             print(f"Failed to send message to topic {topic}")         msg_count += 1           

訂閱消息

編寫消息回調函數 on_message ,該函數将在用戶端從 MQTT Broker 收到消息後被調用,在該函數中我們将列印出訂閱的 topic 名稱以及接收到的消息内容。

def subscribe(client: mqtt_client):    def on_message(client, userdata, msg):        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")    client.subscribe(topic)    client.on_message = on_message           

完整代碼

消息釋出代碼

# python 3.6import randomimport timefrom paho.mqtt import client as mqtt_clientbroker = 'broker.emqx.io'port = 1883topic = "/python/mqtt"# generate client ID with pub prefix randomlyclient_id = f'python-mqtt-{random.randint(0, 1000)}'def connect_mqtt():    def on_connect(client, userdata, flags, rc):        if rc == 0:            print("Connected to MQTT Broker!")        else:            print("Failed to connect, return code %d", rc)    client = mqtt_client.Client(client_id)    client.on_connect = on_connect    client.connect(broker, port)    return clientdef publish(client):    msg_count = 0    while True:        time.sleep(1)        msg = f"messages: {msg_count}"        result = client.publish(topic, msg)        # result: [0, 1]        status = result[0]        if status == 0:            print(f"Send `{msg}` to topic `{topic}`")        else:            print(f"Failed to send message to topic {topic}")        msg_count += 1def run():    client = connect_mqtt()    client.loop_start()    publish(client)if __name__ == '__main__':    run()           

消息訂閱代碼

# python3.6import randomfrom paho.mqtt import client as mqtt_clientbroker = 'broker.emqx.io'port = 1883topic = "/python/mqtt"# generate client ID with pub prefix randomlyclient_id = f'python-mqtt-{random.randint(0, 100)}'def connect_mqtt() -> mqtt_client:    def on_connect(client, userdata, flags, rc):        if rc == 0:            print("Connected to MQTT Broker!")        else:            print("Failed to connect, return code %d", rc)    client = mqtt_client.Client(client_id)    client.on_connect = on_connect    client.connect(broker, port)    return clientdef subscribe(client: mqtt_client):    def on_message(client, userdata, msg):        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")    client.subscribe(topic)    client.on_message = on_messagedef run():    client = connect_mqtt()    subscribe(client)    client.loop_forever()if __name__ == '__main__':    run()           

測試

消息釋出

運作 MQTT 消息釋出代碼,我們将看到用戶端連接配接成功,并且成功将消息釋出。

python3 pub.py           
ipguard用戶端如何解除安裝_如何在 Python 中使用 MQTT項目初始化選擇 MQTT 用戶端庫Pip 安裝 Paho MQTT 用戶端Python MQTT 使用連接配接 MQTT 伺服器導入 Paho MQTT用戶端設定 MQTT Broker 連接配接參數編寫 MQTT 連接配接函數釋出消息訂閱消息完整代碼消息釋出代碼消息訂閱代碼測試消息釋出消息訂閱總結

消息訂閱

運作 MQTT 消息訂閱代碼,我們将看到用戶端連接配接成功,并且成功接收到釋出的消息。

python3 sub.py           
ipguard用戶端如何解除安裝_如何在 Python 中使用 MQTT項目初始化選擇 MQTT 用戶端庫Pip 安裝 Paho MQTT 用戶端Python MQTT 使用連接配接 MQTT 伺服器導入 Paho MQTT用戶端設定 MQTT Broker 連接配接參數編寫 MQTT 連接配接函數釋出消息訂閱消息完整代碼消息釋出代碼消息訂閱代碼測試消息釋出消息訂閱總結

總結

至此,我們完成了使用 paho-mqtt 用戶端連接配接到 公共 MQTT 伺服器 ,并實作了測試用戶端與 MQTT 伺服器的連接配接、消息釋出和訂閱。

與 C ++ 或 Java 之類的進階語言不同,Python 比較适合裝置側的業務邏輯實作,使用 Python 您可以減少代碼上的邏輯複雜度,降低與裝置的互動成本。我們相信在物聯網領域 Python 将會有更廣泛的應用。

原文作者:EMQ

原文連結:https://www.emqx.io/cn/blog/how-to-use-mqtt-in-python

繼續閱讀