在Java中使用RabbitMQ

来源:https://www.cnblogs.com/chy18883701161/archive/2020/03/17/12501428.html
-Advertisement-
Play Games

依賴 <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> 生產者 public class Producer ...


 

依賴

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>

 

 

 

 生產者

public class Producer {
    private final static String QUEUE_NAME = "my_queue"; //隊列名稱
    private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名稱
    private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名稱
    private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key

    public static void send() throws IOException, TimeoutException {
        //創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.9"); //設置rabbitmq-server的地址
        connectionFactory.setPort(5672);  //使用的埠號
        connectionFactory.setVirtualHost("/");  //使用的虛擬主機

        //由連接工廠創建連接
        Connection connection = connectionFactory.newConnection();

        //通過連接創建通道
        Channel channel = connection.createChannel();

        //通過通道聲明一個exchange,若已存在則直接使用,不存在會自動創建
        //參數:name、type、是否支持持久化、此交換機沒有綁定一個queue時是否自動刪除、是否只在rabbitmq內部使用此交換機、此交換機的其它參數(map)
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);

        //通過通道聲明一個queue,如果此隊列已存在,則直接使用;如果不存在,會自動創建
        //參數:name、是否支持持久化、是否是排它的、是否支持自動刪除、其他參數(map)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //將queue綁定至某個exchange。一個exchange可以綁定多個queue
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);

        //發送消息
        String msg = "hello";  //消息內容
        String routing_key = "my_routing_key.key1";  //發送消息使用的routing-key
        channel.basicPublish(EXCHANGE_NAME,routing_key,null,msg.getBytes()); //消息是byte[],可以傳遞所有類型(轉換為byte[]),不局限於字元串
        System.out.println("send message:"+msg);

        //關閉連接
        channel.close();
        connection.close();
    }

}

 

 

 

 

exchange詳解

rabbitmq自帶了7個交換機,都是以amq開頭。我們可以使用自帶的,也可以自己新建交換機。

 

 

交換機的參數

先看最下麵的add a new exchange:

  • name   交換機的name
  • type  交換機的類型,說白了就是routing-key匹配queue使用哪種匹配規則
  • durability  消息是否支持持久化,durable是支持持久化,重啟rabbitmq,rabbitmq上的消息還在、不會丟失,上面features里的D就是durable;transient是不支持持久化,重啟rabbitmq,rabbitmq上的消息丟失。因為消息是保存在內初中的,不持久化到硬碟,關閉rabbitmq消息直接就沒了,持久化後再次啟動時會從硬碟載入消息。transient關鍵字用於阻止序列化。
  • auto delete   如果此交換機一個queue都沒有綁定,是否自動刪除此交換機
  • internal  此交換機是否只在rabbitmq內部使用,大部分交換機都要暴露出來,給消息生產者用,只有少數(一般是rabbitmq自帶的)是內部使用的。features里的I就是internal,表示只在內部使用,自帶的amq.rabbitmq.trace用來跟蹤rabbitmq內部的消息投遞過程,只在內部使用。
  • arguments  給此交換機設置一些其它參數

 

//通過通道聲明一個exchange,若已存在則直接使用,不存在會自動創建
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);

聲明一個交換機,參數和上面控制台add a new exchange的參數順序是一致的,arguments是用map表示,一般不必設置其它參數,寫成null即可。

 

自帶的7個交換機,第一個是(AMQP default),不是說這個交換機的名字是AMQP default。

這個交換機的名字沒有名字(空),如果要使用這個交換機,代碼里交換機的name要寫成空串,括弧是說明這個交換機是rabbitmq預設的交換機。

 

 

 

 

exchange的4種類型

我們綁定exchange、queue時使用了一個routing-key:

private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);

這個exchange使用routing-key來匹配綁定的queue,即要將消息發送到哪些隊列。

 

 

 在發送消息時又使用了一個routing-key:

 String routing_key = "my_routing_key.key1"; //發送消息使用的routing-key

 channel.basicPublish(EXCHANGE_NAME,routing_key,null,msg.getBytes());

rabbitmq會將這個消息發送到指定的exchange,



如何匹配?是完全匹配還是模糊匹配?這就涉及到exchange的4種type:

 

(1)direct  完全匹配

發送消息使用的routing-key,要與交換機使用的routing-key完全相同。

比如exchange使用的routing-key是"my_routing_key",那發送消息使用的routing_key也要是"my_routing_key"

 

 

(2)topic  模糊匹配,可以使用通配符

*只能匹配一個詞,#可以匹配一個或多個詞。

註意是詞,不是字元。"my_routing_key.key1",my_routing_key是一個詞,key1是一個詞,詞之間用點號分隔。

比如exchange的routing-key是"my_routing_key.#" ,則發送消息使用的routing-key以"my_routing_key."開頭即可

我在示例中用的就是這種。這種方式非常適合把一個消息投遞到多個queue(應用)

 

 

(3)fanout  廣播模式

不使用routing-key,一般把routing-key都設置為空串,當然設置為什麼字元串都行,反正都不用。

exchange會把消息投遞(廣播)到此exchange綁定的所有的queue。

這種模式效率很高,因為不進行routing-key的匹配,大大減小了時間開銷。

 

 

(4)headers  首部模式(瞭解即可)

不使用routing-key(路由鍵),根據header將消息投遞到匹配的queue。

Map<String, Object> exchange_headers = new Hashtable<String, Object>();
headers.put("x-match", "any"); //指定鍵值對匹配模式any、all
headers.put("key1", "value1");  //放入一些鍵值對
headers.put("key2", "value2");

//綁定queue時指定指定map。至於routing-key,設置為什麼串都行,反正不使用
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"", exchange_headers);


//發送消息時也要使用map
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("key1", "value1");  //放入一些鍵值對


Builder properties = new BasicProperties.Builder();
properties.headers(headers);


String msg="hello";
//指定消息要使用的header。要使用properties的形式,不能直接發送map。會放在http請求頭中
channel.basicPublish(EXCHANGE_NAME, "",properties.build(),msg.getBytes());

x-match指定匹配模式,all:發送消息的header(map)中的鍵值對要和exchange的header中的所有的鍵值對都要相同,exchange的header有2個鍵值對key1、key2,發送消息的header中也要有這2個鍵值對(需要完全相同)。any:發送消息的header中只要有一個鍵值對和exchange中的鍵值對完全相同就行,比如key1、key2都行,只要有一個就行了。

header這種方式不常用,因為有點複雜。都要匹配queue,用routing-key它不簡單,它不香嗎?非要搞得這麼複雜。

 

不過properties的使用還是需要瞭解一下:

一個消息由properties、body組成,也就是basicPublish()的後2個參數。

properties可以設置此消息的一些參數,比如延時投遞、優先順序,這些參數寫成鍵值對放在map中,將map轉換為properties,再將properties作為basicPublish()的參數。

 

 

 

 

Queue詳解

 

type:指定queue類型,預設為classic(主隊列),還可以設置為quorum(從隊列),將主隊列同步到從隊列,主隊列故障時還可以用從隊列。

name:此queue的名稱

durability:queue中的消息是否持久化到硬碟。exchange也有此屬性,消息會先發給exchange保存,exchange再投遞到某些queue,消費者還沒處理此消息時,消息會保存在queue中。

auto delete:如果此queue沒有綁定到任何一個exchange,是否自動刪除此queue

arguments:設置一些其他參數

 

//聲明一個queue
//參數:name、是否支持持久化、是否是排它的、是否支持自動刪除、其他參數(map)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

排它:這個queue只能在當前連接中使用(拒絕在其他連接中使用),由當前連接聲明|創建,斷開連接會自動刪除此queue。

 

一個exchange可以綁定多gueue,一個queue也可以綁定到多個exchange上。





 

消費者

public class Consumer {
    private final static String QUEUE_NAME = "my_queue"; //隊列名稱

    public static void receive() throws IOException, TimeoutException {
        //創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.9"); //設置rabbitmq-server的地址
        connectionFactory.setPort(5672);  //使用的埠號
        connectionFactory.setVirtualHost("/");  //使用的虛擬主機

        //由連接工廠創建連接
        Connection connection = connectionFactory.newConnection();

        //通過連接創建通道
        Channel channel = connection.createChannel();

        //創建消費者,指定要使用的channel。QueueingConsume類已經棄用,使用DefaultConsumer代替
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //監聽的queue中有消息進來時,會自動調用此方法來處理消息。但此方法預設是空的,需要重寫
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                java.lang.String msg = new java.lang.String(body);
                System.out.println("received msg: " + msg);
            }
        };

        //監聽指定的queue。會一直監聽。
        //參數:要監聽的queue、是否自動確認消息、使用的Consumer
        channel.basicConsume(QUEUE_NAME, true, consumer);

    }
}

這段代碼錶面上沒有問題,監聽queue就完事了。但有一個隱患:

我們是在生產者中聲明|創建的exchange、queue,如果生產者尚未運行,並且rabbitmq上沒有對應的exchange、queue(之前沒有創建),啟動消費者,消費者要監聽指定的queue,根本就連接不上queue,指定的queue創都沒創建,監聽什麼?直接報錯。

 

 

更加健壯的寫法是:在消費者中也聲明exchange、queue,有就直接用,沒有會自動創建。

public class Consumer {
    private final static String QUEUE_NAME = "my_queue"; //隊列名稱
    private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名稱
    private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名稱
    private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key

    public static void receive() throws IOException, TimeoutException {
        //創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.9"); //設置rabbitmq-server的地址
        connectionFactory.setPort(5672);  //使用的埠號
        connectionFactory.setVirtualHost("/");  //使用的虛擬主機

        //由連接工廠創建連接
        Connection connection = connectionFactory.newConnection();

        //通過連接創建通道
        Channel channel = connection.createChannel();

        //通過通道聲明一個exchange,若已存在則直接使用,不存在會自動創建
        //參數:name、type、是否支持持久化、此交換機沒有綁定一個queue時是否自動刪除、是否只在rabbitmq內部使用此交換機、此交換機的其它參數(map)
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);

        //通過通道聲明一個queue,如果此隊列已存在,則直接使用;如果不存在,會自動創建
        //參數:name、是否支持持久化、是否是排它的、是否支持自動刪除、其他參數(map)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //將queue綁定至某個exchange。一個exchange可以綁定多個queue
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);

        //創建消費者,指定要使用的channel。QueueingConsume類已經棄用,使用DefaultConsumer代替
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //監聽的queue中有消息進來時,會自動調用此方法來處理消息。但此方法預設是空的,需要重寫
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                java.lang.String msg = new java.lang.String(body);
                System.out.println("received msg: " + msg);
            }
        };

        //監聽指定的queue。會一直監聽。
        //參數:要監聽的queue、是否自動確認消息、使用的Consumer
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

 

 

 

 

rabbitmq控制台的queue:

ready是此queue中待投遞的消息數,unacked是已投遞、但消費者尚未確認的消息數(和快遞已簽收、未收貨差不多),total是消息總數,即前面2個之和。

incoming是一個消息從exchange進入queue花的時間

deliver/get是一個消息從queue投遞到消費者花的時間,

ack是一條消息投遞給消費者後,過了多長時間queue才收到消費者的應答。

/s表示單位是秒

 

 

ack  確認、應答。

消費者收到queue投遞的消息,然後處理消息,處理後發送一個數據包給queue作為確認、應答(相當於拿到包裹,試了下沒問題,收貨),

queue將消息投遞給消費者後,queue中仍然保留此消息,要消費者應答後才會刪除此消息。

 

 

//參數:要監聽的queue、是否自動確認消息、使用的Consumer
channel.basicConsume(QUEUE_NAME, true, consumer);

第二個參數:autoAck,是否自動應答。

true:自動應答,queue把消息投遞給消費者,就認為消費者簽收了,投遞了一個消息就直接刪除該消息。

這並不可靠,如果消費者還沒處理完就出故障了,那這條消息就丟失了、沒被處理到。

fasle:不使用自動應答,需要消費者自己應答。

 

 

消費者手動應答:

     //創建消費者,指定要使用的channel。QueueingConsume類已經棄用,使用DefaultConsumer代替
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //監聽的queue中有消息進來時,會自動調用此方法來處理消息。但此方法預設是空的,需要重寫
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                java.lang.String msg = new java.lang.String(body);
                System.out.println("received msg: " + msg);
                channel.basicAck(envelope.getDeliveryTag(), false);  //處理完了,應答|簽收
                //channel.basicReject(envelope.getDeliveryTag(), true); //拒收
            }
        };

        //監聽指定的queue。會一直監聽。
        //參數:要監聽的queue、是否自動確認消息、使用的Consumer
        channel.basicConsume(QUEUE_NAME, false, consumer);

就算消費者處理消息時宕機,只要不應答,queue中的這條消息就一直存在,消費者再次啟動時還會投遞此消息。

basicAck()的第一個參數是DeliveryTag,在一個queue中唯一標識一條消息,相當於一條消息的id。

第二個參數是multiple,多個、批處理,是否將多個消息的應答放在一起、一次性發給queue,設置為true可減少網路流量、防止網路阻塞,但是之前消息的應答有時延。

 

 

如果處理消息時發生了異常(代碼執行出了問題),在catch中拒收就是了:

catch (...){ 
  channel.basicReject(envelope.getDeliveryTag(), true); //拒收,重新入隊
  //..... //記錄日誌
}

第二個參數是requeue,是否重新入隊,設置為fasle,不再重新入隊,queue會刪除此消息;設置為true,重新入隊,queue會將此消息重新投遞給消費者。

沒必要把消息的整個處理流程都放在try中,只把可能出現異常的代碼塊放在try中即可,在catch中拒收。

 

這就是rabbitmq提供的可靠投遞機制。再加上消息的持久化,做到了rabbitmq的高可靠性。

 

但重新入隊有一個問題:如果大量的消息重新入隊,重新投遞這些消息會占用資源,使其它消息的投遞變慢。

 

 

 

 

開發過程中可能遇到的問題

1、exchange的name唯一標識一個exchange,調試時可能修改了exchange的類型,如果之前存在一個同名的exchange,會報錯。

 如果之前的同名的exchange不要了,到rabbitmq控制台刪除同名的exchange即可;如果之前的同名的exchange還要用,就把現在的exchange改下name。

 

2、消費者要一直監聽queue,所以消費者的channel、connection都沒關閉,再次啟動時可能連接不上,會報錯,因為rabbitmq上還保持著這個連接。

 等幾分鐘再運行,等連接超時被刪除即可。

 

 

 

說明

1、可以在rabbitmq控制台設置queue的綁定、發送消息到queue

delivery  投遞、交付

persistent  持續的、持久的、堅持不懈的。表示此消息會持久化到硬碟。

payload即消息的body。

點擊publish message會將此消息投遞到當前queue。

 

 

2、消息的有效期

有些消息對即時性要求很高,過了一些時間,如果這條消息還積壓在queue中,這條消息可能就沒有使用價值了,沒必要再投遞,需要刪除這條消息。

可以設置消息的有效期,如果指定的時間內沒有投遞此消息,queue會自動刪除此消息,不再投遞。

具體代碼參考:https://blog.csdn.net/liu0808/article/details/81356552

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 基於S2SH開發線上學堂系統(課程學習網站 前臺+後臺) 開發環境: Windows操作系統開發工具:Eclipse+Jdk+Tomcat+MYSQL資料庫運行效果圖 源碼及原文鏈接:https://javadao.xyz/forum.php?mod=viewthread&tid=97 ...
  • Maven項目打包時,如果遇到需要添加本地jar包依賴的時候,可以選擇兩種方法: 1. 安裝到本地倉庫 第一種方法比較常規,適用於需要添加的jar包也是由maven項目導出,含有pom文件的時候。只需要將jar包安裝到本地maven倉庫下,然後添加依賴即可。 (1)安裝到本地倉庫,執行以下命令(其中 ...
  • Spring Boot為廣大開發人員提供了便利。 本文將介紹如何編寫Starter,以便開發人員復用自己或項目組的代碼。 代碼下載地址:https://gitee.com/jxd134/Spring Boot Greeter Starter.git 1 新建項目 項目基於Maven構建,包含以下三個 ...
  • 題目: 代碼及思路: #include <stdio.h> #include <string.h> int main() { //以字元串的形式接受用戶輸入的數字 char str[1000]; //定義一個統計數組 int nums1[10] = {0}; scanf("%s", str); // ...
  • 什麼是微服務?什麼是SpringCloud? 微服務是一種架構的模式,它提倡將一個應用程式劃分成很多個微小的服務,服務與服務之間相互協調、相互配合。每個服務運行都是一個獨立的進程,服務與服務之間採用輕量級的通訊機制相互溝通。簡單的來說就是將一個龐大的複雜的單體應用進行劃分成n多個微小的服務(一個服務 ...
  • 一、Servlet簡介 1.1、Servlet是sun公司提供的一門用於開發動態web資源的技術,Servlet屬於動態資源。 1.2、Servlet(Server Applet)是Java Servlet的簡稱,Servlet就是一個運行在伺服器端的Java類。 1.3、Servlet就是Java ...
  • 經過前期大量的學習與準備,我們重要要開始寫第一個真正意義上的爬蟲了。本次我們要爬取的網站是:百度貼吧,一個非常適合新人練手的地方,那麼讓我們開始吧。 本次要爬的貼吧是<< 西部世界 >>,西部世界是我一直很喜歡的一部美劇,平時有空也會去看看吧友們都在聊些什麼。所以這次選取這個吧來作為實驗材料。註意: ...
  • 題目:點此 描述 小Hi和小Ho準備國慶期間去A國旅游。A國的城際交通比較有特色:它共有n座城市(編號1-n);城市之間恰好有n-1條公路相連,形成一個樹形公路網。小Hi計劃從A國首都(1號城市)出發,自駕遍歷所有城市,並且經過每一條公路恰好兩次——來回各一次——這樣公路兩旁的景色都不會錯過。 令小 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...