項目全部代碼地址:https://github.com/Tom-shushu/work-study.git (mqtt-emqt 項目) 先看我們最後實現的一個效果 1.手機端向主題 topic111 發送消息,並接收。(手機測試工具名稱:MQTT調試器) 2.控制台列印 MQTT基本簡介 MQTT ...
項目全部代碼地址:https://github.com/Tom-shushu/work-study.git (mqtt-emqt 項目)
先看我們最後實現的一個效果
1.手機端向主題 topic111 發送消息,並接收。(手機測試工具名稱:MQTT調試器)
2.控制台列印
MQTT基本簡介
MQTT 是用於物聯網 (IoT) 的 OASIS 標準消息傳遞協議。它被設計為一種極其輕量級的發佈/訂閱消息傳輸,非常適合連接具有小代碼足跡和最小網路帶寬的遠程設備。
MQTT協議簡介
MQTT 是客戶端伺服器發佈/訂閱消息傳輸協議。它重量輕、開放、簡單,並且易於實施。這些特性使其非常適合在許多情況下使用,包括受限制的環境,例如機器對機器 (M2M) 和物聯網 (IoT) 環境中的通信,其中需要小代碼足跡和/或網路帶寬非常寶貴。
該協議通過 TCP/IP 或其他提供有序、無損、雙向連接的網路協議運行。其特點包括:
· 使用發佈/訂閱消息模式,提供一對多的消息分發和應用程式的解耦。
· 與有效負載內容無關的消息傳輸。
· 消息傳遞的三種服務質量:
o “最多一次”,根據操作環境的最大努力傳遞消息。可能會發生消息丟失。例如,此級別可用於環境感測器數據,其中單個讀數是否丟失並不重要,因為下一個讀數將很快發佈。
o “至少一次”,保證消息到達但可能出現重覆。
o “Exactly once”,保證消息只到達一次。例如,此級別可用於重覆或丟失消息可能導致應用不正確費用的計費系統。
· 最小化傳輸開銷和協議交換以減少網路流量。
· 發生異常斷開時通知相關方的機制。
EMQX簡介
通過開放標準物聯網協議 MQTT、CoAP 和 LwM2M 連接任何設備。使用 EMQX Enterprise 集群輕鬆擴展到數千萬併發 MQTT 連接。
並且EMQX還是開源的,又支持集群,所以還是一個比較不錯的選擇
EMQX集群搭建
前期準備:
1.兩台伺服器:我的兩個伺服器一臺是騰訊雲、一臺是阿裡雲的(不要問為什麼,薅羊毛得來的)咱們暫且叫他們 mqtt_service_aliyun和
mqtt_service_txyun 吧。 2.一個功能變數名稱: mqtt.zhouhong.icu安裝開始
1.分別在兩台伺服器上執行以下操作進行安裝(如果是單機:只需要進行下麵1、2操作就安裝完成了)
## 1.下載 wget https://www.emqx.com/zh/downloads/broker/4.4.4/emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm ## 2.安裝 sudo yum install emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm ## 3.修改配置文件 vim /etc/emqx/emqx.conf ## 4.修改以下內容 ## 註意node.name是當前這台伺服器名稱 node.name = [email protected] cluster.static.seeds = [email protected],[email protected] cluster.discovery = static cluster.name = my-mqtt-cluster
2.分別啟動兩台伺服器的EMQX
sudo emqx start
3.到瀏覽器輸入 http://xxx.xx.xxx.xxx:18083/ 查看(隨便一臺都可以,預設賬號admin 密碼public),註意打開18083,1883 安全組
4.nginx負載均衡
nginx搭建很簡單略過,大家只需要修改以下nginx.conf裡面的內容即可
stream { upstream mqtt.zhouhong.icu { zone tcp_servers 64k; hash $remote_addr; server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s; server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s; } server { listen 8883 ssl; status_zone tcp_server; proxy_pass mqtt.zhouhong.icu; proxy_buffer_size 4k; ssl_handshake_timeout 15s; ssl_certificate /etc/nginx/7967358_www.mqtt.zhouhong.icu.pem; ssl_certificate_key /etc/nginx/7967358_www.mqtt.zhouhong.icu.key; } }
與SpringBoot集成並實現伺服器端監控對應topic下的消息
1.項目搭建
-
引入MQTT相關jar包
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
-
yml配置文件 (如果大家沒搭建好的話,可以直接使用我搭建的這個)
server: port: 8080 mqtt:
## 單機版--只需要把功能變數名稱改為ip既可 hostUrl: tcp://mqtt.zhouhong.icu:1883 username: admin password: public ## 服務端 clientId (發送端自己定義) clientId: service_client_id cleanSession: true reconnect: true timeout: 100 keepAlive: 100 defaultTopic: topic111 qos: 0
-
屬性配置
/** * description: * date: 2022/6/16 15:51 * @author: zhouhong */ @Component @ConfigurationProperties("mqtt") @Data public class MqttProperties { /** * 用戶名 */ private String username; /** * 密碼 */ private String password; /** * 連接地址 */ private String hostUrl; /** * 客戶端Id,同一臺伺服器下,不允許出現重覆的客戶端id */ private String clientId; /** * 預設連接主題 */ private String topic; /** * 超時時間 */ private int timeout; /** * 設置會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端 * 發送個消息判斷客戶端是否線上,但這個方法並沒有重連的機制 */ private int keepAlive; /** * 設置是否清空session,這裡如果設置為false表示伺服器會保留客戶端的連 * 接記錄,這裡設置為true表示每次連接到伺服器都以新的身份連接 */ private Boolean cleanSession; /** * 是否斷線重連 */ private Boolean reconnect; /** * 連接方式 */ private Integer qos; }
-
發送消息回調
/** * description: 發生消息成功後 的 回調 * date: 2022/6/16 15:55 * * @author: zhouhong */ @Component @Log4j2 public class MqttSendCallBack implements MqttCallbackExtended { /** * 客戶端斷開後觸發 * @param throwable */ @Override public void connectionLost(Throwable throwable) { log.info("發送消息回調: 連接斷開,可以做重連"); } /** * 客戶端收到消息觸發 * * @param topic 主題 * @param mqttMessage 消息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("發送消息回調: 接收消息主題 : " + topic); log.info("發送消息回調: 接收消息內容 : " + new String(mqttMessage.getPayload())); } /** * 發佈消息成功 * * @param token token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { log.info("發送消息回調: 向主題:" + topic + "發送消息成功!"); } try { MqttMessage message = token.getMessage(); byte[] payload = message.getPayload(); String s = new String(payload, "UTF-8"); log.info("發送消息回調: 消息的內容是:" + s); } catch (MqttException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 連接emq伺服器後觸發 * * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { log.info("--------------------ClientId:" + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------"); } }
-
接收消息回調
/** * description: 接收消息後的回調 * date: 2022/6/16 15:52 * * @author: zhouhong */ @Component @Log4j2 public class MqttAcceptCallback implements MqttCallbackExtended { @Resource private MqttAcceptClient mqttAcceptClient; /** * 客戶端斷開後觸發 * * @param throwable */ @Override public void connectionLost(Throwable throwable) { log.info("接收消息回調: 連接斷開,可以做重連"); if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) { log.info("接收消息回調: emqx重新連接...................................................."); mqttAcceptClient.reconnection(); } } /** * 客戶端收到消息觸發 * * @param topic 主題 * @param mqttMessage 消息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("接收消息回調: 接收消息主題 : " + topic); log.info("接收消息回調: 接收消息內容 : " + new String(mqttMessage.getPayload())); } /** * 發佈消息成功 * * @param token token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { log.info("接收消息回調: 向主題:" + topic + "發送消息成功!"); } try { MqttMessage message = token.getMessage(); byte[] payload = message.getPayload(); String s = new String(payload, "UTF-8"); log.info("接收消息回調: 消息的內容是:" + s); } catch (MqttException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 連接emq伺服器後觸發 * * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { log.info("--------------------ClientId:" + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------"); // 以/#結尾表示訂閱所有以test開頭的主題 // 訂閱所有機構主題 mqttAcceptClient.subscribe("topic111", 0); } }
-
發消息
/** * description: 發送消息 * date: 2022/6/16 16:01 * * @author: zhouhong */ @Component public class MqttSendClient { @Autowired private MqttSendCallBack mqttSendCallBack; @Autowired private MqttProperties mqttProperties; public MqttClient connect() { MqttClient client = null; try { String uuid = UUID.randomUUID().toString().replaceAll("-",""); client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setCleanSession(true); options.setAutomaticReconnect(false); try { // 設置回調 client.setCallback(mqttSendCallBack); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } return client; } /** * 發佈消息 * 主題格式: server:report:$orgCode(參數實際使用機構代碼) * * @param retained 是否保留 * @param pushMessage 消息體 */ public void publish(boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(mqttProperties.getQos()); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttClient mqttClient = connect(); try { mqttClient.publish(topic, message); } catch (MqttException e) { e.printStackTrace(); } finally { disconnect(mqttClient); close(mqttClient); } } /** * 關閉連接 * * @param mqttClient */ public static void disconnect(MqttClient mqttClient) { try { if (mqttClient != null) { mqttClient.disconnect(); } } catch (MqttException e) { e.printStackTrace(); } } /** * 釋放資源 * * @param mqttClient */ public static void close(MqttClient mqttClient) { try { if (mqttClient != null) { mqttClient.close(); } } catch (MqttException e) { e.printStackTrace(); } } }
-
接收消息
/** * description: 伺服器段端連接訂閱消息、監控topic * date: 2022/6/16 15:52 * * @author: zhouhong */ @Component @Log4j2 public class MqttAcceptClient { @Autowired @Lazy private MqttAcceptCallback mqttAcceptCallback; @Autowired private MqttProperties mqttProperties; public static MqttClient client; private static MqttClient getClient() { return client; } private static void setClient(MqttClient client) { MqttAcceptClient.client = client; } /** * 客戶端連接 */ public void connect() { MqttClient client; try { // clientId 使用伺服器 yml裡面配置的 clientId client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setAutomaticReconnect(mqttProperties.getReconnect()); options.setCleanSession(mqttProperties.getCleanSession()); MqttAcceptClient.setClient(client); try { // 設置回調 client.setCallback(mqttAcceptCallback); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 重新連接 */ public void reconnection() { try { client.connect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 訂閱某個主題 * * @param topic 主題 * @param qos 連接方式 */ public void subscribe(String topic, int qos) { log.info("==============開始訂閱主題==============" + topic); try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 取消訂閱某個主題 * * @param topic */ public void unsubscribe(String topic) { log.info("==============開始取消訂閱主題==============" + topic); try { client.unsubscribe(topic); } catch (MqttException e) { e.printStackTrace(); } } }
-
服務端啟動時連接訂閱主題並監控
/** * description: 啟動後連接 MQTT 伺服器, 監聽 mqtt/my_topic 這個topic發送的消息 * date: 2022/6/16 15:57 * @author: zhouhong */ @Configuration public class MqttConfig { @Resource private MqttAcceptClient mqttAcceptClient; @Bean public MqttAcceptClient getMqttPushClient() { mqttAcceptClient.connect(); return mqttAcceptClient; } }
-
發消息控制類
/** * description: 發消息控制類 * date: 2022/6/16 15:58 * * @author: zhouhong */ @RestController public class SendController { @Resource private MqttSendClient mqttSendClient; @PostMapping("/mqtt/sendmessage") public void sendMessage(@RequestBody SendParam sendParam) { mqttSendClient.publish(false,sendParam.getTopic(),sendParam.getMessageContent()); } }
2.測試
- postman調用發消息介面
- 控制台日誌
- 使用另外一個移動端MQTT調試工具測試
- 手機端向主題 topic111 發送消息,並接收。
2. 控制台列印
本文來自博客園,作者:Tom-shushu,轉載請註明原文鏈接:https://www.cnblogs.com/Tom-shushu/p/16390187.html