spring boot / cloud (十九) 併發消費消息,如何保證入庫的數據是最新的?

来源:http://www.cnblogs.com/itkk/archive/2017/10/18/7684951.html
-Advertisement-
Play Games

spring boot / cloud (十九) 併發消費消息,如何保證入庫的數據是最新的? 消息中間件在解決非同步處理,模塊間解耦和,和高流量場景的削峰,等情況下有著很廣泛的應用 . 本文將跟大家一起討論以下其中的異常場景,如題. 場景 在實際工作中,大家可能也都遇到過這樣的需求 : 如 : 系統A ...


spring boot / cloud (十九) 併發消費消息,如何保證入庫的數據是最新的?

消息中間件在解決非同步處理,模塊間解耦和,和高流量場景的削峰,等情況下有著很廣泛的應用 .

本文將跟大家一起討論以下其中的異常場景,如題.

場景

在實際工作中,大家可能也都遇到過這樣的需求 :

如 : 系統A中的某些重要的數據,想在每次數據變更的時候,將當前最新的數據備份下來,當然,這個備份的動作不能影響當前數據變更的進程.

也更不期望因為備份的操作,影響當前進程的性能.

分析

這是一個比較常見的,可以非同步處理的需求,業務數據變更 和 數據備份 之間並沒有強一致性的要求,大致的架構如下:

併發消費消息

producer作為消息產生者,會通過指定的交換機(exchange)和路由鍵(routingkey),將消息傳輸到指定的隊列(queue)中,通常producer也會有多個節點

consume作為消息的消費者,會依次從隊列(queue)中拿到消息,進行業務處理,最終將數據寫入資料庫中,並且為了更快速的消費消息,consume通常會部署多個節點,並且每個節點中也會有多個線程同時消費消息

queue作為消息隊列,保證了消息被消費的時序性,以及唯一性(一條消息只能被消費一次)

dlxQueue作為死信隊列,當queue中的的消息無法被正常消費時,當重處理N次後,將會被放入死信隊列,並有專門的consume來消費和處理,比如:通知相關人員進行人工干預,等.

問題

producer會源源不斷的產生消息,有新的數據,也有更新老的數據,

而consume則是拿到消息,做insert或者update的操作.

但是由於consume是多線程併發消費消息的,那麼就會出現當前線程拿到的消息並非最新版本的消息,如果這個時候進行了update操作的話,很有可能會覆蓋掉已經是最新版本的數據了

如 : 當前資料庫里的數據為1,業務操作先是將1改為了2,然後馬上的又將2改為了3,這兩個操作時間非常接近,幾乎是同時,然後產生的消息也幾乎同時的進入了消息中間件,

但是在queue里依然有先後,2在前3在後(queue機制保證),那麼這個時候,consume來消費了,由於consume是多線程的,所以,2和3會被分配到兩條線程中同時被處理

這時,如果2的這條線程先結束,3的這條線程後結束,那麼則數據正常,最終數據被更新成3

但是,如果3的這條線程先結束了,2的這條線程是後結束的,那麼,最新的數據就會被老數據覆蓋掉

這種情況顯然是不滿足需求記錄當前最新的數據的,

並且這種情況很容易發生,雖然queue里保證了消息的先後,以及唯一性,但是消息被consume線上程中消費確實同時處理的

臟讀的問題

通常以上這種情況,網路上的一些解決方案,都是在數據中加入版本(version)的概念來解決,本文也是(上文提及的1,2,3,其實就是版本的概念).

通常網路上的描述是,在update的時候,根據資料庫中的最新版本號,如果當前消息的版本號小於資料庫最新的版本號,則放棄更新,大於,則更新

這一段邏輯很簡單,但是也很容易產生誤解,最大的問題在於獲得最新版本號,在多線程環境下,資料庫的臟讀也是蠻嚴重的,臟讀的存在,導致你查詢出來的數據並非是最新的數據

如 : 上面的一個場景,資料庫中最新的版本號是1,有版本號2和3兩個消息是即將要消費的,按照上面的邏輯,處理程式應該先查資料庫,拿到當前最新的版本.

這個時候,兩條線程查詢到的結果有可能都是1,這時2>1,並且3>1,兩條線程依然都會執行,同樣的 :

如果2的這條線程先結束,3的這條線程後結束,那麼則數據正常,最終數據被更新成3

如果3的這條線程先結束了,2的這條線程是後結束的,那麼,最新的數據就會被老數據覆蓋掉

同樣達到想要的效果

如何保證入庫的數據是最新的?

其實要實現很簡單,首先,要知道,對於同一行數據,sql的執行也是有先後順序的,其實到底更新為2的sql語句先執行,還是更新為3的sql語句先執行,並不重要

重要的是,將判斷版本號的語句放入更新條件中進行判斷.

例子 : 同樣是上面的場景,資料庫中的版本為1,這時2和3同時更新,誰先結束,誰也不知道,也無法控制(其實有辦法,但是損失性能,當前場景需要的是高效)

但是我們可以在條件中加入"version < 2"這樣的條件

SQL語句樣例 :

UPDATE TEST_MSG
SET
    VERSION          = #{version},
    DATA             = #{data},
    LAST_UPDATE_DATE = #{lastUpdatedDate}
WHERE BUSINESS_KEY = #{businessKey} AND VERSION  <  #{version}

這樣的話,無論那條線程先結束,都不會影響最終的結果

如果,2先結束,3線程的條件為2 < 3,條件成立,數據將會被更新為3

如果,3先結束,2線程的條件為3 < 2,條件不成立,數據則不會更新(由於是在sql執行過程中判斷,所以這裡不存在臟讀的情況)

這樣就能滿足記錄當前最新的數據的需求了

實現 (springboot使用rabbitmq的例子)

spring.rabbitmq.host=itkk.org
spring.rabbitmq.port=5672
spring.rabbitmq.username=dev_udf-sample
spring.rabbitmq.password=1qazxsw2
spring.rabbitmq.virtual-host=/dev_udf-sample
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10

以上配置文件配置了rabbitmq的連接,也指定了消費者監聽器的併發數量(5)和最大併發數量(10),並且開啟了重試,重試失敗的消息會被流轉到死信隊裡裡

@Configuration
public class UdfServiceADemoConfig {

    public static final String EXCHANGE_ADEMO_TEST1 = "exchange.ademo.test1";

    public static final String QUEUE_ADEMO_TEST1_CONSUME1 = "queue.ademo.test1.consume1";

    public static final String ROUTINGKEY_ADEMO_TEST1_TESTMSG = "routingkey.ademo.test1.testmsg";

    @Bean
    public DirectExchange exchangeAdemoTest1() {
        return new DirectExchange(EXCHANGE_ADEMO_TEST1, true, true);
    }

    @Bean
    public Queue queueAdemoTest1Consume1() {
        return new Queue(QUEUE_ADEMO_TEST1_CONSUME1, true, false, true);
    }

    @Bean
    public Binding queueAdemoTest1Consume1Binding() {
        return new Binding(QUEUE_ADEMO_TEST1_CONSUME1, 
        Binding.DestinationType.QUEUE, EXCHANGE_ADEMO_TEST1, 
        ROUTINGKEY_ADEMO_TEST1_TESTMSG, null);
    }
}  

exchangeAdemoTest1方法定義了一個交換機,並且是自動刪除的

queueAdemoTest1Consume1定義了一個消費者隊列,也是自動刪除的

queueAdemoTest1Consume1Binding將上面定義的交換機和消費者綁定起來,並設定了路由鍵(routingkey)

public class TestMsg implements Serializable {
    /**
     * 描述 : id
     */
    private static final long serialVersionUID = 1L;

    /**
     * msgId
     */
    private String msgId = UUID.randomUUID().toString();

    /**
     * businessKey
     */
    private String businessKey;

    /**
     * version
     */
    private long version;

    /**
     * data
     */
    private String data;

    /**
     * lastUpdatedDate
     */
    private Date lastUpdatedDate;
}

以上定義了消息的格式,主要的欄位就是businessKey和version,分別用來確定唯一的業務數據和版本的判斷


    @Autowired
    private AmqpTemplate amqpTemplate;

    @Scheduled(fixedRate = SCHEDULED_FIXEDRATE)
    public void send1() {
        this.send();
    }
    
    public void send2() {
        .....
    }

    /**
     * send
     */
    private void send() {
        final int numA = 1000;
        int a = (int) (Math.random() * numA);
        long b = (long) (Math.random() * numA);
        TestMsg testMsg = new TestMsg();
        testMsg.setBusinessKey(Integer.toString(a));
        testMsg.setVersion(b);
        testMsg.setData(UUID.randomUUID().toString());
        testMsg.setLastUpdatedDate(new Date());
        amqpTemplate.convertAndSend(UdfServiceADemoConfig.EXCHANGE_ADEMO_TEST1, 
        UdfServiceADemoConfig.ROUTINGKEY_ADEMO_TEST1_TESTMSG, testMsg);
    }

以上定義了用於做測試的消息發送方,使用計劃任務,定期的向交換機中寫入數據,可以定義多個計劃任務,增加同一時間消息產生的數量


    @RabbitListener(queues = UdfServiceADemoConfig.QUEUE_ADEMO_TEST1_CONSUME1)
    public void consume1(TestMsg testMsg) {
        if (testMsgRespository.count(testMsg.getBusinessKey()) > 0) {
            int row = testMsgRespository.update(testMsg);
            log.info("update row = {}", row);
        } else {
            try {
                int row = testMsgRespository.insert(testMsg);
                log.info("insert row = {}", row);
            } catch (Exception e) {
                //進行異常判斷,確定是主鍵衝突錯誤
                int row = testMsgRespository.update(testMsg);
                log.info("update row = {}", row);
            }
        }
        try {
            final long time = 5L;
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    

以上定義了消息消費的方法,此方法是多線程執行的,大概邏輯是,先count資料庫,判斷數據是否存在,如果存在,則update數據,如果不存在,則insert數據

但是count也有可能存在臟讀的情況,所以insert操作有可能會因為主鍵重覆而失敗,這時,會捕獲到異常,通過異常判斷,確定是主鍵衝突錯誤後(樣例代碼中省略了),在進行update操作

這裡的update操作,則是上文提到的採用"version < #{updateVersion}"的方法進行更新,保證了將最新的數據更新到資料庫中

最後的線程休眠,是為了模擬處理時間,以便造成更多的併發情況


    int count(@Param("businessKey") String businessKey);


    int insert(TestMsg testMsg);


    int update(TestMsg testMsg);

    <select id="count" resultType="int">
        SELECT COUNT(*)
        FROM TEST_MSG
        WHERE BUSINESS_KEY = #{businessKey}
    </select>

    <insert id="insert">
        INSERT INTO TEST_MSG
        (
            BUSINESS_KEY,
            VERSION,
            DATA,
            LAST_UPDATE_DATE
        )
        VALUES
            (
                #{businessKey},
                #{version},
                #{data},
                #{lastUpdatedDate}
            )
    </insert>

    <update id="update">
        UPDATE TEST_MSG
        SET
            VERSION          = #{version},
            DATA             = #{data},
            LAST_UPDATE_DATE = #{lastUpdatedDate}
        WHERE BUSINESS_KEY = #{businessKey} AND VERSION <![CDATA[ < ]]> #{version}
    </update>
    

以上為相關的sqlmap定義以及mapper介面的定義


CREATE TABLE TEST_MSG
(
  BUSINESS_KEY     VARCHAR(100) NOT NULL
    PRIMARY KEY,
  VERSION          BIGINT       NOT NULL,
  DATA             VARCHAR(100) NOT NULL,
  LAST_UPDATE_DATE DATETIME     NOT NULL
)
  COMMENT 'TEST_MSG';
    

以上為表結構的定義

結束

這樣,啟動應用,觀察資料庫表更新情況,會發現,數據的版本只會有增長的,不會存在降低的

那麼,我們這實現了本文中開頭提到的需求了.

並且也瞭解了在springboot中如何使用rabbitmq發送和消費消息了.

關於本文內容 , 歡迎大家的意見跟建議

代碼倉庫 (博客配套代碼)


想獲得最快更新,請關註公眾號

想獲得最快更新,請關註公眾號


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 之前的文章中介紹了My Blog文章維護功能的開發,開發過程中使用Area的方法建立了用於維護文章的Controller、View和Model。但是無論代碼怎麼變對於瀏覽器來說都是通過一個url地址去訪問,現在My Blog可用的url有以下幾個: http://localhost:52356/ - ...
  • 本文以近乎v5.2產品為例截圖介紹,近乎產品是基於asp.net mvc 5.0框架。大家可以下一個近乎源碼版,來進一步熟悉和瞭解學習。 近乎下載地址:http://www.jinhusns.com/ MVC項目一啟動會首先進入到Global執行Application_Start()這個方法註冊 區 ...
  • 昨日去筆試了,遇到了一道編程題,因為經常坐在電腦前面,使用開發工具已經習慣了“alt+/”,導致了平時有些方法不是很註意看,於是在整理了思路之後,寫方法的時候,完蛋了,卡殼了....導致那道題做得不是盡如人意....現在筆者將之整理出來,希望加深自己的印象的同時也能夠對讀者有所幫助。 題目:從鍵盤中 ...
  • int #數值(整數) str #字元串(文字) float #浮點(小數點) list #列表 print() #列印\輸出 len() #長度 max() #最大值 min() #最小值 del() #刪除元素 list.append(obj) #此語法中list代表列表,obj代表需要添加到l ...
  • PS:再次說明一下,原本不想寫的太啰嗦的,可之前那個系列發佈後發現,好多朋友都想馬上拿到代碼立即能上手開發自己的項目,對代碼結構、基礎常識、分類目錄與文件功能結構、常用函數......等等什麼都不懂,然後就想使用,我真的很無語,還有一些朋友有十幾年開發經驗也會問一些很基礎的問題,我都不知道怎麼回答了 ...
  • 一、簡介 阿裡巴巴於10月14日在杭州雲棲大會上,正式發佈了《阿裡巴巴Java開發規約》掃描插件!該插件基於《阿裡巴巴Java開發規約》手冊內容,在掃描代碼後,將不符合規約的代碼按Blocker/Critical/Major三個等級顯示在下方,甚至在IDEA上,還基於Inspection機制提供了實 ...
  • 簡述 寫這個工具主要目的在於減少工作量,bear在寫gitbook的時候,發現對應目錄一個一個寫進去,非常繁瑣,而且最近在學習python,所以,手癢之下寫了一個目錄生成的小工具。 工具使用 本身工具並不複雜,主要實現功能接受一個github中的raw版本的url,然後列印自動生成對應文件的mark ...
  • 本節內容 - 使用nm查看符號 - 使用readelf -s輸出符號信息 - 刪除符號表對反彙編的影響 - 使用strip刪除符號和調試信息 - 使用UPX壓縮並保護可執行文件 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...