物聯網微消息隊列MQTT介紹-EMQX集群搭建以及與SpringBoot整合

来源:https://www.cnblogs.com/Tom-shushu/archive/2022/06/19/16390187.html
-Advertisement-
Play Games

項目全部代碼地址:https://github.com/Tom-shushu/work-study.git (mqtt-emqt 項目) 先看我們最後實現的一個效果 1.手機端向主題 topic111 發送消息,並接收。(手機測試工具名稱:MQTT調試器) 2.控制台列印 MQTT基本簡介 MQTT ...


項目全部代碼地址:https://github.com/Tom-shushu/work-study.gitmqtt-emqt 項目)

先看我們最後實現的一個效果

1.手機端向主題 topic111 發送消息,並接收。(手機測試工具名稱:MQTT調試器)

 2.控制台列印

MQTT基本簡介

MQTT 是用於物聯網 (IoT) 的 OASIS 標準消息傳遞協議。它被設計為一種極其輕量級的發佈/訂閱消息傳輸,非常適合連接具有小代碼足跡和最小網路帶寬的遠程設備。

MQTT協議簡介

MQTT 是客戶端伺服器發佈/訂閱消息傳輸協議。它重量輕、開放、簡單,並且易於實施。這些特性使其非常適合在許多情況下使用,包括受限制的環境,例如機器對機器 (M2M) 和物聯網 (IoT) 環境中的通信,其中需要小代碼足跡和/或網路帶寬非常寶貴。

該協議通過 TCP/IP 或其他提供有序、無損、雙向連接的網路協議運行。其特點包括:

·         使用發佈/訂閱消息模式,提供一對多的消息分發和應用程式的解耦。

·         與有效負載內容無關的消息傳輸。

·         消息傳遞的三種服務質量:

o    “最多一次”,根據操作環境的最大努力傳遞消息。可能會發生消息丟失。例如,此級別可用於環境感測器數據,其中單個讀數是否丟失並不重要,因為下一個讀數將很快發佈。

o    “至少一次”,保證消息到達但可能出現重覆。

o    “Exactly once”,保證消息只到達一次。例如,此級別可用於重覆或丟失消息可能導致應用不正確費用的計費系統。

·         最小化傳輸開銷和協議交換以減少網路流量。

·         發生異常斷開時通知相關方的機制。

EMQX簡介

通過開放標準物聯網協議 MQTT、CoAP 和 LwM2M 連接任何設備。使用 EMQX Enterprise 集群輕鬆擴展到數千萬併發 MQTT 連接。

並且EMQX還是開源的,又支持集群,所以還是一個比較不錯的選擇

EMQX集群搭建

前期準備:

1.兩台伺服器:我的兩個伺服器一臺是騰訊雲、一臺是阿裡雲的(不要問為什麼,薅羊毛得來的)咱們暫且叫他們 mqtt_service_aliyun和

 mqtt_service_txyun 吧。 2.一個功能變數名稱: mqtt.zhouhong.icu

安裝開始

1.分別在兩台伺服器上執行以下操作進行安裝(如果是單機:只需要進行下麵1、2操作就安裝完成了)
## 1.下載
wget https://www.emqx.com/zh/downloads/broker/4.4.4/emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm
## 2.安裝
sudo yum install emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm
## 3.修改配置文件
vim /etc/emqx/emqx.conf
## 4.修改以下內容
## 註意node.name是當前這台伺服器名稱
node.name = [email protected]
cluster.static.seeds = [email protected],[email protected]
cluster.discovery = static
cluster.name = my-mqtt-cluster
2.分別啟動兩台伺服器的EMQX
sudo emqx start
3.到瀏覽器輸入 http://xxx.xx.xxx.xxx:18083/ 查看(隨便一臺都可以,預設賬號admin 密碼public),註意打開18083,1883 安全組

4.nginx負載均衡

nginx搭建很簡單略過,大家只需要修改以下nginx.conf裡面的內容即可

stream {
  upstream mqtt.zhouhong.icu {
      zone tcp_servers 64k;
      hash $remote_addr;
      server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s;
      server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s;

  }

  server {
      listen 8883 ssl;
      status_zone tcp_server;
      proxy_pass mqtt.zhouhong.icu;
      proxy_buffer_size 4k;
      ssl_handshake_timeout 15s;
      ssl_certificate     /etc/nginx/7967358_www.mqtt.zhouhong.icu.pem;
      ssl_certificate_key /etc/nginx/7967358_www.mqtt.zhouhong.icu.key;
  }
}

與SpringBoot集成並實現伺服器端監控對應topic下的消息

1.項目搭建

  • 引入MQTT相關jar包
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
  • yml配置文件 (如果大家沒搭建好的話,可以直接使用我搭建的這個)
server:
  port: 8080

mqtt:
 ## 單機版--只需要把功能變數名稱改為ip既可  hostUrl: tcp:
//mqtt.zhouhong.icu:1883 username: admin password: public ## 服務端 clientId (發送端自己定義) clientId: service_client_id cleanSession: true reconnect: true timeout: 100 keepAlive: 100 defaultTopic: topic111 qos: 0
  • 屬性配置
/**
 * description:
 * date: 2022/6/16 15:51
 * @author: zhouhong
 */
@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {

    /**
     * 用戶名
     */
    private String username;

    /**
     * 密碼
     */
    private String password;

    /**
     * 連接地址
     */
    private String hostUrl;

    /**
     * 客戶端Id,同一臺伺服器下,不允許出現重覆的客戶端id
     */
    private String clientId;

    /**
     * 預設連接主題
     */
    private String topic;

    /**
     * 超時時間
     */
    private int timeout;

    /**
     * 設置會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端
     * 發送個消息判斷客戶端是否線上,但這個方法並沒有重連的機制
     */
    private int keepAlive;

    /**
     * 設置是否清空session,這裡如果設置為false表示伺服器會保留客戶端的連
     * 接記錄,這裡設置為true表示每次連接到伺服器都以新的身份連接
     */
    private Boolean cleanSession;

    /**
     * 是否斷線重連
     */
    private Boolean reconnect;

    /**
     * 連接方式
     */
    private Integer qos;
}
  • 發送消息回調
/**
 * description: 發生消息成功後 的 回調
 * date: 2022/6/16 15:55
 *
 * @author: zhouhong
 */
@Component
@Log4j2
public class MqttSendCallBack implements MqttCallbackExtended {

    /**
     * 客戶端斷開後觸發
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("發送消息回調: 連接斷開,可以做重連");
    }

    /**
     * 客戶端收到消息觸發
     *
     * @param topic       主題
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("發送消息回調:  接收消息主題 : " + topic);
        log.info("發送消息回調:  接收消息內容 : " + new String(mqttMessage.getPayload()));
    }

    /**
     * 發佈消息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            log.info("發送消息回調:  向主題:" + topic + "發送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            log.info("發送消息回調:  消息的內容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 連接emq伺服器後觸發
     *
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        log.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------");
    }
}
  • 接收消息回調
/**
 * description: 接收消息後的回調
 * date: 2022/6/16 15:52
 *
 * @author: zhouhong
 */
@Component
@Log4j2
public class MqttAcceptCallback implements MqttCallbackExtended {

    @Resource
    private MqttAcceptClient mqttAcceptClient;

    /**
     * 客戶端斷開後觸發
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("接收消息回調:  連接斷開,可以做重連");
        if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
            log.info("接收消息回調:  emqx重新連接....................................................");
            mqttAcceptClient.reconnection();
        }
    }

    /**
     * 客戶端收到消息觸發
     *
     * @param topic       主題
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("接收消息回調:  接收消息主題 : " + topic);
        log.info("接收消息回調:  接收消息內容 : " + new String(mqttMessage.getPayload()));
    }

    /**
     * 發佈消息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            log.info("接收消息回調:  向主題:" + topic + "發送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            log.info("接收消息回調:  消息的內容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 連接emq伺服器後觸發
     *
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        log.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------");
        // 以/#結尾表示訂閱所有以test開頭的主題
        // 訂閱所有機構主題
        mqttAcceptClient.subscribe("topic111", 0);
    }
}
  • 發消息
/**
 * description: 發送消息
 * date: 2022/6/16 16:01
 *
 * @author: zhouhong
 */
@Component
public class MqttSendClient {

    @Autowired
    private MqttSendCallBack mqttSendCallBack;

    @Autowired
    private MqttProperties mqttProperties;

    public MqttClient connect() {
        MqttClient client = null;
        try {
            String uuid = UUID.randomUUID().toString().replaceAll("-","");
            client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setCleanSession(true);
            options.setAutomaticReconnect(false);
            try {
                // 設置回調
                client.setCallback(mqttSendCallBack);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return client;
    }

    /**
     * 發佈消息
     * 主題格式: server:report:$orgCode(參數實際使用機構代碼)
     *
     * @param retained    是否保留
     * @param pushMessage 消息體
     */
    public void publish(boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(mqttProperties.getQos());
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttClient mqttClient = connect();
        try {
            mqttClient.publish(topic, message);
        } catch (MqttException e) {
            e.printStackTrace();
        } finally {
            disconnect(mqttClient);
            close(mqttClient);
        }
    }

    /**
     * 關閉連接
     *
     * @param mqttClient
     */
    public static void disconnect(MqttClient mqttClient) {
        try {
            if (mqttClient != null) {
                mqttClient.disconnect();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 釋放資源
     *
     * @param mqttClient
     */
    public static void close(MqttClient mqttClient) {
        try {
            if (mqttClient != null) {
                mqttClient.close();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
  • 接收消息
/**
 * description: 伺服器段端連接訂閱消息、監控topic
 * date: 2022/6/16 15:52
 *
 * @author: zhouhong
 */
@Component
@Log4j2
public class MqttAcceptClient {

    @Autowired
    @Lazy
    private MqttAcceptCallback mqttAcceptCallback;

    @Autowired
    private MqttProperties mqttProperties;

    public static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttAcceptClient.client = client;
    }

    /**
     * 客戶端連接
     */
    public void connect() {
        MqttClient client;
        try {
            // clientId 使用伺服器 yml裡面配置的 clientId
            client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setAutomaticReconnect(mqttProperties.getReconnect());
            options.setCleanSession(mqttProperties.getCleanSession());
            MqttAcceptClient.setClient(client);
            try {
                // 設置回調
                client.setCallback(mqttAcceptCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 重新連接
     */
    public void reconnection() {
        try {
            client.connect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 訂閱某個主題
     *
     * @param topic 主題
     * @param qos   連接方式
     */
    public void subscribe(String topic, int qos) {
        log.info("==============開始訂閱主題==============" + topic);
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 取消訂閱某個主題
     *
     * @param topic
     */
    public void unsubscribe(String topic) {
        log.info("==============開始取消訂閱主題==============" + topic);
        try {
            client.unsubscribe(topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
  • 服務端啟動時連接訂閱主題並監控
/**
 * description: 啟動後連接 MQTT 伺服器, 監聽 mqtt/my_topic 這個topic發送的消息
 * date: 2022/6/16 15:57
 * @author: zhouhong
 */
@Configuration
public class MqttConfig {

    @Resource
    private MqttAcceptClient mqttAcceptClient;

    @Bean
    public MqttAcceptClient getMqttPushClient() {
        mqttAcceptClient.connect();
        return mqttAcceptClient;
    }
}
  • 發消息控制類
/**
 * description: 發消息控制類
 * date: 2022/6/16 15:58
 *
 * @author: zhouhong
 */
@RestController
public class SendController {

    @Resource
    private MqttSendClient mqttSendClient;

    @PostMapping("/mqtt/sendmessage")
    public void sendMessage(@RequestBody SendParam sendParam) {
        mqttSendClient.publish(false,sendParam.getTopic(),sendParam.getMessageContent());
    }
}

2.測試

  • postman調用發消息介面

  •  控制台日誌

  •  使用另外一個移動端MQTT調試工具測試
  1. 手機端向主題 topic111 發送消息,並接收。

 

 

  2. 控制台列印

 

本文來自博客園,作者:Tom-shushu,轉載請註明原文鏈接:https://www.cnblogs.com/Tom-shushu/p/16390187.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • C++ 標準庫提供了原子操作。(我已經懶得寫序言了) 先來說原子操作的概念: 原子操作是多線程當中對資源進行保護的一種手段,主要作用是和互斥量(Mutex)一樣,避免對資源的併發訪問、修改。 互斥量的粒度衡量是作用域(哪怕作用域內只有一個變數),而原子的粒度衡量則是以一個變數或對象為單位。因此,原子 ...
  • 介紹 這是很久之前的一個項目了,最近剛好有些時間,就來總結一下吧! 推薦初步熟悉項目後閱讀本文: https://gitee.com/smalldyy/easy-msg-cpp 從何而來 這要從我從事Qt開發的那些日子說起了,項目說大不大,說小也不小,人倒是一茬又一茬,需求也換了又換,後來的事情大家 ...
  • 引言:沒想到2022年還有很多工業軟體公司依然使用MFC,微軟也一直在更新MFC的庫,這次使用MFC封裝的CFileDialog類,寫一個獲得選定文件路徑,名稱,擴展名的程式。 個人技術博客(文章整理+源碼): https://zobolblog.github.io/LearnWinAPI/ 最終效 ...
  • 一個挺著啤酒肚,身穿格子衫,髮際線嚴重後移的中年男子,手拿著保溫杯,胳膊夾著MacBook向你走來,看樣子是架構師級別。 面試開始, 直入正題。 面試官: 你有沒有參與過秒殺系統的設計? 我: 沒有,我平時都是開發後臺管理系統、OA辦公系統、內部管理系統,從來沒有開發過秒殺系統。 面試官: 嗯... ...
  • 大佬理解->Java集合之ArrayList 1、ArrayList的特點 存放的元素有序 元素不唯一(可以重覆) 隨機訪問快 插入刪除元素慢 非線程安全 2、底層實現 底層初始化,使用一個Object類型的空對象數組,初始長度為0; 源碼 //Object類型對象數組引用 transient Ob ...
  • 大佬的理解->《Java IO(五) -- 字元流進階及BufferedWriter,BufferedReader》 1、BufferedReader BufferedReader高效字元流讀取文件基本用法,自帶緩衝區,讀取文件效率高,支持逐行讀取; 1.1 初始化 BufferedReader(R ...
  • 游戲的世界精彩紛呈,有動作類、策略類、角色扮演類等諸多類型,還有很多難以分類的小游戲,讓人玩起來往往愛不釋手 ...
  • 大佬的理解->《Java IO(四) -- 字元流》 FileReader字元流讀取文件,更適合用於讀取文件,可以讀取中文 1、FileReader 1.1 初始化 FileReader(File file) FileReader(String fileName) 1.2 讀取文件內容 read() ...
一周排行
    -Advertisement-
    Play Games
  • 基於.NET Framework 4.8 開發的深度學習模型部署測試平臺,提供了YOLO框架的主流系列模型,包括YOLOv8~v9,以及其系列下的Det、Seg、Pose、Obb、Cls等應用場景,同時支持圖像與視頻檢測。模型部署引擎使用的是OpenVINO™、TensorRT、ONNX runti... ...
  • 十年沉澱,重啟開發之路 十年前,我沉浸在開發的海洋中,每日與代碼為伍,與演算法共舞。那時的我,滿懷激情,對技術的追求近乎狂熱。然而,隨著歲月的流逝,生活的忙碌逐漸占據了我的大部分時間,讓我無暇顧及技術的沉澱與積累。 十年間,我經歷了職業生涯的起伏和變遷。從初出茅廬的菜鳥到逐漸嶄露頭角的開發者,我見證了 ...
  • C# 是一種簡單、現代、面向對象和類型安全的編程語言。.NET 是由 Microsoft 創建的開發平臺,平臺包含了語言規範、工具、運行,支持開發各種應用,如Web、移動、桌面等。.NET框架有多個實現,如.NET Framework、.NET Core(及後續的.NET 5+版本),以及社區版本M... ...
  • 前言 本文介紹瞭如何使用三菱提供的MX Component插件實現對三菱PLC軟元件數據的讀寫,記錄了使用電腦模擬,模擬PLC,直至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1. PLC開發編程環境GX Works2,GX Works2下載鏈接 https:// ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • 1、jQuery介紹 jQuery是什麼 jQuery是一個快速、簡潔的JavaScript框架,是繼Prototype之後又一個優秀的JavaScript代碼庫(或JavaScript框架)。jQuery設計的宗旨是“write Less,Do More”,即倡導寫更少的代碼,做更多的事情。它封裝 ...
  • 前言 之前的文章把js引擎(aardio封裝庫) 微軟開源的js引擎(ChakraCore))寫好了,這篇文章整點js代碼來測一下bug。測試網站:https://fanyi.youdao.com/index.html#/ 逆向思路 逆向思路可以看有道翻譯js逆向(MD5加密,AES加密)附完整源碼 ...
  • 引言 現代的操作系統(Windows,Linux,Mac OS)等都可以同時打開多個軟體(任務),這些軟體在我們的感知上是同時運行的,例如我們可以一邊瀏覽網頁,一邊聽音樂。而CPU執行代碼同一時間只能執行一條,但即使我們的電腦是單核CPU也可以同時運行多個任務,如下圖所示,這是因為我們的 CPU 的 ...
  • 掌握使用Python進行文本英文統計的基本方法,並瞭解如何進一步優化和擴展這些方法,以應對更複雜的文本分析任務。 ...
  • 背景 Redis多數據源常見的場景: 分區數據處理:當數據量增長時,單個Redis實例可能無法處理所有的數據。通過使用多個Redis數據源,可以將數據分區存儲在不同的實例中,使得數據處理更加高效。 多租戶應用程式:對於多租戶應用程式,每個租戶可以擁有自己的Redis數據源,以確保數據隔離和安全性。 ...