消費端如何保證消息隊列MQ的有序消費

来源:https://www.cnblogs.com/yulinfeng/archive/2019/07/30/11254925.html
-Advertisement-
Play Games

消息無序產生的原因 消息隊列,既然是隊列就能保證消息在進入隊列,以及出隊列的時候保證消息的有序性,顯然這是在消息的生產端(Producer),但是往往在生產環境中有多個消息的消費端(Consumer),儘管消費端在拉取消息時是有序的,但各個消息由於網路等方面原因無法保證在各個消費端中處理時有序。 場 ...


消息無序產生的原因

消息隊列,既然是隊列就能保證消息在進入隊列,以及出隊列的時候保證消息的有序性,顯然這是在消息的生產端(Producer),但是往往在生產環境中有多個消息的消費端(Consumer),儘管消費端在拉取消息時是有序的,但各個消息由於網路等方面原因無法保證在各個消費端中處理時有序。

場景分析

先後兩次修改了商品信息,消息A和消息B先後同步寫入MySQL,接著非同步寫入消息隊列中發送消息,此時消息隊列生產端(Producer)按時序先後發出了A和B兩條消息(消息A先發出,消息B後發出)。按業務邏輯,商品信息的最終狀態需要以消息A和消息B綜合為準。

看似一個比較常見的同步寫資料庫,非同步發送消息的場景,但實際上需要保證消息的有序消費。

  • 假設1:消息A只包含修改的商品名稱,消息B只包含修改的商品重量,此時消息隊列的消費端實際上不需要關註消息時序,消息隊列消費端(Consumer)只管消費即可。
  • 假設2:消息A包含修改的商品名稱、重量,消息B包含修改的商品名稱,此時消費端首先接收到消息B,後接收到消息A,那麼消息B的修改就會被覆蓋。此時消息隊列的消費端實際上又需要關註消息時序

可見,你無法保證消息中包含什麼信息,此時必須保證消息的有序消費。

業務角度如何保證消息有序消費

  • 生產端在發送消息時,始終保證消息是全量信息。
  • 消費端在接收消息時,通過緩存時間戳的方式,消費消息時判斷消息產生的時間是否最新,如果不是則丟棄,如果是則執行下一步。

下麵通過偽代碼的方式描述:

生產端偽代碼

insertWare(ware); #插入數據到資料庫,通常在插入資料庫時我們只會update修改的欄位,而不會全量插入

ware = selectWareById(ware.getId); #獲取商品的全量信息(此時是最新的),用於將它放入到消息隊列中

syncMq(ware); #非同步發送mq消息A

消費端偽代碼

ware = fetchWare(); #獲取消息

if (isLasted(ware)) #通過商品的修改時間戳判斷是否是最新的修改

​ TODO #執行下一步業務邏輯

else

​ return #丟棄該消息

重點在於消費端如何判斷該消息是否是最新的修改也就是isLasted方法。

isLasted方法

Long modified = getCacheById(ware.getId); #獲取緩存中該條商品的最新修改時間

If (ware.getModified > modified) { #如果消息中商品修改時間大於緩存中的時間,說明是最新操作

​ setCacheById(ware); #將該條消息的商品修改時間戳寫入到緩存中

​ return true;
} else #如果消息中的商品修改時間小於緩存中的時間,說明該條消息屬於“歷史操作”,不對其更新

​ return false;

以上就是通過偽代碼的方式,描述如何通過業務手段保證消息有序消費,重點在於全量發送信息和緩存時間戳。在其中還有一些技術實現細節。

例如:消費端消費消息B,執行到獲取時間戳緩存之後,併在重新設置新的緩存之前,此時另一個消費端恰好也正在消費B它也正執行到獲取時間戳緩存,由於消息A此時並沒有更新緩存,消息A拿到的緩存仍然是舊的緩存,這時就會存在兩個消費端都認為自己所消費的消息時最新的,造成該丟棄的消息沒丟。

顯然,這是分散式線程安全問題,分散式鎖通常使用Redis或者ZooKeeper,加鎖後的執行時序如下圖所示。

這是從業務角度保證消息在消費端有序消費。通過在消息發送端全量發送消息以及在消息消費端緩存時間戳就可以保證消息的有序消費。

在上述場景中是先同步寫入MySQL,再獲取商品全量數據,接著再非同步發送消息。這一系列的步驟可以通過接MySQL的binlog實現,在同步寫入MySQL後,MySQL發送binlog變更,通過阿裡巴巴Canal中間件接收MySQL的binlog變更再發送消息到消息隊列。

這是一個能給程式員加buff的公眾號 (CoderBuff)
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • upitem() { console.log("1 :", 1); let user = this.username; let pwd = this.password; if (user == "" || pwd == "") { this.$toast("賬號或密碼不能為空!!"); } else... ...
  • 費話不多說,直接上問題: 1.開始時,頁面只有兩個DIV的嵌套(見圖) 運行結果是: 現在看運行的是正常的,但是當我設置讓 class="box2" 的DIV浮動時 運行結果是這樣的: 圖中可以看出,box1中已經沒有了任何高度,這是由於box2設置了浮動屬性,box2脫離了文檔流(也可以理解為bo ...
  • 這道題,是集數組,for迴圈和if語句一體的一道水題,首先用數組及for迴圈輸入10個蘋果的高度,然後………… ...
  • 架構雜談《八》 Docker 架構 一、Docker 引擎的三大組件 1)Docker 後臺服務(Docker Daemon):是長時間運行在後臺的守護進程,是Docker的核心服務,可以通過命令dockerd與它進行交互通信。 2)REST 介面(REST API):程式可以通過REST的介面來訪 ...
  • 一些 Spring Boot 小技巧、小知識點 初始化數據 我們在做測試的時候經常需要初始化導入一些數據,如何來處理呢?會有兩種選擇,一種是使用 Jpa,另外一種是 Spring JDBC 。兩種方式各有區別下麵來詳細介紹。 使用 Jpa 在使用 spring boot jpa的情況下設置 spri ...
  • 前言 ThreadLocal 是一種 無同步 的線程安全實現 體現了 Thread-Specific Storage 模式:即使只有一個入口,內部也會為每個線程分配特有的存儲空間,線程間 沒有共用資源 本文將總結 ThreadLocal 的用法與實現細節,希望能幫上忙 ThreadLocal 思維導 ...
  • 什麼是 SpringBoot Admin? Spring Boot Admin 是一個管理和監控你的 Spring Boot 應用程式的應用程式。這些應用程式通過 Spring Boot Admin Client(通過 HTTP)註冊或者使用 Spring Cloud(例如 Eureka)發現。UI ...
  • 一、log 1.推薦網站:https://www.cnblogs.com/yyds/p/6901864.html 該網站為日誌處理logging模塊簡介 2.logging模塊提供模塊級別的函數記錄日誌,包含四大組件。 3.日誌的級別 (1)不同的用戶關註不同的程式信息 (2)分級:(級別從小到大, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...