2PC機制的資料強一緻性處理
概念闡述
兩階段送出協定是常見的一緻性協定,許多分布式關系型資料管理系統采用此協定來完成分布式事務。它是協調所有分布式原子事務參與者,并決定送出或取消(復原)的分布式算法。在兩階段送出協定中,包含了兩種角色:協調者與參與者。參與者就是實際處理事務的節點,而協調者就是其中一台單獨的進行分布式事務管理的節點。
場景描述
分布式強一緻性是為了保證在服務調用完成時,各個參與事務的服務的狀态已經保持一緻。本節将以一個轉賬的例子來說明如何利用CSE的強一緻性開發和設計分布式事務,保證事務參與方的資料強一緻性。
開發示例
本示例以一個銀行A,B之間進行轉賬的情景為例介紹強一緻事務的開發。
- 前提條件:
-
運作服務的節點已經安裝好mysql資料庫,并建立指定的資料庫(本例中為
teststrong)。
- 啟動本地服務中心。
- 啟動事務協調器coordinator,可在demo-narayana子產品中擷取,直接運作RtsServer.java即可。
-
- 事務發起方和參與方的POM中均需引入2PC事務對應的依賴包:
<dependency> groupId>com.huawei.paas.cse</groupId> <artifactId>handler-2pc</artifactId> </dependency>
-
在microservice.yaml中添加進處理鍊:
需在發起方的microservice.yaml檔案中引入twoPC-consumer的handler,用于在調用參與方時傳輸事務ID。
在參與方的microservice.yaml檔案中引入twoPC-provider的handler,用于接收事務ID。cse: handler: chain: Consumer: default: loadbalance,twoPC-consumer,perf-stats
cse: handler: chain: Provider: default: twoPC-provider
-
配置JPA
在resource/META-INF/persistence.xml中配置JPA屬性,連接配接mysql的配置樣例如下:
其中的mysqlDataSource為下一節中配置的DataSource,persistence-unit name則會在entityManagerFactory中引用。<?xml version="1.0" encoding="UTF-8"?> <persistence version="1.0" xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="h ttp://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ ns/persistence http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd"> <persistence-unit name="jta" transaction-type="JTA"> <provider>org.hibernate.ejb.HibernatePersistence</provider> <jta-data-source>java:/mysqlDataSource</jta-data-source> <properties> <property name="hibernate.archive.autodetection" value="class"/> <property name="hibernate.hbm2ddl.auto" value="update"/> <property name="hibernate.current_session_context_class" value="jta"/> <property name="hibernate.dialect" value="org.hibernate.dialect.MySQL5Dialect"/> <property name="hibernate.connection.release_mode" value="after_statement"/> </properties> </persistence-unit> </persistence>
-
配置DataSource,Adaptor 及entitiyManager
在發起方的resource/META-INF/spring/*.bean.xml 檔案中進行配置,樣例如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"> <tx:annotation-driven/> <context:component-scan base-package="narayana"/> <jpa:repositories base-package="narayana.jpamodel"/> <bean id="mysqlDataSource" class="org.apache.commons.dbcp2.managed.BasicManagedDataSource" destroy-method="close"> <property name="xaDataSourceInstance"> <bean class="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"> <property name="url" value="jdbc:mysql://localhost:3306/bank1?user=root&password=root"/> </bean> </property> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="transactionManager" ref="transactionManagerImpl"/> <property name="url" value="jdbc:mysql://localhost:3306/bank1?user=root&password=root"/> </bean> <bean id="jpaVendorAdapter" class="org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter"> <!--<property name="showSql" value="true"/>--> <property name="generateDdl" value="true"/> <property name="database" value="MYSQL"/> </bean> <bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean"> <property name="dataSource" ref="mysqlDataSource"/> <property name="jpaVendorAdapter" ref="jpaVendorAdapter"/> <property name="persistenceUnitName" value="jta"/> <property name="packagesToScan" value="narayana.jpamodel"/> </bean> </beans>
其中的資料庫bank1表示銀行A的賬戶系統。參與方配置類似,隻是連接配接不同的資料庫bank2,表示銀行B的賬戶系統。
示例代碼
-
Model定義。
采用JPA注解方式定義表結構,@Table中的name即為表名,該表建立在datasource中定義的資料庫中。
@ Entity @Table(name="account") public class Account { @Id @GeneratedValue(strategy = IDENTITY) private Long id; private String username; private Long money; public Account() { } public Account(String username, Long money) { this.username = username; this.money = money; } public void setMoney(Long money) { this.money = money; } public Long getId() { return id; } public String getUsername() { return username; } public Long getMoney() { return money; } }
-
Repository及Service定義
AccountService 和AccountRepository 都會根據xml配置自動注入。
public interface AccountRepository extends JpaRepository<Account, Long> { List<Account> findByUsername(String name); } @Service @Transactional public class AccountService { private final AccountRepository accountRepository; @Autowired public AccountService(AccountRepository accountRepository) { this.accountRepository = accountRepository; } public Account save(String username, Long money) { return this.accountRepository.save(new Account(username, money)); } public void update(Account account) { this.accountRepository.save(account); } public void delete(Long id) { this.accountRepository.delete(id); } public List<Account> findByName(String name){ return this.accountRepository.findByUsername(name); } }
-
發起方服務開發
引入 @Transactional注解,配置復原條件和傳播政策,即可自動建立事務并保證分布式一緻性。服務的業務邏輯中的資料操作通過AccountService 完成。
@Autowired private AccountService accountService; @Path("/transfer/{from}") @GET @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED) public String transfer(@PathParam("from") String from, @QueryParam("username") String username, @QueryParam("money") long money) throws Exception { List<Account> accounts = accountService.findByName(from); if(accounts.size() == 0) { accountService.save(from, 20000L); LOGGER.warn("This account is not present!"); return "This account is not present"; } Account account = accounts.get(0); if(account.getMoney() < money) { LOGGER.warn("There is no enough money!"); return "There is no enough money"; } RestTemplate restTemplate = RestTemplateBuilder.create(); String str = restTemplate.getForObject("cse://demo-narayana-jaxrs2/bank2/trans fer/" + username + "?amount=" + money, String.class); account.setMoney(account.getMoney() - money); accountService.update(account); return str; }
-
參與方服務開發
本demo中Model,Service 和Repository均與發起方一緻。參與方的方法上方加@Transactionl注解可以同發起方建立的事務綁定為同一個事務,由架構保證它們的一緻性。
@Autowired private AccountService accountService; @Path("/transfer/{name}") @GET @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED) public String transfer(@PathParam("name") String name, @QueryParam("amount") long amount){ List<Account> accounts = accountService.findByName(name); if(accounts.size() == 0) { accountService.save(name, amount); return name; } Account account = accounts.get(0); account.setMoney(account.getMoney() + amount); accountService.update(account); return name; }