ZooKeeper :Java用戶端執行批量任務和Transaction API介紹
在上一篇部落格中,部落客介紹了
Watcher API
的使用:
- ZooKeeper :Java用戶端Watcher API介紹
這篇部落格,部落客給大家介紹如何執行批量任務和
Transaction API
的使用。為什麼需要執行批量任務的功能?原因也很簡單,因為可以有效的降低處理多任務時的耗時,主要是可以減少用戶端與服務端的連接配接、驗證以及資料傳輸等耗時操作的次數(當然批量任務的數量也不能太多,不然可能會因為網絡問題導緻丢包和重傳,得不償失)。或者基于某些業務,幾個操作必須被原子式地執行。
事務(
Transaction
)對于執行批量任務就顯得尤為重要了,因為可能會出現批量任務不能全部執行成功的情況,比如用戶端建立一個臨時順序節點用于擷取分布式鎖,一般會先建立這個臨時順序節點,再讓該節點監聽上一個臨時順序節點(如删除事件),當上一個臨時順序節點的用戶端處理完業務關閉連接配接或者當機導緻
Session
逾時後,該用戶端的節點會被服務端删除,這樣後一個臨時順序節點就會監聽到該事件,這樣它就可以去擷取分布式鎖了;如果建立臨時順序節點的操作執行成功,而監聽上一個臨時順序節點的操作執行失敗,可能會導緻分布式鎖失效(建立的臨時順序節點一直擷取不到鎖,因為它沒有監聽前一個同類節點,進而造成連鎖反應)。而事務可以将批量任務封裝成原子操作,是以執行該批量任務就像執行原子操作一樣,隻會出現兩種結果:成功與失敗。
但
ZooKeeper
的
Transaction API
并不是意義上的事務,它隻是執行批量任務的一個簡單封裝,并且還是基于特定種類的操作(下面會進行介紹)。
Transaction
類:
/**
* 提供用于進行多次更新操作的 builder style 的接口
* 實際上隻是Zookeeper.multi() 之上的一層
*/
@InterfaceAudience.Public
public class Transaction {
// ZooKeeper執行個體,通過它執行ops(批量任務)
private ZooKeeper zk;
// 批量任務
private List<Op> ops = new ArrayList<Op>();
protected Transaction(ZooKeeper zk) {
this.zk = zk;
}
// 隻能是更新操作,是以隻有create、delete、check、setData這幾個操作
public Transaction create(final String path, byte[] data, List<ACL> acl, CreateMode createMode) {
ops.add(Op.create(path, data, acl, createMode.toFlag()));
return this;
}
public Transaction delete(final String path, int version) {
ops.add(Op.delete(path, version));
return this;
}
public Transaction check(String path, int version) {
ops.add(Op.check(path, version));
return this;
}
public Transaction setData(final String path, byte[] data, int version) {
ops.add(Op.setData(path, data, version));
return this;
}
// 還是通過調用multi()方法來執行批量任務,Transaction類中并沒有實作事務的功能
public List<OpResult> commit() throws InterruptedException, KeeperException {
return zk.multi(ops);
}
// commit的異步版本
public void commit(MultiCallback cb, Object ctx) {
zk.multi(ops, cb, ctx);
}
}
而這些不同種類的操作在
Op
抽象類中進行了定義(删除了大部分代碼):
/**
* 表示多操作事務中的單個操作
* 每個操作都可以是建立、更新、删除、版本檢查或隻是讀取操作(如getChildren、getData)
* Op的子類分别代表每個詳細類型,但通常不應被引用,除非通過提供的工廠方法
*/
public abstract class Op {
// 操作種類,Transaction執行個體隻能添加OpKind.TRANSACTION類的操作
public enum OpKind {
TRANSACTION,
READ
}
private int type;
private String path;
private OpKind opKind;
// 預設是OpKind.TRANSACTION類操作
public static class Create extends Op {
protected byte[] data;
protected List<ACL> acl;
protected int flags;
}
// 預設是OpKind.TRANSACTION類操作
public static class CreateTTL extends Create {}
// 預設是OpKind.TRANSACTION類操作
public static class Delete extends Op {
private int version;
}
// 預設是OpKind.TRANSACTION類操作
public static class SetData extends Op {
private byte[] data;
private int version;
}
// 預設是OpKind.TRANSACTION類操作
public static class Check extends Op {
private int version;
}
// 預設是OpKind.READ類操作
public static class GetChildren extends Op {
GetChildren(String path) {
super(ZooDefs.OpCode.getChildren, path, OpKind.READ);
}
}
// 預設是OpKind.READ類操作
public static class GetData extends Op {
GetData(String path) {
super(ZooDefs.OpCode.getData, path, OpKind.READ);
}
}
}
Create
、
Delete
、
SetData
以及
Check
類都是通過如下圖所示的方式,在執行個體化時調用父類構造器将
OpKind.TRANSACTION
作為它們執行個體的
opKind
。
而
CreateTTL
類則是在執行個體化時調用了父類
Create
的構造器,是以
CreateTTL
類也是将
OpKind.TRANSACTION
作為它們執行個體的
opKind
。
是以,可以知道
Transaction
類并沒有實作事務的相應功能,隻是提供了一個僅用于
OpKind.TRANSACTION
類操作的批量任務處理接口。而實際執行這些任務的是
ZooKeeper
類中的
multi
方法。
/**
* 執行多個ZooKeeper操作
* 成功執行所有操作或不執行任何操作
*/
public List<OpResult> multi(Iterable<Op> ops) throws InterruptedException, KeeperException {
for (Op op : ops) {
op.validate();
}
return multiInternal(generateMultiTransaction(ops));
}
// multi的異步版本
public void multi(Iterable<Op> ops, MultiCallback cb, Object ctx) {
List<OpResult> results = validatePath(ops);
if (results.size() > 0) {
cb.processResult(KeeperException.Code.BADARGUMENTS.intValue(), null, ctx, results);
return;
}
multiInternal(generateMultiTransaction(ops), cb, ctx);
}
注意:
multi
方法隻能執行同一種
OpKind
類型的操作。
測試類:
package com.kaven.zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @Author: ITKaven
* @Date: 2021/11/20 10:30
* @Leetcode: https://leetcode-cn.com/u/kavenit
* @Notes:
*/
public class Application implements Watcher {
private static CountDownLatch latch;
private static final String SERVER_PROXY = "192.168.1.184:9000";
private static final int TIMEOUT = 40000;
private static long time;
private String watcherName;
protected Application(String watcherName) {
this.watcherName = watcherName;
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
Watcher connectWatcher = new Application("connectWatcher");
latch = new CountDownLatch(1);
time = System.currentTimeMillis();
ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, connectWatcher);
latch.await();
System.out.println(zk.getState());
System.out.println("Connection complete!");
String transactionMessage = "transaction success";
Transaction transaction = zk.transaction()
.create("/transaction", "transaction data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
.setData("/transaction", "new data".getBytes(), -1)
.check("/transaction", 100)
.create("/transaction2", "transaction2 data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
transaction.commit(
(rc, path, ctx, opResults) -> {
System.out.println(KeeperException.Code.get(rc).name());
opResults.forEach(Application::printOpResult);
if(rc == KeeperException.Code.OK.intValue()) {
System.out.println(ctx);
}
},
transactionMessage
);
String multiMessage = "multi success";
List<Op> opList = new ArrayList<>();
opList.add(Op.create("/multi", "multi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
opList.add(Op.check("/multi", 1));
opList.add(Op.setData("/multi", "new data".getBytes(), -1));
opList.add(Op.create("/multi/son", "multi son".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL));
opList.add(Op.check("/multi/son", 0));
opList.add(Op.delete("/multi/son", -1));
zk.multi(opList,
(rc, path, ctx, opResults) -> {
System.out.println(KeeperException.Code.get(rc).name());
opResults.forEach(Application::printOpResult);
if(rc == KeeperException.Code.OK.intValue()) {
System.out.println(ctx);
}
},
multiMessage
);
Thread.sleep(1000000);
}
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("-----------------WatchedEvent------------------");
System.out.println(this.watcherName);
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getState().name());
System.out.println(watchedEvent.getPath());
System.out.println("time use(ms):" + (System.currentTimeMillis() - time));
time = System.currentTimeMillis();
System.out.println("-----------------WatchedEvent------------------");
if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
latch.countDown();
}
}
public static void printOpResult(OpResult opResult) {
System.out.println("-----------------printOpResult------------------");
if(opResult instanceof OpResult.CreateResult) {
System.out.println("CreateResult");
System.out.println(((OpResult.CreateResult) opResult).getPath());
}
else if(opResult instanceof OpResult.CheckResult) {
System.out.println("CheckResult");
}
else if(opResult instanceof OpResult.GetDataResult) {
System.out.println("GetDataResult");
System.out.println(new String(((OpResult.GetDataResult) opResult).getData()));
}
else if(opResult instanceof OpResult.SetDataResult) {
System.out.println("SetDataResult");
System.out.println(((OpResult.SetDataResult) opResult).getStat().getDataLength());
}
else if(opResult instanceof OpResult.GetChildrenResult) {
System.out.println("GetChildrenResult");
((OpResult.GetChildrenResult) opResult).getChildren().forEach(System.out::println);
}
else if(opResult instanceof OpResult.DeleteResult) {
System.out.println("DeleteResult");
}
else if(opResult instanceof OpResult.ErrorResult) {
System.out.println("ErrorResult");
int errorCode = ((OpResult.ErrorResult) opResult).getErr();
System.out.println(KeeperException.Code.get(errorCode).name());
}
System.out.println("-----------------printOpResult------------------");
}
}
輸出結果:
-----------------WatchedEvent------------------
connectWatcher
None
SyncConnected
null
time use(ms):13703
-----------------WatchedEvent------------------
CONNECTED
Connection complete!
BADVERSION
-----------------printOpResult------------------
ErrorResult
OK
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
OK
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
BADVERSION
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
RUNTIMEINCONSISTENCY
-----------------printOpResult------------------
BADVERSION
-----------------printOpResult------------------
ErrorResult
OK
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
BADVERSION
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
RUNTIMEINCONSISTENCY
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
RUNTIMEINCONSISTENCY
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
RUNTIMEINCONSISTENCY
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
RUNTIMEINCONSISTENCY
-----------------printOpResult------------------
使用
commit
方法和
multi
方法來執行批量任務是類似的,畢竟
commit
方法就是調用
multi
方法來執行批量任務的:
public List<OpResult> commit() throws InterruptedException, KeeperException {
return zk.multi(ops);
}
// 異步版
public void commit(MultiCallback cb, Object ctx) {
zk.multi(ops, cb, ctx);
}
調用
multi
方法執行批量任務能保證批量任務的原子性嗎?
Java
用戶端還在運作(會話還沒結束):
而在其他用戶端上并沒有查詢到
/transaction
和
/multi
這兩個節點(但建立
/transaction
和
/multi
這兩個節點的操作的傳回碼都是
OK
,很顯然被復原了):
很明顯兩次批量任務執行失敗都是因為
check
操作執行不成功導緻的:
check("/transaction", 100)
opList.add(Op.check("/multi", 1));
如果删除這兩個操作,兩次批量任務都會執行成功:
-----------------WatchedEvent------------------
connectWatcher
None
SyncConnected
null
time use(ms):13755
-----------------WatchedEvent------------------
CONNECTED
Connection complete!
OK
-----------------printOpResult------------------
CreateResult
/transaction
-----------------printOpResult------------------
-----------------printOpResult------------------
SetDataResult
8
-----------------printOpResult------------------
-----------------printOpResult------------------
CreateResult
/transaction2
-----------------printOpResult------------------
transaction success
OK
-----------------printOpResult------------------
CreateResult
/multi
-----------------printOpResult------------------
-----------------printOpResult------------------
SetDataResult
8
-----------------printOpResult------------------
-----------------printOpResult------------------
CreateResult
/multi/son
-----------------printOpResult------------------
-----------------printOpResult------------------
CheckResult
-----------------printOpResult------------------
-----------------printOpResult------------------
DeleteResult
-----------------printOpResult------------------
multi success
如下所示的輸出,是批量任務中的某個操作執行失敗導緻的:
// 前面的操作執行成功了(後面的操作執行失敗後復原,才能保證事務)
-----------------printOpResult------------------
ErrorResult
OK
-----------------printOpResult------------------
// 執行該操作失敗了,版本不比對(BADVERSION),就是在檢查版本時,輸入的版本與節點的版本不比對
-----------------printOpResult------------------
ErrorResult
BADVERSION
-----------------printOpResult------------------
// 後面的操作不需要再執行,執行錯誤的原因都是:RUNTIMEINCONSISTENCY
-----------------printOpResult------------------
ErrorResult
RUNTIMEINCONSISTENCY
-----------------printOpResult------------------
為什麼說
multi
方法隻能執行同一種
OpKind
類型的操作。如果
multi
方法執行如下操作(同時存在
OpKind.TRANSACTION
類和
OpKind.READ
類操作):
List<Op> opList = new ArrayList<>();
opList.add(Op.create("/multi", "multi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
opList.add(Op.setData("/multi", "new data".getBytes(), -1));
opList.add(Op.getData("/multi"));
opList.add(Op.create("/multi/son", "multi son".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL));
opList.add(Op.check("/multi/son", 0));
opList.add(Op.delete("/multi/son", -1));
會報錯(
multi
請求中不允許混合讀寫操作):
private void setOrCheckOpKind(Op.OpKind ok) throws IllegalArgumentException {
if (opKind == null) {
opKind = ok;
} else if (ok != opKind) {
throw new IllegalArgumentException("Mixing read and write operations (transactions)"
+ " is not allowed in a multi request.");
}
}