天天看點

JTA深度剖析J2EE事務處理JTA 實作原理

在 J2EE 應用中,事務是一個不可或缺的元件模型,它保證了使用者操作的 ACID(即原子、一緻、隔離、持久)屬性。

對于隻操作單一資料源的應用,可以通過本地資源接口實作事務管理;

對于跨資料源(例如多個資料庫,或者資料庫與 JMS)的大型應用,則必須使用全局事務 JTA (Java Transaction API)。

JTA 為 J2EE 平台提供了分布式事務服務,它隔離了事務與底層的資源,實作了透明的事務管理方式。本文将深入探尋 JTA 的體系架構,并通過詳細的代碼介紹其實作機制。

J2EE事務處理

什麼是事務處理?

事務是計算機應用中不可或缺的元件模型,它保證了使用者操作的原子性 ( Atomicity )、一緻性 ( Consistency )、隔離性 ( Isolation ) 和持久性 ( Durabilily )。

       關于事務最經典的示例莫過于信用卡轉賬:将使用者A賬戶中的 500 元人民币轉移到使用者B的賬戶中,其操作流程如下

           1. 将 A 賬戶中的金額減少 500              

           2. 将 B 賬戶中的金額增加 500      

     這兩個操作必須保正 ACID 的事務屬性:即要麼全部成功,要麼全部失敗;

假若沒有事務保障,使用者的賬号金額将可能發生問題: 假如第一步操作成功而第二步失敗,那麼使用者 A 賬戶中的金額将就減少 500 元而使用者 B 的賬号卻沒有任何增加(不翼而飛);同樣如果第一步出錯 而第二步成功,那麼使用者 A 的賬戶金額不變而使用者 B 的賬号将增加 500 元(憑空而生)。

上述任何一種錯誤都會産生嚴重的資料不一緻問題,事務的缺失對于一個穩定的生産系統是不可接受的。

J2EE 事務處理方式

1. 本地事務:

緊密依賴于底層資料總管(例如資料庫連接配接 ),事務處理局限在目前事務資源内。

此種事務處理方式不存在對應用伺服器的依賴,因而部署靈活卻無法支援多資料源的分布式事務。在資料庫連接配接中使用本地事務(JDBC)示例如下:

public void transferAccount() { 
	Connection conn = null; 
	Statement stmt = null; 
	try{ 
	    conn = getDataSource().getConnection(); 
	    // 将自動送出設定為 false,
	    //若設定為 true 則資料庫将會把每一次資料更新認定為一個事務并自動送出
	    conn.setAutoCommit(false);
			
	    stmt = conn.createStatement(); 
	    // 将 A 賬戶中的金額減少 500 
	    stmt.execute("
                update t_account set amount = amount - 500 where account_id = 'A'");
	    // 将 B 賬戶中的金額增加 500 
	    stmt.execute("
                update t_account set amount = amount + 500 where account_id = 'B'");
			
	    // 送出事務
	    conn.commit();
	    // 事務送出:轉賬的兩步操作同時成功
	} catch(SQLException sqle){ 			
	    try{ 
		// 發生異常,復原在本事務中的操做
                conn.rollback();
		// 事務復原:轉賬的兩步操作完全撤銷
                stmt.close(); 
                conn.close(); 
	    }catch(Exception ignore){ 
				
	    } 
	    sqle.printStackTrace(); 
	} 
    }
           

2. 分布式事務處理 :

Java 事務程式設計接口(JTA:Java Transaction API)和 Java 事務服務 (JTS;Java Transaction Service) 為 J2EE 平台提供了分布式事務服務。JTA 是Java規範,是XA協定在Java上的實作.

分布式事務(Distributed Transaction)包括事務管理器(Transaction Manager)和一個或多個支援 XA 協定的資料總管 ( Resource Manager )。

我們可以将資料總管看做任意類型的持久化資料存儲;事務管理器承擔着所有事務參與單元的協調與控制。JTA 事務有效的屏蔽了底層事務資源,使應用可以以透明的方式參入到事務進行中;但是與本地事務相比,XA 協定的系統開銷大,在系統開發過程中應慎重考慮是否确實需要分布式事務。若确實需要分布式事務以協調多個事務資源,則應實作和配置所支援 XA 協定的事務資源,如 JMS、JDBC 資料庫連接配接池等。使用 JTA 處理事務的示例如下(注意:connA 和 connB 是來自不同資料庫的連接配接)

public void transferAccount() { 
		
    UserTransaction userTx = null; 
    Connection connA = null; 
    Statement stmtA = null; 
				
    Connection connB = null; 
    Statement stmtB = null; 
    
    try{ 
        // 獲得 Transaction 管理對象
        userTx = (UserTransaction)getContext().lookup("
            java:comp/UserTransaction"); 
	// 從資料庫 A 中取得資料庫連接配接
	connA = getDataSourceA().getConnection(); 
			
	// 從資料庫 B 中取得資料庫連接配接
	connB = getDataSourceB().getConnection(); 
      
        // 啟動事務
	userTx.begin();
			
	// 将 A 賬戶中的金額減少 500 
	stmtA = connA.createStatement(); 
	stmtA.execute("
            update t_account set amount = amount - 500 where account_id = 'A'");
			
	// 将 B 賬戶中的金額增加 500 
	stmtB = connB.createStatement(); 
	stmtB.execute("\
             update t_account set amount = amount + 500 where account_id = 'B'");
			
	// 送出事務
	userTx.commit();
	// 事務送出:轉賬的兩步操作同時成功(資料庫 A 和資料庫 B 中的資料被同時更新)
    } catch(SQLException sqle){ 
			
	try{ 
	    // 發生異常,復原在本事務中的操縱
            userTx.rollback();
	    // 事務復原:轉賬的兩步操作完全撤銷 
	    //( 資料庫 A 和資料庫 B 中的資料更新被同時撤銷)
				
	    stmt.close(); 
            conn.close(); 
				  
	}catch(Exception ignore){ 
				
	} 
	sqle.printStackTrace(); 
			
    } catch(Exception ne){ 
	 e.printStackTrace(); 
    } 
}
           

JTA 實作原理

很多開發人員都會對 JTA 的内部工作機制感興趣:我編寫的代碼沒有任何與事務資源(如資料庫連接配接)互動的代碼,但是我的操作(資料庫更新)卻實實在在的被包含在了事務中,那 JTA 究竟是通過何種方式來實作這種透明性的呢?

要了解 JTA 的實作原理首先需要了解其架構:它包括事務管理器(Transaction Manager)和一個或多個支援 XA 協定的資料總管 ( Resource Manager ) 兩部分, 我們可以将資料總管看做任意類型的持久化資料存儲;事務管理器則承擔着所有事務參與單元的協調與控制。

根據所面向對象的不同,我們可以将 JTA 的事務管理器和資料總管了解為兩個方面:面向開發人員的使用接口(事務管理器)和面向服務提供商的實作接口(資料總管)。其中開發接口的主要部分即為上述示例中引用的 UserTransaction 對象,開發人員通過此接口在資訊系統中實作分布式事務;而實作接口則用來規範提供商(如資料庫連接配接提供商)所提供的事務服務,它約定了事務的資源管理功能,使得 JTA 可以在異構事務資源之間執行協同溝通。以資料庫為例,IBM 公司提供了實作分布式事務的資料庫驅動程式,Oracle 也提供了實作分布式事務的資料庫驅動程式, 在同時使用 DB2 和 Oracle 兩種資料庫連接配接時, JTA 即可以根據約定的接口協調者兩種事務資源進而實作分布式事務。正是基于統一規範的不同實作使得 JTA 可以協調與控制不同資料庫或者 JMS 廠商的事務資源,其架構如下圖所示:

JTA深度剖析J2EE事務處理JTA 實作原理

開發人員使用開發人員接口,實作應用程式對全局事務的支援;各提供商(資料庫,JMS 等)依據提供商接口的規範提供事務資源管理功能;事務管理器( TransactionManager )将應用對分布式事務的使用映射到實際的事務資源并在事務資源間進行協調與控制。 下面,本文将對包括 UserTransaction、Transaction 和 TransactionManager 在内的三個主要接口以及其定義的方法進行介紹。

面向開發人員的接口

 UserTransaction :開發人員通常隻使用此接口實作 JTA 事務管理,其定義了如下的方法:

  • begin()- 開始一個分布式事務,(在背景 TransactionManager 會建立一個 Transaction 事務對象并把此對象通過 ThreadLocale 關聯到目前線程上 )
  • commit()- 送出事務(在背景 TransactionManager 會從目前線程下取出事務對象并把此對象所代表的事務送出)
  • rollback()- 復原事務(在背景 TransactionManager 會從目前線程下取出事務對象并把此對象所代表的事務復原)
  • getStatus()- 傳回關聯到目前線程的分布式事務的狀态 (Status 對象裡邊定義了所有的事務狀态,感興趣的讀者可以參考 API 文檔 )
  • setRollbackOnly()- 辨別關聯到目前線程的分布式事務将被復原

面向提供商

實作接口主要涉及到 TransactionManager 和 Transaction 兩個對象

Transaction 代表了一個實體意義上的事務,在開發人員調用 UserTransaction.begin() 方法時 TransactionManager 會建立一個 Transaction 事務對象(标志着事務的開始)并把此對象通過 ThreadLocale 關聯到目前線程。UserTransaction 接口中的 commit()、rollback(),getStatus() 等方法都将最終委托給 Transaction 類的對應方法執行。Transaction 接口定義了如下的方法:

  • commit()- 協調不同的事務資源共同完成事務的送出
  • rollback()- 協調不同的事務資源共同完成事務的復原
  • setRollbackOnly()- 辨別關聯到目前線程的分布式事務将被復原
  • getStatus()- 傳回關聯到目前線程的分布式事務的狀态
  • enListResource(XAResource xaRes, int flag)- 将事務資源加入到目前的事務中(在上述示例中,在對資料庫 A 操作時 其所代表的事務資源将被關聯到目前事務中,同樣,在對資料庫 B 操作時其所代表的事務資源也将被關聯到目前事務中)
  • delistResourc(XAResource xaRes, int flag)- 将事務資源從目前事務中删除
  • registerSynchronization(Synchronization sync)- 回調接口,Hibernate 等 ORM 工具都有自己的事務控制機制來保證事務, 但同時它們還需要一種回調機制以便在事務完成時得到通知進而觸發一些處理工作,如清除緩存等。這就涉及到了 Transaction 的回調接口 registerSynchronization。工具可以通過此接口将回調程式注入到事務中,當事務成功送出後,回調程式将被激活。

TransactionManager 本身并不承擔實際的事務處理功能,它更多的是充當使用者接口和實作接口之間的橋梁。下面列出了 TransactionManager 中定義的方法,可以看到此接口中的大部分事務方法與 UserTransaction 和 Transaction 相同。 在開發人員調用 UserTransaction.begin() 方法時 TransactionManager 會建立一個 Transaction 事務對象(标志着事務的開始)并把此對象通過 ThreadLocale 關聯到目前線程上;同樣 UserTransaction.commit() 會調用 TransactionManager.commit(), 方法将從目前線程下取出事務對象 Transaction 并把此對象所代表的事務送出, 即調用 Transaction.commit()

  • begin()- 開始事務
  • commit()- 送出事務
  • rollback()- 復原事務
  • getStatus()- 傳回目前事務狀态
  • setRollbackOnly()
  • getTransaction()- 傳回關聯到目前線程的事務
  • setTransactionTimeout(int seconds)- 設定事務逾時時間
  • resume(Transaction tobj)- 繼續目前線程關聯的事務
  • suspend()- 挂起目前線程關聯的事務

在系統開發過程中會遇到需要将事務資源暫時排除的操作,此時就需要調用 suspend() 方法将目前的事務挂起:在此方法後面所做的任何操作将不會被包括在事務中,在非事務性操作完成後調用 resume()以繼續事務(注: 要進行此操作需要獲得 TransactionManager 對象, 其獲得方式在不同的 J2EE 應用伺服器上是不一樣的) 下面将通過具體的代碼向讀者介紹 JTA 實作原理。下圖列出了示例實作中涉及到的 Java 類,其中 UserTransactionImpl 實作了 UserTransaction 接口,TransactionManagerImpl 實作了 TransactionManager 接口,TransactionImpl 實作了 Transaction 接口

JTA深度剖析J2EE事務處理JTA 實作原理

開始事務 - UserTransactionImpl implenments UserTransaction

public void begin() throws NotSupportedException, SystemException { 
   // 将開始事務的操作委托給 TransactionManagerImpl 
   TransactionManagerImpl.singleton().begin(); 
}
           

開始事務 - TransactionManagerImpl implements TransactionManager

// 此處 transactionHolder 用于将 Transaction 所代表的事務對象關聯到線程上
private static ThreadLocal<TransactionImpl> transactionHolder 
    = new ThreadLocal<TransactionImpl>(); 
	
//TransacationMananger 必須維護一個全局對象,是以使用單執行個體模式實作
private static TransactionManagerImpl singleton = new TransactionManagerImpl(); 
	
private TransactionManagerImpl(){ 
		
} 
	
public static TransactionManagerImpl singleton(){ 
		 return singleton; 
} 

public void begin() throws NotSupportedException, SystemException { 
	//XidImpl 實作了 Xid 接口,其作用是唯一辨別一個事務
	XidImpl xid = new XidImpl(); 
	// 建立事務對象,并将對象關聯到線程
	TransactionImpl tx = new TransactionImpl(xid); 
		
	transactionHolder.set(tx); 
}
           

現在我們就可以了解 Transaction 接口上沒有定義 begin 方法的原因了:Transaction 對象本身就代表了一個事務,在它被建立的時候就表明事務已經開始,是以也就不需要額外定義 begin() 方法了。

送出事務 - UserTransactionImpl implenments UserTransaction

public void commit() throws RollbackException, HeuristicMixedException, 
			 HeuristicRollbackException, SecurityException, 
			 IllegalStateException, SystemException { 
			
	// 檢查是否是 Roll back only 事務,如果是復原事務
	if(rollBackOnly){ 
	    rollback(); 
			
	    return; 
	} else { 
	    // 将送出事務的操作委托給 TransactionManagerImpl 
	    TransactionManagerImpl.singleton().commit(); 
	} 
}
           

送出事務 - TransactionManagerImpl implenments TransactionManager

public void commit() throws RollbackException, HeuristicMixedException, 
    HeuristicRollbackException, SecurityException, 
    IllegalStateException, SystemException { 
				
     // 取得目前事務所關聯的事務并通過其 commit 方法送出
     TransactionImpl tx = transactionHolder.get(); 
     tx.commit(); 
}
           

同理, rollback、getStatus、setRollbackOnly 等方法也采用了與 commit() 相同的方式實作。 UserTransaction 對象不會對事務進行任何控制, 所有的事務方法都是通過 TransactionManager 傳遞到實際的事務資源即 Transaction 對象上。

XAResource

上述示例示範了 JTA 事務的處理過程,下面将為您展示事務資源(資料庫連接配接,JMS)是如何以透明的方式加入到 JTA 事務中的。首先需要明确的一點是,在 JTA 事務 代碼中獲得的資料庫源 ( DataSource ) 必須是支援分布式事務的。在如下的代碼示例中,盡管所有的資料庫操作都被包含在了 JTA 事務中,但是因為 MySql 的資料庫連接配接是通過本地方式獲得的,對 MySql 的任何更新将不會被自動包含在全局事務中。

JTA 事務處理

public void transferAccount() { 
		
    UserTransaction userTx = null; 
    Connection mySqlConnection = null; 
    Statement mySqlStat = null; 
				
    Connection connB = null; 
    Statement stmtB = null; 
    
    try{ 
        // 獲得 Transaction 管理對象
	userTx = (UserTransaction)getContext().lookup("java:comp/UserTransaction");
	// 以本地方式獲得 mySql 資料庫連接配接
	mySqlConnection = DriverManager.getConnection("localhost:1111"); 
			
	// 從資料庫 B 中取得資料庫連接配接, getDataSourceB 傳回應用伺服器的資料源
	connB = getDataSourceB().getConnection(); 
      
        // 啟動事務
	userTx.begin();
			
	// 将 A 賬戶中的金額減少 500 
	//mySqlConnection 是從本地獲得的資料庫連接配接,不會被包含在全局事務中
	mySqlStat = mySqlConnection.createStatement(); 
	mySqlStat.execute("
             update t_account set amount = amount - 500 where account_id = 'A'");
			
	//connB 是從應用伺服器得的資料庫連接配接,會被包含在全局事務中
	stmtB = connB.createStatement(); 
	stmtB.execute("
             update t_account set amount = amount + 500 where account_id = 'B'");
			
	// 事務送出:connB 的操作被送出,mySqlConnection 的操作不會被送出
	userTx.commit();

    } catch(SQLException sqle){ 
	// 處理異常代碼
    } catch(Exception ne){ 
	e.printStackTrace(); 
    } 
}
           

為什麼必須從支援事務的資料源中獲得的資料庫連接配接才支援分布式事務呢?其實支援事務的資料源與普通的資料源是不同的,它實作了額外的 XADataSource 接口。我們可以簡單的将 XADataSource 了解為普通的資料源(繼承了 java.sql.PooledConnection),隻是它為支援分布式事務而增加了 getXAResource 方法。另外,由 XADataSource 傳回的資料庫連接配接與普通連接配接也是不同的,此連接配接除了實作 java.sql.Connection 定義的所有功能之外還實作了 XAConnection 接口。我們可以把 XAConnection 了解為普通的資料庫連接配接,它支援所有 JDBC 規範的資料庫操作,不同之處在于 XAConnection 增加了對分布式事務的支援。通過下面的類圖讀者可以對這幾個接口的關系有所了解:

JTA深度剖析J2EE事務處理JTA 實作原理

應用程式從支援分布式事務的資料源獲得的資料庫連接配接是 XAConnection 接口的實作,而由此資料庫連接配接建立的會話(Statement)也為了支援分布式事務而增加了功能,如下代碼所示:

public void transferAccount() { 
		
    UserTransaction userTx = null; 
				
    Connection conn = null; 
    Statement stmt = null; 
    
    try{ 
        // 獲得 Transaction 管理對象
        userTx = (UserTransaction)getContext().lookup("java:comp/UserTransaction"); 

	// 從資料庫中取得資料庫連接配接, getDataSourceB 傳回支援分布式事務的資料源
	conn = getDataSourceB().getConnection(); 
        // 會話 stmt 已經為支援分布式事務進行了功能增強
	stmt = conn.createStatement(); 
			
        // 啟動事務
	userTx.begin();
        stmt.execute("update t_account ... where account_id = 'A'"); 
	userTx.commit();

    } catch(SQLException sqle){ 
	// 處理異常代碼
    } catch(Exception ne){ 
        e.printStackTrace(); 
    } 
}
           

我們來看一下由 XAConnection 資料庫連接配接建立的會話(Statement)部分的代碼實作(不同的 JTA 提供商會有不同的實作方式,此處代碼示例隻是向您示範事務資源是如何被自動加入到事務中)。 我們以會話對象的 execute 方法為例,通過在方法開始部分增加對 associateWithTransactionIfNecessary 方法的調用,即可以保證在 JTA 事務期間,對任何資料庫連接配接的操作都會被透明的加入到事務中。

将事務資源自動關聯到事務對象 - XAStatement implements Statement

public void execute(String sql) { 
    // 對于每次資料庫操作都檢查此會話所在的資料庫連接配接是否已經被加入到事務中
    associateWithTransactionIfNecessary(); 

    try{ 
        // 處理資料庫操作的代碼
	.... 

    } catch(SQLException sqle){ 
	// 處理異常代碼
    } catch(Exception ne){ 
	e.printStackTrace(); 
    } 
} 

public void associateWithTransactionIfNecessary(){ 
	     
    // 獲得 TransactionManager 
    TransactionManager tm = getTransactionManager(); 

    Transaction tx = tm.getTransaction();
    // 檢查目前線程是否有分布式事務
    if(tx != null){ 
        // 在分布式事務内,通過 tx 對象判斷目前資料連接配接是否已經被包含在事務中,
        //如果不是那麼将此連接配接加入到事務中
        Connection conn = this.getConnection(); 
        //tx.hasCurrentResource, xaConn.getDataSource() 不是标準的 JTA 
        // 接口方法,是為了實作分布式事務而增加的自定義方法
        if(!tx.hasCurrentResource(conn)){ 
	    XAConnection xaConn = (XAConnection)conn; 
	    XADataSource xaSource = xaConn.getDataSource(); 
					
	    // 調用 Transaction 的接口方法,将資料庫事務資源加入到目前事務中
	    tx.enListResource(xaSource.getXAResource(), 1);
        } 
    } 
}
           

XAResource 與 Xid 

XAResource 是 Distributed Transaction Processing: The XA Specification 标準的 Java 實作,它是對底層事務資源的抽象,定義了分布式事務處理過程中事務管理器和資料總管之間的協定,各事務資源提供商(如 JDBC 驅動,JMS)将提供此接口的實作。使用此接口,開發人員可以通過自己的程式設計實作分布式事務處理,但這些通常都是由應用伺服器實作的(伺服器自帶實作更加高效,穩定) 為了說明,我們将舉例說明他的使用方式。 在使用分布式事務之前,為了區分事務使之不發生混淆,必須實作一個 Xid 類用來辨別事務,可以把 Xid 想象成事務的一個标志符,每次在新事務建立是都會為事務配置設定一個 Xid,Xid 包含三個元素:formatID、gtrid(全局事務辨別符)和 bqual(分支修飾詞辨別符)。 formatID 通常是零,這意味着你将使用 OSI CCR(Open Systems Interconnection Commitment, Concurrency 和 Recovery 标準)來命名;如果你要使用另外一種格式,那麼 formatID 應該大于零,-1 值意味着 Xid 為無效。

gtrid 和 bqual 分别包含 64 個位元組二進制碼來分别辨別全局事務和分支事務, 唯一的要求是 gtrid 和 bqual 必須是全局唯一的。 XAResource 接口中主要定義了如下方法:

  • commit()- 送出事務
  • isSameRM(XAResource xares)- 檢查目前的 XAResource 與參數是否同一事務資源
  • prepare()- 通知資料總管準備事務的送出工作
  • rollback()- 通知資料總管復原事務

在事務被送出時,Transaction 對象會收集所有被目前事務包含的 XAResource 資源,然後調用資源的送出方法,如下代碼所示:

送出事務 - TransactionImpl implements Transaction

public void commit() throws RollbackException, HeuristicMixedException, 
			 HeuristicRollbackException, SecurityException, 
			 IllegalStateException, SystemException { 
			
    // 得到目前事務中的所有事務資源
    List<XAResource> list = getAllEnlistedResouces(); 
			
    // 通知所有的事務資料總管,準備送出事務
    // 對于生産級别的實作,此處需要進行額外處理以處理某些資源準備過程中出現的異常
    for(XAResource xa : list){ 
	xa.prepare(); 
    } 
			
    // 所有事務性資源,送出事務
    for(XAResource xa : list){ 
	xa.commit(); 
    } 
}
           

結束語

通過如上介紹相信讀者對 JTA 的原理已經有所了解,本文中的示例代碼都是理想情況下的假設實作。一款完善成熟的 JTA 事務實作需要考慮與處理的細節非常多,如性能(送出事務的時候使用多線程方式并發送出事務)、容錯(網絡,系統異常)等, 其成熟也需要經過較長時間的積累。感興趣的讀者可以閱讀一些開源 JTA 實作以進一步深入學習。