天天看點

Apache Camel - 事務

Apache Camel是一個基于Enterprise Integration Pattern(企業整合模式,簡稱EIP)的開源架構。

1. 概述

關于

Apache Camel

的介紹就不說了,作為2007年就釋出了v1.0版的開源項目,至今擁有着非常強大的生命力(Github上幾乎每天都會有更新),相關的介紹在網上随處可見,本文主要關注的是Camel在事務方面的支援。

2. 本地事務和全局事務

2.1 本地事務

本地事務又稱單事務,這也是我們最常接觸到的事務類型。Camel也是直接選擇了複用Spring的事務支援,誠如《人月神話》第四章裡提到的——法國城市南斯的建築曆史上,八代建築者自我限制和犧牲精神,確定了最終建築風格的一緻性。針對事務支援,Camel成功遏制了自創一套實作的沖動,轉而尋求既有穩定的實作方式,這一點上真的非常難能可貴。

以下我們使用Camel-Sql來給出一個例子:

============================ camel-spring.xml 檔案

<!-- 存儲資料庫配置資訊 -->
<context:property-placeholder location="classpath:db.properties" />

<bean id="dataSource"
	class="org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy">
	<constructor-arg>
		<bean class="com.alibaba.druid.pool.DruidDataSource">
			<property name="driverClassName"
				value="${jdbc.driverClassName}" />
			<property name="url" value="${jdbc.url}" />
			<property name="username" value="${jdbc.username}" />
			<property name="password" value="${jdbc.password}" />
		</bean>
	</constructor-arg>
</bean>

<bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
	<property name="dataSource" ref="dataSource"/>
</bean>	

<!--  Camel / -->

<!-- Camel TransactionManager 适配Spring事務 -->
<bean id="camelTransaction" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
	<property name="transactionManager" ref="txManager"/>
	<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/>
</bean>

<!-- configure the Camel SQL component to use the JDBC data source -->
<bean id="sqlComponent"
	class="org.apache.camel.component.sql.SqlComponent">
	<property name="dataSource" ref="dataSource" />
</bean>

<!-- here is Camel configured with a number of routes -->
<camelContext xmlns="http://camel.apache.org/schema/spring">

	<!-- use Camel property placeholder loaded from the given file -->
	<propertyPlaceholder id="placeholder"
		location="classpath:sql/sql.properties" />

	<!-- route that generate new orders and insert them in the database -->
	<route id="generateOrder-route">
		<!--這一行隻是為了讓路由可控, 可以使用direct:start 代替-->
		<from uri="stream:in?promptMessage=Enter something:" />
		<!-- denote that this route is transacted 聲明本路由使用了事務-->
		<transacted ref="camelTransaction"/>
		<to uri="sqlComponent:{{sql.insertNewTopic}}" />
		<!--模拟異常-->
		<throwException exceptionType="java.lang.RuntimeException" message="模拟異常, 将導緻資料庫事務復原!"/>
	</route>
</camelContext>
           

============================ sql.properties 檔案

# sql.properties 檔案
sql.insertNewTopic=INSERT INTO topic(id, hh) VALUES (1, 'topic')
           

============================ 單元測試代碼

@Test
public void singleTransaction() throws Exception {
	ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
			"camel-spring.xml", this.getClass());
	context.start();
	
	// 通用沒有具體業務意義的代碼,隻是為了保證主線程不退出
	Object obj = new Object();
	synchronized (obj.getClass()) {
		obj.getClass().wait();
	}
	
}
           

2.2 全局事務

全局事務又稱分布式事務,這裡就需要使用諸如Atomikos這樣的第三方元件的支援了,在實際的項目中,使用這樣的方式實作分布式事務對性能影響較大,導緻應用場景有限,是以這裡就不給出相關的示例代碼了。

2.3 提示

Camel提供的資料庫相關元件中,camel-sql, camel-mybatis均支援事務,但camel-jdbc是不支援事務的, 這一點要注意。可見參見連結:關于camel-jdbc支援事務的讨論(2011年)

3. 技巧

3.1 事務失敗的時候傳回自定義值

我們偶爾會遇到如小節标題所示的需求,針對這種情況,Camel也早已給出了相應的通用解決方案 —— 使用

onException

<onException>
	<!-- 當發生此類異常的時候, 執行這裡的邏輯 -->
	<exception>java.lang.RuntimeException</exception>
	<!--将異常從Exchange中移除, 這樣就可以進行自定義資訊的傳回了 -->
	<handled><constant>true</constant></handled>
	<!-- 這裡也可以通過自定義Bean的方式傳回自定義錯誤資訊, 這裡為了簡單  -->
	<transform>constant>fail</constant></transform>
	<!-- <rollback/>必須位于onException的尾部, 這樣才能阻止message繼續Route -->
	<rollback markRollbackOnly="true"/>
</onException>
           

3.2 事務補償

事務一般都是存在于資料庫,MQ之間,諸如檔案操作等其實是不存在所謂事務操作的,針對此類情況Camel也是給出了相應的解決方案:

1.

UnitOfWork

Synchronization

Camel通過引入

UnitOfWork

來模拟事務。

CamelTestUtil.defaultPrepareTest(new RouteBuilder() {
	@Override
	public void configure() throws Exception {
		from("stream:in?promptMessage=Enter something:")//				
				.process(new Processor() {
					public void process(Exchange exchange) throws Exception {
					   // MyRollback實作了Synchronization接口的onFailure()方法, 将在執行失敗時候回調, 這也就是事務補償的時機
						exchange.getUnitOfWork().addSynchronization(new MyRollback());
					}
				})//
				......					
				.throwException(new RuntimeException("XXXXXX"))//				
				.to("stream:out");
	}
});
           

2.

onCompletion

實作事務補償的另外一種方式就是使用Camel提供的

onCompletion

特性,Camel保證其在路由結束時候被調用。你可已認證在其後附加

onFailureOnly()

來選擇隻有在失敗時候才調用。

from("stream:in?promptMessage=Enter something:")//	
	// use a route scoped onCompletion to be executed when the Exchange failed
	.onCompletion().onFailureOnly().bean(MyRollback.class, "onFailure")//
	// must use end() to denote the end of the onCompletion route! 這裡切記, 筆者就栽了跟頭.
	.end() // 
	// here starts the regular route
	.process(ThrowExceptionXProcessor.me)//
	.to("stream:out");
           

3. 以上兩種方法的差別

兩者最大的差別是其線程模型:

  1. Synchronization

    ,所有的操作都在同一線程下
  2. onCompletion

    ,新起單獨的線程專門處理,是以

    onCompletion

    中對于Exchange的修改等等不會影響主線程中的,

    onCompletion

    擷取到的Exchange屬于複制品。

4. 最後

  1. 社群強大真是一件無比幸福的事情。Apache Camel在國内的資料并不多,但依托于強大的生态圈,你依然可以相當輕松地渡過初期階段。
  2. 《Camel In Action》這本書相當不錯,面面俱到,關于使用Camel的問題,你已經遇到的或者之後會遇到的問題,他都給出了相應的讨論。

5. 參考

  1. 《Camel In Action》 P283 - P314
  2. Camel - Transaction