思維導圖

文章已收錄到Github精選,歡迎Star: https://github.com/yehongzhi/learningSummary
前言
在很多時候,我們都可以在各種架構應用中看到ZooKeeper的身影,比如Kafka中間件,Dubbo架構,Hadoop等等。為什麼到處都看到ZooKeeper?
一、什麼是ZooKeeper
ZooKeeper是一個分布式服務協調架構,提供了分布式資料一緻性的解決方案,基于ZooKeeper的資料結構,Watcher,選舉機制等特點,可以實作資料的釋出/訂閱,軟負載均衡,命名服務,統一配置管理,分布式鎖,叢集管理等等。
二、為什麼使用ZooKeeper
ZooKeeper能保證:
- 更新請求順序進行。來自同一個client的更新請求按其發送順序依次執行
- 資料更新原子性。一次資料更新要麼成功,要麼失敗
- 全局唯一資料視圖。client無論連接配接到哪個server,資料視圖都是一緻的
- 實時性。在一定時間範圍内,client讀到的資料是最新的
三、資料結構
ZooKeeper的資料結構和Unix檔案系統很類似,總體上可以看做是一棵樹,每一個節點稱之為一個ZNode,每一個ZNode預設能存儲1M的資料。每一個ZNode可通過唯一的路徑辨別。如下圖所示:
建立ZNode時,可以指定以下四種類型,包括:
- PERSISTENT,持久性ZNode。建立後,即使用戶端與服務端斷開連接配接也不會删除,隻有用戶端主動删除才會消失。
- PERSISTENT_SEQUENTIAL,持久性順序編号ZNode。和持久性節點一樣不會因為斷開連接配接後而删除,并且ZNode的編号會自動增加。
- EPHEMERAL,臨時性ZNode。用戶端與服務端斷開連接配接,該ZNode會被删除。
- EPEMERAL_SEQUENTIAL,臨時性順序編号ZNode。和臨時性節點一樣,斷開連接配接會被删除,并且ZNode的編号會自動增加。
四、監聽通知機制
Watcher是基于觀察者模式實作的一種機制。如果我們需要實作當某個ZNode節點發生變化時收到通知,就可以使用Watcher監聽器。
用戶端通過設定監視點(watcher)向 ZooKeeper 注冊需要接收通知的 znode,在 znode 發生變化時 ZooKeeper 就會向用戶端發送消息。
這種通知機制是一次性的。一旦watcher被觸發,ZooKeeper就會從相應的存儲中删除。如果需要不斷監聽ZNode的變化,可以在收到通知後再設定新的watcher注冊到ZooKeeper。
監視點的類型有很多,如監控ZNode資料變化、監控ZNode子節點變化、監控ZNode 建立或删除。
五、選舉機制
ZooKeeper是一個高可用的應用架構,因為ZooKeeper是支援叢集的。ZooKeeper在叢集狀态下,配置檔案是不會指定Master和Slave,而是在ZooKeeper伺服器初始化時就在内部進行選舉,産生一台做為Leader,多台做為Follower,并且遵守半數可用原則。
由于遵守半數可用原則,是以5台伺服器和6台伺服器,實際上最大允許當機數量都是3台,是以為了節約成本,叢集的伺服器數量一般設定為奇數。
如果在運作時,如果長時間無法和Leader保持連接配接的話,則會再次進行選舉,産生新的Leader,以保證服務的可用。
六、初の體驗
首先在
官網下載下傳ZooKeeper,我這裡用的是3.3.6版本。
然後解壓,複制一下/conf目錄下的zoo_sample.cfg檔案,重命名為zoo.cfg。
修改zoo.cfg中dataDir的值,并建立對應的目錄:
最後到/bin目錄下啟動,我用的是window系統,是以啟動zkServer.cmd,輕按兩下即可:
啟動成功的話就可以看到這個對話框:
可視化界面的話,我推薦使用
ZooInspector
,操作比較簡便:
6.1 使用java連接配接ZooKeeper
首先引入Maven依賴:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
接着我們寫一個Main方法,進行操作:
//連接配接位址及端口号
private static final String SERVER_HOST = "127.0.0.1:2181";
//會話逾時時間
private static final int SESSION_TIME_OUT = 2000;
public static void main(String[] args) throws Exception {
//參數一:服務端位址及端口号
//參數二:逾時時間
//參數三:監聽器
ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//擷取事件的狀态
Event.KeeperState state = watchedEvent.getState();
//判斷是否是連接配接事件
if (Event.KeeperState.SyncConnected == state) {
Event.EventType type = watchedEvent.getType();
if (Event.EventType.None == type) {
System.out.println("zk用戶端已連接配接...");
}
}
}
});
zooKeeper.create("/java", "Hello World".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("新增ZNode成功");
zooKeeper.close();
}
建立一個持久性ZNode,路徑是/java,值為"Hello World":
七、API概述
7.1 建立
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
參數解釋:
- path ZNode路徑
- data ZNode存儲的資料
- acl ACL權限控制
- createMode ZNode類型
ACL權限控制,有三個是ZooKeeper定義的常用權限,在ZooDefs.Ids類中:
/**
* This is a completely open ACL.
* 完全開放的ACL,任何連接配接的用戶端都可以操作該屬性znode
*/
public final ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));
/**
* This ACL gives the creators authentication id's all permissions.
* 隻有建立者才有ACL權限
*/
public final ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));
/**
* This ACL gives the world the ability to read.
* 隻能讀取ACL
*/
public final ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));
createMode就是前面講過的四種ZNode類型:
public enum CreateMode {
/**
* 持久性ZNode
*/
PERSISTENT (0, false, false),
/**
* 持久性自動增加順序号ZNode
*/
PERSISTENT_SEQUENTIAL (2, false, true),
/**
* 臨時性ZNode
*/
EPHEMERAL (1, true, false),
/**
* 臨時性自動增加順序号ZNode
*/
EPHEMERAL_SEQUENTIAL (3, true, true);
}
7.2 查詢
//同步擷取節點資料
public byte[] getData(String path, boolean watch, Stat stat){
...
}
//異步擷取節點資料
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx){
...
}
同步getData()方法中的stat參數是用于接收傳回的節點描述資訊:
public byte[] getData(final String path, Watcher watcher, Stat stat){
//省略...
GetDataResponse response = new GetDataResponse();
//發送請求到ZooKeeper伺服器,擷取到response
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (stat != null) {
//把response的Stat指派到傳入的stat中
DataTree.copyStat(response.getStat(), stat);
}
}
使用同步getData()擷取資料:
//資料的描述資訊,包括版本号,ACL權限,子節點資訊等等
Stat stat = new Stat();
//傳回結果是byte[]資料,getData()方法底層會把描述資訊複制到stat對象中
byte[] bytes = zooKeeper.getData("/java", false, stat);
//列印結果
System.out.println("ZNode的資料data:" + new String(bytes));//Hello World
System.out.println("擷取到dataVersion版本号:" + stat.getVersion());//預設資料版本号是0
7.3 更新
public Stat setData(final String path, byte data[], int version){
...
}
值得注意的是第三個參數version,使用CAS機制,這是為了防止多個用戶端同時更新節點資料,是以需要在更新時傳入版本号,每次更新都會使版本号+1,如果服務端接收到版本号,對比發現不一緻的話,則會抛出異常。
是以,在更新前需要先查詢擷取到版本号,否則你不知道目前版本号是多少,就沒法更新:
//擷取節點描述資訊
Stat stat = new Stat();
zooKeeper.getData("/java", false, stat);
System.out.println("更新ZNode資料...");
//更新操作,傳入路徑,更新值,版本号三個參數,傳回結果是新的描述資訊
Stat setData = zooKeeper.setData("/java", "fly!!!".getBytes(), stat.getVersion());
System.out.println("更新後的版本号為:" + setData.getVersion());//更新後的版本号為:1
更新後,版本号增加了:
如果傳入的版本參數是"-1",就是告訴zookeeper伺服器,用戶端需要基于資料的最新版本進行更新操作。但是-1并不是一個合法的版本号,而是一個辨別符。
7.4 删除
public void delete(final String path, int version){
...
}
- path 删除節點的路徑
- version 版本号
這裡也需要傳入版本号,調用getData()方法即可擷取到版本号,很簡單:
Stat stat = new Stat();
zooKeeper.getData("/java", false, stat);
//删除ZNode
zooKeeper.delete("/java", stat.getVersion());
7.5 watcher機制
在上面第三點提到,ZooKeeper是可以使用通知監聽機制,當ZNode發生變化會收到通知消息,進行處理。基于watcher機制,ZooKeeper能玩出很多花樣。怎麼使用?
ZooKeeper的通知監聽機制,總的來說可以分為三個過程:
①用戶端注冊 Watcher
②伺服器處理 Watcher
③用戶端回調 Watcher用戶端。
注冊 watcher 有 4 種方法,new ZooKeeper()、getData()、exists()、getChildren()。下面示範一下使用exists()方法注冊watcher:
首先需要實作Watcher接口,建立一個監聽器:
public class MyWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
//擷取事件類型
Event.EventType eventType = event.getType();
//通知狀态
Event.KeeperState eventState = event.getState();
//節點路徑
String eventPath = event.getPath();
System.out.println("監聽到的事件類型:" + eventType.name());
System.out.println("監聽到的通知狀态:" + eventState.name());
System.out.println("監聽到的ZNode路徑:" + eventPath);
}
}
然後調用exists()方法,注冊監聽器:
zooKeeper.exists("/java", new MyWatcher());
//對ZNode進行更新資料的操作,觸發監聽器
zooKeeper.setData("/java", "fly".getBytes(), -1);
然後在控制台就可以看到列印的資訊:
這裡我們可以看到Event.EventType對象就是事件類型,我們可以對事件類型進行判斷,再配合Event.KeeperState通知狀态,做相關的業務處理,事件類型有哪些?
打開EventType、KeeperState的源碼檢視:
//事件類型是一個枚舉
public enum EventType {
None (-1),//無
NodeCreated (1),//Watcher監聽的資料節點被建立
NodeDeleted (2),//Watcher監聽的資料節點被删除
NodeDataChanged (3),//Watcher監聽的資料節點内容發生變更
NodeChildrenChanged (4);//Watcher監聽的資料節點的子節點清單發生變更
}
//通知狀态也是一個枚舉
public enum KeeperState {
Unknown (-1),//屬性過期
Disconnected (0),//用戶端與服務端斷開連接配接
NoSyncConnected (1),//屬性過期
SyncConnected (3),//用戶端與服務端正常連接配接
AuthFailed (4),//身份認證失敗
ConnectedReadOnly (5),//傳回這個狀态給用戶端,用戶端隻能處理讀請求
SaslAuthenticated(6),//伺服器采用SASL做校驗時
Expired (-112);//會話session失效
}
7.5.1 watcher的特性
- 一次性。一旦watcher被觸發,ZK都會從相應的存儲中移除。
zooKeeper.exists("/java", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("我是exists()方法的監聽器");
}
});
//對ZNode進行更新資料的操作,觸發監聽器
zooKeeper.setData("/java", "fly".getBytes(), -1);
//企圖第二次觸發監聽器
zooKeeper.setData("/java", "spring".getBytes(), -1);
- 串行執行。用戶端Watcher回調的過程是一個串行同步的過程,這是為了保證順序。
zooKeeper.exists("/java", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("我是exists()方法的監聽器");
}
});
Stat stat = new Stat();
zooKeeper.getData("/java", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("我是getData()方法的監聽器");
}
}, stat);
//對ZNode進行更新資料的操作,觸發監聽器
zooKeeper.setData("/java", "fly".getBytes(), stat.getVersion());
列印結果,說明先調用exists()方法的監聽器,再調用getData()方法的監聽器。因為exists()方法先注冊了。
- 輕量級。WatchedEvent是ZK整個Watcher通知機制的最小通知單元。WatchedEvent包含三部分:通知狀态,事件類型,節點路徑。Watcher通知僅僅告訴用戶端發生了什麼事情,而不會說明事件的具體内容。