天天看點

Zookeeper用戶端學習

package com.test.com.test.zookeeper;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;

/**
 * @author Jane       E-mail:[email protected]
 * @version Zookeeper用戶端示例及源碼分析       建立時間:2018/10/3
 */

/**
 * 
 * 本内容完全是關于zk的用戶端,其中例子的運作需要在本地啟動一個zkServer叢集。
 * 
 * zk 建議全部通過異步機制進行回調處理來提高性能,進而無需捕獲異常(隻需根據狀态碼進行相應處理)。
 * zk通過單線程(eventThread)處理所有的回調通知(異步回調,監聽回調),是以在回調處理過多的情況下,可能造成較高的延遲。
 *
 * Zookeeper類提供了一系列與zkServer互動的方法,并關聯了ClientCnxn用戶端,此用戶端打開一個與zkServer的會話,并關聯sendThread和eventThread用于對發送請求和回調事件進行處理,
 * ClientCnxnSocketNIO通過NIO的方式對socket進行處理。
 *
 * eventThread負責從其維護的阻塞隊列中擷取event,如果是DeathEvent則關閉此線程,否則根據事件類型進行解析,調用回調結構的不同重載(事件監聽回調,異步通知回調)。各種回調都被封裝成實作Event接口的對象
 * sendThread負責與zkServer的連接配接,并負責心跳和重連。然後調用doTransport()方法。異步的方式,用戶端将回調處理器包裝成一個Packet對象,并将該對象加入outgoing隊列中。
 * doTransport()方法通過NIO select擷取一次觸發的所有事件。
 * 如果channel可讀,則從讀buffer中讀取内容,然後調用sendThread的readResponse方法,然後根據擷取消息的響應頭執行不同處理
 * Xid為-2:心跳響應
 * Xid為-4:認證授權
 * Xid為-1:監聽事件通知,讀取事件的response,并将其送出到eventThread的隊列中(将回調相應Watcher接口的實作處理器)。
 * 為sasl:擷取token然後發送給服務端
 * 為回調通知:用戶端認為除以上所有情況外,都屬于回調的通知。從pending隊列中取出packet進行以下處理
 * 1. 如果取出的Xid與服務端響應的Xid不一緻,則認為發送連接配接丢失。
 * 2. 将服務端傳回的消息,根據協定填回packet字段中
 * 3. 如果沒有回調接口則notifyAll,如果有回調則将packet加入eventThread的阻塞隊列中。
 *
 * 如果channel可寫,則從outgoing隊列中取出一個Packet對象,将buffer内容寫入socket中,然後将此packet對象交與pending隊列中。
 *
 * <p>
 * zk用戶端提供了重連機制,如果心跳檢測網絡斷開,會一直嘗試重新連接配接(連接配接原伺服器,或者叢集中的其他伺服器),直到用戶端程序被殺死。
 * <p>
 * zk用戶端重新連接配接,但是服務端已經将此會話置為無效,則必須重新new Zookeeper句柄,與服務端開啟一個新的會話。
 * 優雅的關閉與服務端的會話,需要調用 Zookeeper.close();
 * <p>
 * 對于外部資源的通路,需要使用隔離符(czxid)使用者接受最新的會話的請求,如果Czxid落後則不允許通路。
 */
public class Master implements Watcher { // Watcher接口實作一個監聽,在用戶端啟動後回調
    private ZooKeeper zk;
    private String hostPort;

    Master(String hostPort) {
        this.hostPort = hostPort;
    }

    void startZK() throws IOException {
        zk = new ZooKeeper(hostPort, 15000, this);
    }


    @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println("收到監聽通知:" + watchedEvent);
    }

    /**
     * 同步調用建立znode
     * 必須要自己解決受檢異常
     * @param path
     * @param value
     */
    public void createZnode(String path, String value) {
        try {
            String str = this.zk.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println(str);
        } catch (KeeperException e) {
            System.out.println("check一下,看那個程序建立了子節點");
            System.out.println(String.valueOf(getData(path)));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 異步的方式建立Znode.
     * CreateMode.EPHEMERAL隻會建立臨時節點 用戶端會話關閉自動删除
     * CreateMode.PERSISTENT會建立持久化節點
     * 在回調進行中,根據異常傳回碼進行相應處理
     * @param path
     * @param value
     */
    public void createZnodeAsync(String path, String value) {
        this.zk.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
            @Override
            public void processResult(int i, String s, Object o, String s1) {
                System.out.println(i + " " + s + " " + o + " " + s1);
            }
        }, new Object());
    }

    /**
     * 擷取某個節點的資料,即建立時設定的值
     *
     * @param path
     *
     * @return
     */
    public String getData(String path) {
        byte[] result = null;
        try {
            result = zk.getData(path, false, new Stat());
        } catch (KeeperException e) {
            System.out.println("繼續check或者報警");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return String.valueOf(result);
    }

    /**
     * 查詢某個節點是否存在,如果存在則設定删除節點監聽。
     * 帶Watcher監聽器 和 異步回調處理器
     * @param path
     */
    public void exist(String path) {

        Stat s = new Stat();
        zk.exists(path, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("設定監視節點删除事件回調");
            }
        }, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int i, String s, Object o, Stat stat) {
                System.out.println("exist回調處理器");
                // 根據不同的傳回異常碼,做相應的處理,對于連接配接丢失事件需要重試。
                switch (KeeperException.Code.get(i)) {
                    case OK:
                        System.out.println("處理成功");
                        break;
                    case NOTEMPTY:
                        System.out.println("已經存在");
                        break;
                    case CONNECTIONLOSS:
                        System.out.println("連接配接丢失,可能發送時丢失,可能接收時丢失,建議重試");
                        break;
                }
            }
        }, s);
    }

    /**
     * 擷取某個路徑下的子節點,并設定監聽,如果子節點變化,則回調處理器
     *
     * @param path
     */
    public void getChild(String path) {
        zk.getChildren(path, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("getChild設定監聽,如果子節點變動,則回調此處理器");
            }
        }, new AsyncCallback.ChildrenCallback() {
            @Override
            public void processResult(int i, String s, Object o, List<String> list) {
                System.out.println("getChild傳回結果回調");
                switch (KeeperException.Code.get(i)) {
                    case OK:
                        System.out.println("處理成功");
                        for (String str : list) {
                            System.out.println(str);
                        }
                }
            }
        }, new Object());
    }

    public static void main(String args[]) throws Exception {
        Master m = new Master("127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184");
        // 建立句柄,并與zookeeper Server建立會話連接配接
        // 這個建立會話也是異步執行的
        m.startZK();
        //m.createZnode("/master", "hello world");
        //m.createZnodeAsync("/slave","hello world");

        // 如果存在則在回調中處理,并設定監聽,當伺服器删除節點時候收到回調通知。
        //m.exist("/slave");

        System.out.println(m.getData("/slave"));
        // 擷取某個路徑下所有子節點,并設定監聽,如果子節點更改,則回調監聽事件。
        m.getChild("/slave");
        Thread.sleep(60000);
        m.closeZK();
    }

    public void closeZK() {
        try {
            this.zk.close();
        } catch (InterruptedException e) {
            System.out.println("關閉zk失敗");
        }
    }
}
           

繼續閱讀