天天看點

學習筆記之ZooKeeper概述zookeeper概述

zookeeper概述

(個人學習筆記,如有錯誤歡迎指正!!!)

經典的CAP/BASE理論

CAP

  • 一緻性

    Consistency

    : 對于任何從用戶端發送到分布式系統的讀資料請求,要麼傳回正确的資料,要麼傳回錯誤。這一特性保證的是資料的正确性。
  • 可用性

    Availability

    :對于任何從用戶端發送到分布式系統的讀請求,用戶端都一定會收到資料,但不保證資料的一定是罪行的資料。這一特性保證系統不會傳回錯誤。
  • 分區容錯

    Partition-tolerance

    :分布式系統再各個節點進行資料同步的時候可能存在延遲,但是系統必須保持運作狀态。這一特性要求系統一緻處于工作狀态。

CAP 理論是不能同時實作這三個特性的。在實際應用過程中,一定會出網絡延遲導緻資料丢失等問題,是以分區容錯的特性是一定要實作的。

BASE

  • 基本可用

    basically available

    :分布式系統出現故障,允許失去部分的可用性,但是系統還是可以運作的。例如:将資料存放到不同的執行個體上,即使部分資料損壞,還能保證其他部分的資料時可用的。
  • 軟狀态

    soft-state

    :允許保持中間狀态。在基于 C/S模式系統中,server端是否有狀态,決定了系統是否具有良好的水準擴充、負載均衡、故障恢複等特點。Server端承諾會維護client端狀态資料,這個狀态僅僅位置一小段時間,這段時間以後,server端會對丢棄這個狀态,恢複正常狀态。
  • 最終一緻性

    eventually consistent

    :資料最終一緻性

ZooKeeper介紹

雅虎建立,基于 Google chubby

是一個開源的分布式協調架構

并不是用來存儲資料的,通過監控資料狀态的變化,達到基于資料的叢集管理

zookeeper序列化使用的是Jute

使用場景

訂閱釋出:watcher機制,統一配置管理(disconf)

分布式鎖:redis,zookeeper,資料庫

負載均衡,ID生成器,分布式隊列,統一命名服務,master選舉

原則

單一視圖

無論用戶端連接配接到哪一個伺服器,所看到的模型都是一樣的

原子性

所有的事務請求的處理結果在整個叢集中的所有機器上的應用情況是一緻的。(所有機器上的分布式事務要麼同時成功,要麼同時失敗)

可靠性

一旦伺服器成功應用了某一個事務的資料,并且對用戶端做了成功的 響應,那麼這個資料在整個叢集中一定是同步并且保留下來

實時性

一旦一個事務被成功應用,用戶端就能夠立即從伺服器端讀取到事務變更後的最新資料;(zookeeper僅僅保證在一定時間内實時,近實時)

安裝(linux系統)

單機環境安裝

  • 下載下傳zookeeper安裝包:http://apache.fayea.com/zookeeper/stable/
  • 解壓安裝包:

    tar -zxvf zookeeper-3.4.12.tar.gz

  • cd 到

    zookeeper/conf

    ,(複制一份zoo.cfg):

    cp zoo_sample.cfg zoo.cfg

    ,zookeeper加載的配置檔案為

    zoo.cfg

  • sh zkServer.sh xxx

    :

    start|start-foreground|stop|restart|status|upgrade|print-cmd

    執行伺服器指令
  • sh zkCli.sh -server ip:port

    :與zookeeper伺服器建立連接配接,如果添加

    -server ip:port

    内容,預設連接配接到本機的zookeeper節點

叢集環境

zookeeper叢集,包括三種角色:leader/follower/observer

observer 是一種特殊的zookeeper節點,可以幫助解決zookeeper的擴充性(如果大量用戶端通路zookeeper叢集,需要增加叢集機器數量,進而增加zookeeper伺服器性能,導緻zookeeper寫性能下降,因為,zookeeper的資料變更需要半數以上伺服器投票通過,造成網絡消耗增加投票成本)

  • observer不參與投票,隻接受投票結果。
  • 不屬于zookeeper的關鍵部位。

配置observer:在zoo.cfg 裡面增加

​ peerType=observer

​ server.id=host:port:port:observer

安裝:

  • 修改配置檔案

    zoo.cfg

    (配置所有機器)

    server.id=host:port:port

    id的範圍為1~255,用id表示該機器在叢集中的機器序号

    第一個port是follow而與leader交換資訊的端口号,第二個port是leader節點挂掉了,需要通過這個端口号進行leader選舉。

    2181是zookeeper與用戶端交換資訊的端口号 ,是以前面兩個端口号不能使用2181

  • 建立myid(dataDir定義在

    zoo.cfg

    檔案中)

    在每一個伺服器的dataDir目錄下建立一個myid檔案,資料内容是每台伺服器對應的Server.id中id的值,該值配置在

    zoo.cfg

    檔案中
  • 啟動zookeeper,通過調用指令

    sh zkServer.sh start

    啟動伺服器(zookeeper叢集隻有當兩台及以上伺服器啟動後才會工作,這時候使用指令

    sh zkServer.sh status

    可以檢視目前伺服器在zookeeper叢集中的角色)

zoo.cfg配置檔案分析

  • tickTime=2000

    :zookeeper中最小的事件機關,預設是2000毫秒。
  • initLimit=10

    :follower啟動之後,與leader伺服器完成資料同步的時間。
  • syncLimit=5

    :leader節點與follower節點進行心跳檢測的最大延遲時間。
  • dataDir=/tmp/zookeeper

    :表示zookeeper伺服器存儲快照檔案的目錄。
  • dataLogDir

    :表示配置zookeeper事務日志的存儲路徑,預設指定在dataDir路徑下。
  • clientPort=2181

    :表示用戶端與伺服器端建立連接配接的端口号:2181。

相關概念

資料模型

zookeeper的資料模型和檔案系統類似,每一個節點稱為 znode,是zookeeper中最小的資料單元。每個znode上都可以儲存資料和挂載子節點

​ 持久化節點(

CreateMode.PERSISTENT

) :節點建立後會一直存在zookeeper伺服器上,直到主動删除

​ 持久化有序節點(

CreateMode.PERSISTENT_SEQUENTIAL

):節點會為有序子節點維護一個順序

​ 臨時節點(

CreateMode.EPHEMERAL

):臨時節點的生命周期和用戶端的會話保持一緻,當用戶端的會話失效,該節點自動清理 (臨時節點不能擁有子節點)

​ 臨時有序節點(

CreateMode.EPHEMERAL_SEQUENTIAL

):在臨時節點上多一個順序性特性(臨時節點不能擁有子節點)

資料存儲

zookeeper有三種日志:

zookeeper.out 運作日志

快照,存儲某一時刻的全量資料

事務日志,事務操作的日志記錄

watcher

zookeeper提供了分布式資料釋出/訂閱,zookeeper允許用戶端向伺服器端注冊一個watcher監聽。當伺服器端的節點觸發指定事件的時候會觸發watcher。服務端會向用戶端發送一個事件通知

watcher的通知是一次性的,一旦觸發一次通知後,該watcher就失效

注意在注冊watcher之後,如果還需要繼續監聽,應該重新注冊watcher,因為watcher是一次性有效,zookeeper中可以用來注冊的方法有

getData()

,

exists()

,

setData()

,

getChildren()

方法,

操作與産生的事件類型的對應關系
"/parent"的事件 "/parent/child"的事件
create("/parent") NodeCreated
delete("/parent") NodeDeleted
setData("/parent") NodeDataChanged
create("/parent/child") NodeChildrenChanged NodeCreated
delete("/parent/child") NodeChildrenChanged NodeDeleted
setData("/parent/child") —— NodeDataChanged
事件類型與設定watcher的方法的對應關系

(watcher并不是監視所有類型的事件,其中default watcher是指建立ZooKeeper時注冊的watcher)!

Default Watcher exists("/path") getData("/path") getChildren("/path")
None Y Y Y Y
NodeCreated Y Y
NodeDeleted Y Y
NodeDataChanged Y Y
NodeChildrenChanged Y

ACL

zookeeper提供了控制節點通路權限的功能,用于有效的保證zookeeper中的資料安全性,避免誤操作而導緻系統出現重大事故

CREATE/READ/WRITE/DELETE/ADMIN

  • create:表示建立權限
  • read:表示讀權限
  • write:表示寫權限
  • delete:表示删除權限
  • admin:表示管理者權限

權限控制模式

schema:授權對象

ip:192.168.1.1

Digest:username:password

world:開放式的權限控制模式,資料節點的通路權限對所有使用者開放。world:anyone

super:超級使用者,可以對zookeeper上的資料節點進行操作

Server指令操作

  • create [-s] [-e] path data acl

    :-s 表示節點是否有序,-e表示是否為臨時節點,acl表示權限
  • get path [watch]

    :獲得指定path資料(資訊)
  • set path data [version]

    :修改節點對應的data

    資料庫中有一個version字段去控制資料行的版本号,在執行該指令時,如果使用該字段,則version必須與目前節點的

    dataversion

    相同,如果不同,則設定資料失敗。-1表示預設不使用version字段
  • delete path [version]

    :删除指定path的節點,注意必須按照建立順序的相反順序删除,即先删除子節點,再删除父節點,其中version字段與

    set

    指令的作用相同

節點state資訊(get指令)

  • cversion = 0

    :子節點的版本号
  • dataVersion = 0

    :表示目前節點資料版本号
  • aclVersion = 0

    :表示acl的版本号,修改節點權限
  • cZxid = 0x10000000a

    :節點被建立時的事務ID
  • mZxid = 0x10000000d

    :節點最後一次被更新的事務ID
  • pZxid = 0x10000000a

    :目前節點下的子節點最後一次被修改時的事務ID
  • ctime = Fri Dec 21 23:57:12 PST 2018

    :建立時間
  • mtime = Sat Dec 22 00:00:38 PST 2018

    :修改時間
  • ephemeralOwner = 0x0

    :建立臨時節點的時候,會有一個sessionId,該值就是存儲的sessionId
  • dataLength = 3

    :資料長度
  • numChildren = 0

    :子節點數

zookeeper叢集中的角色

leader

​ leader是zookeeper叢集的核心

​ 1.事務請求的唯一排程和處理者,保證叢集事務處理的順序性

​ 2.叢集内部各個伺服器的排程者

follower

​ 1.處理用戶端非事務請求,以及轉發事務請求給leader伺服器

​ 2.參與事務請求提議(proposal)的投票(用戶端的一個事務請求,需要半數伺服器投票通過以後才能通知leader commit;leader會發起一個提案,要follower投票)

​ 3.參與leader選舉的投票

observer

​ 1.觀察zookeeper叢集中最新狀态的變化并将這些狀态同步到observer伺服器上

​ 2.增加observer不影響叢集的事務處理能力,同時還能增加叢集的非事務處理能力

ZAB協定

zab協定為分布式協調服務zookeeper專門設計的一種支援崩潰回複和原子廣播的協定(Zookeeper atomic broadcast)

消息廣播

zookeeper使用主備模式的系統架構來保持資料在個副本之間的一緻性。

當用戶端向zookeeper叢集寫資料時,資料均是寫入到leader伺服器,然後由leader将資料複制到follower服務中,其中,在複制的過程中,隻需要follower伺服器中由半數以上的傳回ack資訊,zab協定就可以送出。

步驟:

  • leader接受用戶端的事務請求
  • leader将資料複制到follower
  • follower傳回ack
  • 如果數量超過一半,leader則執行commit(确認修改),同時送出自己

注意:當follower接收到用戶端的寫請求時,會将該請求轉發到leader伺服器

崩潰回複

當leader崩潰後,zookeeper叢集會選擇所有伺服器中擁有最大事務id(zxid)的伺服器作為新的leader,這樣可以確定所有已經送出的資料能夠同步到叢集中所有的伺服器中。

java API的使用

原生API

首先添加 zookeeper 的相關依賴:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.13</version>
    <type>pom</type>
</dependency>
           

首先需要定義連接配接zookeeper叢集中所有伺服器的ip位址以及對應的端口号,zookeeper伺服器預設語用戶端連接配接的端口号為2181,這裡定義了三台伺服器,如

CONNECTSTRING

CountDownLatch

是一個計數器,當watcher監聽到連接配接狀态變化為已連接配接之後會執行減一操作,這時

main()

方法便可以繼續執行,主要原因是防止zookeeper在連接配接的過程中,

main()

方法繼續執行導緻執行結果與預期不一緻的問題。

public class Demo{
    private final static String CONNECTSTRING="192.168.149.128:2181,192.168.149.130:2181,192.168.149.129:2181";
	private static CountDownLatch countDownLatch = new CountDownLatch(1);
	private static ZooKeeper zooKeeper;

	public static void main(String[] args) throws Exception{
    	zooKeeper = new ZooKeeper(CONNECTSTRING, 5000, new MyWatcher());
        countDownLatch.await();

        String result = zooKeeper.create("/clq","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        zooKeeper.setData("/clq","234".getBytes(),-1);

        zooKeeper.delete("/clq",-1);

        zooKeeper.getChildren(path,true);
	}
    
    class MyWatcher implements Watcher{

        @Override
        public void process(WatchedEvent watchedEvent) {
            if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                countDownLatch.countDown();
                System.out.println(watchedEvent.getState() + "-->" + watchedEvent.getType());
            }
            if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
                ......
            } else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
                ......
            } else if (watchedEvent.getType() == Event.EventType.NodeCreated) {
                ......
            } else if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
                ......
            }
        }
    }
}
           

zkClient

需要添加依賴:

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>
           

首先同樣需要定義所有zookeeper伺服器的IP位址和連接配接端口,如

CONNECTSTRING

與原生API不同的是,zkClient在建立節點的時候,能夠實作疊代建立節點,不需要建立父節點之後,在建立子節點這樣繁瑣的過程,删除節點的時候同樣也支援疊代删除節點。

注冊watcher的過程也與原生API不同,調用

zkClient.subscribeDataChanges()

方法,并且傳入

IZkDataListener

執行個體,并且實作其相應的處理方法,同時實作了持續監聽,不需要每次響應之後重新注冊監聽。

public class ZkClientApiOperatorDemo {

    private final static String CONNECTSTRING="192.168.149.128:2181,192.168.149.130:2181,192.168.149.129:2181";

    private static ZkClient getInstance(){
        return new ZkClient(CONNECTSTRING,4000);
    }

    public static void main(String[] args) throws Exception{
        ZkClient zkClient = ZkClientApiOperatorDemo.getInstance();
        zkClient.createPersistent("/clq/clq1/clq1-1/clq1-1-1",true);

        List<String> list = zkClient.getChildren("/clq");
        System.out.println(list);

        zkClient.subscribeDataChanges("/clq", new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {
                System.out.println("節點名稱:"+s+"-> 節點的值:"+o);
            }

            @Override
            public void handleDataDeleted(String s) throws Exception {

            }
        });
        zkClient.writeData("/clq","clq");
        TimeUnit.SECONDS.sleep(2);
        System.out.println("success");


    }
}
           

curator

需要添加依賴:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.11.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.11.0</version>
</dependency>
           

本身是Netfix公司開源的zookeeper用戶端;curator提供了各種應用場景的封裝

curator-framework:提供了fluent風格的api

curator-recipes:提供了實作封裝

首先同樣需要定義所有zookeeper伺服器的IP位址和連接配接端口,如

CONNECTSTRING

建立zookeeper連接配接有兩種方式,一種是傳統的建立方式,另一種是fluent風格的建立方式。

public class CuratorOperatorDemo {

    private final static String CONNECTSTRING="192.168.149.128:2181,192.168.149.130:2181,192.168.149.129:2181";

    public static CuratorFramework getInstance(){
        //傳統建立方式
        CuratorFramework curatorFramework = CuratorFrameworkFactory
                .newClient(CONNECTSTRING,5000,5000
                        ,new ExponentialBackoffRetry(1000,3));
        curatorFramework.start();
        //fluent風格的建立方式
        CuratorFramework curatorFramework1 = CuratorFrameworkFactory
                .newClient(CONNECTSTRING,5000,5000
                        ,new ExponentialBackoffRetry(1000,3));
        curatorFramework1.start();
        return curatorFramework1;
    }

    public static void main(String[] args) {
        CuratorFramework curatorFramework = getInstance();
        System.out.println("conncet success ......");
        try {
            //建立節點,支援疊代建立,fluent風格
            curatorFramework.create().creatingParentContainersIfNeeded()
              .withMode(CreateMode.PERSISTENT).forPath("/curator/curator1/curator1-1","123".getBytes());
            //删除節點
         	curatorFramework.delete().deletingChildrenIfNeeded()
             .forPath("/clq");
            //stat會儲存操作後的節點資訊
	     	Stat stat = new Stat();
            //擷取節點的資訊,資訊會儲存到stat中
            byte[] bytes = curatorFramework.getData().storingStatIn(stat)
                .forPath("/clq");
            System.out.println(new String(bytes)+ "-> stat"+stat);
            //設定節點資料
            Stat stat1 = curatorFramework.setData()
                .forPath("/clq","123".getBytes());
            System.out.println(stat1);
            //建立節點cache,使用該cache為節點注冊監聽
            NodeCache cache = new NodeCache(curatorFramework,"/curator",false);
        	//使該cache立刻加載該節點的資料
            cache.start(true);
			//為節點注冊監聽,curator已經在内部實作了持續監聽,這裡使用的lambda表達式,可以根			//據不同的需要進行響應的實作
        	cache.getListenable().addListener(()-> System.out.println("節點資料發生變化,變化後的結果:"
                + new String(cache.getCurrentData().getData())));
            //這裡可以定義連續操作,最後傳回的結構儲存到Collection中,可以周遊該Collection來				//擷取所有的執行結果
            Collection<CuratorTransactionResult> resultCollection = curatorFramework.inTransaction().create()
                    .forPath("/ccc","3456".getBytes())
                  .and().setData().
                forPath("/ccc","111".getBytes()).and().commit();

            for(CuratorTransactionResult result : resultCollection){
                System.out.println(result.getForPath()+"->"+result.getType());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
           

繼續閱讀