天天看點

Unsafe

Unsafe接口實際上是Channel接口的輔助接口,它不應該被使用者代碼直接調用。實際的I/O讀寫操作都是由Unsafe接口負責完成的。

Unsafe

1.register方法

register方法主要用于将目前Unsafe對應的Channel注冊到EventLoop的多路複用器上,然後調用DefaultChannelPipeline的fireChannelRegistered方法。如果Channel被激活,則調用DefaultChannelPipeline的fireChannelActive方法。

首先判斷目前所在的線程是否是Channel對應的NioEventLoop線程,如果是同一個線程則不存在多線程并發操作問題,直接調用register0進行注冊;如果是由使用者線程或者其他線程發起的注冊操作,則将注冊操作封裝成Runnable,放到NioEventLoop任務隊列中執行。注意:如果直接執行register0方法,會存在多線程并發操作Channel的問題。

看register0方法的實作,首先調用ensureOpen方法判斷目前Channel是否打開,如果沒有打開則無法注冊,直接傳回。校驗通過後調用doRegister方法,它由AbstractNioUnsafe對應的AbstractNioChannel實作,該方法在前面的AbstractNioChannel源碼分析中已經介紹過。如果doRegister方法沒有抛出異常,則說明Channel注冊成功。将ChannelPromise的結果設定為成功,調用ChannelPipeline的fireChannelRegistered方法,判斷目前的Channel是否已經被激活,如果已經被激活,則調用ChannelPipeline的fireChannelActive方法。

如果注冊過程中發生了異常,則強制關閉連接配接,将異常堆棧資訊設定到ChannelPromise中。

2.bind方法

bind方法主要用于綁定指定的端口,對于服務端,用于綁定監聽端口,可以設定backlog參數;對于用戶端,主要用于指定用戶端Channel的本地綁定Socket位址。

調用doBind方法,對于NioSocketChannel和NioServerSocketChannel有不同的實作,

如果綁定本地端口發生異常,則将異常設定到ChannelPromise中用于通知ChannelFuture,随後調用closeIfClosed方法來關閉Channel。

3.disconnect方法

disconnect用于用戶端或者服務端主動關閉連接配接,

4.close方法

在鍊路關閉之前需要首先判斷是否處于重新整理狀态,如果處于重新整理狀态說明還有消息尚未發送出去,需要等到所有消息發送完成再關閉鍊路,是以,将關閉操作封裝成Runnable稍後再執行。

如果鍊路沒有處于重新整理狀态,需要從closeFuture中判斷關閉操作是否完成,如果已經完成,不需要重複關閉鍊路,設定ChannelPromise的操作結果為成功并傳回。

執行關閉操作,将消息發送緩沖數組設定為空,通知JVM進行記憶體回收。調用抽象方法doClose關閉鍊路。

如果關閉操作成功,設定ChannelPromise結果為成功。如果操作失敗,則設定異常對象到ChannelPromise中。

調用ChannelOutboundBuffer的close方法釋放緩沖區的消息,随後構造鍊路關閉通知Runnable放到NioEventLoop中執行。

最後,調用deregister方法,将Channel從多路複用器上取消注冊。

NioEventLoop的cancel方法實際将selectionKey對應的Channel從多路複用器上去注冊。

5.write方法

write方法實際上将消息添加到環形發送數組中,并不是真正的寫Channel,

如果Channel沒有處于激活狀态,說明TCP鍊路還沒有真正建立成功,目前Channel存在以下兩種狀态。

(1)Channel打開,但是TCP鍊路尚未建立成功:NOT_YET_CONNECTED_EXCEPTION;

(2)Channel已經關閉:CLOSED_CHANNEL_EXCEPTION。

對鍊路狀态進行判斷,給ChannelPromise設定對應的異常,然後調用ReferenceCountUtil的release方法釋放發送的msg對象。

如果鍊路狀态正常,則将需要發送的msg和promise放入發送緩沖區中(環形數組)。

6.flush方法

flush方法負責将發送緩沖區中待發送的消息全部寫入到Channel中,并發送給通信對方。

首先将發送環形數組的unflushed指針修改為tail,辨別本次要發送消息的緩沖區範圍。然後調用flush0進行發送。

重點分析 doWrite方法,首先計算需要發送的消息個數(unflushed - flush),如果隻有 1 個消息需要發送,則調用父類的寫操作,我們分析AbstractNioByteChannel的doWrite()方法,因為隻有一條消息需要發送,是以直接從ChannelOutboundBuffer中擷取目前需要發送的消息,首先,擷取需要發送的消息,如果消息為ByteBuf且它配置設定的是JDK的非堆記憶體,則直接傳回。對傳回的消息進行判斷,如果為空,說明該消息已經發送完成并被回收,然後執行清空OP_WRITE操作位的clearOpWrite方法,如果需要發送的ByteBuf已經沒有可寫的位元組了,則說明已經發送完成,将該消息從環形隊列中删除,然後繼續循環,

分析下ChannelOutboundBuffer的remove方法,首先判斷環形隊列中是否還有需要發送的消息,如果沒有,則直接傳回。如果非空,則首先擷取Entry,然後對其進行資源釋放,同時對需要發送的索引flushed進行更新。所有操作執行完之後,調用decrementPendingOutboundBytes減去已經發送的位元組數,該方法跟incrementPendingOutboundBytes類似,會進行發送低水位的判斷和事件通知。

繼續對消息的發送進行分析,首先将半包辨別設定為false,從DefaultSocketChannelConfig中擷取循環發送的次數,進行循環發送,對發送方法doWriteBytes展開分析,ByteBuf的readBytes()方法的功能是将目前ByteBuf中的可寫位元組數組寫入到指定的Channel中。方法的第一個參數是Channel,此處就是SocketChannel,第二個參數是寫入的位元組數組長度,它等于ByteBuf的可讀位元組數,傳回值是寫入的位元組個數。由于我們将SocketChannel設定為異步非阻塞模式,是以寫操作不會阻塞。

從寫操作中傳回,需要對寫入的位元組數進行判斷,如果為0,說明TCP發送緩沖區已滿,不能繼續再向裡面寫入消息,是以,将寫半包辨別設定為true,然後退出循環,執行後續排隊的其他任務或者讀操作,等待下一次selector的輪詢繼續觸發寫操作。

對寫入的位元組數進行累加,判斷目前的ByteBuf中是否還有沒有發送的位元組,如果沒有可發送的位元組,則将done設定為true,退出循環。

從循環發送狀态退出後,首先根據實際發送的位元組數更新發送進度,實際就是發送的位元組數和需要發送的位元組數的一個比值。執行完進度更新後,判斷本輪循環是否将需要發送的消息全部發送完成,如果發送完成則将該消息從循環隊列中删除;否則,設定多路複用器的OP_WRITE操作位,用于通知Reactor線程還有半包消息需要繼續發送。

AbstractNioUnsafe是AbstractUnsafe類的NIO實作,它主要實作了connect、finishConnect等方法。

1.connect方法

首先擷取目前的連接配接狀态進行緩存,然後發起連接配接操作,需要指出的是,SocketChannel執行connect()操作有三種可能的結果。

(1)連接配接成功,傳回true;

(2)暫時沒有連接配接上,服務端沒有傳回ACK應答,連接配接結果不确定,傳回false;

(3)連接配接失敗,直接抛出I/O異常。

如果是第(2)種結果,需要将NioSocketChannel中的selectionKey設定為OP_ CONNECT,監聽連接配接應答消息。

異步連接配接傳回之後,需要判斷連接配接結果,如果連接配接成功,則觸發ChannelActive事件,它最終會将NioSocketChannel中的 selectionKey設定為SelectionKey.OP_READ,用于監聽網絡讀操作位。如果沒有立即連接配接上服務端,則執行其他分支的操作,有兩個目的。

(1)根據連接配接逾時時間設定定時任務,逾時時間到之後觸發校驗,如果發現連接配接并沒有完成,則關閉連接配接句柄,釋放資源,設定異常堆棧并發起去注冊。

(2)設定連接配接結果監聽器,如果接收到連接配接完成通知則判斷連接配接是否被取消,如果被取消則關閉連接配接句柄,釋放資源,發起取消注冊操作。 

2.finishConnect方法

用戶端接收到服務端的TCP握手應答消息,通過SocketChannel的finishConnect方法對連接配接結果進行判斷,首先緩存連接配接狀态,目前傳回false,然後執行doFinishConnect方法判斷連接配接結果,通過 SocketChannel的finishConnect 方法判斷連接配接結果,執行該方法傳回三種可能結果。

(1)連接配接成功傳回true;

(2)連接配接失敗傳回 false;

(3)發生鍊路被關閉、鍊路中斷等異常,連接配接失敗。

隻要連接配接失敗,就抛出 Error(),由調用方執行句柄關閉等資源釋放操作,如果傳回成功,則執行fulfillConnectPromise 方法,它負責将SocketChannel修改為監聽讀操作位,用來監聽網絡的讀事件,最後對連接配接逾時進行判斷:如果連接配接逾時時仍然沒有接收到服務端的 ACK 應答消息,則由定時任務關閉用戶端連接配接,将SocketChannel從Reactor線程的多路複用器上摘除,釋放資源。

重點分析它的read方法,首先,擷取NioSocketChannel的SocketChannelConfig,它主要用于設定用戶端連接配接的TCP參數,繼續看allocHandle的初始化。如果是首次調用,從SocketChannelConfig的RecvByteBufAllocator中建立Handle。

RecvByteBufAllocator預設有兩種實作,分别是AdaptiveRecvByteBufAllocator和FixedRecvByteBufAllocator。由于FixedRecvByteBufAllocator 的實作比較簡單,重點分析AdaptiveRecvByteBufAllocator的實作。 

AdaptiveRecvByteBufAllocator指的是緩沖區大小可以動态調整的ByteBuf配置設定器。它的成員變量分别定義了三個系統預設值:最小緩沖區長度64位元組、初始容量1024位元組、最大容量65536位元組。還定義了兩個動态調整容量時的步進參數:擴張的步進索引為4、收縮的步進索引為1。最後,定義了長度的向量表SIZE_TABLE并初始化它,向量數組的每個值都對應一個Buffer容量,當容量小于512的時候,由于緩沖區已經比較小,需要降低步進值,容量每次下調的幅度要小些;當大于512時,說明需要解碼的消息碼流比較大,這時采用調大步進幅度的方式減少動态擴張的頻率,是以它采用512的倍數進行擴張。

重點分析下AdaptiveRecvByteBufAllocator的方法。

方法1:getSizeTableIndex(final int size)

根據容量Size查找容量向量表對應的索引——這是個典型的二分查找法。

下面我們分析下它的内部靜态類HandleImpl,首先,還是看下它的成員變量,

它有5個成員變量,分别是:對應向量表的最小索引、最大索引、目前索引、下一次預配置設定的Buffer大小和是否立即執行容量收縮操作。

重點分析它的record(int actualReadBytes)方法:當NioSocketChannel執行完讀操作後,會計算獲得本次輪詢讀取的總位元組數,它就是參數actualReadBytes,執行record方法,根據實際讀取的位元組數對ByteBuf進行動态伸縮和擴張。

首先,對目前索引做步進縮減,然後擷取收縮後索引對應的容量,與實際讀取的位元組數進行比對,如果發現小于收縮後的容量,則重新對目前索引進行指派,取收縮後的索引和最小索引中的較大者作為最新的索引。然後,為下一次緩沖區容量配置設定指派——新的索引對應容量向量表中的容量。相反,如果目前實際讀取的位元組數大于之前預配置設定的初始容量,則說明實際配置設定的容量不足,需要動态擴張。重新計算索引,選取目前索引+擴張步進和最大索引中的較小作為目前索引值,然後對下次緩沖區的容量值進行重新配置設定,完成緩沖區容量的動态擴張。

AdaptiveRecvByteBufAllocator就是根據本次讀取的實際位元組數對下次接收緩沖區的容量進行動态調整。使用動态緩沖區配置設定器的優點如下。

(1)Netty作為一個通用的NIO架構,并不對使用者的應用場景進行假設,可以使用它做流媒體傳輸,也可以用它做聊天工具。不同的應用場景,傳輸的碼流大小千差萬别,無論初始化配置設定的是32K還是1M,都會随着應用場景的變化而變得不适應。是以,Netty根據上次實際讀取的碼流大小對下次的接收Buffer緩沖區進行預測和調整,能夠最大限度的滿足不同行業的應用場景。

(2)性能更高,容量過大會導緻記憶體占用開銷增加,後續的Buffer處理性能會下降;容量過小時需要頻繁地記憶體擴張來接收大的請求消息,同樣會導緻性能下降。

(3)更節約記憶體。假如通常情況下請求消息平均值為1M左右,接收緩沖區大小為1.2M;突然某個客戶發送了一個10M的流媒體附件,接收緩沖區擴張為10M以接納該附件,如果緩沖區不能收縮,每次緩沖區建立都會配置設定10M的記憶體,但是後續所有的消息都是1M左右,這樣會導緻記憶體的浪費,如果并發用戶端過多,可能會發生記憶體溢出,最終當機。