天天看點

華為雲微服務引擎CSE事務方案二:2PC機制的資料強一緻性處理2PC機制的資料強一緻性處理

2PC機制的資料強一緻性處理

概念闡述

兩階段送出協定是常見的一緻性協定,許多分布式關系型資料管理系統采用此協定來完成分布式事務。它是協調所有分布式原子事務參與者,并決定送出或取消(復原)的分布式算法。在兩階段送出協定中,包含了兩種角色:協調者與參與者。參與者就是實際處理事務的節點,而協調者就是其中一台單獨的進行分布式事務管理的節點。

場景描述

分布式強一緻性是為了保證在服務調用完成時,各個參與事務的服務的狀态已經保持一緻。本節将以一個轉賬的例子來說明如何利用CSE的強一緻性開發和設計分布式事務,保證事務參與方的資料強一緻性。

開發示例

本示例以一個銀行A,B之間進行轉賬的情景為例介紹強一緻事務的開發。

  • 前提條件:
    1. 運作服務的節點已經安裝好mysql資料庫,并建立指定的資料庫(本例中為

      teststrong)。

    2. 啟動本地服務中心。
    3. 啟動事務協調器coordinator,可在demo-narayana子產品中擷取,直接運作RtsServer.java即可。
  • 事務發起方和參與方的POM中均需引入2PC事務對應的依賴包:
    <dependency>
        groupId>com.huawei.paas.cse</groupId>
        <artifactId>handler-2pc</artifactId>
    </dependency>
               
  • 在microservice.yaml中添加進處理鍊:

    需在發起方的microservice.yaml檔案中引入twoPC-consumer的handler,用于在調用參與方時傳輸事務ID。

    cse:
      handler:
        chain:
          Consumer:
            default: loadbalance,twoPC-consumer,perf-stats
               
    在參與方的microservice.yaml檔案中引入twoPC-provider的handler,用于接收事務ID。
    cse:
        handler:
            chain:
                Provider:
                    default: twoPC-provider
               
  • 配置JPA

    在resource/META-INF/persistence.xml中配置JPA屬性,連接配接mysql的配置樣例如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <persistence version="1.0" xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="h
    ttp://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/
    ns/persistence http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd">
        <persistence-unit name="jta" transaction-type="JTA">
            <provider>org.hibernate.ejb.HibernatePersistence</provider>
            <jta-data-source>java:/mysqlDataSource</jta-data-source>
            <properties>
                <property name="hibernate.archive.autodetection" value="class"/>
                <property name="hibernate.hbm2ddl.auto" value="update"/>
                <property name="hibernate.current_session_context_class" value="jta"/>
                <property name="hibernate.dialect" value="org.hibernate.dialect.MySQL5Dialect"/>
                <property name="hibernate.connection.release_mode" value="after_statement"/>
            </properties>
        </persistence-unit>
    </persistence>
               
    其中的mysqlDataSource為下一節中配置的DataSource,persistence-unit name則會在entityManagerFactory中引用。
  • 配置DataSource,Adaptor 及entitiyManager

    在發起方的resource/META-INF/spring/*.bean.xml 檔案中進行配置,樣例如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:jpa="http://www.springframework.org/schema/data/jpa"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd">
      <tx:annotation-driven/>
      <context:component-scan base-package="narayana"/>
      <jpa:repositories base-package="narayana.jpamodel"/>
      <bean id="mysqlDataSource" class="org.apache.commons.dbcp2.managed.BasicManagedDataSource"
      destroy-method="close">
        <property name="xaDataSourceInstance">
        <bean class="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
        <property name="url" value="jdbc:mysql://localhost:3306/bank1?user=root&amp;password=root"/>
        </bean>
        </property>
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="transactionManager" ref="transactionManagerImpl"/>
        <property name="url" value="jdbc:mysql://localhost:3306/bank1?user=root&amp;password=root"/>
      </bean>
      <bean id="jpaVendorAdapter" class="org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter">
        <!--<property name="showSql" value="true"/>-->
        <property name="generateDdl" value="true"/>
        <property name="database" value="MYSQL"/>
      </bean>
      <bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
        <property name="dataSource" ref="mysqlDataSource"/>
        <property name="jpaVendorAdapter" ref="jpaVendorAdapter"/>
        <property name="persistenceUnitName" value="jta"/>
        <property name="packagesToScan" value="narayana.jpamodel"/>
      </bean>
    </beans>
               

其中的資料庫bank1表示銀行A的賬戶系統。參與方配置類似,隻是連接配接不同的資料庫bank2,表示銀行B的賬戶系統。

示例代碼

  • Model定義。

    采用JPA注解方式定義表結構,@Table中的name即為表名,該表建立在datasource中定義的資料庫中。

    @
    Entity
    @Table(name="account")
    public class Account {
        @Id
        @GeneratedValue(strategy = IDENTITY)
        private Long id;
    
        private String username;
    
        private Long money;
    
        public Account() {
        }
    
        public Account(String username, Long money) {
            this.username = username;
            this.money = money;
        }
    
        public void setMoney(Long money) {
            this.money = money;
        }
    
        public Long getId() {
            return id;
        }
    
        public String getUsername() {
            return username;
        }
    
        public Long getMoney() {
            return money;
        }
    }
               
  • Repository及Service定義

    AccountService 和AccountRepository 都會根據xml配置自動注入。

    public interface AccountRepository extends JpaRepository<Account, Long> {
        List<Account> findByUsername(String name);
    }
    @Service
    @Transactional
    public class AccountService {
        private final AccountRepository accountRepository;
        @Autowired
        public AccountService(AccountRepository accountRepository) {
            this.accountRepository = accountRepository;
        }
        public Account save(String username, Long money) {
            return this.accountRepository.save(new Account(username, money));
        }
        public void update(Account account) {
            this.accountRepository.save(account);
        }
        public void delete(Long id) {
            this.accountRepository.delete(id);
        }
        public List<Account> findByName(String name){
            return this.accountRepository.findByUsername(name);
        }
    }
               
  • 發起方服務開發

    引入 @Transactional注解,配置復原條件和傳播政策,即可自動建立事務并保證分布式一緻性。服務的業務邏輯中的資料操作通過AccountService 完成。

    @Autowired
    private AccountService accountService;
    @Path("/transfer/{from}")
    @GET
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
    public String transfer(@PathParam("from") String from, @QueryParam("username") String username,
    @QueryParam("money") long money) throws Exception {
        List<Account> accounts = accountService.findByName(from);
        if(accounts.size() == 0) {
            accountService.save(from, 20000L);
            LOGGER.warn("This account is not present!");
            return "This account is not present";
        }
        Account account = accounts.get(0);
        if(account.getMoney() < money) {
            LOGGER.warn("There is no enough money!");
            return "There is no enough money";
        }
        RestTemplate restTemplate = RestTemplateBuilder.create();
        String str = restTemplate.getForObject("cse://demo-narayana-jaxrs2/bank2/trans
        fer/" + username + "?amount=" + money, String.class);
        account.setMoney(account.getMoney() - money);
        accountService.update(account);
        return str;
    }
               
  • 參與方服務開發

    本demo中Model,Service 和Repository均與發起方一緻。參與方的方法上方加@Transactionl注解可以同發起方建立的事務綁定為同一個事務,由架構保證它們的一緻性。

    @Autowired
    private AccountService accountService;
    @Path("/transfer/{name}")
    @GET
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
    public String transfer(@PathParam("name") String name, @QueryParam("amount") long
    amount){
        List<Account> accounts = accountService.findByName(name);
        if(accounts.size() == 0) {
            accountService.save(name, amount);
            return name;
        }
        Account account = accounts.get(0);
        account.setMoney(account.getMoney() + amount);
        accountService.update(account);
        return name;
    }
               

繼續閱讀