Redis實現延遲隊列方法介紹

来源:https://www.cnblogs.com/a609251438/archive/2020/07/13/13295253.html
-Advertisement-
Play Games

延遲隊列,顧名思義它是一種帶有延遲功能的消息隊列。那麼,是在什麼場景下我才需要這樣的隊列呢? 1. 背景 我們先看看以下業務場景: 當訂單一直處於未支付狀態時,如何及時的關閉訂單 如何定期檢查處於退款狀態的訂單是否已經退款成功 在訂單長時間沒有收到下游系統的狀態通知的時候,如何實現階梯式的同步訂單狀 ...


延遲隊列,顧名思義它是一種帶有延遲功能的消息隊列。那麼,是在什麼場景下我才需要這樣的隊列呢?

1. 背景

我們先看看以下業務場景:

  • 當訂單一直處於未支付狀態時,如何及時的關閉訂單
  • 如何定期檢查處於退款狀態的訂單是否已經退款成功
  • 在訂單長時間沒有收到下游系統的狀態通知的時候,如何實現階梯式的同步訂單狀態的策略
  • 在系統通知上游系統支付成功終態時,上游系統返回通知失敗,如何進行非同步通知實行分頻率發送:15s 3m 10m 30m 30m 1h 2h 6h 15h

1.1 解決方案

  • 最簡單的方式,定時掃表。例如對於訂單支付失效要求比較高的,每2S掃表一次檢查過期的訂單進行主動關單操作。優點是簡單,缺點是每分鐘全局掃表,浪費資源,如果遇到表數據訂單量即將過期的訂單量很大,會造成關單延遲。
  • 使用RabbitMq或者其他MQ改造實現延遲隊列,優點是,開源,現成的穩定的實現方案,缺點是:MQ是一個消息中間件,如果團隊技術棧本來就有MQ,那還好,如果不是,那為了延遲隊列而去部署一套MQ成本有點大
  • 使用Redis的zset、list的特性,我們可以利用redis來實現一個延遲隊列RedisDelayQueue

2. 設計目標

  • 實時性:允許存在一定時間的秒級誤差
  • 高可用性:支持單機、支持集群
  • 支持消息刪除:業務會隨時刪除指定消息
  • 消息可靠性:保證至少被消費一次
  • 消息持久化:基於Redis自身的持久化特性,如果Redis數據丟失,意味著延遲消息的丟失,不過可以做主備和集群保證。這個可以考慮後續優化將消息持久化到MangoDB中

3. 設計方案

設計主要包含以下幾點:

  • 將整個Redis當做消息池,以KV形式存儲消息
  • 使用ZSET做優先隊列,按照Score維持優先順序
  • 使用LIST結構,以先進先出的方式消費
  • ZSET和LIST存儲消息地址(對應消息池的每個KEY)
  • 自定義路由對象,存儲ZSET和LIST名稱,以點對點的方式將消息從ZSET路由到正確的LIST
  • 使用定時器維護路由
  • 根據TTL規則實現消息延遲

3.1 設計圖

還是基於有贊的延遲隊列設計,進行優化改造及代碼實現。有贊設計

3.2 數據結構

  • ZING:DELAY_QUEUE:JOB_POOL 是一個Hash_Table結構,裡面存儲了所有延遲隊列的信息。KV結構:K=prefix+projectName field = topic+jobId V=CONENT;V由客戶端傳入的數據,消費的時候回傳
  • ZING:DELAY_QUEUE:BUCKET 延遲隊列的有序集合ZSET,存放K=ID和需要的執行時間戳,根據時間戳排序
  • ZING:DELAY_QUEUE:QUEUE LIST結構,每個Topic一個LIST,list存放的都是當前需要被消費的JOB

圖片僅供參考,基本可以描述整個流程的執行過程

3.3 任務的生命周期

  1. 新增一個JOB,會在ZING:DELAY_QUEUE:JOB_POOL中插入一條數據,記錄了業務方消費方。ZING:DELAY_QUEUE:BUCKET也會插入一條記錄,記錄執行的時間戳
  2. 搬運線程會去ZING:DELAY_QUEUE:BUCKET中查找哪些執行時間戳的RunTimeMillis比現在的時間小,將這些記錄全部刪除;同時會解析出每個任務的Topic是什麼,然後將這些任務PUSH到TOPIC對應的列表ZING:DELAY_QUEUE:QUEUE
  3. 每個TOPIC的LIST都會有一個監聽線程去批量獲取LIST中的待消費數據,獲取到的數據全部扔給這個TOPIC的消費線程池
  4. 消費線程池執行會去ZING:DELAY_QUEUE:JOB_POOL查找數據結構,返回給回調結構,執行回調方法。

3.4 設計要點

3.4.1 基本概念

  • JOB:需要非同步處理的任務,是延遲隊列里的基本單元
  • Topic:一組相同類型Job的集合(隊列)。供消費者來訂閱

3.4.2 消息結構

每個JOB必須包含以下幾個屬性

  • jobId:Job的唯一標識。用來檢索和刪除指定的Job信息
  • topic:Job類型。可以理解成具體的業務名稱
  • delay:Job需要延遲的時間。單位:秒。(服務端會將其轉換為絕對時間)
  • body:Job的內容,供消費者做具體的業務處理,以json格式存儲
  • retry:失敗重試次數
  • url:通知URL

3.5 設計細節

3.5.1 如何快速消費ZING:DELAY_QUEUE:QUEUE

最簡單的實現方式就是使用定時器進行秒級掃描,為了保證消息執行的時效性,可以設置每1S請求Redis一次,判斷隊列中是否有待消費的JOB。但是這樣會存在一個問題,如果queue中一直沒有可消費的JOB,那頻繁的掃描就失去了意義,也浪費了資源,幸好LIST中有一個BLPOP阻塞原語,如果list中有數據就會立馬返回,如果沒有數據就會一直阻塞在那裡,直到有數據返回,可以設置阻塞的超時時間,超時會返回NULL;具體的實現方式及策略會在代碼中進行具體的實現介紹

3.5.2 避免定時導致的消息重覆搬運及消費

  • 使用Redis的分散式鎖來控制消息的搬運,從而避免消息被重覆搬運導致的問題
  • 使用分散式鎖來保證定時器的執行頻率

4. 核心代碼實現

4.1 技術說明

技術棧:SpringBoot,Redisson,Redis,分散式鎖,定時器

註意:本項目沒有實現設計方案中的多Queue消費,只開啟了一個QUEUE,這個待以後優化

4.2 核心實體

4.2.1 Job新增對象

/**
 * 消息結構
 *
 * @author 睜眼看世界
 * @date 2020年1月15日
 */
@Data
public class Job implements Serializable {
 
 private static final long serialVersionUID = 1L;
 
 /**
 * Job的唯一標識。用來檢索和刪除指定的Job信息
 */
 @NotBlank
 private String jobId;
 
 
 /**
 * Job類型。可以理解成具體的業務名稱
 */
 @NotBlank
 private String topic;
 
 /**
 * Job需要延遲的時間。單位:秒。(服務端會將其轉換為絕對時間)
 */
 private Long delay;
 
 /**
 * Job的內容,供消費者做具體的業務處理,以json格式存儲
 */
 @NotBlank
 private String body;
 
 /**
 * 失敗重試次數
 */
 private int retry = 0;
 
 /**
 * 通知URL
 */
 @NotBlank
 private String url;
}
4.2.2 Job刪除對象
/**
 * 消息結構
 *
 * @author 睜眼看世界
 * @date 2020年1月15日
 */
@Data
public class JobDie implements Serializable {
 
 private static final long serialVersionUID = 1L;
 
 /**
 * Job的唯一標識。用來檢索和刪除指定的Job信息
 */
 @NotBlank
 private String jobId;
 
 
 /**
 * Job類型。可以理解成具體的業務名稱
 */
 @NotBlank
 private String topic;
}

4.3 搬運線程

/**
 * 搬運線程
 *
 * @author 睜眼看世界
 * @date 2020年1月17日
 */
@Slf4j
@Component
public class CarryJobScheduled {
 
 @Autowired
 private RedissonClient redissonClient;
 
 /**
 * 啟動定時開啟搬運JOB信息
 */
 @Scheduled(cron = "*/1 * * * * *")
 public void carryJobToQueue() {
 System.out.println("carryJobToQueue --->");
 RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
 try {
 boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
 if (!lockFlag) {
 throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
 }
 RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
 long now = System.currentTimeMillis();
 Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true);
 List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
 RList<String> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
 readyQueue.addAll(jobList);
 bucketSet.removeAllAsync(jobList);
 } catch (InterruptedException e) {
 log.error("carryJobToQueue error", e);
 } finally {
 if (lock != null) {
 lock.unlock();
 }
 }
 }
}

4.4 消費線程

@Slf4j
@Component
public class ReadyQueueContext {
 
 @Autowired
 private RedissonClient redissonClient;
 
 @Autowired
 private ConsumerService consumerService;
 
 /**
 * TOPIC消費線程
 */
 @PostConstruct
 public void startTopicConsumer() {
 TaskManager.doTask(this::runTopicThreads, "開啟TOPIC消費線程");
 }
 
 /**
 * 開啟TOPIC消費線程
 * 將所有可能出現的異常全部catch住,確保While(true)能夠不中斷
 */
 @SuppressWarnings("InfiniteLoopStatement")
 private void runTopicThreads() {
 while (true) {
 RLock lock = null;
 try {
 lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
 } catch (Exception e) {
 log.error("runTopicThreads getLock error", e);
 }
 try {
 if (lock == null) {
 continue;
 }
 // 分散式鎖時間比Blpop阻塞時間多1S,避免出現釋放鎖的時候,鎖已經超時釋放,unlock報錯
 boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
 if (!lockFlag) {
 continue;
 }
 
 // 1. 獲取ReadyQueue中待消費的數據
 RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
 String topicId = queue.poll(60, TimeUnit.SECONDS);
 if (StringUtils.isEmpty(topicId)) {
 continue;
 }
 
 // 2. 獲取job元信息內容
 RMap<String, Job> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
 Job job = jobPoolMap.get(topicId);
 
 // 3. 消費
 FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消費JobId-->" + job.getJobId());
 if (taskResult.get()) {
 // 3.1 消費成功,刪除JobPool和DelayBucket的job信息
 jobPoolMap.remove(topicId);
 } else {
 int retrySum = job.getRetry() + 1;
 // 3.2 消費失敗,則根據策略重新加入Bucket
 
 // 如果重試次數大於5,則將jobPool中的數據刪除,持久化到DB
 if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
 jobPoolMap.remove(topicId);
 continue;
 }
 job.setRetry(retrySum);
 long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
 log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
 RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
 delayBucket.add(nextTime, topicId);
 // 3.3 更新元信息失敗次數
 jobPoolMap.put(topicId, job);
 }
 } catch (Exception e) {
 log.error("runTopicThreads error", e);
 } finally {
 if (lock != null) {
 try {
 lock.unlock();
 } catch (Exception e) {
 log.error("runTopicThreads unlock error", e);
 }
 }
 }
 }
 }
}

4.5 添加及刪除JOB

/**
 * 提供給外部服務的操作介面
 *
 * @author why
 * @date 2020年1月15日
 */
@Slf4j
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
 
 @Autowired
 private RedissonClient redissonClient;
 
 
 /**
 * 添加job元信息
 *
 * @param job 元信息
 */
 @Override
 public void addJob(Job job) {
 
 RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
 try {
 boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
 if (!lockFlag) {
 throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
 }
 String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
 
 // 1. 將job添加到 JobPool中
 RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
 if (jobPool.get(topicId) != null) {
 throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
 }
 
 jobPool.put(topicId, job);
 
 // 2. 將job添加到 DelayBucket中
 RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
 delayBucket.add(job.getDelay(), topicId);
 } catch (InterruptedException e) {
 log.error("addJob error", e);
 } finally {
 if (lock != null) {
 lock.unlock();
 }
 }
 }
 
 
 /**
 * 刪除job信息
 *
 * @param job 元信息
 */
 @Override
 public void deleteJob(JobDie jobDie) {
 
 RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
 try {
 boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
 if (!lockFlag) {
 throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
 }
 String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
 
 RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
 jobPool.remove(topicId);
 
 RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
 delayBucket.remove(topicId);
 } catch (InterruptedException e) {
 log.error("addJob error", e);
 } finally {
 if (lock != null) {
 lock.unlock();
 }
 }
 }
}

5. 待優化的內容

  1. 目前只有一個Queue隊列存放消息,當需要消費的消息大量堆積後,會影響消息通知的時效。改進的辦法是,開啟多個Queue,進行消息路由,再開啟多個消費線程進行消費,提供吞吐量
  2. 消息沒有進行持久化,存在風險,後續會將消息持久化到MangoDB中

6. 源碼

更多詳細源碼請在下麵地址中獲取

7. 參考

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、 包裝類的使用 java提供了8種基本數據類型對應的包裝類,使得基本數據類型的變數具有類的特征 需要掌握的:基本數據類型、包裝類、String三者之間的相互轉換 基本數據類型 《 》包裝類:自動裝箱,自動拆箱 基本數據類型、包裝類 >String類型:調用String重載的valueOf(Xxx ...
  • Django Template層之Template概述 by:授客 QQ:1033553122 實踐環境 Python版本:python-3.4.0.amd64 下載地址:https://www.python.org/downloads/release/python-340/ Win7 64位 Dj ...
  • 一、java.lang.Object類 1.Object類是所有Java類的根父類 2.如果在類的聲明中未使用extends關鍵字指明其父類,則預設父類為java.lang.Object類 3.Object類中的功能(屬性、方法)就具有通用性。 屬性:無 方法:equals() / toString ...
  • 一、Django自帶的用戶認證-auth模塊 1.auth模塊簡介 網站開發過程中,我們需要設計實現網站的用戶系統。此時我們需要實現包括用戶註冊、用戶登錄、用戶認證、註銷、修改密碼等功能。Flask框架中我們需要手動的創建User模型,然後逐步實現驗證方法,但Django框架內置了強大的用戶認證系統 ...
  • 前言 本文的文字及圖片來源於網路,僅供學習、交流使用,不具有任何商業用途,版權歸原作者所有,如有問題請及時聯繫我們以作處理。 from math import pi import matplotlib.pyplot as plt cat = ['Speed', 'Reliability', 'Com ...
  • 最簡單的方法是用vc6新建一個Win32 Application空工程,然後添加一個cpp文件,輸入 (註意添加對話框資源,並且在對話框上添加一個文本框) #include #include "resource.h" // DialogProc, 枚舉視窗對話框過程. int CALLBACK Di ...
  • #JDK 配置環境無效的兩種情況 第 ① 種:輸入java -version,顯示:**'java' 不是內部或外部命令,也不是可運行的程式或批處理文件。**這個問題一般出現在電腦第一次配置環境的時候。 第 ② 種:輸入java -version,命令可以正常使用,但是顯示的版本與Path中配置的版 ...
  • from typing import List# 八皇後問題,用遞歸的方法來寫。class Solution: def solveNQueens(self, n: int) -> List[List[str]]: # 如果n < 1直接返回空列表 if n < 1:return [] # 定義變數用 ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...