1、RocketMQ消息隊列簡單介紹 這裡簡單介紹一下RocketMQ的消息隊列的模型 一個topic對應多個隊列如下圖: 生產者和消費者分別向隊列中發送和消費消息,生產者和消費者都可以是多個,通過組名進行群組約束。由於負載因素造成生產消息會生產到各個queue中。 消費群組進行queue消費時首先 ...
1、RocketMQ消息隊列簡單介紹
這裡簡單介紹一下RocketMQ的消息隊列的模型
一個topic對應多個隊列如下圖:
生產者和消費者分別向隊列中發送和消費消息,生產者和消費者都可以是多個,通過組名進行群組約束。由於負載因素造成生產消息會生產到各個queue中。
消費群組進行queue消費時首先因為負載因素,queue會分配給各自的消費實例中,如果消費組有變化會重新分配,導致queue分配亂序。
另外一個消費者實例消費對應的queue時,消費者使用線程池進行處理消息。
以上各種操作都會導致消息不一定先處理就會先完成,所以造成消息消費不是嚴格順序處理的。
所以在之前的版本中假如我們要求要嚴格按照順序進行消息處理的話就必須進行單隊列單線程進行消息消費處理
4.0.0之後版本支持順序消費處理,我們看一看他是如何處理的。。。
2、順序消費原來介紹
根據上面的簡單介紹我們知道一個topic對應多個隊列,我們生產消息的時候就可以針對隊列的數量和消息的有效標識進行取模進行隊列選擇發送,如下示例代碼
package org.apache.rocketmq.example.ordermessage; import java.io.UnsupportedEncodingException; import java.util.List; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; public class Producer { public static void main(String[] args) throws UnsupportedEncodingException { try { MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { int orderId = i % 10; Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } producer.shutdown(); } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); } } }
比如我們使用訂單號作為和queue的ID取模的關鍵欄位,我們就可以保證這個訂單號的消息都會發送給一個隊列,這樣我們就保證了生產者生產消息在業務上是有序的
接下來我們看看消費端是怎樣實現的。
1、消費者與broker建立連接分配隊列的時候會嘗試給隊列加鎖,如果成功則獲取queue消費權利,否則嘗試下一個queue。
2、如果消費模式為集群每20秒對分配給自己的隊列自動加鎖
3、消息消費時對queue進行加鎖,同一時刻只允許一個線程對一個queue進行消費
4、根據消費時間進行隊列和線程的切換預設60s(這個時間就是鎖住隊列的時間)
5、消息重試次數超過最大次數之後將消息移入死信queue
根據以上幾點可以保證一個隊列中的消息可以按順序進行消費。
3、總結
RocketMQ4.0.0對順序消息做了升級,但是犧牲了部分消費性能。因為要給隊列加鎖,並且只能一個隊列同一時間只能有一個線程處理消息。
至於為什麼是60秒(時間可設置)進行線程切換可能是更好的利用cpu或者不因為某個隊列消息異常拖慢其他隊列消息處理吧,還有待深入研究。
本人理解如有誤請廣大網友指正,謝謝!