文章目錄
- Curator用戶端
-
- 建立會話
- 建立節點
- 擷取節點和資料
- 更新資料
- 删除節點
- 事務
- 節點存在
- 事件監聽
- 其他工具類
- 開發測試
Curator用戶端
-
包含了幾個包:Curator
-
:對zookeeper的底層api的一些封裝curator-framework
-
:提供一些用戶端的操作,例如重試政策等curator-client
-
:封裝了一些進階特性,如:Cache事件監聽、選舉、分布式鎖、分布式計數器、分布式Barrier等curator-recipes
-
- 版本
- The are currently two released versions of Curator, 2.x.x and 3.x.x:
- Curator 2.x.x - compatible with both ZooKeeper 3.4.x and ZooKeeper 3.5.x
- Curator 3.x.x - compatible only with ZooKeeper 3.5.x and includes support for new features such as dynamic reconfiguration, etc.
-
主要解決了以下問題Curator
- 封裝
與ZooKeeper client
之間的連接配接處理ZooKeeper server
- 提供了一套
風格的操作APIFluent
- 提供
各種應用場景(recipe, 比如共享鎖服務, 叢集上司選舉機制)的抽象封裝ZooKeeper
- 封裝
- 這裡使用的版本
- 服務端
版本:Zookeeper
3.4.6
-
版本:Curator
compile "org.apache.curator:curator-recipes:2.11.0"
- 服務端
建立會話
- 建立會話
@Test public void testCreateClient() throws Exception{ String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; /** * 失去連接配接重試政策 * baseSleepTimeMs 初始sleep時間 * maxRetries 最大重試次數 * maxSleepMs 最大sleep時間 * 目前sleep時間=baseSleepTimeMs*Math.mac(1,random.next(1)) << (retryCount+1) */ int baseSleepTimeMs; int maxRetries; int maxSleepMs; RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //第一種方式,使用Builder fluent風格的API方式建立 CuratorFramework curatorFramework1 = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .build(); CuratorFrameworkState state1 = curatorFramework1.getState(); //zk-curator - LATENT logger.info(state1.toString()); //當連接配接狀态變化時觸發 curatorFramework1.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { //連接配接狀态事件:true logger.info("連接配接狀态事件:{}",connectionState.isConnected()); } }); //完成會話的建立 curatorFramework1.start(); //第二種方式,使用靜态工廠方式建立,此方式本質上還是通過第一種方式建立 CuratorFramework curatorFramework2 = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); CuratorFrameworkState state2 = curatorFramework2.getState(); //zk-curator - LATENT logger.info(state2.toString()); /** * 臨時用戶端: * 一定時間不活動後連接配接會被關閉,使用buildTemp()建立 */ CuratorTempFramework curatorFramewor3 = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .buildTemp(2000,TimeUnit.SECONDS); //.buildTemp(); TimeUnit.SECONDS.sleep(20); }
建立節點
- 建立節點
@Test public void testCreateZnode() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "jannal-create"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) //用戶端對zk上的資料節點的操作都是基本此根節點進行的 .build(); //完成會話建立 curatorFramework.start(); //建立一個初始内容為空的znode,預設建立的是持久節點 curatorFramework.create().forPath("/userEmpty"); //建立一個附帶内容的znode curatorFramework.create().forPath("/userData", "我是jannal".getBytes(Charsets.UTF_8)); //建立一個臨時節點 curatorFramework.create(). withMode(CreateMode.EPHEMERAL). forPath("/userTemp", "jannal".getBytes(Charsets.UTF_8)); //自動遞歸建立父節點creatingParentsIfNeeded() curatorFramework.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/user/password", "123456".getBytes(Charsets.UTF_8)); curatorFramework.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { } }); ExecutorService executorService = Executors.newFixedThreadPool(2); CountDownLatch countDownLatch = new CountDownLatch(1); /** * 在原生的zookeeper用戶端中,所有異步通知事件處理都是通過EventThread這個 * 線程處理的(串行處理所有事件的通知)。一旦發生複雜的處理單元就會消耗很長事件 * 進而影響其他事件的處理,Curator可以使用線程池來處理,如果不指定線程池 * 則預設使用EventThread來處理 */ curatorFramework.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { logger.info(event.toString()); countDownLatch.countDown(); } },executorService) .forPath("/user2","jannal2".getBytes(Charsets.UTF_8)); countDownLatch.await(); executorService.shutdown(); }
擷取節點和資料
- 擷取節點和資料
@Test public void testGetZnode() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "jannal-get"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) .build(); client.start(); client.create() .creatingParentsIfNeeded() .forPath("/user/username/jannal", "jannal123".getBytes(Charsets.UTF_8)); //擷取資料 byte[] bytes = client.getData().forPath("/user/username/jannal"); Assert.assertEquals("jannal123", new String(bytes, Charsets.UTF_8)); //讀取一個節點的資料内容,同時擷取到該節點的stat Stat stat = new Stat(); bytes = client.getData().storingStatIn(stat).forPath("/user/username/jannal"); logger.info(ToStringBuilder.reflectionToString(stat)); client.create().creatingParentsIfNeeded().forPath("/user/username1", "abc".getBytes(Charsets.UTF_8)); client.create().creatingParentsIfNeeded().forPath("/user/username2", "def".getBytes(Charsets.UTF_8)); //擷取節點的子節點路徑 List<String> strings = client.getChildren().forPath("/user"); //[username2, username1, username] logger.info(strings.toString()); }
更新資料
- 更新資料
@Test public void testSetZnode() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "jannal-update"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) //用戶端對zk上的資料節點的操作都是基本此根節點進行的 .build(); curatorFramework.start(); String path = "/user/password"; curatorFramework.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/user/password", "123456".getBytes(Charsets.UTF_8)); byte[] bytes = curatorFramework.getData().forPath(path); Assert.assertEquals("123456", new String(bytes, Charsets.UTF_8)); Stat stat = curatorFramework.setData().forPath(path, "abcd".getBytes(Charsets.UTF_8)); bytes = curatorFramework.getData().forPath(path); Assert.assertEquals("abcd", new String(bytes, Charsets.UTF_8)); //更新一個節點的資料内容,強制指定版本進行更新 curatorFramework.setData().withVersion(stat.getVersion()).forPath(path, "abcde".getBytes()); }
删除節點
- 在删除一個節點時,由于網絡原因,導緻删除操作失敗。這個異常在有些場景下是緻命的,比如
選舉(先建立後删除),針對這個問題Master
引入了一種重試機制,即如果我們調用了Curator
,那麼隻要用戶端有效,就會在背景反複重試,直到節點删除成功。guaranteed()
- 删除節點示例
@Test public void testDeleteZnode() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "jannal-delete"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) .build(); client.start(); client.create() .creatingParentsIfNeeded() .forPath("/user/username/jannal", "jannal123".getBytes(Charsets.UTF_8)); client.create() .creatingParentsIfNeeded() .forPath("/user2/username/jannal", "jannal123".getBytes(Charsets.UTF_8)); client.create() .creatingParentsIfNeeded() .forPath("/user3/username/jannal", "jannal123".getBytes(Charsets.UTF_8)); //删除節點且遞歸删除其所有的子節點 client.delete().deletingChildrenIfNeeded().forPath("/user"); //強制指定版本進行删除 client.delete().deletingChildrenIfNeeded().withVersion(0).forPath("/user2"); //強制保證删除,隻要用戶端有效,Curator會在背景持續進行删除操作,直到節點删除成功 client.delete().guaranteed().forPath("/user3/username/jannal"); }
事務
- 事務
/** * inTransaction()開啟事務,可以複合create, setData, check, and/or delete * 等操作然後調用commit()作為一個原子操作送出 * * @throws Exception */ @Test public void testTransaction() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "jannal-transaction"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) .build(); client.start(); client.inTransaction() .create().withMode(CreateMode.EPHEMERAL).forPath("/transaction", "事務".getBytes(Charsets.UTF_8)) .and() .delete().forPath("/transaction") .and() .commit(); }
節點存在
- 判斷節點是否存在
@Test public void testExist() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "jannal-Exist"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) .build(); client.start(); //節點不存在傳回null Stat stat = client.checkExists().forPath("/jannal"); Assert.assertEquals(null, stat); logger.info(ToStringBuilder.reflectionToString(stat)); }
事件監聽
- 原生用戶端支援通過注冊Watch來進行事件監聽,但是收到事件通知後每次都需要手動再次注冊Watch。
引入了Cache來實作對Curator
服務端事件的監聽,并且自動處理反複注冊監聽,簡化了原生API的繁瑣。Zookeeper
-
分為兩類監聽Cache
- 節點監聽
- 子節點監聽
-
用于監聽指定NodeCache
資料節點本身的變化,資料内容發生變化時,就會回調此方法。Zookeeper
@Test public void testNodeCacheEventListener() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "jannal-listener"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) .build(); client.start(); String path = "/user/username/jannal"; client.create() .creatingParentsIfNeeded() .forPath(path, "jannal123".getBytes(Charsets.UTF_8)); //是否進行資料壓縮 boolean dataIsCompressed = false; final NodeCache cache = new NodeCache(client, path, dataIsCompressed); cache.start(true); CountDownLatch countDownLatch = new CountDownLatch(2); cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData currentData = cache.getCurrentData(); if (currentData != null) { logger.info("資料發生變更,新資料:{}", new String(currentData.getData(), StandardCharsets.UTF_8)); }else{ logger.info("資料發生變更,新資料:{},節點可能被删除", currentData, StandardCharsets.UTF_8); } countDownLatch.countDown(); } }); client.setData().forPath(path, "jannal".getBytes(Charsets.UTF_8)); client.delete().deletingChildrenIfNeeded().forPath(path); countDownLatch.await(); }
-
用于監聽指定PathChildrenCache
資料節點的子節點變化情況。當指定節點的子節點發生變化時,就會回調Listener。Zookeeper
無法對二級子節點觸發變更事件。比如對Curator
子節點進行監聽,當/zk-jannal
節點被建立或者删除的時候,無法觸發子節點變更事件/zk-jannal/user/password
@Test public void testPathChildrenCacheEventListener() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "jannal-parent-listener"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) .build(); client.start(); String path = "/user"; client.create() .creatingParentsIfNeeded() .forPath(path, "jannal123".getBytes(Charsets.UTF_8)); //是否進行資料壓縮 boolean dataIsCompressed = false; /** * 是否把節點内容緩存起來,如果配置為true,則client在接收 * 到節點清單變更的同時,也能夠擷取到節點的資料内容,如果為false * 則無法擷取節點的資料内容 */ boolean cacheData = true; ExecutorService executorService = Executors.newFixedThreadPool(2); CountDownLatch countDownLatch = new CountDownLatch(3); final PathChildrenCache cache = new PathChildrenCache(client, path, cacheData, dataIsCompressed, executorService); /** * BUILD_INITIAL_CACHE 同步初始化用戶端的cache,及建立cache後,就從伺服器端拉入對應的資料(這個是NodeCache使用的方式) * NORMAL 異步初始化cache * POST_INITIALIZED_EVENT 異步初始化,初始化完成觸發事件PathChildrenCacheEvent.Type.INITIALIZED */ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { ChildData currentData = pathChildrenCacheEvent.getData(); //事件類型,可通過判斷事件類型來做相應的處理 PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType(); switch (type) { //初始化會觸發這個事件 case INITIALIZED: logger.info("子類緩存系統初始化完成"); break; case CHILD_ADDED: //新增子節點/user/username logger.info("新增子節點{}", currentData.getPath()); break; case CHILD_UPDATED: // 子節點資料變更[106, 97, 110, 110, 97, 108] logger.info("子節點資料變更{}", new String(currentData.getData(),Charsets.UTF_8)); break; case CHILD_REMOVED: //子節點資料删除/user/username logger.info("子節點資料删除{}", currentData.getPath()); break; } countDownLatch.countDown(); } }); client.create() .creatingParentsIfNeeded() .forPath(path+"/username", "jannal123".getBytes(Charsets.UTF_8)); TimeUnit.SECONDS.sleep(1); client.setData().forPath(path+"/username", "jannal".getBytes(Charsets.UTF_8)); TimeUnit.SECONDS.sleep(1); client.delete().deletingChildrenIfNeeded().forPath(path); countDownLatch.await(); }
-
可以監聽整個樹上的所有節點。即可以監控節點的狀态,還監控節點的子節點的狀态TreeCache
@Test public void testTreeCache() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "jannal-tree-listener"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) .build(); client.start(); String path = "/user"; client.create() .creatingParentsIfNeeded() .forPath(path, "jannal123".getBytes(Charsets.UTF_8)); CountDownLatch countDownLatch = new CountDownLatch(1); TreeCache cache = new TreeCache(client, path); cache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception { TreeCacheEvent.Type type = treeCacheEvent.getType(); ChildData childData = treeCacheEvent.getData(); switch (type) { //子類緩存系統初始化完成 case INITIALIZED: logger.info("子類緩存系統初始化完成"); break; case NODE_ADDED: //新增子節點/user/username logger.info("新增子節點{}", childData != null ? childData.getPath() : null); break; case NODE_UPDATED: // 子節點資料變更jannal // 子節點資料變更jannal2 logger.info("子節點資料變更{}", childData != null ? new String(childData.getData(), Charsets.UTF_8) : null); break; case NODE_REMOVED: //子節點資料删除/user logger.info("子節點資料删除{}", childData != null ? childData.getPath() : null); break; } countDownLatch.countDown(); } }); cache.start(); client.setData().forPath(path, "jannal".getBytes()); Thread.sleep(1000); client.setData().forPath(path, "jannal2".getBytes()); Thread.sleep(1000); client.delete().deletingChildrenIfNeeded().forPath(path); Thread.sleep(1000 * 2); countDownLatch.await(); }
其他工具類
-
提供了一些工具類。比如Curator
和ZKPaths
EnsurePath
-
提供更簡單的API來建構ZKPaths
路徑、遞歸建立和删除節點。Znode
-
提供了一種確定資料節點存在的機制。在分布式環境下,A和B機器都同時建立節點,由于并發操作的存在,可能抛出EnsurePath
。節點已經存在的異常
内部實作就是試圖建立指定節點,如果節點存在,那麼就不進行任何操作,也不抛出異常,否則正常建立節點。從Curator 2.9.0版本開始此類已經過時,首選EnsurePath
orCuratorFramework.create().creatingParentContainersIfNeeded()
CuratorFramework.exists().creatingParentContainersIfNeeded()
- 代碼示例
@Test public void testOther() throws Exception { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "zk-other"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) .build(); client.start(); String path = ZKPaths.fixForNamespace(rootPath, "/user"); // 輸出:/zk-other/user logger.info(path); ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode("/zk-other/user/username"); // path:/zk-other/user,node:username logger.info("path:{},node:{}", pathAndNode.getPath(), pathAndNode.getNode()); CuratorZookeeperClient zookeeperClient = client.getZookeeperClient(); //建立節點 ZKPaths.mkdirs(zookeeperClient.getZooKeeper(), "/zk-other/user/password"); //擷取子節點 List<String> sortedChildren = ZKPaths.getSortedChildren(zookeeperClient.getZooKeeper(), "/zk-other"); logger.info(sortedChildren.toString()); }
開發測試
- 為了便于開發人員進行zk的開發與測試,可以使用
子產品。依賴curator-test
compile "org.apache.curator:curator-test:2.11.0"
- 單元測試
@Test public void testSingleServer() throws Exception { File dataDir = new File("."); TestingServer server = new TestingServer(2000, dataDir); server.start(); CuratorFramework curatorFramework = CuratorFrameworkFactory. builder(). connectString(server.getConnectString()). sessionTimeoutMs(1000). retryPolicy(new RetryNTimes(3, 1000)). build(); curatorFramework.start(); System.out.println(curatorFramework.getChildren().forPath("/")); curatorFramework.close(); server.stop(); }
- 叢集測試
@Test public void testClusterServer() throws Exception { TestingCluster server = new TestingCluster(3); server.start(); Thread.sleep(2000); TestingZooKeeperServer leader = null; for (TestingZooKeeperServer zs : server.getServers()) { logger.info(zs.getInstanceSpec().getServerId() + "-"); logger.info(zs.getQuorumPeer().getServerState() + "-"); logger.info(zs.getInstanceSpec().getDataDirectory().getAbsolutePath()); if (zs.getQuorumPeer().getServerState().equals("leading")) { leader = zs; } } //模拟leader挂掉 leader.kill(); logger.info("leader 挂掉之後"); for (TestingZooKeeperServer zs : server.getServers()) { logger.info(zs.getInstanceSpec().getServerId() + "-"); logger.info(zs.getQuorumPeer().getServerState() + "-"); logger.info(zs.getInstanceSpec().getDataDirectory().getAbsolutePath()); } server.stop(); }