全局有序 在RocketMQ中,如果使消息全局有序,可以為Topic設置一個消息隊列,使用一個生產者單線程發送數據,消費者端也使用單線程進行消費,從而保證消息的全局有序,但是這種方式效率低,一般不使用。 局部有序 假設一個Topic分配了兩個消息隊列,生產者在發送消息的時候,可以對消息設置一個路由I ...
全局有序
在RocketMQ中,如果使消息全局有序,可以為Topic設置一個消息隊列,使用一個生產者單線程發送數據,消費者端也使用單線程進行消費,從而保證消息的全局有序,但是這種方式效率低,一般不使用。
局部有序
假設一個Topic分配了兩個消息隊列,生產者在發送消息的時候,可以對消息設置一個路由ID,比如想保證一個訂單的相關消息有序,那麼就使用訂單ID當做路由ID,在發送消息的時候,通過訂單ID對消息隊列的個數取餘,根據取餘結果選擇消息隊列,這樣同一個訂單的數據就可以保證發送到一個消息隊列中,消費者端使用MessageListenerOrderly
處理有序消息,這就是RocketMQ的局部有序,保證消息在某個消息隊列中有序。
接下來看RoceketMQ源碼中提供的順序消息例子(稍微做了一些修改):
生產者
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
// 創建生產者
DefaultMQProducer producer = new DefaultMQProducer("生產者組");
// 啟動
producer.start();
// 創建TAG
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
// 生成訂單ID
int orderId = i % 10;
// 創建消息
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 獲取訂單ID
Integer id = (Integer) arg;
// 對消息隊列個數取餘
int index = id % mqs.size();
// 根據取餘結果選擇消息要發送給哪個消息隊列
return mqs.get(index);
}
}, orderId); // 這裡傳入了訂單ID
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}
消費者
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 創建消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消費者組");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 訂閱主題
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
// 註冊消息監聽器,使用的是MessageListenerOrderly
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
// 列印消息
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
從例子中可以看出生產者在發送消息的時候,通過訂單ID作為路由信息,將同一個訂單ID的消息發送到了同一個消息隊列中,保證同一個訂單ID相關消息有序發送,接下來就看消費者是如何保證消息的順序消費的。
定時任務對消息隊列加鎖
消費者在啟動的時候,會對是否是順序消費進行判斷(監聽器是否是MessageListenerOrderly
類型來判斷),如果是順序消費,會使用ConsumeMessageOrderlyService
,並調用它的start方法進行啟動,在集群模式模式下,start方法中會啟動一個定時加鎖的任務,周期性的對該消費者負責的消息隊列進行加鎖。
為什麼集群模式下需要加鎖?
因為廣播模式下,消息隊列會分配給消費者下的每一個消費者,而在集群模式下,一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進行,所以在廣播模式下不存在競爭關係,也就不需要對消息隊列進行加鎖,而在集群模式下,有可能因為負載均衡等原因將某一個消息隊列分配到了另外一個消費者中,因此在順序消費情況下,集群模式下需要對消息隊列加鎖,當某個消息隊列被鎖定時,其他的消費者不能進行消費。
加鎖的具體邏輯如下,首先獲取當前消費者負責的所有消息隊列MessageQueue
,返回數據是一個MAP,key為broker名稱,value為broker下的消息隊列,接著對MAP進行遍歷,處理每一個broker下的消息隊列:
(1)根據broker名稱查找broker的詳細信息;
(2)創建加鎖請求,在請求中設置要加鎖的消息隊列,將請求發送給broker,表示要對這些消息隊列進行加鎖;
(3)Broker返回請求處理結果,響應結果中包含了加鎖成功的消息隊列,對於加鎖成功的消息隊列將消息隊列MessageQueue
,將其對應的ProcessQueue
中的locked屬性置為true表示該消息隊列已加鎖成功,如果響應中未包含某個消息隊列的信息,表示此消息隊列加鎖失敗,需要將其對應的ProcessQueue
對象中的locked屬性置為false表示加鎖失敗;
順序消息拉取
上面可知,在使用順序消息時,定時任務會周期性的對當前消費者負責的消息隊列進行加鎖,不過由於負載均衡等原因,有可能給當前消費者分配了新的消息隊列,此時還未來得及通過定時任務加鎖,所以消費者在構建消息拉取請求前會再次進行判斷,如果是新分配到當前消費者的消息隊列,同樣會向Broker發送請求,對MessageQueue
進行加鎖,加鎖成功將其對應的ProcessQueue
中的locked屬性置為true才可以拉取消息。
順序消息消費
消息拉取成功之後,會將消息提交到線程池中進行處理,對於順序消費處理邏輯如下:
-
獲取消息隊列
MessageQueue
的對象鎖,每個MessageQueue
對應了一把Object
對象鎖,然後使用synchronized進行加鎖,這裡加鎖的原因是因為順序消費使用的是線程池,由多個線程同時進行消費,所以某個線程在處理某個消息隊列的消息時需要對該消息隊列MessageQueue
加鎖,防止其他線程併發消費該消息隊列的鎖,破壞消息的順序性;public class MessageQueueLock { private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>(); public Object fetchLockObject(final MessageQueue mq) { // 獲取消息隊列對應的對象鎖,也就是一個Object類型的對象 Object objLock = this.mqLockTable.get(mq); // 如果獲取為空 if (null == objLock) { // 創建對象 objLock = new Object(); // 加入到Map中 Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock); if (prevLock != null) { objLock = prevLock; } } return objLock; } }
-
上一步獲取鎖成功之後,會再次校驗該
MessageQueue
對應的ProcessQueue
中的鎖(locked狀態),看是否過期或者已經失效,過期或者失效稍後會重新進行加鎖; -
獲取
ProcessQueue
的中的consumeLock
消費鎖,獲取成功之後調用消息監聽器的consumeMessage
方法開始消費消費;public class ProcessQueue { // 消息消費鎖 private final Lock consumeLock = new ReentrantLock(); public Lock getConsumeLock() { // 獲取消息消費鎖 return consumeLock; } }
-
消息消費完畢,釋放
ProcessQueue
的consumeLock
消費鎖; -
方法執行完畢,釋放
MessageQueue
對應的Object
對象鎖;
在第1步中就已經獲取了MessageQueue
對應的Object
對象鎖對消息隊列進行加鎖了,那麼為什麼在第3步消費消息之前還要再加一個消費鎖呢?
猜測有可能是在消費者進行負載均衡時,當前消費者負責的消息隊列發生變化,可能移除某個消息隊列,那麼消費者在進行消費的時候就要獲取ProcessQueue
的consumeLock
消費鎖進行加鎖,相當於鎖住ProcessQueue
,防止正在消費的過程中,ProcessQueue
被負載均衡移除。
既然如此,負載均衡的時候為什麼不使用MessageQueue
對應的Object
對象鎖進行加鎖而要使用ProcessQueue
中的consumeLock
消費鎖?
這裡應該是為了減小鎖的粒度,因為消費者在MessageQueue
對應的Object
加鎖後,還進行了一系列的判斷,校驗都成功之後獲取ProcessQueue
中的consumeLock
加鎖,之後開始消費消息,消費完畢釋放所有的鎖,如果負載均衡使用MessageQueue
的Object
對象鎖需要等待整個過程結束,鎖的粒度較粗,這樣顯然會降低性能,而如果使用消息消費鎖,只需要等待第3步和第4步結束就可以獲取鎖,減少等待的時間,而且消費者在進行消息消費前也會判斷ProcessQueue是否被移除,所以只要保證consumeMessage
方法在執行的過程中(消息被消費的過程)ProcessQueue
不被移除即可。
總結
消費者端,是通過加鎖來保證消息的順序消費,一共有三把鎖:
-
向Broker申請的消息隊列鎖
集群模式下一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進行,為了避免負載均衡等原因引起的變動,消費者會向Broker發送請求對消息隊列進行加鎖,如果加鎖成功,記錄到消息隊列對應的ProcessQueue
中的locked
變數中。 -
消息隊列鎖
對應MessageQueue
對應的Object
對象鎖,消費者在處理拉取到的消息時,由於可以開啟多線程進行處理,所以處理消息前需要對MessageQueue
加鎖,鎖住要處理的消息隊列,主要是處理多線程之間的競爭,保證消息的順序性。 -
消息消費鎖
對應ProcessQueue
中的consumeLock
,消費者在調用consumeMessage方法之前會加消費鎖,主要是為了避免在消費消息時,由於負載均衡等原因,ProcessQueue被刪除。
對應的相關源碼可參考: