天天看點

zookeeper原了解析-用戶端與伺服器端互動

Zookeeper叢集中server數量總是确定的,是以叢集中的server互動采用比較可靠的bio長連接配接模型;不同于叢集中sever間互動zookeeper用戶端其實數量是未知的,為了提高zookeeper并發性能,zookeeper用戶端與伺服器端互動采用nio模型。下面我們主要來講講zookeeper的伺服器端與用戶端的互動。讀者對nio不了解的話不妨抽點時間去了解下,對于一些nio架構如netty,mina再如一些web容器如tomcat,jetty底層都實作一套nio架構,對于實作nio架構模型大家不妨去谷歌百度搜一下Doug Lea的scalable io in Java 這個ppt。

用戶端

ClientCnxnSocketNIO是zookeeper的nio通訊層的用戶端部分,下面僞代碼示例其核心代碼:

ClientCnxnSocketNIO{

      doTransport() {

               if (如果之前連接配接沒有立馬連上,則在這裡處理OP_CONNECT事件) {

                   sendThread.primeConnection();

               } else {

                    doIO

                }

             //隊列中有發送的消息, 開啟寫

        }

       doIO() {

              if (sockKey.isReadable()) {

                    sendThread.readResponse(incomingBuffer);

                    updateLastHeard();

               } 

               if(sockKey.isWritable()) {

                      Packetp = outgoingQueue.getFirst() //從發送隊列取

                      updateLastSend

                      p.requestHeader.setXid(cnxn.getXid());//設定用戶端的xid

                     序列化

                     發送

                     從發送隊列删除

                     加入到pendingQueue隊列

}

ClientCnxn 是用戶端操作ClientCnxnSocketNIO的工具,維護了發送任務線程SendThread,事件任務線程EventThead, 發送隊列OutgoingQueue以及請求消息的等待隊列PendingQueue。下面以僞代碼來示例其核心代碼

ClientCnxn {

    outgoingQueue//待向伺服器端發送的隊列, 用戶端送出請求放入這個隊列

    pendingQueue //發送以後等待響應的隊列,

    submitRequest(){

       //client端一個封裝成一個packet

        outgoingQueue.add(packet);

        selector.wakeup();

        packet.wait(); //如果是同步調用wait,應該回報後會

    }

   SendThread {

       run() {

           1.設定clientCnxnSocket 最後發送時間,最後的心跳時間

           2. if(!clientCnxnSocket.isConnected()) {

                    startConnect()  //主要工作clientCnxnSocket做

              } else {

                    計算下次ping的時間, 發送心跳

                  委托給 clientCnxnSocket.doTranspor進行底層的nio傳輸

               } 

         }

        primeConnection(){

           //建構ConnectRequest

           //組合成通訊層的Packet對象,添加到發送隊列,對于ConnectRequest其requestHeader為null

            outgoingQueue.addFirst

            clientCnxnSocket.enableReadWriteOnly();//確定讀寫事件都監聽 

        readResponse(){

           1.先讀響應頭,先對特殊的xid進行處理

           2. packet = pendingQueue.remove() //由于client和server都是單線程處理,多隊列處理,是以認為全局有序

           3. 反序列化響應體response, 并設定到packet上

           4.finishPacket 1)同步notifyAll,結束 2)異步加入到event線程的隊列

    EventThread{ //主要支援異步的回調

大家觀察用戶端操作類Zookeeper裡面的操作類主要分為兩個參數不帶callback的同步方法和參數帶callback的異步方法。

1.      同步調用方法實作類似Future同步轉異步模式實作

1)  Client送出請求對象封裝成packet對象放入OutgoingQueue隊列,并調用packet.wait()阻塞目前線程。

2)  每個Client都隻有一個SendThread線程是線性處理OutgoingQueue中的請求消息的,SendThread線程通過ClientCnxnSocketNIO工具順序從OutgoingQueue隊列中取請求消息發送到伺服器端,同時将請求packet加入到PendingQueue中

3)  ClientCnxnSocketNIO工具接收處理伺服器端響應

4)  從PendingQueue隊列取出對應的packet,并調packet.notifyAll()喚醒阻塞的線程完成同步調用

2.      異步調用的總體流程跟同步類似關鍵差別在于

1)  向OutgoingQueue隊列送出請求後,不會調用packet.wait()阻塞目前線程,主流程繼續執行

2)  同同步調用

3)  同同步調用

4)  從PendingQueue隊列取出對應的packet,并調packet.callback方法完成回調處理

Zookeeper伺服器端

NIOServerCnxnFactory工廠類,zookeeperserver用來啟動監聽用戶端連接配接,每當有用戶端請求連接配接進來,NIOServerCnxnFactory都會為這個連結建構NIOServerCnxn 執行個體來單獨處理與這個用戶端的互動

NIOServerCnxn封裝了處理讀取用戶端請求資料與及向用戶端響應資料

下面通過僞代碼來執行個體:

NIOServerCnxnFactory  {

   configure {

        綁定端口

        作為server監聽

        注冊selectkey 的連接配接時間

    run {  //起到accept的作用

       1. OP_ACCEPT, 将NIOServerCnxn(handler) attach到selectkey以便被讀寫事件使用

       2. OP_READ 和 OP_WRITE取出handler NIOServerCnxn,并調doIo

NIOServerCnxn {

    構造器 {

        //設定selectkey對read感興趣

    doIo {

        if(k.isReadable()) {

             1. 讀前四個位元組, 代表請求内容長度,不包括自己的4位元組

             2. 讀取到位元組數組中

             3. zkServer.processPacket()或者zkServer.processConnectRequest()

        }

        if(k.isWritable()) {

              1.從outgoingBuffers取ByteBuffer

              2.發送bytes

熬夜不易,點選請老王喝杯烈酒!!!!!!!