物聯網微消息隊列MQTT介紹-EMQX集群搭建以及與SpringBoot整合

来源:https://www.cnblogs.com/Tom-shushu/archive/2022/06/19/16390187.html
-Advertisement-
Play Games

項目全部代碼地址:https://github.com/Tom-shushu/work-study.git (mqtt-emqt 項目) 先看我們最後實現的一個效果 1.手機端向主題 topic111 發送消息,並接收。(手機測試工具名稱:MQTT調試器) 2.控制台列印 MQTT基本簡介 MQTT ...


項目全部代碼地址:https://github.com/Tom-shushu/work-study.gitmqtt-emqt 項目)

先看我們最後實現的一個效果

1.手機端向主題 topic111 發送消息,並接收。(手機測試工具名稱:MQTT調試器)

 2.控制台列印

MQTT基本簡介

MQTT 是用於物聯網 (IoT) 的 OASIS 標準消息傳遞協議。它被設計為一種極其輕量級的發佈/訂閱消息傳輸,非常適合連接具有小代碼足跡和最小網路帶寬的遠程設備。

MQTT協議簡介

MQTT 是客戶端伺服器發佈/訂閱消息傳輸協議。它重量輕、開放、簡單,並且易於實施。這些特性使其非常適合在許多情況下使用,包括受限制的環境,例如機器對機器 (M2M) 和物聯網 (IoT) 環境中的通信,其中需要小代碼足跡和/或網路帶寬非常寶貴。

該協議通過 TCP/IP 或其他提供有序、無損、雙向連接的網路協議運行。其特點包括:

·         使用發佈/訂閱消息模式,提供一對多的消息分發和應用程式的解耦。

·         與有效負載內容無關的消息傳輸。

·         消息傳遞的三種服務質量:

o    “最多一次”,根據操作環境的最大努力傳遞消息。可能會發生消息丟失。例如,此級別可用於環境感測器數據,其中單個讀數是否丟失並不重要,因為下一個讀數將很快發佈。

o    “至少一次”,保證消息到達但可能出現重覆。

o    “Exactly once”,保證消息只到達一次。例如,此級別可用於重覆或丟失消息可能導致應用不正確費用的計費系統。

·         最小化傳輸開銷和協議交換以減少網路流量。

·         發生異常斷開時通知相關方的機制。

EMQX簡介

通過開放標準物聯網協議 MQTT、CoAP 和 LwM2M 連接任何設備。使用 EMQX Enterprise 集群輕鬆擴展到數千萬併發 MQTT 連接。

並且EMQX還是開源的,又支持集群,所以還是一個比較不錯的選擇

EMQX集群搭建

前期準備:

1.兩台伺服器:我的兩個伺服器一臺是騰訊雲、一臺是阿裡雲的(不要問為什麼,薅羊毛得來的)咱們暫且叫他們 mqtt_service_aliyun和

 mqtt_service_txyun 吧。 2.一個功能變數名稱: mqtt.zhouhong.icu

安裝開始

1.分別在兩台伺服器上執行以下操作進行安裝(如果是單機:只需要進行下麵1、2操作就安裝完成了)
## 1.下載
wget https://www.emqx.com/zh/downloads/broker/4.4.4/emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm
## 2.安裝
sudo yum install emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm
## 3.修改配置文件
vim /etc/emqx/emqx.conf
## 4.修改以下內容
## 註意node.name是當前這台伺服器名稱
node.name = [email protected]
cluster.static.seeds = [email protected],[email protected]
cluster.discovery = static
cluster.name = my-mqtt-cluster
2.分別啟動兩台伺服器的EMQX
sudo emqx start
3.到瀏覽器輸入 http://xxx.xx.xxx.xxx:18083/ 查看(隨便一臺都可以,預設賬號admin 密碼public),註意打開18083,1883 安全組

4.nginx負載均衡

nginx搭建很簡單略過,大家只需要修改以下nginx.conf裡面的內容即可

stream {
  upstream mqtt.zhouhong.icu {
      zone tcp_servers 64k;
      hash $remote_addr;
      server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s;
      server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s;

  }

  server {
      listen 8883 ssl;
      status_zone tcp_server;
      proxy_pass mqtt.zhouhong.icu;
      proxy_buffer_size 4k;
      ssl_handshake_timeout 15s;
      ssl_certificate     /etc/nginx/7967358_www.mqtt.zhouhong.icu.pem;
      ssl_certificate_key /etc/nginx/7967358_www.mqtt.zhouhong.icu.key;
  }
}

與SpringBoot集成並實現伺服器端監控對應topic下的消息

1.項目搭建

  • 引入MQTT相關jar包
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
  • yml配置文件 (如果大家沒搭建好的話,可以直接使用我搭建的這個)
server:
  port: 8080

mqtt:
 ## 單機版--只需要把功能變數名稱改為ip既可  hostUrl: tcp:
//mqtt.zhouhong.icu:1883 username: admin password: public ## 服務端 clientId (發送端自己定義) clientId: service_client_id cleanSession: true reconnect: true timeout: 100 keepAlive: 100 defaultTopic: topic111 qos: 0
  • 屬性配置
/**
 * description:
 * date: 2022/6/16 15:51
 * @author: zhouhong
 */
@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {

    /**
     * 用戶名
     */
    private String username;

    /**
     * 密碼
     */
    private String password;

    /**
     * 連接地址
     */
    private String hostUrl;

    /**
     * 客戶端Id,同一臺伺服器下,不允許出現重覆的客戶端id
     */
    private String clientId;

    /**
     * 預設連接主題
     */
    private String topic;

    /**
     * 超時時間
     */
    private int timeout;

    /**
     * 設置會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端
     * 發送個消息判斷客戶端是否線上,但這個方法並沒有重連的機制
     */
    private int keepAlive;

    /**
     * 設置是否清空session,這裡如果設置為false表示伺服器會保留客戶端的連
     * 接記錄,這裡設置為true表示每次連接到伺服器都以新的身份連接
     */
    private Boolean cleanSession;

    /**
     * 是否斷線重連
     */
    private Boolean reconnect;

    /**
     * 連接方式
     */
    private Integer qos;
}
  • 發送消息回調
/**
 * description: 發生消息成功後 的 回調
 * date: 2022/6/16 15:55
 *
 * @author: zhouhong
 */
@Component
@Log4j2
public class MqttSendCallBack implements MqttCallbackExtended {

    /**
     * 客戶端斷開後觸發
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("發送消息回調: 連接斷開,可以做重連");
    }

    /**
     * 客戶端收到消息觸發
     *
     * @param topic       主題
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("發送消息回調:  接收消息主題 : " + topic);
        log.info("發送消息回調:  接收消息內容 : " + new String(mqttMessage.getPayload()));
    }

    /**
     * 發佈消息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            log.info("發送消息回調:  向主題:" + topic + "發送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            log.info("發送消息回調:  消息的內容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 連接emq伺服器後觸發
     *
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        log.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------");
    }
}
  • 接收消息回調
/**
 * description: 接收消息後的回調
 * date: 2022/6/16 15:52
 *
 * @author: zhouhong
 */
@Component
@Log4j2
public class MqttAcceptCallback implements MqttCallbackExtended {

    @Resource
    private MqttAcceptClient mqttAcceptClient;

    /**
     * 客戶端斷開後觸發
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("接收消息回調:  連接斷開,可以做重連");
        if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
            log.info("接收消息回調:  emqx重新連接....................................................");
            mqttAcceptClient.reconnection();
        }
    }

    /**
     * 客戶端收到消息觸發
     *
     * @param topic       主題
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("接收消息回調:  接收消息主題 : " + topic);
        log.info("接收消息回調:  接收消息內容 : " + new String(mqttMessage.getPayload()));
    }

    /**
     * 發佈消息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            log.info("接收消息回調:  向主題:" + topic + "發送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            log.info("接收消息回調:  消息的內容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 連接emq伺服器後觸發
     *
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        log.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------");
        // 以/#結尾表示訂閱所有以test開頭的主題
        // 訂閱所有機構主題
        mqttAcceptClient.subscribe("topic111", 0);
    }
}
  • 發消息
/**
 * description: 發送消息
 * date: 2022/6/16 16:01
 *
 * @author: zhouhong
 */
@Component
public class MqttSendClient {

    @Autowired
    private MqttSendCallBack mqttSendCallBack;

    @Autowired
    private MqttProperties mqttProperties;

    public MqttClient connect() {
        MqttClient client = null;
        try {
            String uuid = UUID.randomUUID().toString().replaceAll("-","");
            client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setCleanSession(true);
            options.setAutomaticReconnect(false);
            try {
                // 設置回調
                client.setCallback(mqttSendCallBack);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return client;
    }

    /**
     * 發佈消息
     * 主題格式: server:report:$orgCode(參數實際使用機構代碼)
     *
     * @param retained    是否保留
     * @param pushMessage 消息體
     */
    public void publish(boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(mqttProperties.getQos());
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttClient mqttClient = connect();
        try {
            mqttClient.publish(topic, message);
        } catch (MqttException e) {
            e.printStackTrace();
        } finally {
            disconnect(mqttClient);
            close(mqttClient);
        }
    }

    /**
     * 關閉連接
     *
     * @param mqttClient
     */
    public static void disconnect(MqttClient mqttClient) {
        try {
            if (mqttClient != null) {
                mqttClient.disconnect();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 釋放資源
     *
     * @param mqttClient
     */
    public static void close(MqttClient mqttClient) {
        try {
            if (mqttClient != null) {
                mqttClient.close();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
  • 接收消息
/**
 * description: 伺服器段端連接訂閱消息、監控topic
 * date: 2022/6/16 15:52
 *
 * @author: zhouhong
 */
@Component
@Log4j2
public class MqttAcceptClient {

    @Autowired
    @Lazy
    private MqttAcceptCallback mqttAcceptCallback;

    @Autowired
    private MqttProperties mqttProperties;

    public static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttAcceptClient.client = client;
    }

    /**
     * 客戶端連接
     */
    public void connect() {
        MqttClient client;
        try {
            // clientId 使用伺服器 yml裡面配置的 clientId
            client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setAutomaticReconnect(mqttProperties.getReconnect());
            options.setCleanSession(mqttProperties.getCleanSession());
            MqttAcceptClient.setClient(client);
            try {
                // 設置回調
                client.setCallback(mqttAcceptCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 重新連接
     */
    public void reconnection() {
        try {
            client.connect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 訂閱某個主題
     *
     * @param topic 主題
     * @param qos   連接方式
     */
    public void subscribe(String topic, int qos) {
        log.info("==============開始訂閱主題==============" + topic);
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 取消訂閱某個主題
     *
     * @param topic
     */
    public void unsubscribe(String topic) {
        log.info("==============開始取消訂閱主題==============" + topic);
        try {
            client.unsubscribe(topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
  • 服務端啟動時連接訂閱主題並監控
/**
 * description: 啟動後連接 MQTT 伺服器, 監聽 mqtt/my_topic 這個topic發送的消息
 * date: 2022/6/16 15:57
 * @author: zhouhong
 */
@Configuration
public class MqttConfig {

    @Resource
    private MqttAcceptClient mqttAcceptClient;

    @Bean
    public MqttAcceptClient getMqttPushClient() {
        mqttAcceptClient.connect();
        return mqttAcceptClient;
    }
}
  • 發消息控制類
/**
 * description: 發消息控制類
 * date: 2022/6/16 15:58
 *
 * @author: zhouhong
 */
@RestController
public class SendController {

    @Resource
    private MqttSendClient mqttSendClient;

    @PostMapping("/mqtt/sendmessage")
    public void sendMessage(@RequestBody SendParam sendParam) {
        mqttSendClient.publish(false,sendParam.getTopic(),sendParam.getMessageContent());
    }
}

2.測試

  • postman調用發消息介面

  •  控制台日誌

  •  使用另外一個移動端MQTT調試工具測試
  1. 手機端向主題 topic111 發送消息,並接收。

 

 

  2. 控制台列印

 

本文來自博客園,作者:Tom-shushu,轉載請註明原文鏈接:https://www.cnblogs.com/Tom-shushu/p/16390187.html


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • C++ 標準庫提供了原子操作。(我已經懶得寫序言了) 先來說原子操作的概念: 原子操作是多線程當中對資源進行保護的一種手段,主要作用是和互斥量(Mutex)一樣,避免對資源的併發訪問、修改。 互斥量的粒度衡量是作用域(哪怕作用域內只有一個變數),而原子的粒度衡量則是以一個變數或對象為單位。因此,原子 ...
  • 介紹 這是很久之前的一個項目了,最近剛好有些時間,就來總結一下吧! 推薦初步熟悉項目後閱讀本文: https://gitee.com/smalldyy/easy-msg-cpp 從何而來 這要從我從事Qt開發的那些日子說起了,項目說大不大,說小也不小,人倒是一茬又一茬,需求也換了又換,後來的事情大家 ...
  • 引言:沒想到2022年還有很多工業軟體公司依然使用MFC,微軟也一直在更新MFC的庫,這次使用MFC封裝的CFileDialog類,寫一個獲得選定文件路徑,名稱,擴展名的程式。 個人技術博客(文章整理+源碼): https://zobolblog.github.io/LearnWinAPI/ 最終效 ...
  • 一個挺著啤酒肚,身穿格子衫,髮際線嚴重後移的中年男子,手拿著保溫杯,胳膊夾著MacBook向你走來,看樣子是架構師級別。 面試開始, 直入正題。 面試官: 你有沒有參與過秒殺系統的設計? 我: 沒有,我平時都是開發後臺管理系統、OA辦公系統、內部管理系統,從來沒有開發過秒殺系統。 面試官: 嗯... ...
  • 大佬理解->Java集合之ArrayList 1、ArrayList的特點 存放的元素有序 元素不唯一(可以重覆) 隨機訪問快 插入刪除元素慢 非線程安全 2、底層實現 底層初始化,使用一個Object類型的空對象數組,初始長度為0; 源碼 //Object類型對象數組引用 transient Ob ...
  • 大佬的理解->《Java IO(五) -- 字元流進階及BufferedWriter,BufferedReader》 1、BufferedReader BufferedReader高效字元流讀取文件基本用法,自帶緩衝區,讀取文件效率高,支持逐行讀取; 1.1 初始化 BufferedReader(R ...
  • 游戲的世界精彩紛呈,有動作類、策略類、角色扮演類等諸多類型,還有很多難以分類的小游戲,讓人玩起來往往愛不釋手 ...
  • 大佬的理解->《Java IO(四) -- 字元流》 FileReader字元流讀取文件,更適合用於讀取文件,可以讀取中文 1、FileReader 1.1 初始化 FileReader(File file) FileReader(String fileName) 1.2 讀取文件內容 read() ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...