分散式事務與本地事務一樣,包含原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)和持久性(Durability)。兩階段提交是保證分散式事務中原子性的重要方法。本文重點介紹了兩階段提交的原理,PostgreSQL中兩階段提交介面,以及Java中兩階段提交介面... ...
原創文章,同步發自作者個人博客 http://www.jasongj.com/big_data/two_phase_commit/
分散式事務
分散式事務簡介
分散式事務是指會涉及到操作多個資料庫(或者提供事務語義的系統,如JMS)的事務。其實就是將對同一資料庫事務的概念擴大到了對多個資料庫的事務。目的是為了保證分散式系統中事務操作的原子性。分散式事務處理的關鍵是必須有一種方法可以知道事務在任何地方所做的所有動作,提交或回滾事務的決定必須產生統一的結果(全部提交或全部回滾)。
分散式事務實現機制
如同作者在《SQL優化(六) MVCC PostgreSQL實現事務和多版本併發控制的精華》一文中所講,事務包含原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)和持久性(Durability)。
PostgreSQL針對ACID的實現技術如下表所示。
ACID |
---|
原子性(Atomicity) |
一致性(Consistency) |
隔離性 |
持久性 |
分散式事務的實現技術如下表所示。(以PostgreSQL作為事務參與方為例)
分散式ACID |
---|
原子性(Atomicity) |
一致性(Consistency) |
隔離性 |
持久性 |
從上表可以看到,一致性、隔離性和持久性靠的是各分散式事務參與方自己原有的機制,而兩階段提交主要保證了分散式事務的原子性。
兩階段提交
分散式事務如何保證原子性
在分散式系統中,各個節點(或者事務參與方)之間在物理上相互獨立,通過網路進行協調。每個獨立的節點(或組件)由於存在事務機制,可以保證其數據操作的ACID特性。但是,各節點之間由於相互獨立,無法確切地知道其經節點中的事務執行情況,所以多節點之間很難保證ACID,尤其是原子性。
如果要實現分散式系統的原子性,則須保證所有節點的數據寫操作,要不全部都執行(生效),要麼全部都不執行(生效)。但是,一個節點在執行本地事務的時候無法知道其它機器的本地事務的執行結果,所以它就不知道本次事務到底應該commit還是 roolback。常規的解決辦法是引入一個“協調者”的組件來統一調度所有分散式節點的執行。
XA規範
XA是由X/Open組織提出的分散式事務的規範。XA規範主要定義了(全局)事務管理器(Transaction Manager)和(局部)資源管理器(Resource Manager)之間的介面。XA介面是雙向的系統介面,在事務管理器(Transaction Manager)以及一個或多個資源管理器(Resource Manager)之間形成通信橋梁。XA引入的事務管理器充當上文所述全局事務中的“協調者”角色。事務管理器控制著全局事務,管理事務生命周期,並協調資源。資源管理器負責控制和管理實際資源(如資料庫或JMS隊列)。目前,Oracle、Informix、DB2、Sybase和PostgreSQL等各主流資料庫都提供了對XA的支持。
XA規範中,事務管理器主要通過以下的介面對資源管理器進行管理
- xa_open,xa_close:建立和關閉與資源管理器的連接。
- xa_start,xa_end:開始和結束一個本地事務。
- xa_prepare,xa_commit,xa_rollback:預提交、提交和回滾一個本地事務。
- xa_recover:回滾一個已進行預提交的事務。
兩階段提交原理
二階段提交的演算法思路可以概括為:協調者詢問參與者是否準備好了提交,並根據所有參與者的反饋情況決定向所有參與者發送commit或者rollback指令(協調者向所有參與者發送相同的指令)。
所謂的兩個階段是指
準備階段
又稱投票階段。在這一階段,協調者詢問所有參與者是否準備好提交,參與者如果已經準備好提交則回覆Prepared
,否則回覆Non-Prepared
。提交階段
又稱執行階段。協調者如果在上一階段收到所有參與者回覆的Prepared
,則在此階段向所有參與者發送commit
指令,所有參與者立即執行commit
操作;否則協調者向所有參與者發送rollback
指令,參與者立即執行rollback
操作。
兩階段提交中,協調者和參與方的交互過程如下圖所示。
兩階段提交前提條件
- 網路通信是可信的。雖然網路並不可靠,但兩階段提交的主要目標並不是解決諸如拜占庭問題的網路問題。同時兩階段提交的主要網路通信危險期(In-doubt Time)在事務提交階段,而該階段非常短。
- 所有crash的節點最終都會恢復,不會一直處於crash狀態。
- 每個分散式事務參與方都有WAL日誌,並且該日誌存於穩定的存儲上。
- 各節點上的本地事務狀態即使碰到機器crash都可從WAL日誌上恢復。
兩階段提交容錯方式
兩階段提交中的異常主要分為如下三種情況
- 協調者正常,參與方crash
- 協調者crash,參與者正常
- 協調者和參與方都crash
對於第一種情況,若參與方在準備階段crash,則協調者收不到Prepared
回覆,協調方不會發送commit
命令,事務不會真正提交。若參與方在提交階段提交,當它恢復後可以通過從其它參與方或者協調方獲取事務是否應該提交,並作出相應的響應。
第二種情況,可以通過選出新的協調者解決。
第三種情況,是兩階段提交無法完美解決的情況。尤其是當協調者發送出commit
命令後,唯一收到commit
命令的參與者也crash,此時其它參與方不能從協調者和已經crash的參與者那兒瞭解事務提交狀態。但如同上一節兩階段提交前提條件所述,兩階段提交的前提條件之一是所有crash的節點最終都會恢復,所以當收到commit
的參與方恢復後,其它節點可從它那裡獲取事務狀態並作出相應操作。
JTA
JTA介紹
作為java平臺上事務規範JTA(Java Transaction API)也定義了對XA事務的支持,實際上,JTA是基於XA架構上建模的。在JTA 中,事務管理器抽象為javax.transaction.TransactionManager
介面,並通過底層事務服務(即Java Transaction Service)實現。像很多其他的Java規範一樣,JTA僅僅定義了介面,具體的實現則是由供應商(如J2EE廠商)負責提供,目前JTA的實現主要有以下幾種:
- J2EE容器所提供的JTA實現(如JBoss)。
- 獨立的JTA實現:如JOTM(Java Open Transaction Manager),Atomikos。這些實現可以應用在那些不使用J2EE應用伺服器的環境里用以提供分佈事事務保證。
PostgreSQL兩階段提交介面
PREPARE TRANSACTION transaction_id
PREPARE TRANSACTION 為當前事務的兩階段提交做準備。 在命令之後,事務就不再和當前會話關聯了;它的狀態完全保存在磁碟上, 它提交成功有非常高的可能性,即使是在請求提交之前資料庫發生了崩潰也如此。這條命令必須在一個用BEGIN顯式開始的事務塊裡面使用。COMMIT PREPARED transaction_id
提交已進入準備階段的ID為transaction_id
的事務ROLLBACK PREPARED transaction_id
回滾已進入準備階段的ID為transaction_id
的事務
典型的使用方式如下
postgres=> BEGIN;
BEGIN
postgres=> CREATE TABLE demo(a TEXT, b INTEGER);
CREATE TABLE
postgres=> PREPARE TRANSACTION 'the first prepared transaction';
PREPARE TRANSACTION
postgres=> SELECT * FROM pg_prepared_xacts;
transaction | gid | prepared | owner | database
-------------+--------------------------------+-------------------------------+-------+----------
23970 | the first prepared transaction | 2016-08-01 20:44:55.816267+08 | casp | postgres
(1 row)
從上面代碼可看出,使用PREPARE TRANSACTION transaction_id
語句後,PostgreSQL會在pg_catalog.pg_prepared_xact
表中將該事務的transaction_id
記於gid欄位中,並將該事務的本地事務ID,即23970,存於transaction
欄位中,同時會記下該事務的創建時間及創建用戶和資料庫名。
繼續執行如下命令
postgres=> \q
SELECT * FROM pg_prepared_xacts;
transaction | gid | prepared | owner | database
-------------+--------------------------------+-------------------------------+-------+----------
23970 | the first prepared transaction | 2016-08-01 20:44:55.816267+08 | casp | cqdb
(1 row)
cqdb=> ROLLBACK PREPARED 'the first prepared transaction';
ROLLBACK PREPARED
cqdb=> SELECT * FROM pg_prepared_xacts;
transaction | gid | prepared | owner | database
-------------+-----+----------+-------+----------
(0 rows)
即使退出當前session,pg_catalog.pg_prepared_xact
表中關於已經進入準備階段的事務信息依然存在,這與上文所述準備階段後各節點會將事務信息存於磁碟中持久化相符。註:如果不使用PREPARED TRANSACTION 'transaction_id'
,則已BEGIN但還未COMMIT或ROLLBACK的事務會在session退出時自動ROLLBACK。
在ROLLBACK已進入準備階段的事務時,必須指定其transaction_id
。
PostgreSQL兩階段提交註意事項
PREPARE TRANSACTION transaction_id
命令後,事務狀態完全保存在磁碟上。PREPARE TRANSACTION transaction_id
命令後,事務就不再和當前會話關聯,因此當前session可繼續執行其它事務。COMMIT PREPARED
和ROLLBACK PREPARED
可在任何會話中執行,而並不要求在提交準備的會話中執行。- 不允許對那些執行了涉及臨時表或者是創建了帶
WITH HOLD
游標的事務進行PREPARE。 這些特性和當前會話綁定得實在是太緊密了,因此在一個準備好的事務里沒什麼可用的。 - 如果事務用
SET
修改了運行時參數,這些效果在PREPARE TRANSACTION
之後保留,並且不會被任何以後的COMMIT PREPARED
或ROLLBACK PREPARED
所影響,因為SET
的生效範圍是當前session。 - 從性能的角度來看,把一個事務長時間停在準備好的狀態是不明智的,因為它會影響
VACUUM
回收存儲的能力。 - 已準備好的事務會繼續持有它們獲得的鎖,直到該事務被commit或者rollback。所以如果已進入準備階段的事務一直不被處理,其它事務可能會因為獲取不到鎖而被block或者失敗。
- 預設情況下,PostgreSQL並不開啟兩階段提交,可以通過在
postgresql.conf
文件中設置max_prepared_transactions
配置項開啟PostgreSQL的兩階段提交。
JTA實現PostgreSQL兩階段提交
本文使用Atomikos提供的JTA實現,利用PostgreSQL提供的兩階段提交特性,實現了分散式事務。本文中的分散式事務使用了2個不同機器上的PostgreSQL實例。
本例所示代碼可從作者Github獲取。
package com.jasongj.jta.resource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.sql.DataSource;
import javax.transaction.NotSupportedException;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.WebApplicationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Path("/jta")
public class JTAResource {
private static final Logger LOGGER = LoggerFactory.getLogger(JTAResource.class);
@GET
public String test(@PathParam(value = "commit") boolean isCommit)
throws NamingException, SQLException, NotSupportedException, SystemException {
UserTransaction userTransaction = null;
try {
Context context = new InitialContext();
userTransaction = (UserTransaction) context.lookup("java:comp/UserTransaction");
userTransaction.setTransactionTimeout(600);
userTransaction.begin();
DataSource dataSource1 = (DataSource) context.lookup("java:comp/env/jdbc/1");
Connection xaConnection1 = dataSource1.getConnection();
DataSource dataSource2 = (DataSource) context.lookup("java:comp/env/jdbc/2");
Connection xaConnection2 = dataSource2.getConnection();
LOGGER.info("Connection autocommit : {}", xaConnection1.getAutoCommit());
Statement st1 = xaConnection1.createStatement();
Statement st2 = xaConnection2.createStatement();
LOGGER.info("Connection autocommit after created statement: {}", xaConnection1.getAutoCommit());
st1.execute("update casp.test set qtime=current_timestamp, value = 1");
st2.execute("update casp.test set qtime=current_timestamp, value = 2");
LOGGER.info("Autocommit after execution : ", xaConnection1.getAutoCommit());
userTransaction.commit();
LOGGER.info("Autocommit after commit: ", xaConnection1.getAutoCommit());
return "commit";
} catch (Exception ex) {
if (userTransaction != null) {
userTransaction.rollback();
}
LOGGER.info(ex.toString());
throw new WebApplicationException("failed", ex);
}
}
}
從上示代碼中可以看到,雖然使用了Atomikos的JTA實現,但因為使用了面向介面編程特性,所以只出現了JTA相關的介面,而未顯式使用Atomikos相關類。具體的Atomikos使用是在WebContent/META-INFO/context.xml
中配置。
<Context>
<Transaction factory="com.atomikos.icatch.jta.UserTransactionFactory" />
<Resource name="jdbc/1"
auth="Container"
type="com.atomikos.jdbc.AtomikosDataSourceBean"
factory="com.jasongj.jta.util.EnhancedTomcatAtomikosBeanFactory"
uniqueResourceName="DataSource_Resource1"
minPoolSize="2"
maxPoolSize="8"
testQuery="SELECT 1"
xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
xaProperties.databaseName="postgres"
xaProperties.serverName="192.168.0.1"
xaProperties.portNumber="5432"
xaProperties.user="casp"
xaProperties.password=""/>
<Resource name="jdbc/2"
auth="Container"
type="com.atomikos.jdbc.AtomikosDataSourceBean"
factory="com.jasongj.jta.util.EnhancedTomcatAtomikosBeanFactory"
uniqueResourceName="DataSource_Resource2"
minPoolSize="2"
maxPoolSize="8"
testQuery="SELECT 1"
xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
xaProperties.databaseName="postgres"
xaProperties.serverName="192.168.0.2"
xaProperties.portNumber="5432"
xaProperties.user="casp"
xaProperties.password=""/>
</Context>