全局有序 在RocketMQ中,如果使消息全局有序,可以為Topic設置一個消息隊列,使用一個生產者單線程發送數據,消費者端也使用單線程進行消費,從而保證消息的全局有序,但是這種方式效率低,一般不使用。 局部有序 假設一個Topic分配了兩個消息隊列,生產者在發送消息的時候,可以對消息設置一個路由I ...
全局有序
在RocketMQ中,如果使消息全局有序,可以為Topic設置一個消息隊列,使用一個生產者單線程發送數據,消費者端也使用單線程進行消費,從而保證消息的全局有序,但是這種方式效率低,一般不使用。
局部有序
假設一個Topic分配了兩個消息隊列,生產者在發送消息的時候,可以對消息設置一個路由ID,比如想保證一個訂單的相關消息有序,那麼就使用訂單ID當做路由ID,在發送消息的時候,通過訂單ID對消息隊列的個數取餘,根據取餘結果選擇消息隊列,這樣同一個訂單的數據就可以保證發送到一個消息隊列中,消費者端使用MessageListenerOrderly
處理有序消息,這就是RocketMQ的局部有序,保證消息在某個消息隊列中有序。
接下來看RoceketMQ源碼中提供的順序消息例子(稍微做了一些修改):
生產者
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
// 創建生產者
DefaultMQProducer producer = new DefaultMQProducer("生產者組");
// 啟動
producer.start();
// 創建TAG
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
// 生成訂單ID
int orderId = i % 10;
// 創建消息
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 獲取訂單ID
Integer id = (Integer) arg;
// 對消息隊列個數取餘
int index = id % mqs.size();
// 根據取餘結果選擇消息要發送給哪個消息隊列
return mqs.get(index);
}
}, orderId); // 這裡傳入了訂單ID
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}
消費者
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 創建消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消費者組");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 訂閱主題
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
// 註冊消息監聽器,使用的是MessageListenerOrderly
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
// 列印消息
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
從例子中可以看出生產者在發送消息的時候,通過訂單ID作為路由信息,將同一個訂單ID的消息發送到了同一個消息隊列中,保證同一個訂單ID的消息有序,那麼消費者端是如何保證消息的順序讀取呢?接下來就去看下源碼。
順序消息實現原理
在【RocketMQ】消息的拉取一文中講到,消費者在啟動時會調用DefaultMQPushConsumerImpl
的start方法:
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
/**
* 預設的消息推送實現類
*/
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
/**
* 啟動
*/
@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
// 啟動消費者
this.defaultMQPushConsumerImpl.start();
// ...
}
}
在DefaultMQPushConsumerImpl
的start方法中,對消息監聽器類型進行了判斷,如果類型是MessageListenerOrderly
表示要進行順序消費,此時使用ConsumeMessageOrderlyService
對ConsumeMessageService
進行實例化,然後調用它的start方法進行啟動:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
// 消息消費service
private ConsumeMessageService consumeMessageService;
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
// ...
// 如果是順序消費
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
// 設置順序消費標記
this.consumeOrderly = true;
// 創建consumeMessageService,使用的是ConsumeMessageOrderlyService
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
// 併發消費使用ConsumeMessageConcurrentlyService
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
// 啟動ConsumeMessageService
this.consumeMessageService.start();
// ...
break;
// ...
}
// ...
}
}
加鎖定時任務
進入到ConsumeMessageOrderlyService
的start方法中,可以看到,如果是集群模式,會啟動一個定時加鎖的任務,周期性的對訂閱的消息隊列進行加鎖,具體是通過調用RebalanceImpl
的lockAll方法實現的:
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
public void start() {
// 如果是集群模式
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 周期性的執行加鎖方法
ConsumeMessageOrderlyService.this.lockMQPeriodically();
} catch (Throwable e) {
log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
}
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
public synchronized void lockMQPeriodically() {
if (!this.stopped) {
// 進行加鎖
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
}
}
為什麼集群模式下需要加鎖?
因為廣播模式下,消息隊列會分配給消費者下的每一個消費者,而在集群模式下,一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進行,所以在廣播模式下不存在競爭關係,也就不需要對消息隊列進行加鎖,而在集群模式下,有可能因為負載均衡等原因將某一個消息隊列分配到了另外一個消費者中,因此在集群模式下就要加鎖,當某個消息隊列被鎖定時,其他的消費者不能進行消費。
消息隊列加鎖
在RebalanceImpl
的lockAll
方法中,首先從處理隊列表中獲取當前消費者訂閱的所有消息隊列MessageQueue信息,返回數據是一個MAP,key為broker名稱,value為broker下的消息隊列,接著對MAP進行遍歷,處理每一個broker下的消息隊列:
- 獲取broker名稱,根據broker名稱查找broker的相關信息;
- 構建加鎖請求,在請求中設置要加鎖的消息隊列,然後將請求發送給broker,表示要對這些消息隊列進行加鎖;
- 加鎖請求返回的響應結果中包含了加鎖成功的消息隊列,此時遍歷加鎖成功的消息隊列,將消息隊列對應的
ProcessQueue
中的locked屬性置為true表示該消息隊列已加鎖成功; - 處理加鎖失敗的消息隊列,如果響應中未包含某個消息隊列的信息,表示此消息隊列加鎖失敗,需要將其對應的
ProcessQueue
對象中的locked屬性置為false表示加鎖失敗;
public abstract class RebalanceImpl {
public void lockAll() {
// 從處理隊列表中獲取broker對應的消息隊列,key為broker名稱,value為broker下的消息隊列
HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
// 遍歷訂閱的消息隊列
Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<MessageQueue>> entry = it.next();
// broker名稱
final String brokerName = entry.getKey();
// 獲取消息隊列
final Set<MessageQueue> mqs = entry.getValue();
if (mqs.isEmpty())
continue;
// 根據broker名稱獲取broker信息
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
// 構建加鎖請求
LockBatchRequestBody requestBody = new LockBatchRequestBody();
// 設置消費者組
requestBody.setConsumerGroup(this.consumerGroup);
// 設置ID
requestBody.setClientId(this.mQClientFactory.getClientId());
// 設置要加鎖的消息隊列
requestBody.setMqSet(mqs);
try {
// 批量進行加鎖,返回加鎖成功的消息隊列
Set<MessageQueue> lockOKMQSet =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
// 遍歷加鎖成功的隊列
for (MessageQueue mq : lockOKMQSet) {
// 從處理隊列表中獲取對應的處理隊列對象
ProcessQueue processQueue = this.processQueueTable.get(mq);
// 如果不為空,設置locked為true表示加鎖成功
if (processQueue != null) {
if (!processQueue.isLocked()) {
log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
}
// 設置加鎖成功標記
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
// 處理加鎖失敗的消息隊列
for (MessageQueue mq : mqs) {
if (!lockOKMQSet.contains(mq)) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
// 設置加鎖失敗標記
processQueue.setLocked(false);
log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);
}
}
}
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mqs, e);
}
}
}
}
}
在【RocketMQ】消息的拉取一文中講到,消費者需要先向Broker發送拉取消息請求,從Broker中拉取消息,拉取消息請求構建在RebalanceImpl的updateProcessQueueTableInRebalance
方法中,拉取消息的響應結果處理在PullCallback
的onSuccess方法中,接下來看下順序消費時在這兩個過程中是如何處理的。
拉取消息
上面已經知道,在使用順序消息時,會周期性的對訂閱的消息隊列進行加鎖,不過由於負載均衡等原因,有可能給當前消費者分配新的消息隊列,此時可能還未來得及通過定時任務加鎖,所以消費者在構建消息拉取請求前會再次進行判斷,如果processQueueTable中之前未包含某個消息隊列,會先調用lock方法進行加鎖,lock方法的實現邏輯與lockAll基本一致,如果加鎖成功構建拉取請求進行消息拉取,如果加鎖失敗,則跳過繼續處理下一個消息隊列:
public abstract class RebalanceImpl {
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
// ...
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
// 遍歷隊列集合
for (MessageQueue mq : mqSet) {
// 如果processQueueTable之前不包含當前的消息隊列
if (!this.processQueueTable.containsKey(mq)) {
// 如果是順序消費,調用lock方法進行加鎖,如果加鎖失敗不往下執行,繼續處理下一個消息隊列
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
// ...
// 如果偏移量大於等於0
if (nextOffset >= 0) {
// 放入處理隊列表中
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
// 如果之前不存在,構建PullRequest,之後對請求進行處理,進行消息拉取
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 添加消息拉取請求
this.dispatchPullRequest(pullRequestList);
return changed;
}
public boolean lock(final MessageQueue mq) {
// 獲取broker信息
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
// 構建加鎖請求
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
// 設置要加鎖的消息隊列
requestBody.getMqSet().add(mq);
try {
// 發送加鎖請求
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
// 如果加鎖成功設置成功標記
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
boolean lockOK = lockedMq.contains(mq);
log.info("the message queue lock {}, {} {}",
lockOK ? "OK" : "Failed",
this.consumerGroup,
mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
}
return false;
}
}
順序消息消費
在PullCallback
的onSuccess
方法中可以看到,如果從Broker拉取到消息,會調用ConsumeMessageService的submitConsumeRequest方法將消息提交到ConsumeMessageService中進行消費:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void pullMessage(final PullRequest pullRequest) {
// ...
// 拉取消息回調函數
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 處理拉取結果
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
// 判斷拉取結果
switch (pullResult.getPullStatus()) {
case FOUND:
// ...
// 如果未拉取到消息
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
// 將拉取請求放入到阻塞隊列中再進行一次拉取
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
// ...
// 如果拉取到消息,將消息提交到ConsumeMessageService中進行消費
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// ...
}
// ...
}
}
}
};
}
}
前面知道順序消費時使用的是ConsumeMessageOrderlyService
,首先在ConsumeMessageOrderlyService
的構造函數中可以看到
初始化了一個消息消費線程池,也就是說順序消費時也是開啟多線程進行消費的:
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerOrderly messageListener) {
// ...
// 設置消息消費線程池
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl(consumeThreadPrefix));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}
}
接下來看submitConsumeRequest方法,可以看到構建了ConsumeRequest
對象,將拉取的消息提交到了消息消費線程池中進行消費:
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
// 構建ConsumeRequest
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
}
消費時的消息隊列鎖
ConsumeRequest
是ConsumeMessageOrderlyService
的內部類,它有兩個成員變數,分別為MessageQueue
消息隊列和它對應的處理隊列ProcessQueue
對象。
在run方法中,對消息進行消費,處理邏輯如下:
- 判斷
ProcessQueue
是否被刪除,如果被刪除終止處理; - 調用messageQueueLock的ftchLockObject方法獲取消息隊列的對象鎖,然後使用synchronized進行加鎖,這裡加鎖的原因是因為順序消費使用的是線程池,可以設置多個線程同時進行消費,所以某個線程在進行消息消費的時候要對消息隊列加鎖,防止其他線程併發消費,破壞消息的順序性;
- 如果是廣播模式、或者當前的消息隊列已經加鎖成功(Locked置為true)並且加鎖時間未過期,開始對拉取的消息進行遍歷:
- 如果是集群模式並且消息隊列加鎖失敗,調用tryLockLaterAndReconsume稍後重新進行加鎖;
- 如果是集群模式並且消息隊列加鎖時間已經過期,調用tryLockLaterAndReconsume稍後重新進行加鎖;
- 如果當前時間距離開始處理的時間超過了最大消費時間,調用submitConsumeRequestLater稍後重新進行處理;
- 獲取批量消費消息個數,從ProcessQueue獲取消息內容,如果消息獲取不為空,添加消息消費鎖,然後調用messageListener的consumeMessage方法進行消息消費;
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue; // 消息隊列對應的處理隊列
private final MessageQueue messageQueue; // 消息隊列
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
@Override
public void run() {
// 處理隊列如果已經被置為刪除狀態,跳過不進行處理
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 獲取消息隊列的對象鎖
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
// 對象消息隊列的對象鎖加鎖
synchronized (objLock) {
// 如果是廣播模式、或者當前的消息隊列已經加鎖成功並且加鎖時間未過期
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
// 判斷processQueue是否刪除
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
// 如果是集群模式並且processQueue的加鎖失敗
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
// 稍後進行加鎖
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 如果是集群模式並且消息隊列加鎖時間已經過期
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
// 稍後進行加鎖
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
long interval = System.currentTimeMillis() - beginTime;
// 如果當前時間距離開始處理的時間超過了最大消費時間
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
// 稍後重新進行處理
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
// 批量消費消息個數
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 獲取消息內容
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
// ...
try {
// 加消費鎖
this.processQueue.getConsumeLock().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
// 消費消息
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
} finally {
// 釋放消息消費鎖
this.processQueue.getConsumeLock().unlock();
}
// ...
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
}
}
MessageQueueLock
中使用了ConcurrentHashMap
存儲每個消息隊列對應的對象鎖,對象鎖實際上是一個Object類的對象,從Map中獲取消息隊列的對象鎖時,如果對象鎖不存在,則新建一個Object對象,並放入Map集合中:
public class MessageQueueLock {
private ConcurrentMap<MessageQueue, Object> mqLockTable =
new ConcurrentHashMap<MessageQueue, Object>();
public Object fetchLockObject(final MessageQueue mq) {
// 獲取消息隊列對應的對象鎖,也就是一個Object類型的對象
Object objLock = this.mqLockTable.get(mq);
// 如果獲取尾款
if (null == objLock) {
// 創建對象
objLock = new Object();
// 加入到Map中
Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
if (prevLock != null) {
objLock = prevLock;
}
}
return objLock;
}
}
消息消費鎖
ProcessQueue
中持有一個消息消費鎖,消費者調用consumeMessage進行消息前,會添加消費鎖,上面已經知道在處理拉取到的消息時就已經調用messageQueueLock的fetchLockObject方法獲取消息隊列的對象鎖然後使用syncronized對其加鎖,那麼為什麼在消費之前還要再加一個消費鎖呢?
public class ProcessQueue {
// 消息消費鎖
private final Lock consumeLock = new ReentrantLock();
public Lock getConsumeLock() {
return consumeLock;
}
}
這裡講一個小技巧,如果在查看源碼的時候對某個方法有疑問,可以查看一下這個方法在哪裡被調用了,結合調用處的代碼處理邏輯進行猜測。
那麼就來看下getConsumeLock在哪裡被調用了,可以看到除了C的run方法中調用了之外,RebalancePushImpl
中的removeUnnecessaryMessageQueue方法也調用了getConsumeLock方法:
removeUnnecessaryMessageQueue方法從名字上可以看出,是移除不需要的消息隊列,RebalancePushImpl
是與負載均衡相關的類,所以猜測有可能在負載均衡時,需要移除某個消息隊列,那麼消費者在進行消費的時候就要獲取ProcessQueue
的consumeLock進行加鎖,防止正在消費的過程中,消費隊列被移除:
public class RebalancePushImpl extends RebalanceImpl {
@Override
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
// 如果是順序消費並且是集模式
if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
try {
// 進行加鎖
if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
try {
return this.unlockDelay(mq, pq);
} finally {
pq.getConsumeLock().unlock();
}
} else {
log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
mq,
pq.getTryUnlockTimes());
pq.incTryUnlockTimes();
}
} catch (Exception e) {
log.error("removeUnnecessaryMessageQueue Exception", e);
}
return false;
}
return true;
}
}
不過在消費者在消費消息前已經對隊列進行了加鎖,負載均衡的時候為什麼不使用隊列鎖而要使用消費鎖?
這裡應該是為了減小鎖的粒度,因為消費者在對消息隊列加鎖後,還進行了一系列的判斷,校驗都成功之後從處理隊列中獲取消息內容,之後才開始消費消息,如果負載均衡使用消息隊列鎖就要等待整個過程完成才有可能加鎖成功,這樣顯然會降低性能,而如果使用消息消費鎖,就可以減少等待的時間,並且消費者在進行消息消費前也會判斷ProcessQueue
是否被移除,所以只要保證consumeMessage方法在執行的過程中,ProcessQueue
不被移除即可。
總結
目前一共涉及了三把鎖,它們分別對應不同的情況:
向Broker申請的消息隊列鎖
集群模式下一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進行,為了避免負載均衡等原因引起的變動,消費者會向Broker發送請求對消息隊列進行加鎖,如果加鎖成功,記錄到消息隊列對應的ProcessQueue
中的locked變數中,它是boolean類型的:
public class ProcessQueue {
private volatile boolean locked = false;
}
消費者處理拉取消息時的消息隊列鎖
消費者在處理拉取到的消息時,由於可以開啟多線程進行處理,所以處理消息前通過MessageQueueLock
中的mqLockTable獲取到了消息隊列對應的鎖,鎖住要處理的消息隊列,這裡加消息隊列鎖主要是處理多線程之間的競爭:
public class MessageQueueLock {
private ConcurrentMap<MessageQueue, Object> mqLockTable =
new ConcurrentHashMap<MessageQueue, Object>();
消息消費鎖
消費者在調用consumeMessage方法之前會加消費鎖,主要是為了避免在消費消息時,由於負載均衡等原因,ProcessQueue
被刪除:
public class ProcessQueue {
private final Lock consumeLock = new ReentrantLock();
}
參考
丁威、周繼鋒《RocketMQ技術內幕》
RocketMQ版本:4.9.3