spring boot / cloud (十九) 併發消費消息,如何保證入庫的數據是最新的? 消息中間件在解決非同步處理,模塊間解耦和,和高流量場景的削峰,等情況下有著很廣泛的應用 . 本文將跟大家一起討論以下其中的異常場景,如題. 場景 在實際工作中,大家可能也都遇到過這樣的需求 : 如 : 系統A ...
spring boot / cloud (十九) 併發消費消息,如何保證入庫的數據是最新的?
消息中間件在解決非同步處理,模塊間解耦和,和高流量場景的削峰,等情況下有著很廣泛的應用 .
本文將跟大家一起討論以下其中的異常場景,如題.
場景
在實際工作中,大家可能也都遇到過這樣的需求 :
如 : 系統A中的某些重要的數據,想在每次數據變更的時候,將當前最新的數據備份下來,當然,這個備份的動作不能影響當前數據變更的進程.
也更不期望因為備份的操作,影響當前進程的性能.
分析
這是一個比較常見的,可以非同步處理的需求,業務數據變更 和 數據備份 之間並沒有強一致性的要求,大致的架構如下:
producer作為消息產生者,會通過指定的交換機(exchange)和路由鍵(routingkey),將消息傳輸到指定的隊列(queue)中,通常producer也會有多個節點
consume作為消息的消費者,會依次從隊列(queue)中拿到消息,進行業務處理,最終將數據寫入資料庫中,並且為了更快速的消費消息,consume通常會部署多個節點,並且每個節點中也會有多個線程同時消費消息
queue作為消息隊列,保證了消息被消費的時序性,以及唯一性(一條消息只能被消費一次)
dlxQueue作為死信隊列,當queue中的的消息無法被正常消費時,當重處理N次後,將會被放入死信隊列,並有專門的consume來消費和處理,比如:通知相關人員進行人工干預,等.
問題
producer會源源不斷的產生消息,有新的數據,也有更新老的數據,
而consume則是拿到消息,做insert或者update的操作.
但是由於consume是多線程併發消費消息的,那麼就會出現當前線程拿到的消息並非最新版本的消息,如果這個時候進行了update操作的話,很有可能會覆蓋掉已經是最新版本的數據了
如 : 當前資料庫里的數據為1,業務操作先是將1改為了2,然後馬上的又將2改為了3,這兩個操作時間非常接近,幾乎是同時,然後產生的消息也幾乎同時的進入了消息中間件,
但是在queue里依然有先後,2在前3在後(queue機制保證),那麼這個時候,consume來消費了,由於consume是多線程的,所以,2和3會被分配到兩條線程中同時被處理
這時,如果2的這條線程先結束,3的這條線程後結束,那麼則數據正常,最終數據被更新成3
但是,如果3的這條線程先結束了,2的這條線程是後結束的,那麼,最新的數據就會被老數據覆蓋掉
這種情況顯然是不滿足需求記錄當前最新的數據的,
並且這種情況很容易發生,雖然queue里保證了消息的先後,以及唯一性,但是消息被consume線上程中消費確實同時處理的
臟讀的問題
通常以上這種情況,網路上的一些解決方案,都是在數據中加入版本(version)的概念來解決,本文也是(上文提及的1,2,3,其實就是版本的概念).
通常網路上的描述是,在update的時候,根據資料庫中的最新版本號,如果當前消息的版本號小於資料庫最新的版本號,則放棄更新,大於,則更新
這一段邏輯很簡單,但是也很容易產生誤解,最大的問題在於獲得最新版本號,在多線程環境下,資料庫的臟讀也是蠻嚴重的,臟讀的存在,導致你查詢出來的數據並非是最新的數據
如 : 上面的一個場景,資料庫中最新的版本號是1,有版本號2和3兩個消息是即將要消費的,按照上面的邏輯,處理程式應該先查資料庫,拿到當前最新的版本.
這個時候,兩條線程查詢到的結果有可能都是1,這時2>1,並且3>1,兩條線程依然都會執行,同樣的 :
如果2的這條線程先結束,3的這條線程後結束,那麼則數據正常,最終數據被更新成3
如果3的這條線程先結束了,2的這條線程是後結束的,那麼,最新的數據就會被老數據覆蓋掉
同樣達到想要的效果
如何保證入庫的數據是最新的?
其實要實現很簡單,首先,要知道,對於同一行數據,sql的執行也是有先後順序的,其實到底更新為2的sql語句先執行,還是更新為3的sql語句先執行,並不重要
重要的是,將判斷版本號的語句放入更新條件中進行判斷.
例子 : 同樣是上面的場景,資料庫中的版本為1,這時2和3同時更新,誰先結束,誰也不知道,也無法控制(其實有辦法,但是損失性能,當前場景需要的是高效)
但是我們可以在條件中加入"version < 2"這樣的條件
SQL語句樣例 :
UPDATE TEST_MSG
SET
VERSION = #{version},
DATA = #{data},
LAST_UPDATE_DATE = #{lastUpdatedDate}
WHERE BUSINESS_KEY = #{businessKey} AND VERSION < #{version}
這樣的話,無論那條線程先結束,都不會影響最終的結果
如果,2先結束,3線程的條件為2 < 3,條件成立,數據將會被更新為3
如果,3先結束,2線程的條件為3 < 2,條件不成立,數據則不會更新(由於是在sql執行過程中判斷,所以這裡不存在臟讀的情況)
這樣就能滿足記錄當前最新的數據的需求了
實現 (springboot使用rabbitmq的例子)
spring.rabbitmq.host=itkk.org
spring.rabbitmq.port=5672
spring.rabbitmq.username=dev_udf-sample
spring.rabbitmq.password=1qazxsw2
spring.rabbitmq.virtual-host=/dev_udf-sample
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
以上配置文件配置了rabbitmq的連接,也指定了消費者監聽器的併發數量(5)和最大併發數量(10),並且開啟了重試,重試失敗的消息會被流轉到死信隊裡裡
@Configuration
public class UdfServiceADemoConfig {
public static final String EXCHANGE_ADEMO_TEST1 = "exchange.ademo.test1";
public static final String QUEUE_ADEMO_TEST1_CONSUME1 = "queue.ademo.test1.consume1";
public static final String ROUTINGKEY_ADEMO_TEST1_TESTMSG = "routingkey.ademo.test1.testmsg";
@Bean
public DirectExchange exchangeAdemoTest1() {
return new DirectExchange(EXCHANGE_ADEMO_TEST1, true, true);
}
@Bean
public Queue queueAdemoTest1Consume1() {
return new Queue(QUEUE_ADEMO_TEST1_CONSUME1, true, false, true);
}
@Bean
public Binding queueAdemoTest1Consume1Binding() {
return new Binding(QUEUE_ADEMO_TEST1_CONSUME1,
Binding.DestinationType.QUEUE, EXCHANGE_ADEMO_TEST1,
ROUTINGKEY_ADEMO_TEST1_TESTMSG, null);
}
}
exchangeAdemoTest1方法定義了一個交換機,並且是自動刪除的
queueAdemoTest1Consume1定義了一個消費者隊列,也是自動刪除的
queueAdemoTest1Consume1Binding將上面定義的交換機和消費者綁定起來,並設定了路由鍵(routingkey)
public class TestMsg implements Serializable {
/**
* 描述 : id
*/
private static final long serialVersionUID = 1L;
/**
* msgId
*/
private String msgId = UUID.randomUUID().toString();
/**
* businessKey
*/
private String businessKey;
/**
* version
*/
private long version;
/**
* data
*/
private String data;
/**
* lastUpdatedDate
*/
private Date lastUpdatedDate;
}
以上定義了消息的格式,主要的欄位就是businessKey和version,分別用來確定唯一的業務數據和版本的判斷
@Autowired
private AmqpTemplate amqpTemplate;
@Scheduled(fixedRate = SCHEDULED_FIXEDRATE)
public void send1() {
this.send();
}
public void send2() {
.....
}
/**
* send
*/
private void send() {
final int numA = 1000;
int a = (int) (Math.random() * numA);
long b = (long) (Math.random() * numA);
TestMsg testMsg = new TestMsg();
testMsg.setBusinessKey(Integer.toString(a));
testMsg.setVersion(b);
testMsg.setData(UUID.randomUUID().toString());
testMsg.setLastUpdatedDate(new Date());
amqpTemplate.convertAndSend(UdfServiceADemoConfig.EXCHANGE_ADEMO_TEST1,
UdfServiceADemoConfig.ROUTINGKEY_ADEMO_TEST1_TESTMSG, testMsg);
}
以上定義了用於做測試的消息發送方,使用計劃任務,定期的向交換機中寫入數據,可以定義多個計劃任務,增加同一時間消息產生的數量
@RabbitListener(queues = UdfServiceADemoConfig.QUEUE_ADEMO_TEST1_CONSUME1)
public void consume1(TestMsg testMsg) {
if (testMsgRespository.count(testMsg.getBusinessKey()) > 0) {
int row = testMsgRespository.update(testMsg);
log.info("update row = {}", row);
} else {
try {
int row = testMsgRespository.insert(testMsg);
log.info("insert row = {}", row);
} catch (Exception e) {
//進行異常判斷,確定是主鍵衝突錯誤
int row = testMsgRespository.update(testMsg);
log.info("update row = {}", row);
}
}
try {
final long time = 5L;
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
以上定義了消息消費的方法,此方法是多線程執行的,大概邏輯是,先count資料庫,判斷數據是否存在,如果存在,則update數據,如果不存在,則insert數據
但是count也有可能存在臟讀的情況,所以insert操作有可能會因為主鍵重覆而失敗,這時,會捕獲到異常,通過異常判斷,確定是主鍵衝突錯誤後(樣例代碼中省略了),在進行update操作
這裡的update操作,則是上文提到的採用"version < #{updateVersion}"的方法進行更新,保證了將最新的數據更新到資料庫中
最後的線程休眠,是為了模擬處理時間,以便造成更多的併發情況
int count(@Param("businessKey") String businessKey);
int insert(TestMsg testMsg);
int update(TestMsg testMsg);
<select id="count" resultType="int">
SELECT COUNT(*)
FROM TEST_MSG
WHERE BUSINESS_KEY = #{businessKey}
</select>
<insert id="insert">
INSERT INTO TEST_MSG
(
BUSINESS_KEY,
VERSION,
DATA,
LAST_UPDATE_DATE
)
VALUES
(
#{businessKey},
#{version},
#{data},
#{lastUpdatedDate}
)
</insert>
<update id="update">
UPDATE TEST_MSG
SET
VERSION = #{version},
DATA = #{data},
LAST_UPDATE_DATE = #{lastUpdatedDate}
WHERE BUSINESS_KEY = #{businessKey} AND VERSION <![CDATA[ < ]]> #{version}
</update>
以上為相關的sqlmap定義以及mapper介面的定義
CREATE TABLE TEST_MSG
(
BUSINESS_KEY VARCHAR(100) NOT NULL
PRIMARY KEY,
VERSION BIGINT NOT NULL,
DATA VARCHAR(100) NOT NULL,
LAST_UPDATE_DATE DATETIME NOT NULL
)
COMMENT 'TEST_MSG';
以上為表結構的定義
結束
這樣,啟動應用,觀察資料庫表更新情況,會發現,數據的版本只會有增長的,不會存在降低的
那麼,我們這實現了本文中開頭提到的需求了.
並且也瞭解了在springboot中如何使用rabbitmq發送和消費消息了.
關於本文內容 , 歡迎大家的意見跟建議
代碼倉庫 (博客配套代碼)
想獲得最快更新,請關註公眾號