【RocketMQ】順序消息實現原理

来源:https://www.cnblogs.com/shanml/archive/2022/11/21/16909874.html
-Advertisement-
Play Games

全局有序 在RocketMQ中,如果使消息全局有序,可以為Topic設置一個消息隊列,使用一個生產者單線程發送數據,消費者端也使用單線程進行消費,從而保證消息的全局有序,但是這種方式效率低,一般不使用。 局部有序 假設一個Topic分配了兩個消息隊列,生產者在發送消息的時候,可以對消息設置一個路由I ...


全局有序
在RocketMQ中,如果使消息全局有序,可以為Topic設置一個消息隊列,使用一個生產者單線程發送數據,消費者端也使用單線程進行消費,從而保證消息的全局有序,但是這種方式效率低,一般不使用。

局部有序
假設一個Topic分配了兩個消息隊列,生產者在發送消息的時候,可以對消息設置一個路由ID,比如想保證一個訂單的相關消息有序,那麼就使用訂單ID當做路由ID,在發送消息的時候,通過訂單ID對消息隊列的個數取餘,根據取餘結果選擇消息隊列,這樣同一個訂單的數據就可以保證發送到一個消息隊列中,消費者端使用MessageListenerOrderly處理有序消息,這就是RocketMQ的局部有序,保證消息在某個消息隊列中有序。

接下來看RoceketMQ源碼中提供的順序消息例子(稍微做了一些修改):

生產者

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            // 創建生產者
            DefaultMQProducer producer = new DefaultMQProducer("生產者組");
            // 啟動
            producer.start();
            // 創建TAG
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                // 生成訂單ID
                int orderId = i % 10;
                // 創建消息
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 獲取訂單ID
                        Integer id = (Integer) arg;
                        // 對消息隊列個數取餘
                        int index = id % mqs.size();
                        // 根據取餘結果選擇消息要發送給哪個消息隊列
                        return mqs.get(index);
                    }
                }, orderId); // 這裡傳入了訂單ID
                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消費者

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 創建消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消費者組");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 訂閱主題
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        // 註冊消息監聽器,使用的是MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                // 列印消息
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

從例子中可以看出生產者在發送消息的時候,通過訂單ID作為路由信息,將同一個訂單ID的消息發送到了同一個消息隊列中,保證同一個訂單ID的消息有序,那麼消費者端是如何保證消息的順序讀取呢?接下來就去看下源碼。

順序消息實現原理

【RocketMQ】消息的拉取一文中講到,消費者在啟動時會調用DefaultMQPushConsumerImpl的start方法:

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
    
    /**
     * 預設的消息推送實現類
     */
    protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    
    /**
     * 啟動
     */
    @Override
    public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        // 啟動消費者
        this.defaultMQPushConsumerImpl.start();
        // ...
    }
}

DefaultMQPushConsumerImpl的start方法中,對消息監聽器類型進行了判斷,如果類型是MessageListenerOrderly表示要進行順序消費,此時使用ConsumeMessageOrderlyServiceConsumeMessageService進行實例化,然後調用它的start方法進行啟動:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    // 消息消費service
    private ConsumeMessageService consumeMessageService;
  
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                // ...
            
                // 如果是順序消費
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    // 設置順序消費標記
                    this.consumeOrderly = true;
                    // 創建consumeMessageService,使用的是ConsumeMessageOrderlyService
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    // 併發消費使用ConsumeMessageConcurrentlyService
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                // 啟動ConsumeMessageService
                this.consumeMessageService.start();

                // ...
                break;
          // ...
        }
        // ...
    }
}

加鎖定時任務

進入到ConsumeMessageOrderlyService的start方法中,可以看到,如果是集群模式,會啟動一個定時加鎖的任務,周期性的對訂閱的消息隊列進行加鎖,具體是通過調用RebalanceImpl的lockAll方法實現的:

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
    public void start() {
      
        // 如果是集群模式
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 周期性的執行加鎖方法
                        ConsumeMessageOrderlyService.this.lockMQPeriodically();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
                    }
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }
  
    public synchronized void lockMQPeriodically() {
        if (!this.stopped) {
            // 進行加鎖
            this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
        }
    }
}

為什麼集群模式下需要加鎖?
因為廣播模式下,消息隊列會分配給消費者下的每一個消費者,而在集群模式下,一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進行,所以在廣播模式下不存在競爭關係,也就不需要對消息隊列進行加鎖,而在集群模式下,有可能因為負載均衡等原因將某一個消息隊列分配到了另外一個消費者中,因此在集群模式下就要加鎖,當某個消息隊列被鎖定時,其他的消費者不能進行消費。

消息隊列加鎖

RebalanceImpllockAll方法中,首先從處理隊列表中獲取當前消費者訂閱的所有消息隊列MessageQueue信息,返回數據是一個MAP,key為broker名稱,value為broker下的消息隊列,接著對MAP進行遍歷,處理每一個broker下的消息隊列:

  1. 獲取broker名稱,根據broker名稱查找broker的相關信息;
  2. 構建加鎖請求,在請求中設置要加鎖的消息隊列,然後將請求發送給broker,表示要對這些消息隊列進行加鎖;
  3. 加鎖請求返回的響應結果中包含了加鎖成功的消息隊列,此時遍歷加鎖成功的消息隊列,將消息隊列對應的ProcessQueue中的locked屬性置為true表示該消息隊列已加鎖成功;
  4. 處理加鎖失敗的消息隊列,如果響應中未包含某個消息隊列的信息,表示此消息隊列加鎖失敗,需要將其對應的ProcessQueue對象中的locked屬性置為false表示加鎖失敗;
public abstract class RebalanceImpl {
    public void lockAll() {
        // 從處理隊列表中獲取broker對應的消息隊列,key為broker名稱,value為broker下的消息隊列
        HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
        // 遍歷訂閱的消息隊列
        Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, Set<MessageQueue>> entry = it.next();
            // broker名稱
            final String brokerName = entry.getKey();
            // 獲取消息隊列
            final Set<MessageQueue> mqs = entry.getValue();
            if (mqs.isEmpty())
                continue;
            // 根據broker名稱獲取broker信息
            FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
            if (findBrokerResult != null) {
                // 構建加鎖請求
                LockBatchRequestBody requestBody = new LockBatchRequestBody();
                // 設置消費者組
                requestBody.setConsumerGroup(this.consumerGroup);
                // 設置ID
                requestBody.setClientId(this.mQClientFactory.getClientId());
                // 設置要加鎖的消息隊列
                requestBody.setMqSet(mqs);

                try {
                    // 批量進行加鎖,返回加鎖成功的消息隊列
                    Set<MessageQueue> lockOKMQSet =
                        this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
                    // 遍歷加鎖成功的隊列
                    for (MessageQueue mq : lockOKMQSet) {
                        // 從處理隊列表中獲取對應的處理隊列對象
                        ProcessQueue processQueue = this.processQueueTable.get(mq);
                        // 如果不為空,設置locked為true表示加鎖成功
                        if (processQueue != null) {
                            if (!processQueue.isLocked()) {
                                log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
                            }
                            // 設置加鎖成功標記
                            processQueue.setLocked(true);
                            processQueue.setLastLockTimestamp(System.currentTimeMillis());
                        }
                    }
                    // 處理加鎖失敗的消息隊列
                    for (MessageQueue mq : mqs) {
                        if (!lockOKMQSet.contains(mq)) {
                            ProcessQueue processQueue = this.processQueueTable.get(mq);
                            if (processQueue != null) {
                                // 設置加鎖失敗標記
                                processQueue.setLocked(false);
                                log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("lockBatchMQ exception, " + mqs, e);
                }
            }
        }
    }
}

【RocketMQ】消息的拉取一文中講到,消費者需要先向Broker發送拉取消息請求,從Broker中拉取消息,拉取消息請求構建在RebalanceImpl的updateProcessQueueTableInRebalance方法中,拉取消息的響應結果處理在PullCallback的onSuccess方法中,接下來看下順序消費時在這兩個過程中是如何處理的。

拉取消息

上面已經知道,在使用順序消息時,會周期性的對訂閱的消息隊列進行加鎖,不過由於負載均衡等原因,有可能給當前消費者分配新的消息隊列,此時可能還未來得及通過定時任務加鎖,所以消費者在構建消息拉取請求前會再次進行判斷,如果processQueueTable中之前未包含某個消息隊列,會先調用lock方法進行加鎖,lock方法的實現邏輯與lockAll基本一致,如果加鎖成功構建拉取請求進行消息拉取,如果加鎖失敗,則跳過繼續處理下一個消息隊列:

public abstract class RebalanceImpl {
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
        // ...
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        // 遍歷隊列集合
        for (MessageQueue mq : mqSet) {
            // 如果processQueueTable之前不包含當前的消息隊列
            if (!this.processQueueTable.containsKey(mq)) {
                // 如果是順序消費,調用lock方法進行加鎖,如果加鎖失敗不往下執行,繼續處理下一個消息隊列
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }
                // ... 
                // 如果偏移量大於等於0
                if (nextOffset >= 0) {
                    // 放入處理隊列表中
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        // 如果之前不存在,構建PullRequest,之後對請求進行處理,進行消息拉取
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
        // 添加消息拉取請求
        this.dispatchPullRequest(pullRequestList);

        return changed;
    }
  
    public boolean lock(final MessageQueue mq) {
        // 獲取broker信息
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
        if (findBrokerResult != null) {
            // 構建加鎖請求
            LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            // 設置要加鎖的消息隊列
            requestBody.getMqSet().add(mq);

            try {
                // 發送加鎖請求
                Set<MessageQueue> lockedMq =
                    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
                for (MessageQueue mmqq : lockedMq) {
                    ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                    // 如果加鎖成功設置成功標記
                    if (processQueue != null) {
                        processQueue.setLocked(true);
                        processQueue.setLastLockTimestamp(System.currentTimeMillis());
                    }
                }
                boolean lockOK = lockedMq.contains(mq);
                log.info("the message queue lock {}, {} {}",
                    lockOK ? "OK" : "Failed",
                    this.consumerGroup,
                    mq);
                return lockOK;
            } catch (Exception e) {
                log.error("lockBatchMQ exception, " + mq, e);
            }
        }

        return false;
    }
}

順序消息消費

PullCallbackonSuccess方法中可以看到,如果從Broker拉取到消息,會調用ConsumeMessageService的submitConsumeRequest方法將消息提交到ConsumeMessageService中進行消費:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    public void pullMessage(final PullRequest pullRequest) {
        // ...
        // 拉取消息回調函數
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    // 處理拉取結果
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                            subscriptionData);
                    // 判斷拉取結果
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            // ...
                            // 如果未拉取到消息
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                // 將拉取請求放入到阻塞隊列中再進行一次拉取
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                // ...
                                // 如果拉取到消息,將消息提交到ConsumeMessageService中進行消費
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                        pullResult.getMsgFoundList(),
                                        processQueue,
                                        pullRequest.getMessageQueue(),
                                        dispatchToConsume);
                                // ...
                            }
                        // ...
                    }
                }
            }
        };
    }
}

前面知道順序消費時使用的是ConsumeMessageOrderlyService,首先在ConsumeMessageOrderlyService的構造函數中可以看到
初始化了一個消息消費線程池,也就是說順序消費時也是開啟多線程進行消費的:

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
    public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerOrderly messageListener) {
        // ...
        // 設置消息消費線程池
        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl(consumeThreadPrefix));
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
    }
}

接下來看submitConsumeRequest方法,可以看到構建了ConsumeRequest對象,將拉取的消息提交到了消息消費線程池中進行消費:

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
   
    @Override
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) {
            // 構建ConsumeRequest
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }
    
}

消費時的消息隊列鎖

ConsumeRequestConsumeMessageOrderlyService的內部類,它有兩個成員變數,分別為MessageQueue消息隊列和它對應的處理隊列ProcessQueue對象。
在run方法中,對消息進行消費,處理邏輯如下:

  1. 判斷ProcessQueue是否被刪除,如果被刪除終止處理;
  2. 調用messageQueueLock的ftchLockObject方法獲取消息隊列的對象鎖,然後使用synchronized進行加鎖,這裡加鎖的原因是因為順序消費使用的是線程池,可以設置多個線程同時進行消費,所以某個線程在進行消息消費的時候要對消息隊列加鎖,防止其他線程併發消費,破壞消息的順序性
  3. 如果是廣播模式、或者當前的消息隊列已經加鎖成功(Locked置為true)並且加鎖時間未過期,開始對拉取的消息進行遍歷:
  • 如果是集群模式並且消息隊列加鎖失敗,調用tryLockLaterAndReconsume稍後重新進行加鎖;
  • 如果是集群模式並且消息隊列加鎖時間已經過期,調用tryLockLaterAndReconsume稍後重新進行加鎖;
  • 如果當前時間距離開始處理的時間超過了最大消費時間,調用submitConsumeRequestLater稍後重新進行處理;
  • 獲取批量消費消息個數,從ProcessQueue獲取消息內容,如果消息獲取不為空,添加消息消費鎖,然後調用messageListener的consumeMessage方法進行消息消費;
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
 
   class ConsumeRequest implements Runnable {
        private final ProcessQueue processQueue; // 消息隊列對應的處理隊列
        private final MessageQueue messageQueue; // 消息隊列

        public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }

        @Override
        public void run() {
            // 處理隊列如果已經被置為刪除狀態,跳過不進行處理
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }
            // 獲取消息隊列的對象鎖
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            // 對象消息隊列的對象鎖加鎖
            synchronized (objLock) {
                // 如果是廣播模式、或者當前的消息隊列已經加鎖成功並且加鎖時間未過期
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                    final long beginTime = System.currentTimeMillis();
                    for (boolean continueConsume = true; continueConsume; ) {
                        // 判斷processQueue是否刪除
                        if (this.processQueue.isDropped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            break;
                        }
                        // 如果是集群模式並且processQueue的加鎖失敗
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && !this.processQueue.isLocked()) {
                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                            // 稍後進行加鎖
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }
                        // 如果是集群模式並且消息隊列加鎖時間已經過期
                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && this.processQueue.isLockExpired()) {
                            log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                            // 稍後進行加鎖
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }

                        long interval = System.currentTimeMillis() - beginTime;
                        // 如果當前時間距離開始處理的時間超過了最大消費時間
                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                            // 稍後重新進行處理
                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                            break;
                        }
                        // 批量消費消息個數
                        final int consumeBatchSize =
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                        // 獲取消息內容
                        List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                        defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                        if (!msgs.isEmpty()) {
                            // ...
                            try {
                                // 加消費鎖
                                this.processQueue.getConsumeLock().lock();
                                if (this.processQueue.isDropped()) {
                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }
                                // 消費消息
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                                log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
                                    RemotingHelper.exceptionSimpleDesc(e),
                                    ConsumeMessageOrderlyService.this.consumerGroup,
                                    msgs,
                                    messageQueue), e);
                                hasException = true;
                            } finally {
                                // 釋放消息消費鎖
                                this.processQueue.getConsumeLock().unlock();
                            }
                            // ...
                            ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                                .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        } else {
                            continueConsume = false;
                        }
                    }
                } else {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        return;
                    }

                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }

    }
}

MessageQueueLock中使用了ConcurrentHashMap存儲每個消息隊列對應的對象鎖,對象鎖實際上是一個Object類的對象,從Map中獲取消息隊列的對象鎖時,如果對象鎖不存在,則新建一個Object對象,並放入Map集合中:

public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable =
        new ConcurrentHashMap<MessageQueue, Object>();

    public Object fetchLockObject(final MessageQueue mq) {
        // 獲取消息隊列對應的對象鎖,也就是一個Object類型的對象
        Object objLock = this.mqLockTable.get(mq);
        // 如果獲取尾款
        if (null == objLock) {
            // 創建對象
            objLock = new Object();
            // 加入到Map中
            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
            if (prevLock != null) {
                objLock = prevLock;
            }
        }

        return objLock;
    }
}

消息消費鎖

ProcessQueue中持有一個消息消費鎖,消費者調用consumeMessage進行消息前,會添加消費鎖,上面已經知道在處理拉取到的消息時就已經調用messageQueueLock的fetchLockObject方法獲取消息隊列的對象鎖然後使用syncronized對其加鎖,那麼為什麼在消費之前還要再加一個消費鎖呢?

public class ProcessQueue {
    // 消息消費鎖
    private final Lock consumeLock = new ReentrantLock();

    public Lock getConsumeLock() {
        return consumeLock;
    }
}

這裡講一個小技巧,如果在查看源碼的時候對某個方法有疑問,可以查看一下這個方法在哪裡被調用了,結合調用處的代碼處理邏輯進行猜測。
那麼就來看下getConsumeLock在哪裡被調用了,可以看到除了C的run方法中調用了之外,RebalancePushImpl中的removeUnnecessaryMessageQueue方法也調用了getConsumeLock方法:

removeUnnecessaryMessageQueue方法從名字上可以看出,是移除不需要的消息隊列,RebalancePushImpl是與負載均衡相關的類,所以猜測有可能在負載均衡時,需要移除某個消息隊列,那麼消費者在進行消費的時候就要獲取ProcessQueue的consumeLock進行加鎖,防止正在消費的過程中,消費隊列被移除:

public class RebalancePushImpl extends RebalanceImpl {
   @Override
    public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
        this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
        this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
        // 如果是順序消費並且是集模式
        if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
            && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
            try {
                // 進行加鎖
                if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
                    try {
                        return this.unlockDelay(mq, pq);
                    } finally {
                        pq.getConsumeLock().unlock();
                    }
                } else {
                    log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
                        mq,
                        pq.getTryUnlockTimes());

                    pq.incTryUnlockTimes();
                }
            } catch (Exception e) {
                log.error("removeUnnecessaryMessageQueue Exception", e);
            }

            return false;
        }
        return true;
    }
}

不過在消費者在消費消息前已經對隊列進行了加鎖,負載均衡的時候為什麼不使用隊列鎖而要使用消費鎖?

這裡應該是為了減小鎖的粒度,因為消費者在對消息隊列加鎖後,還進行了一系列的判斷,校驗都成功之後從處理隊列中獲取消息內容,之後才開始消費消息,如果負載均衡使用消息隊列鎖就要等待整個過程完成才有可能加鎖成功,這樣顯然會降低性能,而如果使用消息消費鎖,就可以減少等待的時間,並且消費者在進行消息消費前也會判斷ProcessQueue是否被移除,所以只要保證consumeMessage方法在執行的過程中,ProcessQueue不被移除即可。

總結

目前一共涉及了三把鎖,它們分別對應不同的情況:

向Broker申請的消息隊列鎖

集群模式下一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進行,為了避免負載均衡等原因引起的變動,消費者會向Broker發送請求對消息隊列進行加鎖,如果加鎖成功,記錄到消息隊列對應的ProcessQueue中的locked變數中,它是boolean類型的:

public class ProcessQueue {
    private volatile boolean locked = false;
}

消費者處理拉取消息時的消息隊列鎖

消費者在處理拉取到的消息時,由於可以開啟多線程進行處理,所以處理消息前通過MessageQueueLock中的mqLockTable獲取到了消息隊列對應的鎖,鎖住要處理的消息隊列,這裡加消息隊列鎖主要是處理多線程之間的競爭:

public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable =
        new ConcurrentHashMap<MessageQueue, Object>();

消息消費鎖

消費者在調用consumeMessage方法之前會加消費鎖,主要是為了避免在消費消息時,由於負載均衡等原因,ProcessQueue被刪除:


public class ProcessQueue {
    private final Lock consumeLock = new ReentrantLock();
}

參考
丁威、周繼鋒《RocketMQ技術內幕》

RocketMQ版本:4.9.3


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言: 學了三天,我們學習了 TS 的基本類型聲明,TS 的編譯,webpack 打包,其實也就差不多了,剩下的也就一些 類,繼承,構造函數,抽象類,泛型一些的,如果都細緻的講可能寫好久,感興趣的可以自己找資料細緻的去學一下 學習代碼或一個新語法,最好的方法無非就是做項目,從這個過程中學會如何去使用 ...
  • 對 Chrome 擴展功能熟悉的小伙伴,可能都有用過 Chrome 的 3D 展示頁面層級關係這個功能。 可以通過 控制台 --> 右邊的三個小點 --> More Tools --> Layers 打開。即可以看到頁面的一個 3D 層級關係,像是這樣: 這個功能有幾個不錯的作用: 頁面層級概覽 快 ...
  • 學習信息 學習形式:網路教學視頻 學習地址:https://www.bilibili.com/video/BV1Sy4y1C7ha/?spm_id_from=333.337.search-card.all.click 學習開始時間:2022年11月18日 01 初識 JavaScript 瀏覽器執行 ...
  • 先聲明一下:我所在的公司是一個小團隊,做物聯網相關的,前後端、硬體、測試加起來也就五六十個人左右;本人的崗位是Java開發(兼DBA、運維。。。);我進公司時整個項目的部署架構為 簡單jar包部署微服務集群形式;去年公司將部分服務使用docker進行部署;因為現在服務稍微有點多導致容器管理起來也比較 ...
  • # 1.索引(下標) print('1.索引') str_data = 'Python' # [索引(下標)]取索引的格式 # 正負索引 # 獲取單個數據 sub_str = str_data[4] print(sub_str) sub_str = str_data[-2] print(sub_st ...
  • 我們結合運算符重載知識實現string 類 在自己實現的String類中可以參考C++中string的方法 例如構造,加法,大小比較,長度,[] 等操作. 當前的MyString 類中,暫時不加入迭代器,我們將在下一節中加入迭代器的代碼. #include <iostream> using name ...
  • Java異常 1.概念理解 異常(Exepotion)指程式運行過程中不期而至的各種狀況,它阻止了程式按照程式員的預期正常執行,這就是異常(開發過程中的語法錯誤和1.邏輯錯誤不是異常)。如文件找不到,網路連接失敗,非法參數等。 異常發生在程式運行期間,影響程式的正常執行。 2.常見異常分類 **檢查 ...
  • WEB開發會話技術02 6.Cookie的生命周期 預設情況下,Cookie只在瀏覽器的記憶體中存活,也就是說,當你關閉瀏覽器後,Cookie就會消失。但是也可以通過方法設置cookie的生存時間。 cookie的生命周期指的是如何管理cookie,什麼時候cookie被銷毀。 setMaxAge(i ...
一周排行
    -Advertisement-
    Play Games
  • GoF之工廠模式 @目錄GoF之工廠模式每博一文案1. 簡單說明“23種設計模式”1.2 介紹工廠模式的三種形態1.3 簡單工廠模式(靜態工廠模式)1.3.1 簡單工廠模式的優缺點:1.4 工廠方法模式1.4.1 工廠方法模式的優缺點:1.5 抽象工廠模式1.6 抽象工廠模式的優缺點:2. 總結:3 ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 本章將和大家分享ES的數據同步方案和ES集群相關知識。廢話不多說,下麵我們直接進入主題。 一、ES數據同步 1、數據同步問題 Elasticsearch中的酒店數據來自於mysql資料庫,因此mysql數據發生改變時,Elasticsearch也必須跟著改變,這個就是Elasticsearch與my ...
  • 引言 在我們之前的文章中介紹過使用Bogus生成模擬測試數據,今天來講解一下功能更加強大自動生成測試數據的工具的庫"AutoFixture"。 什麼是AutoFixture? AutoFixture 是一個針對 .NET 的開源庫,旨在最大程度地減少單元測試中的“安排(Arrange)”階段,以提高 ...
  • 經過前面幾個部分學習,相信學過的同學已經能夠掌握 .NET Emit 這種中間語言,並能使得它來編寫一些應用,以提高程式的性能。隨著 IL 指令篇的結束,本系列也已經接近尾聲,在這接近結束的最後,會提供幾個可供直接使用的示例,以供大伙分析或使用在項目中。 ...
  • 當從不同來源導入Excel數據時,可能存在重覆的記錄。為了確保數據的準確性,通常需要刪除這些重覆的行。手動查找並刪除可能會非常耗費時間,而通過編程腳本則可以實現在短時間內處理大量數據。本文將提供一個使用C# 快速查找並刪除Excel重覆項的免費解決方案。 以下是實現步驟: 1. 首先安裝免費.NET ...
  • C++ 異常處理 C++ 異常處理機制允許程式在運行時處理錯誤或意外情況。它提供了捕獲和處理錯誤的一種結構化方式,使程式更加健壯和可靠。 異常處理的基本概念: 異常: 程式在運行時發生的錯誤或意外情況。 拋出異常: 使用 throw 關鍵字將異常傳遞給調用堆棧。 捕獲異常: 使用 try-catch ...
  • 優秀且經驗豐富的Java開發人員的特征之一是對API的廣泛瞭解,包括JDK和第三方庫。 我花了很多時間來學習API,尤其是在閱讀了Effective Java 3rd Edition之後 ,Joshua Bloch建議在Java 3rd Edition中使用現有的API進行開發,而不是為常見的東西編 ...
  • 框架 · 使用laravel框架,原因:tp的框架路由和orm沒有laravel好用 · 使用強制路由,方便介面多時,分多版本,分文件夾等操作 介面 · 介面開發註意欄位類型,欄位是int,查詢成功失敗都要返回int(對接java等強類型語言方便) · 查詢介面用GET、其他用POST 代碼 · 所 ...
  • 正文 下午找企業的人去鎮上做貸後。 車上聽同事跟那個司機對罵,火星子都快出來了。司機跟那同事更熟一些,連我在內一共就三個人,同事那一手指桑罵槐給我都聽愣了。司機也是老社會人了,馬上聽出來了,為那個無辜的企業經辦人辯護,實際上是為自己辯護。 “這個事情你不能怪企業。”“但他們總不能讓銀行的人全權負責, ...