RabbitMQ基礎教程之使用進階篇 相關博文,推薦查看: I. 背景 前一篇基本使用篇的博文中,介紹了rabbitmq的三種使用姿勢,可以知道如何向RabbitMQ發送消息以及如何消費,但遺留下幾個疑問,本篇則主要希望弄清楚這幾點 Exchange聲明的問題(是否必須聲明,如果不聲明會怎樣) Ex ...
RabbitMQ基礎教程之使用進階篇
相關博文,推薦查看:
I. 背景
前一篇基本使用篇的博文中,介紹了rabbitmq的三種使用姿勢,可以知道如何向RabbitMQ發送消息以及如何消費,但遺留下幾個疑問,本篇則主要希望弄清楚這幾點
- Exchange聲明的問題(是否必須聲明,如果不聲明會怎樣)
- Exchange聲明的幾個參數(durable, autoDelete)有啥區別
- 當沒有隊列和Exchange綁定時,直接往隊列中塞數據,好像不會有數據增加(即先塞數據,然後創建queue,建立綁定,從控制臺上看這個queue裡面也不會有數據)
- 消息消費的兩種姿勢(一個主動去拿數據,一個是rabbit推數據)對比
II. 基本進階篇
1. Exchange預設場景
將前面的消息發送代碼撈出來,幹掉Exchange的聲明,如下
public class DefaultProducer {
public static void publishMsg(String queue, String message) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創建連接
Connection connection = factory.newConnection();
//創建消息通道
Channel channel = connection.createChannel();
channel.queueDeclare(queue, true, false, true, null);
// 發佈消息
channel.basicPublish("", queue, null, message.getBytes());
channel.close();
connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
for (int i = 0; i < 20; i++) {
publishMsg("hello", "msg" + i);
}
}
}
在發佈消息時,傳入的Exchange名為“”,再到控制台查看,發現數據被投遞到了(AMQP default)這個交換器,對應的截圖如下
image看一下上面的綁定描述內容,重點如下
- 預設交換器選擇Direct策略
- 將rountingKey綁定到同名的queue上
- 不支持顯示的綁定和解綁
上面的代碼為了演示數據的流向,在發佈消息的同時也定義了一個同名的Queue,因此可以在控制臺上看到同名的 "hello" queue,且內部有20條數據
當我們去掉queue的聲明時,會發現另一個問題,投入的數據好像並沒有存下來(因為沒有queue來接收這些數據,而之後再聲明queue時,之前的數據也不會分配過來)
2. 綁定之後才有數據
首先是將控制臺中的hello這個queue刪掉,然後再次執行下麵的代碼(相對於前面的就是註釋了queue的聲明)
public class DefaultProducer {
public static void publishMsg(String queue, String message) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創建連接
Connection connection = factory.newConnection();
//創建消息通道
Channel channel = connection.createChannel();
// channel.queueDeclare(queue, true, false, true, null);
// 發佈消息
channel.basicPublish("", queue, null, message.getBytes());
channel.close();
connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
for (int i = 0; i < 20; i++) {
publishMsg("hello", "msg" + i);
}
}
}
然後從控制臺上看,可以看到有數據寫入Exchange,但是沒有queue來接收這些數據
IMAGE然後開啟消費進程,然後再次執行上面的塞入數據,新後面重新塞入的數據可以被消費;但是之前塞入的數據則沒有,消費消息的代碼如下:
public class MyDefaultConsumer {
public void consumerMsg(String queue) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//創建連接
Connection connection = factory.newConnection();
//創建消息通道
Channel channel = connection.createChannel();
channel.queueDeclare(queue, true, false, true, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [ " + queue + " ] Received '" + message);
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 取消自動ack
channel.basicConsume(queue, false, consumer);
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
MyDefaultConsumer consumer = new MyDefaultConsumer();
consumer.consumerMsg("hello");
Thread.sleep(1000 * 60 * 10);
}
}
小結:
- 通過上面的演示得知一點
- 當沒有Queue綁定到Exchange時,往Exchange中寫入的消息也不會重新分發到之後綁定的queue上
3. Durable, autoDeleted參數
在定義Queue時,可以指定這兩個參數,這兩個參數的區別是什麼呢?
a. durable
持久化,保證RabbitMQ在退出或者crash等異常情況下數據沒有丟失,需要將queue,exchange和Message都持久化。
若是將queue的持久化標識durable設置為true,則代表是一個持久的隊列,那麼在服務重啟之後,也會存在,因為服務會把持久化的queue存放在硬碟上,當服務重啟的時候,會重新什麼之前被持久化的queue。隊列是可以被持久化,但是裡面的消息是否為持久化那還要看消息的持久化設置。也就是說,重啟之前那個queue裡面還沒有發出去的消息的話,重啟之後那隊列裡面是不是還存在原來的消息,這個就要取決於發生著在發送消息時對消息的設置
b. autoDeleted
自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列
這個比較容易演示了,當一個Queue被設置為自動刪除時,當消費者斷掉之後,queue會被刪除,這個主要針對的是一些不是特別重要的數據,不希望出現消息積累的情況
// 倒數第二個參數,true表示開啟自動刪除
// 正數第二個參數,true表示持久化
channel.queueDeclare(queue, true, false, true, null);
c. 小結
- 當一個Queue已經聲明好了之後,不能更新durable或者autoDelted值;當需要修改時,需要先刪除再重新聲明
- 消費的Queue聲明應該和投遞的Queue聲明的 durable,autoDelted屬性一致,否則會報錯
- 對於重要的數據,一般設置
durable=true, autoDeleted=false
- 對於設置
autoDeleted=true
的隊列,當沒有消費者之後,隊列會自動被刪除
4. ACK
執行一個任務可能需要花費幾秒鐘,你可能會擔心如果一個消費者在執行任務過程中掛掉了。一旦RabbitMQ將消息分發給了消費者,就會從記憶體中刪除。在這種情況下,如果正在執行任務的消費者宕機,會丟失正在處理的消息和分發給這個消費者但尚未處理的消息。
但是,我們不想丟失任何任務,如果有一個消費者掛掉了,那麼我們應該將分發給它的任務交付給另一個消費者去處理。
為了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收並且處理完畢了。RabbitMQ就可以刪除它了。
因此手動ACK的常見手段
// 接收消息之後,主動ack/nak
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [ " + queue + " ] Received '" + message);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
};
// 取消自動ack
channel.basicConsume(queue, false, consumer);
手動ack時,有個multiple
,其含義表示:
可以理解為每個Channel維護一個unconfirm的消息序號集合,每publish一條數據,集合中元素加1,每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)或多條(multiple=true)記錄
III. 其他
1. 參考
2. 一灰灰Blog: https://liuyueyi.github.io/hexblog
一灰灰的個人博客,記錄所有學習和工作中的博文,歡迎大家前去逛逛
3. 聲明
盡信書則不如,已上內容,純屬一家之言,因個人能力有限,難免有疏漏和錯誤之處,如發現bug或者有更好的建議,歡迎批評指正,不吝感激
- 微博地址: 小灰灰Blog
- QQ: 一灰灰/3302797840