依賴 <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> 生產者 public class Producer ...
依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency>
生產者
public class Producer { private final static String QUEUE_NAME = "my_queue"; //隊列名稱 private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名稱 private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名稱 private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key public static void send() throws IOException, TimeoutException { //創建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.9"); //設置rabbitmq-server的地址 connectionFactory.setPort(5672); //使用的埠號 connectionFactory.setVirtualHost("/"); //使用的虛擬主機 //由連接工廠創建連接 Connection connection = connectionFactory.newConnection(); //通過連接創建通道 Channel channel = connection.createChannel(); //通過通道聲明一個exchange,若已存在則直接使用,不存在會自動創建 //參數:name、type、是否支持持久化、此交換機沒有綁定一個queue時是否自動刪除、是否只在rabbitmq內部使用此交換機、此交換機的其它參數(map) channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null); //通過通道聲明一個queue,如果此隊列已存在,則直接使用;如果不存在,會自動創建 //參數:name、是否支持持久化、是否是排它的、是否支持自動刪除、其他參數(map) channel.queueDeclare(QUEUE_NAME, true, false, false, null); //將queue綁定至某個exchange。一個exchange可以綁定多個queue channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY); //發送消息 String msg = "hello"; //消息內容 String routing_key = "my_routing_key.key1"; //發送消息使用的routing-key channel.basicPublish(EXCHANGE_NAME,routing_key,null,msg.getBytes()); //消息是byte[],可以傳遞所有類型(轉換為byte[]),不局限於字元串 System.out.println("send message:"+msg); //關閉連接 channel.close(); connection.close(); } }
exchange詳解
rabbitmq自帶了7個交換機,都是以amq開頭。我們可以使用自帶的,也可以自己新建交換機。
交換機的參數
先看最下麵的add a new exchange:
- name 交換機的name
- type 交換機的類型,說白了就是routing-key匹配queue使用哪種匹配規則
- durability 消息是否支持持久化,durable是支持持久化,重啟rabbitmq,rabbitmq上的消息還在、不會丟失,上面features里的D就是durable;transient是不支持持久化,重啟rabbitmq,rabbitmq上的消息丟失。因為消息是保存在內初中的,不持久化到硬碟,關閉rabbitmq消息直接就沒了,持久化後再次啟動時會從硬碟載入消息。transient關鍵字用於阻止序列化。
- auto delete 如果此交換機一個queue都沒有綁定,是否自動刪除此交換機
- internal 此交換機是否只在rabbitmq內部使用,大部分交換機都要暴露出來,給消息生產者用,只有少數(一般是rabbitmq自帶的)是內部使用的。features里的I就是internal,表示只在內部使用,自帶的amq.rabbitmq.trace用來跟蹤rabbitmq內部的消息投遞過程,只在內部使用。
- arguments 給此交換機設置一些其它參數
//通過通道聲明一個exchange,若已存在則直接使用,不存在會自動創建 channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
聲明一個交換機,參數和上面控制台add a new exchange的參數順序是一致的,arguments是用map表示,一般不必設置其它參數,寫成null即可。
自帶的7個交換機,第一個是(AMQP default),不是說這個交換機的名字是AMQP default。
這個交換機的名字沒有名字(空),如果要使用這個交換機,代碼里交換機的name要寫成空串,括弧是說明這個交換機是rabbitmq預設的交換機。
exchange的4種類型
我們綁定exchange、queue時使用了一個routing-key:
private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);
這個exchange使用routing-key來匹配綁定的queue,即要將消息發送到哪些隊列。
在發送消息時又使用了一個routing-key:
String routing_key = "my_routing_key.key1"; //發送消息使用的routing-key channel.basicPublish(EXCHANGE_NAME,routing_key,null,msg.getBytes());
rabbitmq會將這個消息發送到指定的exchange,
如何匹配?是完全匹配還是模糊匹配?這就涉及到exchange的4種type:
(1)direct 完全匹配
發送消息使用的routing-key,要與交換機使用的routing-key完全相同。
比如exchange使用的routing-key是"my_routing_key",那發送消息使用的routing_key也要是"my_routing_key"
(2)topic 模糊匹配,可以使用通配符
*只能匹配一個詞,#可以匹配一個或多個詞。
註意是詞,不是字元。"my_routing_key.key1",my_routing_key是一個詞,key1是一個詞,詞之間用點號分隔。
比如exchange的routing-key是"my_routing_key.#" ,則發送消息使用的routing-key以"my_routing_key."開頭即可
我在示例中用的就是這種。這種方式非常適合把一個消息投遞到多個queue(應用)
(3)fanout 廣播模式
不使用routing-key,一般把routing-key都設置為空串,當然設置為什麼字元串都行,反正都不用。
exchange會把消息投遞(廣播)到此exchange綁定的所有的queue。
這種模式效率很高,因為不進行routing-key的匹配,大大減小了時間開銷。
(4)headers 首部模式(瞭解即可)
不使用routing-key(路由鍵),根據header將消息投遞到匹配的queue。
Map<String, Object> exchange_headers = new Hashtable<String, Object>(); headers.put("x-match", "any"); //指定鍵值對匹配模式any、all headers.put("key1", "value1"); //放入一些鍵值對 headers.put("key2", "value2"); //綁定queue時指定指定map。至於routing-key,設置為什麼串都行,反正不使用 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"", exchange_headers); //發送消息時也要使用map Map<String,Object> headers = new Hashtable<String, Object>(); headers.put("key1", "value1"); //放入一些鍵值對 Builder properties = new BasicProperties.Builder(); properties.headers(headers); String msg="hello"; //指定消息要使用的header。要使用properties的形式,不能直接發送map。會放在http請求頭中 channel.basicPublish(EXCHANGE_NAME, "",properties.build(),msg.getBytes());
x-match指定匹配模式,all:發送消息的header(map)中的鍵值對要和exchange的header中的所有的鍵值對都要相同,exchange的header有2個鍵值對key1、key2,發送消息的header中也要有這2個鍵值對(需要完全相同)。any:發送消息的header中只要有一個鍵值對和exchange中的鍵值對完全相同就行,比如key1、key2都行,只要有一個就行了。
header這種方式不常用,因為有點複雜。都要匹配queue,用routing-key它不簡單,它不香嗎?非要搞得這麼複雜。
不過properties的使用還是需要瞭解一下:
一個消息由properties、body組成,也就是basicPublish()的後2個參數。
properties可以設置此消息的一些參數,比如延時投遞、優先順序,這些參數寫成鍵值對放在map中,將map轉換為properties,再將properties作為basicPublish()的參數。
Queue詳解
type:指定queue類型,預設為classic(主隊列),還可以設置為quorum(從隊列),將主隊列同步到從隊列,主隊列故障時還可以用從隊列。
name:此queue的名稱
durability:queue中的消息是否持久化到硬碟。exchange也有此屬性,消息會先發給exchange保存,exchange再投遞到某些queue,消費者還沒處理此消息時,消息會保存在queue中。
auto delete:如果此queue沒有綁定到任何一個exchange,是否自動刪除此queue
arguments:設置一些其他參數
//聲明一個queue //參數:name、是否支持持久化、是否是排它的、是否支持自動刪除、其他參數(map) channel.queueDeclare(QUEUE_NAME, true, false, false, null);
排它:這個queue只能在當前連接中使用(拒絕在其他連接中使用),由當前連接聲明|創建,斷開連接會自動刪除此queue。
一個exchange可以綁定多gueue,一個queue也可以綁定到多個exchange上。
消費者
public class Consumer { private final static String QUEUE_NAME = "my_queue"; //隊列名稱 public static void receive() throws IOException, TimeoutException { //創建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.9"); //設置rabbitmq-server的地址 connectionFactory.setPort(5672); //使用的埠號 connectionFactory.setVirtualHost("/"); //使用的虛擬主機 //由連接工廠創建連接 Connection connection = connectionFactory.newConnection(); //通過連接創建通道 Channel channel = connection.createChannel(); //創建消費者,指定要使用的channel。QueueingConsume類已經棄用,使用DefaultConsumer代替 DefaultConsumer consumer = new DefaultConsumer(channel){ //監聽的queue中有消息進來時,會自動調用此方法來處理消息。但此方法預設是空的,需要重寫 @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { java.lang.String msg = new java.lang.String(body); System.out.println("received msg: " + msg); } }; //監聽指定的queue。會一直監聽。 //參數:要監聽的queue、是否自動確認消息、使用的Consumer channel.basicConsume(QUEUE_NAME, true, consumer); } }
這段代碼錶面上沒有問題,監聽queue就完事了。但有一個隱患:
我們是在生產者中聲明|創建的exchange、queue,如果生產者尚未運行,並且rabbitmq上沒有對應的exchange、queue(之前沒有創建),啟動消費者,消費者要監聽指定的queue,根本就連接不上queue,指定的queue創都沒創建,監聽什麼?直接報錯。
更加健壯的寫法是:在消費者中也聲明exchange、queue,有就直接用,沒有會自動創建。
public class Consumer { private final static String QUEUE_NAME = "my_queue"; //隊列名稱 private final static String EXCHANGE_NAME = "my_exchange"; //要使用的exchange的名稱 private final static String EXCHANGE_TYPE = "topic"; //要使用的exchange的名稱 private final static String EXCHANGE_ROUTING_KEY = "my_routing_key.#"; //exchange使用的routing-key public static void receive() throws IOException, TimeoutException { //創建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.9"); //設置rabbitmq-server的地址 connectionFactory.setPort(5672); //使用的埠號 connectionFactory.setVirtualHost("/"); //使用的虛擬主機 //由連接工廠創建連接 Connection connection = connectionFactory.newConnection(); //通過連接創建通道 Channel channel = connection.createChannel(); //通過通道聲明一個exchange,若已存在則直接使用,不存在會自動創建 //參數:name、type、是否支持持久化、此交換機沒有綁定一個queue時是否自動刪除、是否只在rabbitmq內部使用此交換機、此交換機的其它參數(map) channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null); //通過通道聲明一個queue,如果此隊列已存在,則直接使用;如果不存在,會自動創建 //參數:name、是否支持持久化、是否是排它的、是否支持自動刪除、其他參數(map) channel.queueDeclare(QUEUE_NAME, true, false, false, null); //將queue綁定至某個exchange。一個exchange可以綁定多個queue channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY); //創建消費者,指定要使用的channel。QueueingConsume類已經棄用,使用DefaultConsumer代替 DefaultConsumer consumer = new DefaultConsumer(channel){ //監聽的queue中有消息進來時,會自動調用此方法來處理消息。但此方法預設是空的,需要重寫 @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { java.lang.String msg = new java.lang.String(body); System.out.println("received msg: " + msg); } }; //監聽指定的queue。會一直監聽。 //參數:要監聽的queue、是否自動確認消息、使用的Consumer channel.basicConsume(QUEUE_NAME, true, consumer); } }
rabbitmq控制台的queue:
ready是此queue中待投遞的消息數,unacked是已投遞、但消費者尚未確認的消息數(和快遞已簽收、未收貨差不多),total是消息總數,即前面2個之和。
incoming是一個消息從exchange進入queue花的時間
deliver/get是一個消息從queue投遞到消費者花的時間,
ack是一條消息投遞給消費者後,過了多長時間queue才收到消費者的應答。
/s表示單位是秒
ack 確認、應答。
消費者收到queue投遞的消息,然後處理消息,處理後發送一個數據包給queue作為確認、應答(相當於拿到包裹,試了下沒問題,收貨),
queue將消息投遞給消費者後,queue中仍然保留此消息,要消費者應答後才會刪除此消息。
//參數:要監聽的queue、是否自動確認消息、使用的Consumer channel.basicConsume(QUEUE_NAME, true, consumer);
第二個參數:autoAck,是否自動應答。
true:自動應答,queue把消息投遞給消費者,就認為消費者簽收了,投遞了一個消息就直接刪除該消息。
這並不可靠,如果消費者還沒處理完就出故障了,那這條消息就丟失了、沒被處理到。
fasle:不使用自動應答,需要消費者自己應答。
消費者手動應答:
//創建消費者,指定要使用的channel。QueueingConsume類已經棄用,使用DefaultConsumer代替 DefaultConsumer consumer = new DefaultConsumer(channel){ //監聽的queue中有消息進來時,會自動調用此方法來處理消息。但此方法預設是空的,需要重寫 @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { java.lang.String msg = new java.lang.String(body); System.out.println("received msg: " + msg); channel.basicAck(envelope.getDeliveryTag(), false); //處理完了,應答|簽收 //channel.basicReject(envelope.getDeliveryTag(), true); //拒收 } }; //監聽指定的queue。會一直監聽。 //參數:要監聽的queue、是否自動確認消息、使用的Consumer channel.basicConsume(QUEUE_NAME, false, consumer);
就算消費者處理消息時宕機,只要不應答,queue中的這條消息就一直存在,消費者再次啟動時還會投遞此消息。
basicAck()的第一個參數是DeliveryTag,在一個queue中唯一標識一條消息,相當於一條消息的id。
第二個參數是multiple,多個、批處理,是否將多個消息的應答放在一起、一次性發給queue,設置為true可減少網路流量、防止網路阻塞,但是之前消息的應答有時延。
如果處理消息時發生了異常(代碼執行出了問題),在catch中拒收就是了:
catch (...){
channel.basicReject(envelope.getDeliveryTag(), true); //拒收,重新入隊
//..... //記錄日誌
}
第二個參數是requeue,是否重新入隊,設置為fasle,不再重新入隊,queue會刪除此消息;設置為true,重新入隊,queue會將此消息重新投遞給消費者。
沒必要把消息的整個處理流程都放在try中,只把可能出現異常的代碼塊放在try中即可,在catch中拒收。
這就是rabbitmq提供的可靠投遞機制。再加上消息的持久化,做到了rabbitmq的高可靠性。
但重新入隊有一個問題:如果大量的消息重新入隊,重新投遞這些消息會占用資源,使其它消息的投遞變慢。
開發過程中可能遇到的問題
1、exchange的name唯一標識一個exchange,調試時可能修改了exchange的類型,如果之前存在一個同名的exchange,會報錯。
如果之前的同名的exchange不要了,到rabbitmq控制台刪除同名的exchange即可;如果之前的同名的exchange還要用,就把現在的exchange改下name。
2、消費者要一直監聽queue,所以消費者的channel、connection都沒關閉,再次啟動時可能連接不上,會報錯,因為rabbitmq上還保持著這個連接。
等幾分鐘再運行,等連接超時被刪除即可。
說明
1、可以在rabbitmq控制台設置queue的綁定、發送消息到queue
delivery 投遞、交付
persistent 持續的、持久的、堅持不懈的。表示此消息會持久化到硬碟。
payload即消息的body。
點擊publish message會將此消息投遞到當前queue。
2、消息的有效期
有些消息對即時性要求很高,過了一些時間,如果這條消息還積壓在queue中,這條消息可能就沒有使用價值了,沒必要再投遞,需要刪除這條消息。
可以設置消息的有效期,如果指定的時間內沒有投遞此消息,queue會自動刪除此消息,不再投遞。
具體代碼參考:https://blog.csdn.net/liu0808/article/details/81356552