天天看點

java使用STM原理實作轉賬

首先我們先來看看賬戶類

class Account {
	// 餘額
	private TxnRef<Integer> balance;
	// 構造方法
	public Account(int balance) {
		this.balance = new TxnRef<Integer>(balance);
	}
	// 轉賬操作,該操作我們要保證	1.txn這個事務是原子性  2.每個賬戶對象的balance是一緻性的(就是轉賬,兩者總數不變)
	public void transfer(Account target, int amt) {
		STM.atomic((txn) -> {
			Integer from = balance.getValue(txn);
			balance.setValue(from - amt, txn);
			Integer to = target.balance.getValue(txn);
			target.balance.setValue(to + amt, txn);
		});
	}
}
           

下面我們介紹上面代碼中餘額TxnRef這個引用對象。

/*
 * 事務的引用。(不是事務,你可以想象成  資料庫中的一條資料引用)
 * 該方法中存在模仿MVCC的并發版本控制的餘額版本對象VersionedRef 
 * 			以及在目前事務中讀取資料和寫資料兩個方法,當然這兩個方法交給其他人Txn來做
 */
public class TxnRef<T> {
	// 目前資料,帶版本的資料(目前事務中,儲存的最新的資料)
	volatile VersionedRef curRef;
	public TxnRef(T value) {
		this.curRef = new VersionedRef(value, 0L);
	}
	// 擷取目前事務的資料
	public T getValue(Txn txn) {
		// 事務的讀操作交給Txn接口
		return txn.get(this);
	}
	// 在目前事務中設定資料
	public void setValue(T value, Txn txn) {
		// 事務的寫操作交給Txn接口
		txn.set(this, value);
	}
}
           

根據上面TxnRef類先來展示下VersionedRef 真正的帶版本資料,然後我們在來看實作讀寫資料引用TxnRef的Txn類。

/*
 * 該類是帶版本的資料對象
 * 資料和版本是一一對應的,沒有修改資料,版本号不變,修改資料,版本号+1
 */
public final class VersionedRef<T> {
	final T value;
	final long version;
	public VersionedRef(T value, long version) {
		this.value = value;
		this.version = version;
	}
}
           

修改資料,讓版本号+1操作是交給實作了Txn接口的STMTxn類,讓我們來看看這個實作了讀寫資料引用TxnRef的Txn實作類。

該實作類的三個方法:

1. get方法,把要讀的對象都添加到inTxnMap 中。

2. set方法,把要修改的對象,先讀添加到inTxnMap 中,把修改後的對象儲存在writeMap中

3. commit方法,為了簡單實作,使用互斥鎖的方式,事務的送出變成串行了,首先先檢查inTxnMap 中資料是否發生變化,如果沒有,就直接将writeMap中的資料寫入(就是把賬戶對象的餘額修改),如果發生變化,break結束,當然我們不能因為餘額發生變化就不轉賬了,下面就會介紹到STM類,重新建立一個事務STMTxn 再送出,知道送出成功。

//接口
public interface Txn {
	<T> T get(TxnRef<T> ref);
	<T> void set(TxnRef<T> ref, T value);
}
/*實作類
 * 事務中的值得讀寫是交給這個實作類的。
 */
public class STMTxn implements Txn {
	// 事務id生成器(唯一)
	private static AtomicLong txnSeq = new AtomicLong(1);
	// 目前事務所有相關的資料
	private Map<TxnRef, VersionedRef> inTxnMap = new HashMap();
	// 目前事務所有需要修改的資料
	private Map<TxnRef, Object> writeMap = new HashMap();
	// 目前事務id
	private long txnId;
	// 自動生成目前事務id
	public STMTxn() {
		this.txnId = txnSeq.getAndIncrement();
	}
	@Override
	// 擷取目前事務中的資料
	public <T> T get(TxnRef<T> ref) {
		// 将需要讀取的資料加入到inTxnMap中,同時保證一個事務中讀取的事務是同一個版本
		if (!inTxnMap.containsKey(ref)) {
			inTxnMap.put(ref, ref.curRef);
		}
		return (T) inTxnMap.get(ref).value;
	}

	@Override
	public <T> void set(TxnRef<T> ref, T value) {
		// 将需要修改的資料加入到inTxnMap中
		if (!inTxnMap.containsKey(ref)) {
			inTxnMap.put(ref, ref.curRef);
		}
		// 将要修改的Object放入writeMap
		writeMap.put(ref, value);
	}

	// 送出事務
	public boolean commit() {
		synchronized (STM.commitLock) {
			// 是否校驗通過
			boolean isValid = true;
			// 校驗所有讀過的資料是否發生過變化
			for (Map.Entry entry : inTxnMap.entrySet()) {

				VersionedRef curRef = ((TxnRef) entry.getKey()).curRef;
				// 該事務讀資料的時候的值
				VersionedRef readRef = (VersionedRef) entry.getValue();
				// 通過版本号來驗證資料是否發生過變化
				if (curRef.version != readRef.version) {
					isValid = false;
					break;
				}
			}
			// 如果校驗通過,則所有更改生效
			if (isValid) {
				writeMap.forEach((k, v) -> {
				//這裡把事務的原子類id,作為版本
					k.curRef = new VersionedRef(v, txnId);
				});
			}
			return isValid;
		}
	}
}

           

當然我們需要提供一個實作事務操作的入口。就比如上面Account類轉賬一系列操作如何變成事務?

請看代碼

//函數式接口
@FunctionalInterface
public interface TxnRunnable {
	void run(Txn txn);
}
public final class STM {
	// 私有化構造方法
	private STM() {
	}
	// 送出資料需要用到的全局鎖
	static final Object commitLock = new Object();

	// 原子化送出方法,這裡引入函數式接口
	public static void atomic(TxnRunnable action) {
		boolean committed = false;
		// 如果沒有送出成功,則一直重試
		while (!committed) {
			// 建立新的事務
			STMTxn txn = new STMTxn();
			// 執行業務邏輯
			action.run(txn);
			// 送出事務
			committed = txn.commit();
		}
	}
}
           

當Account類執行轉賬操作,調用STM工具類atomic方法,開啟事務,while循環執行事務,直到成功執行,執行事務中,調用函數式接口執行run方法,傳入txn事務,執行事務get,set方法.

事務1:CountA轉賬CountB100元,先執行寫操作不送出。等事務2:CountB轉賬CountC100元,送出事務之後,再送出事務1.

這個過程無法直覺展示,是以你隻能結合下面的代碼+想象喽。

public static void main(String[] args) {
		Account account1 = new Account(500);
		Account account2 = new Account(500);
		Account account3 = new Account(500);
		new Thread(() -> {
			account1.transfer(account2, 100);
		}).start();
		new Thread(() -> {
			account2.transfer(account3, 100);
		}).start();
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(account1.balance.curRef.value);
		System.out.println(account2.balance.curRef.value);
		System.out.println(account3.balance.curRef.value);

	}
}