Seata 包括 Server端和Client端。Seata中有三種角色:TC、TM、RM,其中,Server端就是TC,TM和RM屬Client端。Client端的源碼學習上一篇已講過,詳見 《Seata 1.5.2源碼學習》,今天來學習Server端的源碼。 源碼下載地址:https://git ...
Seata 包括 Server端和Client端。Seata中有三種角色:TC、TM、RM,其中,Server端就是TC,TM和RM屬Client端。Client端的源碼學習上一篇已講過,詳見 《Seata 1.5.2源碼學習》,今天來學習Server端的源碼。
源碼下載地址:https://github.com/seata/seata
啟動類 ServerApplication 沒什麼好說的,重點是ServerRunner
ServerRunner 是一個 CommandLineRunner 實例,因此在Spring Boot啟動完成後會回調其run()方法。而在ServerRunner的run()方法中調用了Server.start()方法。
在Server#start()方法中,初始化了包括id生成器在內的很多組件,我們先不管這些,重點關註以下幾行代碼:
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
DefaultCoordinator是一個單例Bean,在整個應用中只有一個DefaultCoordinator實例
DefaultCoordinator 實現了 TransactionMessageHandler
NettyRemotingServer#setHandler()設置的正是TransactionMessageHandler
DefaultCoordinator#onRequest()
重點是這三行:
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);
transactionRequest.handle(context);
DefaultCoordinator實現了TCInboundHandler介面,所以它不僅是一個TransactionMessageHandler,還是一個TCInboundHandler
這裡transactionRequest.setTCInboundHandler(this),就是指定AbstractTransactionRequestToTC中的TCInboundHandler設置為DefaultCoordinator
AbstractTransactionRequest#handle()
不同的請求分發給對應的處理器去處理
現在請求和對應的處理器都有了,下麵具體看一下每種請求都是如何被處理的
1. 開啟全局事務
開啟事務直接調用子類DefaultCoordinator#doGlobalBegin(),同時放在一個處理模板中執行
在doGlobalBean()中調用DefaultCore#begin()並返回全局事務ID(xid)
new GlobalSession()
添加一個SessionManager作為Session的監聽器
Core
總結一下,開啟事務:
- 創建一個GlobalSession
- 給GlobalSession添加一個監聽器SessionManager
- session.begin()
開啟事務,創建一個全局事務,如果是seata.store.mode=db的話,向global_table表插入一條記錄
2. 分支事務註冊
DefaultCore#branchRegister()
如果是AT模式,這裡調用的就是ATCore#branchSessionLock()
ATCore#branchSessionLock()檢查是否拿到鎖了
具體每種鎖的實現就不往下看了,挑其中一個看下,就RedisLocker吧
總之,分支註冊的時候需要檢查鎖,拿到本次事務中所涉及的所有需要加鎖的行的鎖才能註冊成功
所有行都加鎖成功,分支註冊才算成功,才會返回true
再回到AbstractCore#branchRegister(),整個方法是放在SessionHolder#lockAndExecute()中執行的
總結一下,分支註冊:
- 創建一個BranchSession
- 加鎖,獲取所有行的鎖
- 將BranchSession加到GlobalSession中
- 返回branchId
分支註冊,創建BranchSession,獲取全局鎖成功後將branchSession加入globalSession
3. 提交全局事務
首先判斷全局事務狀態是否為begin,如果不是則不應該提交。如果是,則將事務active置為false,釋放全局鎖,判斷是否可以非同步提交。分支類型是AT的都可以非同步提交,因此AT模式,預設是非同步提交。如果不能非同步提交,則採取同步提交。
3.1. 同步提交
遍歷所有已註冊的分支事務,向分支發送同步請求,告訴它全局事務開始提交了,不出意外的情況下返回分支狀態是二階段提交成功。當所有分支都提交成功,則返回true,於是全局事務提交成功,返回全局事務狀態為已提交。如果有分支提交失敗,則返回false,全局事務提交失敗,返回全局狀態為提交失敗。如果拋異常了,則會有定時任務稍後重試提交。
3.2. 非同步提交
非同步提交只是將全局狀態置為非同步提交中,剩下的事情交給定時任務去執行
啟動的時候調用了DefaultCoordinator#init()方法,啟動定時任務
每次,定時任務執行前,要先獲取一把分散式鎖,這個鎖是io.seata.core.store.DistributedLocker,不是分支註冊時的那把鎖io.seata.core.lock.Locker
非同步提交首先將全局狀態設置為AsyncCommitting,返回返回全局狀態Committed。後臺有定時任務掃描,找到所有狀態為AsyncCommitting的全局事務,迴圈遍歷。對於每個全局事務提交,調用DefaultCore#doGlobalCommit(),遍歷所有已註冊的分支事務,向分支事務發請求,通知其提交事務,分支事務返回二階段提交成功,表示該分支事務提交成功,當所有分支事務都二階段提交成功,則全局事務提交成功。
4. 回滾全局事務
回滾,首先檢查全局事務狀態是否為Begin,不是的話直接結束。遍歷當前全局事務中已註冊的分支事務,依次給每個分支發請求,告訴分支事務需要回滾。如果所有分支返回回滾成功,則全局回滾成功。如果有分支回滾失敗且不重試,則直接直接結束。如果失敗且可重試,或者執行過程中拋異常,則稍後會有定時任務重試回滾操作。
5. 分支上報
RM向TC報告分支事務狀態
只是更新一下分支狀態及相關數據
6. 查詢全局事務狀態
7. 查詢全局鎖
挑RedisLocker看一下吧
關於Seata Server 的源碼學習就先到這裡,歡迎交流,多謝點贊 (^_^)