實踐環境 Python 3.9.13 paho-mqtt 2.1.0 簡介 Eclipse Paho MQTT Python客戶端類庫實現了MQTT 協議版本 5.0, 3.1.1, 和3.1。 該類庫提供一個客戶端類,允許應用連接到MQTT代理併發布消息,訂閱主題並檢索發佈的消息。同時還提供了一個 ...
實踐環境
Python 3.9.13
paho-mqtt 2.1.0
簡介
Eclipse Paho MQTT Python客戶端類庫實現了MQTT 協議版本 5.0, 3.1.1, 和3.1。
該類庫提供一個客戶端類,允許應用連接到MQTT代理併發布消息,訂閱主題並檢索發佈的消息。同時還提供了一個寫其它輔助函數,使向MQTT伺服器發佈一次性消息變得非常簡單。
支持 Python 3.7+。
MQTT協議是一種機器對機器(M2M)/“物聯網”連接協議。它被設計為一種極其輕量級的發佈/訂閱消息傳輸,對於需要小代碼占用和/或網路帶寬非常昂貴的遠程連接非常有用。
安裝
pip install paho-mqtt
已知限制
以下是已知的未實現的MQTT功能。
當clean_session
為False
時,會話僅存儲在記憶體中,不會持久化。這意味著當客戶端重新啟動時(不僅僅是重新連接,通常是因為程式重新啟動而重新創建對象),會話就會丟失。這可能會導致消息丟失。
客戶端會話的以下部分丟失:
-
已從伺服器接收到但尚未完全確認的 QoS 2 消息。
由於客戶端會盲目確認任何PUBCOMP(QoS 2 事務的最後一條消息),因此它不會掛起,但會丟失此 QoS 2 消息。
-
已發送到伺服器但尚未完全確認的 QoS 1 和 QoS 2 消息。
這意味著傳遞給
publish()
的消息可能會丟失。這可以通過讓傳遞給publish()
的所有消息都有相應的on_publish()
調用或使用wait_for_publish
來緩解。這也意味著代理在會話中可能有 QoS2 消息。由於客戶端從一個空會話開始,它不知道它,並將重用mid。這還沒有解決。
此外,當clean_session
為True
時,此類庫將在網路重新連接時重新發佈 QoS > 0消息。這意味著 QoS > 0消息不會丟失。但標準規定,我們應該丟棄發送發佈包的任何消息。設置為True
意味著不符合標準,QoS 2 可能會被接收兩次。
如果只需要一次交付的 QoS 2 保證,則應設置clean_session=False
。
用法與API
API詳細線上文檔:https://eclipse.dev/paho/files/paho.mqtt.python/html/client.html
示例:https://github.com/eclipse/paho.mqtt.python/tree/master/examples
開始
下麵是一個非常簡單的示例,它訂閱代理$SYS
主題樹並列印出結果消息:
# -*- coding:utf-8 -*-
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, reason_code, properties):
'''客戶端從伺服器接收到 CONNACK 響應時的回調'''
print(f"Connected with result code {reason_code}") # 成功連接時 reason_code 值為 Success
# 在on_connect()中執行訂閱操作,意味著如果應用失去連接並且重新連接後,訂閱將被續訂。
if reason_code == 'Success':
client.subscribe('$SYS/#')
def on_disconnect(client, userdata, flags, reason_code, properties):
print(f'Disconnected with result code {reason_code}')
def on_message(client, userdata, msg):
'''從伺服器收到 PUBLISH 消息時的回調。'''
print(msg.topic + ' ' + str(msg.payload)) # 輸出值形如 $SYS/broker/version b'mosquitto version 2.0.18'
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.on_message = on_message
# client.username_pw_set('testacc', 'test1234') # 設置訪問賬號和密碼
mqttc.connect("mqtt.eclipseprojects.io", 1883, 60)
# 阻塞調用,處理網路流量、分派回調和處理重新連接
# 有其它提供線程介面和手動介面的loop*()函數可用
mqttc.loop_forever()
說明:
-
Client.username_pw_set(username: str | None, password: str | None = None*) → None
為代理身份驗證設置用戶名和密碼(可選)。
必須在
connect()
之前調用才能生效。需要支持MQTT v3.1或更高版本的代理。- 參數:
username
– 要進行身份驗證的用戶名。需要與客戶端id沒有關係。必須是字元串[MQTT-3.1.3-11]。設置為“None”可將客戶端重置為不使用用戶名/密碼進行代理身份驗證。password
– 用於身份驗證的密碼。可選,如果不需要,則設置為None
。如果為字元串r,那麼它將被編碼為UTF-8。
- 參數:
-
Client.connect(host: str, port: int = 1883, keepalive: int = 60, bind_address: str = '', bind_port: int = 0, clean_start: bool | Literal[3] = 3, properties: Properties|None = None) → MQTTErrorCode
連接到遠程代理。這是一個阻塞調用,用於建立底層連接並傳輸
CONNECT
數據包。請註意,在收到並處理CONNACK
之前,連接狀態不會更新(這需要一個正在運行的網路迴圈,請參閱loop_start
,loop_forever
,loop
…)).
參數:-
host
– 遠程代理的主機名或IP地址。 -
port
– 要連接的伺服器主機的網路埠。預設為1883。請註意,SSL/TLS上MQTT的預設埠是8883,因此如果使用TLS_set()
可能需要提供埠。 -
keepalive
- 設置心跳的時間,單位是秒。這個值告訴MQTT客戶端,在沒有接收到任何通信的情況下,多久應該發送一個PING請求給伺服器,以保持連接,預設60秒。 -
clean_start
-(僅限MQTT v5.0)True
、False
或MQTT_CLEAN_START_FIRST_ONLY
。總是設置MQTT v5.0clean_start
標誌、從不或僅在第一次成功連接時。設置clean_start標誌後,MQTT會話數據(如未完成的消息和訂閱)在成功連接時被清除。對於MQTT v3.1.1,Client
的clean_session
參數應用於類似的結果。 -
properties
(Properties) –( 僅僅限MQTT v5.0)需要在MQTT連接包發送的的MQTT v5.0 屬性。
-
客戶端(Client)
Client類一般使用流程如下:
- 創建客戶端實例
- 使用
connect*()
函數之一連接到代理 - 調用其中一個
loop*()
函數來維護代理的網路流量 - 使用
subscribe()
訂閱主題並接收消息 - 使用
publish()
將消息發佈到代理 - 使用
disconnect()
斷開與代理的連接
將調用回調以允許應用程式根據需要處理事件。這些回調如下所述。
網路迴圈
這些功能是Client背後的驅動力。如果它們沒有被調用,傳入的網路數據將不會被處理,傳出的網路數據也不會被髮送。管理網路環路有四種選擇。這裡描述了三個,第四個在下麵的“外部事件迴圈支持”中描述。不要混合使用不同的loop
函數。
loop_start()
/ loop_stop()
mqttc.loop_start()
while True:
temperature = sensor.blocking_read()
mqttc.publish("paho/temperature", temperature)
mqttc.loop_stop()
這些函數實現了網路迴圈的線程介面。在connect*()
之前或之後調用loop_start()
一次,會在後臺運行一個線程來自動調用loop()
。這釋放了主線程,用於可能阻塞的其他工作。此調用還處理與代理的重新連接。調用loop_stop()
以停止後臺線程。如果調用disconnect()
,迴圈也會停止。
loop_forever()
mqttc.loop_forever(retry_first_connection=False)
這是網路迴圈的阻塞形式,在客戶端調用disconnect()
之前不會返回(即調用mqttc.disconnect()
後會停止阻塞,繼續運行其後的代碼)。它會自動處理重新連接。
除了使用connect_async
時的第一次連接嘗試外,使用retry_first_connection=True
使其重試第一次連接。
警告:這可能會導致客戶端保持連接到不存在的主機而不會出現失敗。
loop()
run = True
while run:
rc = mqttc.loop(timeout=1.0)
if rc != 0:
# need to handle error, possible reconnecting or stopping the application
定期調用以處理網路事件。此調用觸發select()
等待,直到網路套接字可用於讀取或寫入,如果套接字可用,則處理流入/流出的數據。此函數最多阻塞timeout
秒。timeout
不能超過客戶端的keepalive
值,否則代理會定期斷開客戶端的連接。
使用這種迴圈,需要自己處理重新連接策略。
回調
與paho-mqtt交互的介面包括各種回調,當發生某些事件時,類庫會調用這些回調。
回調是在代碼中定義的函數,用於實現對這些事件要求的操作。這可能只是列印收到的消息,也可能是更複雜的行為。
回調API是有版本的,所選版本是我們提供給客戶端構造函數的CallbackAPIVersion
。目前支持兩個版本:
CallbackAPIVersion.VERSION1
:這是paho-mqtt 2.0版本之前使用的歷史版本。它是在引入CallbackAPIVersion
之前使用的API。此版本已棄用,將在paho-mqtt 3.0
版本中刪除。CallbackAPIVersion.VERSION2
:此版本在協議MQTT 3.x和MQTT 5.x之間更為一致。它也更適用於MQTT 5.x,因為reason_code
和屬性始終在可獲取時提供。建議所有用戶升級到此版本。強烈建議MQTT 5.x用戶使用。
存在以下回調:
on_connec()
:當收到代理返回CONNACK
時被調用。調用可能是針對被拒絕的連接,請檢查reason_code
以查看連接是成功還是被拒絕。on_connect_fail()
:當TCP連接建立失敗時,由loop_forever()
和loop_start()
調用。當直接使用connect()
或reconnect()
時,不會調用此回調。它僅由loop_start()
和loop_forever()
製造的自動(重新)連接後被調用on_disconnect()
:當連接關閉時被調用。on_message()
:收到代理返回的MQTT消息時被調用。on_publish()
:當MQTT消息發送到代理時被調用。取決於QoS
級別,回調在不同時刻被調用:- 對於
QoS==0
,一旦消息通過網路發送,就會調用它。這可能是在相應的publish()
返回之前。 - 對於
QoS==1
,當收到代理返回的對應消息的PUBACK
時調用它 - 對於
QoS==2
,當收到代理返回的對應消息的PUBCOMP
時,會調用它
- 對於
on_subscribe()
:當收到代理返回的SUBACK
時被調用on_unsubscribe
:當收到代理返回的UNSUBACK
時被調用on_log()
:當類庫記錄一條消息時被調用onSocket_open
、onSocket_close
、onSocket_register_write
、onSocket_unregister_write
:用於外部迴圈支持(External event loop support
)的回調。詳見下文。
參閱線上文檔查看有關每個回調的特征。
訂閱示例
# -*- coding:utf-8 -*-
import paho.mqtt.client as mqtt
def on_subscribe(client, userdata, mid, reason_code_list, properties):
# 由於我們只訂閱了一個通道,reason_code_list只包含一個條目
# print(reason_code_list) #輸出: [ReasonCode(Suback, 'Granted QoS 0')]
if reason_code_list[0].is_failure:
print(f"Broker rejected you subscription: {reason_code_list[0]}")
else:
print(f"Broker granted the following QoS: {reason_code_list[0].value}")
def on_unsubscribe(client, userdata, mid, reason_code_list, properties):
#註意,reason_code_list僅存在於MQTTv5中,在MQTTv3中,它將始終為空
if len(reason_code_list) == 0 or not reason_code_list[0].is_failure:
print("unsubscribe succeeded (if SUBACK is received in MQTTv3 it success)")
else:
print(f"Broker replied with failure: {reason_code_list[0]}")
client.disconnect()
def on_message(client, userdata, message):
# userdata是我們選擇提供的數據結構,這裡為一個列表(通過下方的 mqttc.user_data_set([])設置,該函數參數即為userdata參數值
userdata.append(message.payload)
# 假設只想處理10條消息
if len(userdata) >= 10:
client.unsubscribe("$SYS/#")
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code.is_failure:
print(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
else:
# 應該始終在 on_connect 回調中訂閱以確保在重新連接時訂閱依舊存在。
client.subscribe("$SYS/#")
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.on_subscribe = on_subscribe
mqttc.on_unsubscribe = on_unsubscribe
mqttc.user_data_set([]) # 設置 userdata
mqttc.connect("mqtt.eclipseprojects.io")
mqttc.loop_forever() # 當調用client.disconnect()後繼續執行以下代碼
print(f"Received the following message: {mqttc.user_data_get()}")
發佈示例
# -*- coding:utf-8 -*-
import time
import paho.mqtt.client as mqtt
def on_publish(client, userdata, mid, reason_code, properties):
'''reason_code和properties將僅出現在MQTTv5中。在MQTTv3中始終未設置
使用不存在`uncaked_publish`中的`mid`調用`on_publish()`。這是由於不可避免的競爭情形:
* publish() 返回已發送消息的mid。
* 主線程將publish()返回的mid添加到uncaked_publish中
* loop_start線程調用on_publish()
雖然不太可能(因為on_publish()將在網路往返後調用),但是這是一種可能發生的競爭情形
避免競爭情形的最佳解決方案是使用publish()中的msg_info。還可以嘗試使用已確認的mid列表,而不是從待處理列表中刪除
但是請記住,mid可以重覆使用!
reason_code和properties將僅出現在MQTTv5中。在MQTTv3中始終未設置
'''
try:
userdata.remove(mid)
except KeyError:
print("on_publish() is called with a mid not present in unacked_publish")
print("This is due to an unavoidable race-condition:")
print("* publish() return the mid of the message sent.")
print("* mid from publish() is added to unacked_publish by the main thread")
print("* on_publish() is called by the loop_start thread")
print("While unlikely (because on_publish() will be called after a network round-trip),")
print(" this is a race-condition that COULD happen")
print("")
print("The best solution to avoid race-condition is using the msg_info from publish()")
print("We could also try using a list of acknowledged mid rather than removing from pending list,")
print("but remember that mid could be re-used !")
unacked_publish = set()
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_publish = on_publish
mqttc.user_data_set(unacked_publish)
mqttc.connect("mqtt.eclipseprojects.io")
mqttc.loop_start()
# 應用生產一些消息
msg_info = mqttc.publish("paho/test/topic", "my message", qos=1)
unacked_publish.add(msg_info.mid)
msg_info2 = mqttc.publish("paho/test/topic", "my message2", qos=1)
unacked_publish.add(msg_info2.mid)
# 等待所有消息被髮布
while len(unacked_publish):
time.sleep(0.1)
# 由於上述描述的競爭狀態, 以下等待所有消息發佈完成的方式更安全
msg_info.wait_for_publish()
msg_info2.wait_for_publish()
mqttc.disconnect()
mqttc.loop_stop()
說明:
-
Client.max_inflight_messages_set(inflight: int) → None
設置一次可以通過其網路流的QoS>0的消息的最大數量(可以簡單理解為允許多大數量的QoS>0的消息被同時進行傳輸處理)。預設值為20。
-
Client.max_queued_messages_set(queue_size:int)→ Client
設置傳出消息隊列中的最大消息數量。0表示無限制。 -
MQTTMessageInfo.wait_for_publish(timeout: float | None = None) → None
阻塞,直到與此對象關聯的消息被髮布,或者直到超時發生。如果
timeout
為None
,則永遠不會超時。將超時設置為正數秒,例如1,2,以啟用超時。
拋出:ValueError
–如果消息因傳出隊列已滿而未排隊。RuntimeError
-如果消息因其他原因未發佈。
-
實踐過程中發現,採用多線程併發發佈消息時,如果伺服器因為限流的原因不返回消息確認,那麼運行一小段時間後,出現消息無法發佈成功的情況(不報錯,但是消息無法抵達broker),通過合理的參數調用以上三個函數,可以緩解這個問題。
Logger
客戶端會發出一些日誌消息,這些消息在故障排除過程中可能很有用。啟用日誌最簡單的方法是調用enable_logger()
。可以提供自定義記錄器或使用預設記錄器
示例:
import logging
import paho.mqtt.client as mqtt
logging.basicConfig(level=logging.DEBUG)
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.enable_logger()
mqttc.connect("mqtt.eclipseprojects.io", 1883, 60)
mqttc.loop_start()
# Do additional action needed, publish, subscribe, ...
[...]
還可以定義一個on_log
回調,它將接收所有日誌消息的副本。例子:
import paho.mqtt.client as mqtt
def on_log(client, userdata, paho_log_level, messages):
if paho_log_level == mqtt.LogLevel.MQTT_LOG_ERR:
print(message)
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_log = on_log
mqttc.connect("mqtt.eclipseprojects.io", 1883, 60)
mqttc.loop_start()
# Do additional action needed, publish, subscribe, ...
[...]
Paho日誌級別和標準日誌級別的對應關係如下:
Paho | logging |
---|---|
MQTT_LOG_ERR |
logging.ERROR |
MQTT_LOG_WARNING |
logging.WARNING |
MQTT_LOG_NOTICE |
logging.INFO (no direct equivalent) |
MQTT_LOG_INFO |
logging.INFO |
MQTT_LOG_DEBUG |
logging.DEBUG |
外部事件迴圈支持
為了支持其他網路迴圈,如asyncio(參見示例),類庫公開了一些方法和回調來支持這些用例。
存在以下迴圈方法:
loop_read
:應該在套接字可讀取時調用。loop_write
:應該在套接字可寫並且類庫需要寫入數據時調用。loop_misc
:應每隔幾秒鐘調用一次,以處理消息重試和ping。
用偽代碼表示如下:
while run:
if need_read:
mqttc.loop_read()
if need_write:
mqttc.loop_write()
mqttc.loop_misc()
if not need_read and not need_write:
# But don't wait more than few seconds, loop_misc() need to be called regularly
wait_for_change_in_need_read_or_write()
updated_need_read_and_write()
棘手的部分是實現updated_need_read_and_write
並等待條件變更。為了支持這一點,存在以下方法:
-
socket
:當TCP連接打開時返回socket
對象。此調用對於基於select迴圈特別有用。請參閱examples/loop_select.py
。 -
want_write()
:如果有數據等待寫入,則返回True
。這接近於上述偽代碼的need_writew
,但還是應該檢查套接字是否可寫。 -
回調函數
on_socket_*
:on_socket_open
:在套接字打開時調用。on_socket_open
:在套接字打開時調用。on_socket_close
:當套接字即將關閉時調用。on_socket_register_write
:當客戶端想要在套接字上寫入數據時調用on_socket_unregister_write
:當套接字上沒有更多數據要寫入時調用。
回調對於事件迴圈特別有用,在事件迴圈中,可以註冊或註銷用於讀寫的套接字。請參閱
examples/loop_asyncio.py
獲取示例。
回調總是按以下順序調用:
-
on_socket_open
-
0或者更多次:
on_socket_register_write
on_socket_unregister_write
-
on_socket_close
全局輔助函數
客戶端模塊還提供了一些全局輔助函數。
topic_matches_sub(sub, topic)
可用於檢查主題(topic
)是否與訂閱(subscription
)匹配。
例如:
主題
foo/bar
將與訂閱foo/#
或+/bar
匹配
主題non/matching
將不匹配訂閱non/+/+
發佈
此模塊提供了兩個輔助函數single()
和multiple()
,允許以一次性方式直接發佈消息。換句話說,它們對於有一個/多個消息要發佈到代理,然後斷開連接而不需要其他任何東西的情況非常有用。
提供的兩個函數是single()
和multiple()
。
這兩個函數都支持MQTT v5.0,但目前不允許在連接或發送消息時設置任何屬性。
Single
發佈一條消息到代理,然後徹底斷開連接。
例子:
import paho.mqtt.publish as publish
publish.single("paho/test/topic", "payload", hostname="mqtt.eclipseprojects.io")
Multiple
發佈多條消息到代理,然後徹底斷開連接。
例子:
from paho.mqtt.enums import MQTTProtocolVersion
import paho.mqtt.publish as publish
msgs = [{'topic':"paho/test/topic", 'payload':"multiple 1"},
("paho/test/topic", "multiple 2", 0, False)]
publish.multiple(msgs, hostname="mqtt.eclipseprojects.io", protocol=MQTTProtocolVersion.MQTTv5)
訂閱
此模塊提供了兩個輔助函數simple()
和callback()
,以允許直接訂閱和處理消息。
這兩個函數都支持MQTT v5.0,但目前不允許在連接或發送消息時設置任何屬性。
Simple
訂閱一組主題並返回收到的消息。這是一個阻塞函數。
例子:
import paho.mqtt.subscribe as subscribe
msg = subscribe.simple("paho/test/topic", hostname="mqtt.eclipseprojects.io")
print("%s %s" % (msg.topic, msg.payload))
使用回調(Callback)
訂閱一組主題,並使用用戶提供的回調處理收到的消息。
例子:
import paho.mqtt.subscribe as subscribe
def on_message_print(client, userdata, message):
print("%s %s" % (message.topic, message.payload))
userdata["message_count"] += 1
if userdata["message_count"] >= 5:
# it's possible to stop the program by disconnecting
client.disconnect()
subscribe.callback(on_message_print, "paho/test/topic", hostname="mqtt.eclipseprojects.io", userdata={"message_count": 0})
參考連接
https://github.com/eclipse/paho.mqtt.python
https://eclipse.dev/paho/files/paho.mqtt.python/html/client.html
作者:授客
微信/QQ:1033553122
全國軟體測試QQ交流群:7156436
Git地址:https://gitee.com/ishouke
友情提示:限於時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額隨意,您的支持將是我繼續創作的源動力,打賞後如有任何疑問,請聯繫我!!!
微信打賞
支付寶打賞 全國軟體測試交流QQ群