為了方便大家理解我把之前方案的圖片複製過來了,如下: 上圖的方案存在一個問題,就是我們今天文章要聊的內容。 這個問題就是當 MQ Consumer 收到消息後,就直接發佈 Event 了,如果是同步的,沒有問題。如果某個 EventListener 中處理失敗了,那麼這條消息將不會 ACK。 如果是 ...
上圖的方案存在一個問題,就是我們今天文章要聊的內容。
這個問題就是當 MQ Consumer 收到消息後,就直接發佈 Event 了,如果是同步的,沒有問題。如果某個 EventListener 中處理失敗了,那麼這條消息將不會 ACK。
如果是非同步發佈 Event 的場景,發佈完消息馬上就 ACK 了。就算某個 EventListener 中處理失敗了,MQ 也感知不到,不會進行消息的重新投遞,這就是存在的問題。
解決方案
方案一
既然消息已經 ACK 了,那就不利用 MQ 的重試功能了,使用方自己重試是不是也可以呢?
可肯定是可以的,內部處理是否成功肯定是可以知道的,如果處理失敗了可以預設重試,或者有一定策略的重試。實在不行還可以落庫,保存記錄。
這樣的問題在於太煩了呀,每個使用的地方都要去做這件事情,而且對於未來接手你代碼的程式小哥哥來說,這很有可能讓小哥哥頭髮慢慢脫落啊。。。。
脫落不要緊,關鍵他還不知道要做這個處理,說不定哪天就背鍋了,慘兮兮。。。。
方案二
要保證消息和業務處理的一致性,就不能立馬進行 ACK 操作。而是要等業務處理完成後再決定是否要 ACK。
如果有處理失敗的就不應該 ACK,這樣就能復用 MQ 的重試機制了。
分析下來,這就是一個典型的非同步轉同步的場景。像 Dubbo 中也有這個場景,所以我們可以借鑒 Dubbo 中的實現思路。
創建一個 DefaultFuture 用於同步等待獲取任務執行結果。然後在 MQ 消費的地方使用 DefaultFuture。
@Service @RocketMQMessageListener(topic = "${rocketmq.topic.data_change}", consumerGroup = "${rocketmq.group.data_change_consumer}") public class DataChangeConsume implements RocketMQListener<DataChangeRequest> { @Autowired private ApplicationContext applicationContext; @Autowired private CustomApplicationContextAware customApplicationContextAware; @Override public void onMessage(DataChangeRequest dataChangeRequest) { log.info("received message {} , Thread {}", dataChangeRequest, Thread.currentThread().getName()); DataChangeEvent event = new DataChangeEvent(this); event.setChangeType(dataChangeRequest.getChangeType()); event.setTable(dataChangeRequest.getTable()); event.setMessageId(dataChangeRequest.getMessageId()); DefaultFuture defaultFuture = DefaultFuture.newFuture(dataChangeRequest, customApplicationContextAware.getTaskCount(), 6000 * 10); applicationContext.publishEvent(event); Boolean result = defaultFuture.get(); log.info("MessageId {} 處理結果 {}", dataChangeRequest.getMessageId(), result); if (!result) { throw new RuntimeException("處理失敗,不進行消息ACK,等待下次重試"); } } }
newFuture() 會傳入事件參數,超時時間,任務數量幾個參數。任務數量是用於判斷所有 EventListener 是否全部執行完成。
defaultFuture.get(); 這不就會阻塞,等待所有任務執行完成才會返回結果,如果所有業務都處理成功了,那麼會返回 true,流程結束,消息自動 ACK。
如果返回了 false 證明有處理失敗的或者超時的,就不需要 ACK 了,拋出異常等待重試。
public Boolean get() { if (isDone()) { return true; } long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); // 有失敗的任務反饋 if (!isSuccessDone()) { return false; } // 全部執行成功 if (isDone()) { return true; } // 超時 if (System.currentTimeMillis() - start > timeout) { return false; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } return true; }
isDone() 會判斷反饋結果了的任務數量跟總數量是否一致,如果一直就說明全部執行完成了。
public boolean isDone() { return feedbackResultCount.get() == taskCount; }
那麼任務執行完了怎麼反饋呢? 不可能讓每個使用的方法去關心,所以我們定義了一個切麵來做這件事情。
@Aspect @Component public class EventListenerAspect { @Around(value = "@annotation(eventListener)") public Object aroundAdvice(ProceedingJoinPoint joinpoint, EventListener eventListener) throws Throwable { DataChangeEvent event = null; boolean executeResult = true; try { event = (DataChangeEvent)joinpoint.getArgs()[0]; Object result = joinpoint.proceed(); return result; } catch (Exception e) { executeResult = false; throw e; } finally { DefaultFuture.received(event.getMessageId(), executeResult); } } }
通過 DefaultFuture.received() 反饋執行結果。
public static void received(String id, boolean result) { DefaultFuture future = FUTURES.get(id); if (future != null) { // 累加失敗任務數量 if (!result) { future.feedbackFailResultCount.incrementAndGet(); } // 累加執行完成任務數量 future.feedbackResultCount.incrementAndGet(); if (future.isDone()) { FUTURES.remove(id); future.doReceived(); } } } private void doReceived() { lock.lock(); try { if (done != null) { // 喚醒阻塞的線程 done.signal(); } } finally { lock.unlock(); } }
下麵我們來總結整個流程:
-
收到 MQ 消息,組裝成 DefaultFuture,通過 get 方法獲取執行結果,未執行完的時候此方法阻塞。
-
通過切麵切入加了 EventListener 的方法,判斷是否有異常來判斷任務的執行結果。
-
通過 DefaultFuture.received() 反饋結果。
-
反饋時計算是否全部完成,全部完成則喚醒阻塞的線程。DefaultFuture.get()就能獲取到結果。
-
是否要進行 ACK 操作。
需要註意的是每個 EventListener 內部消費的邏輯都要做冪等控制。