Zookeeper叢集中server數量總是确定的,是以叢集中的server互動采用比較可靠的bio長連接配接模型;不同于叢集中sever間互動zookeeper用戶端其實數量是未知的,為了提高zookeeper并發性能,zookeeper用戶端與伺服器端互動采用nio模型。下面我們主要來講講zookeeper的伺服器端與用戶端的互動。讀者對nio不了解的話不妨抽點時間去了解下,對于一些nio架構如netty,mina再如一些web容器如tomcat,jetty底層都實作一套nio架構,對于實作nio架構模型大家不妨去谷歌百度搜一下Doug Lea的scalable io in Java 這個ppt。
用戶端
ClientCnxnSocketNIO是zookeeper的nio通訊層的用戶端部分,下面僞代碼示例其核心代碼:
ClientCnxnSocketNIO{
doTransport() {
if (如果之前連接配接沒有立馬連上,則在這裡處理OP_CONNECT事件) {
sendThread.primeConnection();
} else {
doIO
}
//隊列中有發送的消息, 開啟寫
}
doIO() {
if (sockKey.isReadable()) {
sendThread.readResponse(incomingBuffer);
updateLastHeard();
}
if(sockKey.isWritable()) {
Packetp = outgoingQueue.getFirst() //從發送隊列取
updateLastSend
p.requestHeader.setXid(cnxn.getXid());//設定用戶端的xid
序列化
發送
從發送隊列删除
加入到pendingQueue隊列
}
ClientCnxn 是用戶端操作ClientCnxnSocketNIO的工具,維護了發送任務線程SendThread,事件任務線程EventThead, 發送隊列OutgoingQueue以及請求消息的等待隊列PendingQueue。下面以僞代碼來示例其核心代碼
ClientCnxn {
outgoingQueue//待向伺服器端發送的隊列, 用戶端送出請求放入這個隊列
pendingQueue //發送以後等待響應的隊列,
submitRequest(){
//client端一個封裝成一個packet
outgoingQueue.add(packet);
selector.wakeup();
packet.wait(); //如果是同步調用wait,應該回報後會
}
SendThread {
run() {
1.設定clientCnxnSocket 最後發送時間,最後的心跳時間
2. if(!clientCnxnSocket.isConnected()) {
startConnect() //主要工作clientCnxnSocket做
} else {
計算下次ping的時間, 發送心跳
委托給 clientCnxnSocket.doTranspor進行底層的nio傳輸
}
}
primeConnection(){
//建構ConnectRequest
//組合成通訊層的Packet對象,添加到發送隊列,對于ConnectRequest其requestHeader為null
outgoingQueue.addFirst
clientCnxnSocket.enableReadWriteOnly();//確定讀寫事件都監聽
readResponse(){
1.先讀響應頭,先對特殊的xid進行處理
2. packet = pendingQueue.remove() //由于client和server都是單線程處理,多隊列處理,是以認為全局有序
3. 反序列化響應體response, 并設定到packet上
4.finishPacket 1)同步notifyAll,結束 2)異步加入到event線程的隊列
EventThread{ //主要支援異步的回調
大家觀察用戶端操作類Zookeeper裡面的操作類主要分為兩個參數不帶callback的同步方法和參數帶callback的異步方法。
1. 同步調用方法實作類似Future同步轉異步模式實作
1) Client送出請求對象封裝成packet對象放入OutgoingQueue隊列,并調用packet.wait()阻塞目前線程。
2) 每個Client都隻有一個SendThread線程是線性處理OutgoingQueue中的請求消息的,SendThread線程通過ClientCnxnSocketNIO工具順序從OutgoingQueue隊列中取請求消息發送到伺服器端,同時将請求packet加入到PendingQueue中
3) ClientCnxnSocketNIO工具接收處理伺服器端響應
4) 從PendingQueue隊列取出對應的packet,并調packet.notifyAll()喚醒阻塞的線程完成同步調用
2. 異步調用的總體流程跟同步類似關鍵差別在于
1) 向OutgoingQueue隊列送出請求後,不會調用packet.wait()阻塞目前線程,主流程繼續執行
2) 同同步調用
3) 同同步調用
4) 從PendingQueue隊列取出對應的packet,并調packet.callback方法完成回調處理
Zookeeper伺服器端
NIOServerCnxnFactory工廠類,zookeeperserver用來啟動監聽用戶端連接配接,每當有用戶端請求連接配接進來,NIOServerCnxnFactory都會為這個連結建構NIOServerCnxn 執行個體來單獨處理與這個用戶端的互動
NIOServerCnxn封裝了處理讀取用戶端請求資料與及向用戶端響應資料
下面通過僞代碼來執行個體:
NIOServerCnxnFactory {
configure {
綁定端口
作為server監聽
注冊selectkey 的連接配接時間
run { //起到accept的作用
1. OP_ACCEPT, 将NIOServerCnxn(handler) attach到selectkey以便被讀寫事件使用
2. OP_READ 和 OP_WRITE取出handler NIOServerCnxn,并調doIo
NIOServerCnxn {
構造器 {
//設定selectkey對read感興趣
doIo {
if(k.isReadable()) {
1. 讀前四個位元組, 代表請求内容長度,不包括自己的4位元組
2. 讀取到位元組數組中
3. zkServer.processPacket()或者zkServer.processConnectRequest()
}
if(k.isWritable()) {
1.從outgoingBuffers取ByteBuffer
2.發送bytes
熬夜不易,點選請老王喝杯烈酒!!!!!!!