事務一致性 首先,我們來回顧一下ACID原則: Atomicity:原子性,改變數據狀態要麼是一起完成,要麼一起失敗 Consistency:一致性,數據的狀態是完整一致的 Isolation:隔離線,即使有併發事務,互相之間也不影響 Durability:持久性, 一旦事務提交,不可撤銷 在單體應 ...
事務一致性
首先,我們來回顧一下ACID原則:
- Atomicity:原子性,改變數據狀態要麼是一起完成,要麼一起失敗
- Consistency:一致性,數據的狀態是完整一致的
- Isolation:隔離線,即使有併發事務,互相之間也不影響
- Durability:持久性, 一旦事務提交,不可撤銷
在單體應用中,我們可以利用關係型資料庫的特性去完成事務一致性,但是一旦應用往微服務發展,根據業務拆分成不用的模塊,而且每個模塊的資料庫已經分離開了,這時候,我們要面對的就是分散式事務了,需要自己在代碼裡頭完成ACID了。比較流行的解決方案有:兩階段提交、補償機制、本地消息表(利用本地事務和MQ)、MQ的事務消息(RocketMQ)。
大家可以到此篇文章去瞭解一下:分散式事務的四種解決方案
CAP定理
1998年,加州大學的電腦科學家 Eric Brewer 提出,分散式系統有三個指標。
- Consistency:一致性
- Availability:可用性
- Partition tolerance:分區容錯
Eric Brewer 說,這三個指標不可能同時做到。這個結論就叫做 CAP 定理。
微服務中,不同模塊之間使用的資料庫是不同的,不同模塊之間部署的服務去也有可能是不用的,那麼分區容錯是無法避免的,因為服務之間的調用不能保證百分百的沒問題,所以系統設計必須考慮這種情況。因此,我們可以認為CAP的P總是成立的,剩下的C和A無法同時做到。
實際上根據分散式系統中CAP原則,當P(分區容忍)發生的時候,強行追求C(一致性),會導致(A)可用性、吞吐量下降,此時我們一般用最終一致性來保證我們系統的AP能力。當然不是放棄C,而是放棄強一致性,而且在一般情況下CAP都能保證,只是在發生分區容錯的情況下,我們可以通過最終一致性來保證數據一致。
事件驅動實現最終一致性
事件驅動架構在領域對象之間通過非同步的消息來同步狀態,有些消息也可以同時發佈給多個服務,在消息引起了一個服務的同步後可能會引起另外消息,事件會擴散開。嚴格意義上的事件驅動是沒有同步調用的。
例子:
在電商裡面,用戶下單必鬚根據庫存來確定訂單是否成交。
項目架構:SpringBoot2+Mybatis+tk-Mybatis+ActiveMQ【因為小例子,不做成Spring Cloud架構】
首先,我們來看看正常的服務之間調用:
代碼:
@Override
@Transactional(rollbackFor = Exception.class)
public Result placeOrder(OrderQuery query) {
Result result = new Result();
// 先遠程調用Stock-Service去減少庫存
RestTemplate restTemplate = new RestTemplate();
//請求頭
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
//封裝成一個請求對象
HttpEntity entity = new HttpEntity(query, headers);
// 同步調用庫存服務的介面
Result stockResult = restTemplate.postForObject("http://127.0.0.1:8081/stock/reduceStock",entity,Result.class);
if (stockResult.getCode() == Result.ResultConstants.SUCCESS){
Order order = new Order();
BeanUtils.copyProperties(query,order);
order.setOrderStatus(1);
Integer insertCount = orderMapper.insertSelective(order);
if (insertCount == 1){
result.setMsg("下單成功");
}else {
result.setMsg("下單失敗");
}
}else {
result.setCode(Result.ResultConstants.FAIL);
result.setMsg("下單失敗:"+stockResult.getMsg());
}
return result;
}
我們可以看到,這樣的服務調用的弊端多多:
1、訂單服務需同步等待庫存服務的返回結果,介面結果返回延誤。
2、訂單服務直接依賴於庫存服務,只要庫存服務崩了,訂單服務不能再正常運行。
3、訂單服務需考慮併發問題,庫存最後可能為負。
下麵開始利用事件驅動實現最終一致性
1、在訂單服務新增訂單後,訂單的狀態是“已開啟”,然後發佈一個Order Created事件到消息隊列上
代碼:
@Transactional(rollbackFor = Exception.class)
public Result placeOrderByMQ(OrderQuery query) {
Result result = new Result();
// 先創建訂單,狀態為下單0
Order order = new Order();
BeanUtils.copyProperties(query,order);
order.setOrderStatus(0);
Integer insertCount = orderMapper.insertSelective(order);
if (insertCount == 1){
// 發送 訂單消息
MqOrderMsg mqOrderMsg = new MqOrderMsg();
mqOrderMsg.setId(order.getId());
mqOrderMsg.setGoodCount(query.getGoodCount());
mqOrderMsg.setGoodName(query.getGoodName());
mqOrderMsg.setStockId(query.getStockId());
jmsProducer.sendOrderCreatedMsg(mqOrderMsg);
// 此時的訂單隻是開啟狀態
result.setMsg("下單成功");
}
return result;
}
2、庫存服務在監聽到消息隊列OrderCreated中的消息,將庫存表中商品的庫存減去下單數量,然後再發送一個Stock Locked事件給消息隊列。
代碼:
/**
* 接收下單消息
* @param message 接收到的消息
* @param session 上下文
*/
@JmsListener(destination = ORDER_CREATE,containerFactory = "myListenerContainerFactory")
@Transactional(rollbackFor = Exception.class)
public void receiveOrderCreatedMsg(Message message, Session session){
try {
if (message instanceof ActiveMQObjectMessage){
MqStockMsg result = new MqStockMsg();
ActiveMQObjectMessage objectMessage=(ActiveMQObjectMessage)message;
MqOrderMsg msg = (MqOrderMsg)objectMessage.getObject();
Integer updateCount = stockMapper.updateNumByStockId(msg.getStockId(),msg.getGoodCount());
if (updateCount >= 1){
result.setSuccess(true);
result.setOrderId(msg.getId());
}else {
result.setSuccess(false);
}
// 手動ack,使消息出隊列,不然會不斷消費
message.acknowledge();
// 發送庫存鎖定消息到MQ
jmsProducer.sendStockLockedMsg(result);
}
} catch (JMSException e) {
log.error("接收訂單創建消息報錯:"+e.getMessage());
}
}
仔細的朋友可能會看到:message.acknowledge(),即手動確認消息。因為在保證庫存服務的邏輯能正常執行後再確認消息已消費,可以保證消息的投遞可靠性,萬一在庫存服務執行時報出異常,我們可以做到重新消費該下單消息。
3、訂單服務接收到Stock Locked事件,將訂單的狀態改為“已確認”
代碼:
/**
* 判斷是否還有庫存,有庫存更新訂單狀態為1,無庫存更新訂單狀態為2,並且通知用戶(WebSocket)
* @param message
*/
@JmsListener(destination = STOCK_LOCKED,containerFactory = "myListenerContainerFactory")
@Transactional(rollbackFor = Exception.class)
public void receiveStockLockedMsg(Message message, Session session){
try {
if (message instanceof ActiveMQObjectMessage){
ActiveMQObjectMessage objectMessage=(ActiveMQObjectMessage)message;
MqStockMsg msg = (MqStockMsg)objectMessage.getObject();
if (msg.isSuccess()){
Order updateOrder = new Order();
updateOrder.setId(msg.getOrderId());
updateOrder.setOrderStatus(1);
orderMapper.updateByPrimaryKeySelective(updateOrder);
log.info("訂單【"+msg.getOrderId()+"】下單成功");
}else {
Order updateOrder = new Order();
updateOrder.setId(msg.getOrderId());
updateOrder.setOrderStatus(2);
orderMapper.updateByPrimaryKeySelective(updateOrder);
// 通知用戶庫存不足,訂單被取消
log.error("訂單【"+msg.getOrderId()+"】因庫存不足被取消");
}
// 手動ack,使消息出隊列,不然會不斷消費
message.acknowledge();
}
} catch (JMSException e) {
log.error("接收庫存鎖定消息報錯:"+e.getMessage());
}
}
同樣,這裡我們也是會利用手動確認消息來保證消息的投遞可靠性。
至此,已經全部搞定了。我們看一下和正常的服務調用對比如何:
1、訂單服務不再直接依賴於庫存服務,而是將下單事件發送到MQ中,讓庫存監聽。
2、訂單服務能真正的作為一個模塊獨立運行。
3、解決了併發問題,而且MQ的隊列處理效率非常的高。
但是也存在下麵的問題:
1、用戶體驗改變了:因為使用事件機制,訂單是立即生成的,可是很有可能過一會,系統會提醒你沒貨了。。這就像是排隊搶購一樣,排著排著就被通知沒貨了,不用再排隊了。
2、資料庫可能會存在很對沒有完成下單的訂單。
最後,如果真的要考慮用戶體驗,並且不想資料庫存在很多不必要的數據,該怎麼辦?
那就把訂單服務和庫存服務聚合在一起吧。解決當前的問題應當是首先要考慮的,我們設計微服務的目的是本想是解決業務併發量。而現在面臨的卻是用戶體驗的問題,所以架構設計也是需要妥協的。
最主要是,我們是經過思考和分析的,每個方案能做到哪種程度,能應用到哪種場景。正所謂,技術要和實際場景結合,我們不能為了追求新技術而生搬硬套。