java事務(三)——自己實現分散式事務

来源:http://www.cnblogs.com/nobounds/archive/2016/04/23/5423598.html
-Advertisement-
Play Games

在上一篇《java事務(二)——本地事務》中已經提到了事務的類型,並對本地事務做了說明。而分散式事務是跨越多個數據源來對數據來進行訪問和更新,在JAVA中是使用JTA(Java Transaction API)來實現分散式的事務管理的。但是在本篇中並不會說明如何使用JTA,而是在不依賴其他框架以及j ...


    在上一篇《java事務(二)——本地事務》中已經提到了事務的類型,並對本地事務做了說明。而分散式事務是跨越多個數據源來對數據來進行訪問和更新,在JAVA中是使用JTA(Java Transaction API)來實現分散式的事務管理的。但是在本篇中並不會說明如何使用JTA,而是在不依賴其他框架以及jar包的情況下自己來實現分散式事務,作為對分散式事務的一個理解。

 

    假設現在有兩個資料庫,可以是在一臺機器上也可以是在不同機器上,現在要向其中一個資料庫更新用戶賬戶信息,另外一個資料庫新增用戶的消費信息。首先說明一下,分散式事務也是事務,在事務特性的那篇博客中就已經說明瞭事務的四個特性:原子性、一致性、隔離性和持久性,那麼分散式事務也必然是符合這四個特性的,這就要求同時對兩個資料庫進行數據訪問和更新的時候是作為一個單獨的工作單元來進行處理,並且同時成功或者失敗後進行回滾。但是在說明本地事務的時候已經提到了,本地事務是基於連接的,現在有兩個資料庫,分別保存數據,那麼為了實現這個事務,必然會有兩個資料庫連接,這似乎是與事務基於連接的說法相悖。現在舉個例子:之前回老家去了一趟醫院,後來在辦理出院手續的時候是這樣的,辦理出院時需要護士站的主任醫生填寫出院單,然後攜帶結賬單到收費處繳納費用並去藥房取藥,然後回護士站蓋章,出院手續辦理完畢。如果把不同地點的視窗看成是不同的連接,那麼實現辦理出院手續這個事務就必須保證在每個業務視窗上的事務都是成功的,最後出院手續才算真正完成。在最終蓋章的時候,需要查看每個視窗給出的單子是否是已辦理的,只有綜合起來所有的單子才能判定出院手續是否成功。這主要就是為了說明分散式事務實現的關鍵其實是管理每個連接上的事務,用一個東西來判定每個連接上的事務執行情況,綜合起來作為分散式事務執行成功與否的依據。這大概就是事務管理器要做的事情。雖然這個例子並不太恰當,很有挑毛病的地方,但是在不太鑽牛角尖的情況下,還是可以用來說明要表達的東西的。

實現例子

   我打開了兩台虛擬機,分別命令為node1、node2,每台虛擬機上都安裝了MySQL資料庫,在向node1上的資料庫更新用戶賬戶信息,向node2上的資料庫新增用戶消費信息。

 在node1上創建賬戶表,建表語句如下:

CREATE TABLE ACCOUNTS
(
    ID INT NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',
    CUSTOMER_NO    VARCHAR(25) NOT NULL COMMENT '客戶號',
    CUSTOMER_NAME  VARCHAR(25) NOT NULL COMMENT '客戶名稱',
    CARD_ID         VARCHAR(18) NOT NULL COMMENT '身份證號',
    BANK_ID         VARCHAR(25) NOT NULL COMMENT '開戶行ID',
    BALANCE      DECIMAL  NOT NULL COMMENT '賬戶餘額',
    CURRENCY     VARCHAR(10) NOT NULL COMMENT '幣種',
    PRIMARY KEY (ID)
)
COMMENT  = '賬戶表' ;

  然後向表中插入一條記錄,如下圖:

 在node2上創建用戶消費歷史表,建表語句如下:

CREATE TABLE USER_PURCHASE_HIS
(
    ID INT NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',
    CUSTOMER_NO VARCHAR(25) NOT NULL COMMENT '客戶號',
    SERIAL_NO   VARCHAR(32) NOT NULL COMMENT '交易流水號',
    AMOUNT         DECIMAL     NOT NULL COMMENT '交易金額',
    CURRENCY    VARCHAR(10) NOT NULL COMMENT '幣種',
    REMARK         VARCHAR(100) NOT NULL COMMENT '備註',
    PRIMARY KEY (ID)
)
COMMENT = '用戶消費歷史表';

 下麵實現一個簡陋的例子,代碼如下:

 1、創建DBUtil類,用來獲取和關閉連接

package person.lb.example1;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class DBUtil {

    static {
        try {
            //載入驅動類
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } 
    }
    
    //獲取node1上的資料庫連接
    public static Connection getNode1Connection() {
        Connection conn = null;
        try {
            conn = (Connection) DriverManager.getConnection(
                    "jdbc:mysql://192.168.0.108:3306/TEST", 
                    "root", 
                    "root");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return conn;
    }
    
    //獲取node2上的資料庫連接
    public static Connection getNode2Connection() {
        Connection conn = null;
        try {
            conn = (Connection) DriverManager.getConnection(
                    "jdbc:mysql://192.168.0.109:3306/TEST", 
                    "root", 
                    "root");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return conn;
    }
    
    //關閉連接
    public static void close(ResultSet rs, Statement st, Connection conn) {
        try {
            if(rs != null) {
                rs.close();
            }
            if(st != null) {
                st.close();
            }
            if(conn != null) {
                conn.close();
            }
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

2、創建XADemo類,用來測試事務

package person.lb.example1;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

public class XADemo {

    public static void main(String[] args) {

        //獲取連接
        Connection node1Conn = DBUtil.getNode1Connection();
        Connection node2Conn = DBUtil.getNode2Connection();
        try {
            //設置連接為非自動提交
            node1Conn.setAutoCommit(false);
            node2Conn.setAutoCommit(false);
            //更新賬戶信息
            updateAccountInfo(node1Conn);
            //增加用戶消費信息
            addUserPurchaseInfo(node2Conn);
            //提交
            node1Conn.commit();
            node2Conn.commit();
        } catch (SQLException e) {
            e.printStackTrace();
            //回滾
            try {
                node1Conn.rollback();
                node2Conn.rollback();
            } catch (SQLException e1) {
                e1.printStackTrace();
            }
        } finally {
            //關閉連接
            DBUtil.close(null, null, node1Conn);
            DBUtil.close(null, null, node2Conn);
        }
    }
    
    /**
     * 更新賬戶信息
     * @param conn
     * @throws SQLException 
     */
    private static void updateAccountInfo(Connection conn) throws SQLException {
        Statement st = conn.createStatement();
        st.execute("UPDATE ACCOUNTS SET BALANCE = CAST('9900.00' AS DECIMAL) WHERE CUSTOMER_NO = '88888888' ");
    }
    
    /**
     * 增加用戶消費信息
     * @param conn
     * @throws SQLException 
     */
    private static void addUserPurchaseInfo(Connection conn) throws SQLException {
        Statement st = conn.createStatement();
        st.execute("INSERT INTO USER_PURCHASE_HIS(CUSTOMER_NO, SERIAL_NO, AMOUNT, CURRENCY, REMARK) "
                + " VALUES ('88888888', 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', 100, 'CNY', '買衣服')");
    }

}

  這是一個沒有發生任何異常的例子,執行結果是nod1上ACCOUNTS 表中的BALANCE欄位的值成功更新為9900,而node2上USER_PURCHASE_HIS表中新增了一條記錄,兩個連接上的事務都成功完成,事務目標實現。如果反向測試一下,更改Insert語句,把其中某一個要插入的值改為NULL,由於欄位都是非空限制,所以會發生異常,這個連接上的事務會失敗,那麼跟它關聯的node1上的事務也必須回滾,不對資料庫進行任何更改。經測試,結果與預期目標一致。說明這個例子是符合事務特性的。

 

  但是這個例子不管是從代碼的可讀性和可維護性上來說都是比較差的。在使用spring開發項目的時候,配置了事務管理器以後,在我們的業務邏輯中幾乎是察覺不到事務控制的,而且也看不到事務控制的代碼。那麼究竟spring中是怎麼實現的事務控制呢,這篇博客中不會詳細說明,但是要提到兩個東西,事務管理器和資源管理器,現在自己來實現一個簡單的事務管理器和資源管理器來對事務進行控制。

代碼示例如下:

1、創建AbstractDataSource 類

package person.lb.datasource;

import java.sql.Connection;
import java.sql.SQLException;

public abstract class AbstractDataSource {

    //獲取連接
    public abstract Connection getConnection() throws SQLException ;
    //關閉連接
    public abstract void close() throws SQLException;
        
}

2、創建Node1DataSource 類,用來連接node1上的資料庫

package person.lb.datasource;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Node1DataSource extends AbstractDataSource {

    //使用ThreadLocal類保存當前線程使用的Connection
    protected static final ThreadLocal<Connection> threadSession = new ThreadLocal<Connection>();
    
    static {
        try {
            //載入驅動類
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } 
    }
    
    private final static Node1DataSource node1DataSource = new Node1DataSource();
    
    private Node1DataSource() {}
    
    public static Node1DataSource getInstance() {
        return node1DataSource;
    }
    
    /**
     * 獲取連接
     */
    @Override
    public Connection getConnection() throws SQLException {
        Connection conn = null;
        if(threadSession.get() == null) {
            conn = (Connection) DriverManager.getConnection(
                        "jdbc:mysql://192.168.0.108:3306/TEST", 
                        "root", 
                        "root");
            threadSession.set(conn);
        } else {
            conn = threadSession.get();
        }
        return conn;
    }

    /**
     * 關閉並移除連接
     */
    @Override
    public void close() throws SQLException {
        Connection conn = threadSession.get();
        if(conn != null) {
            conn.close();
            threadSession.remove();
        }
    }

}

3、創建Node2DataSource類,用來連接node2機器上的資料庫

package person.lb.datasource;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;


public class Node2DataSource extends AbstractDataSource {

    //使用ThreadLocal類保存當前線程使用的Connection
    protected static final ThreadLocal<Connection> threadSession = new ThreadLocal<Connection>();
        
    static {
        try {
            //載入驅動類
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } 
    }
    
    private static final Node2DataSource node2DataSource  = new Node2DataSource();
    
    private Node2DataSource() {};
    
    public static Node2DataSource getInstance() {
        return node2DataSource;
    }
    
    /**
     * 獲取連接
     */
    @Override
    public Connection getConnection() throws SQLException {
        Connection conn = null;
        if(threadSession.get() == null) {
            conn = (Connection) DriverManager.getConnection(
                        "jdbc:mysql://192.168.0.109:3306/TEST", 
                        "root", 
                        "root");
            threadSession.set(conn);
        } else {
            conn = threadSession.get();
        }
        return conn;
    }

    /**
     * 關閉並移除連接
     */
    @Override
    public void close() throws SQLException {
        Connection conn = threadSession.get();
        if(conn != null) {
            conn.close();
            threadSession.remove();
        }
    }
}

4、創建Node1Dao類,在node1的資料庫中更新賬戶信息

package person.lb.dao;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import person.lb.datasource.Node1DataSource;

public class Node1Dao {

    private Node1DataSource dataSource = Node1DataSource.getInstance();
    
    /**
     * 更新賬戶信息
     * @throws SQLException
     */
    public void updateAccountInfo() throws SQLException {
        Connection conn = dataSource.getConnection();
        Statement st = conn.createStatement();
        st.execute("UPDATE ACCOUNTS SET BALANCE = CAST('9900.00' AS DECIMAL) WHERE CUSTOMER_NO = '88888888' ");
    }
}

5、創建Node2Dao,在node2機器上增加用戶消費信息

package person.lb.dao;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import person.lb.datasource.Node2DataSource;

public class Node2Dao {

    private Node2DataSource dataSource = Node2DataSource.getInstance();
    
    /**
     * 增加用戶消費信息
     * @throws SQLException
     */
    public void addUserPurchaseInfo() throws SQLException {
        Connection conn = dataSource.getConnection();
        Statement st = conn.createStatement();
        st.execute("INSERT INTO USER_PURCHASE_HIS(CUSTOMER_NO, SERIAL_NO, AMOUNT, CURRENCY, REMARK) "
                + " VALUES ('88888888', 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', null, 'CNY', '買衣服')");
    }
}

6、創建NodeService類,把兩個操作作為一個事務來執行

package person.lb.service;

import java.sql.SQLException;

import person.lb.dao.Node1Dao;
import person.lb.dao.Node2Dao;
import person.lb.transaction.TransactionManager;

public class NodeService {

    
    public void execute() {
        //啟動事務
        TransactionManager.begin();
        
        Node1Dao node1Dao = new Node1Dao();
        Node2Dao node2Dao = new Node2Dao();
        try {
            node1Dao.updateAccountInfo();
            node2Dao.addUserPurchaseInfo();
            //提交事務
            TransactionManager.commit();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

7、最後是測試類TestTx

package person.lb.test;

import person.lb.service.NodeService;

public class TestTx {

    public static void main(String[] args) {
        NodeService nodeService = new NodeService();
        nodeService.execute();
    }
}

經測試,與第一個例子效果一致,但是從代碼上來說要比第一個例子的可讀性和可維護性高。不過這個例子並不能說明分散式事務中的事務管理器和資源管理器的真正原理,也不是一個可使用的代碼,畢竟存在缺陷,而且dao層需要拋出異常才能實現事務的回滾。我想,作為一個理解分散式事務的作用的例子是夠了。

 

最後是這篇博客中的源碼:TransactionDemo.rar

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在開發一個串口通信程式時,用到需要將10進位轉換為16進位的情況,參照瞭如下類容: From:http://www.cnblogs.com/xdotnet/archive/2009/01/17/tostring_format.html 在很多對象顯示為字元串的時候都會使用到ToString中的格式化 ...
  • 註:以下文章原文來自於Dr Charles Severance 的 《Python for Informatics》 12.3 用HTTP協議獲取一張圖片 在上一節的例子中,我們獲取的是一個有換行符的文本文件,並簡單的把它顯示在屏幕上。同樣我們可以用一個小程式通過HTTP協議獲取圖片。下麵這個程式運 ...
  • 這篇文章是我在慕課網上學習Hibernate註解的時候進行手機以及整理的筆記。 今天把它分享給大家,希望對大家有用。可以進行收藏,然後需要的時候進行對照一下即可。這樣能起到一個查閱的作用。 本文主要講解的內容簡介 : 第1章 類級別註解 1-1 本章簡介 1-2 準備工作 1-3 @Entity註解 ...
  • #include<stdio.h>int main(){ int i,j; int word=0,num=0;//新單詞標記,單詞下標 char str[100],s[50][20]={0},c; gets(str);//輸入字元串(多個單詞) for(i=0;(c=str[i])!='\0';i+ ...
  • 問題描述: Given a positive integer n, break it into the sum of at least two positive integers and maximize the product of those integers. Return the maxim ...
  • 一丶可命名元組(nametuple) ...
  • 1.include語句 使用include語句可以告訴PHP提取特定的文件,並載入它的全部內容 1 <?php 2 inlude "fileinfo.php"; 3 4 //此處添加其他代碼 5 ?> 使用include語句可以告訴PHP提取特定的文件,並載入它的全部內容 1 <?php 2 inl ...
  • 命名空間其實只是一個形式,最終目的是重構代碼,但這個過程想要一蹴而就是不可能的。 一開始給了一個偽命題:基於ThinkPHP的重構(不要為什麼)。經過一段的實踐,發現這是一個大錯特錯的思維方式,其中遇到的坑在此略過不表。 首先,不要想著全盤基於命名空間重寫,而應該是基於局部的。 最終思考後的結果,是 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...