首先我們先來看看賬戶類
class Account {
// 餘額
private TxnRef<Integer> balance;
// 構造方法
public Account(int balance) {
this.balance = new TxnRef<Integer>(balance);
}
// 轉賬操作,該操作我們要保證 1.txn這個事務是原子性 2.每個賬戶對象的balance是一緻性的(就是轉賬,兩者總數不變)
public void transfer(Account target, int amt) {
STM.atomic((txn) -> {
Integer from = balance.getValue(txn);
balance.setValue(from - amt, txn);
Integer to = target.balance.getValue(txn);
target.balance.setValue(to + amt, txn);
});
}
}
下面我們介紹上面代碼中餘額TxnRef這個引用對象。
/*
* 事務的引用。(不是事務,你可以想象成 資料庫中的一條資料引用)
* 該方法中存在模仿MVCC的并發版本控制的餘額版本對象VersionedRef
* 以及在目前事務中讀取資料和寫資料兩個方法,當然這兩個方法交給其他人Txn來做
*/
public class TxnRef<T> {
// 目前資料,帶版本的資料(目前事務中,儲存的最新的資料)
volatile VersionedRef curRef;
public TxnRef(T value) {
this.curRef = new VersionedRef(value, 0L);
}
// 擷取目前事務的資料
public T getValue(Txn txn) {
// 事務的讀操作交給Txn接口
return txn.get(this);
}
// 在目前事務中設定資料
public void setValue(T value, Txn txn) {
// 事務的寫操作交給Txn接口
txn.set(this, value);
}
}
根據上面TxnRef類先來展示下VersionedRef 真正的帶版本資料,然後我們在來看實作讀寫資料引用TxnRef的Txn類。
/*
* 該類是帶版本的資料對象
* 資料和版本是一一對應的,沒有修改資料,版本号不變,修改資料,版本号+1
*/
public final class VersionedRef<T> {
final T value;
final long version;
public VersionedRef(T value, long version) {
this.value = value;
this.version = version;
}
}
修改資料,讓版本号+1操作是交給實作了Txn接口的STMTxn類,讓我們來看看這個實作了讀寫資料引用TxnRef的Txn實作類。
該實作類的三個方法:
1. get方法,把要讀的對象都添加到inTxnMap 中。
2. set方法,把要修改的對象,先讀添加到inTxnMap 中,把修改後的對象儲存在writeMap中
3. commit方法,為了簡單實作,使用互斥鎖的方式,事務的送出變成串行了,首先先檢查inTxnMap 中資料是否發生變化,如果沒有,就直接将writeMap中的資料寫入(就是把賬戶對象的餘額修改),如果發生變化,break結束,當然我們不能因為餘額發生變化就不轉賬了,下面就會介紹到STM類,重新建立一個事務STMTxn 再送出,知道送出成功。
//接口
public interface Txn {
<T> T get(TxnRef<T> ref);
<T> void set(TxnRef<T> ref, T value);
}
/*實作類
* 事務中的值得讀寫是交給這個實作類的。
*/
public class STMTxn implements Txn {
// 事務id生成器(唯一)
private static AtomicLong txnSeq = new AtomicLong(1);
// 目前事務所有相關的資料
private Map<TxnRef, VersionedRef> inTxnMap = new HashMap();
// 目前事務所有需要修改的資料
private Map<TxnRef, Object> writeMap = new HashMap();
// 目前事務id
private long txnId;
// 自動生成目前事務id
public STMTxn() {
this.txnId = txnSeq.getAndIncrement();
}
@Override
// 擷取目前事務中的資料
public <T> T get(TxnRef<T> ref) {
// 将需要讀取的資料加入到inTxnMap中,同時保證一個事務中讀取的事務是同一個版本
if (!inTxnMap.containsKey(ref)) {
inTxnMap.put(ref, ref.curRef);
}
return (T) inTxnMap.get(ref).value;
}
@Override
public <T> void set(TxnRef<T> ref, T value) {
// 将需要修改的資料加入到inTxnMap中
if (!inTxnMap.containsKey(ref)) {
inTxnMap.put(ref, ref.curRef);
}
// 将要修改的Object放入writeMap
writeMap.put(ref, value);
}
// 送出事務
public boolean commit() {
synchronized (STM.commitLock) {
// 是否校驗通過
boolean isValid = true;
// 校驗所有讀過的資料是否發生過變化
for (Map.Entry entry : inTxnMap.entrySet()) {
VersionedRef curRef = ((TxnRef) entry.getKey()).curRef;
// 該事務讀資料的時候的值
VersionedRef readRef = (VersionedRef) entry.getValue();
// 通過版本号來驗證資料是否發生過變化
if (curRef.version != readRef.version) {
isValid = false;
break;
}
}
// 如果校驗通過,則所有更改生效
if (isValid) {
writeMap.forEach((k, v) -> {
//這裡把事務的原子類id,作為版本
k.curRef = new VersionedRef(v, txnId);
});
}
return isValid;
}
}
}
當然我們需要提供一個實作事務操作的入口。就比如上面Account類轉賬一系列操作如何變成事務?
請看代碼
//函數式接口
@FunctionalInterface
public interface TxnRunnable {
void run(Txn txn);
}
public final class STM {
// 私有化構造方法
private STM() {
}
// 送出資料需要用到的全局鎖
static final Object commitLock = new Object();
// 原子化送出方法,這裡引入函數式接口
public static void atomic(TxnRunnable action) {
boolean committed = false;
// 如果沒有送出成功,則一直重試
while (!committed) {
// 建立新的事務
STMTxn txn = new STMTxn();
// 執行業務邏輯
action.run(txn);
// 送出事務
committed = txn.commit();
}
}
}
當Account類執行轉賬操作,調用STM工具類atomic方法,開啟事務,while循環執行事務,直到成功執行,執行事務中,調用函數式接口執行run方法,傳入txn事務,執行事務get,set方法.
事務1:CountA轉賬CountB100元,先執行寫操作不送出。等事務2:CountB轉賬CountC100元,送出事務之後,再送出事務1.
這個過程無法直覺展示,是以你隻能結合下面的代碼+想象喽。
public static void main(String[] args) {
Account account1 = new Account(500);
Account account2 = new Account(500);
Account account3 = new Account(500);
new Thread(() -> {
account1.transfer(account2, 100);
}).start();
new Thread(() -> {
account2.transfer(account3, 100);
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(account1.balance.curRef.value);
System.out.println(account2.balance.curRef.value);
System.out.println(account3.balance.curRef.value);
}
}