天天看點

netty(十三)源碼分析之Channel

類似于NIO的Channel,Netty提供了自己的Channel和其子類實作,用于異步I/O操作和其他相關的操作。

Unsafe是個内部接口,聚合在Channel中協助進行網絡讀寫相關的操作,因為它的設計初衷就是Channel的内部輔助類,不應該被Netty架構的上層使用者調用,是以被命名為Unsafe。這裡不能僅從字面了解認為它是不安全的操作,而要從這個架構的設計層面體會它的設計初衷和職責。

Channel功能說明

io.netty.channel.Channel是Netty網絡操作抽象類,它聚合了一組功能,包括但不限于網路的讀,寫,用戶端發起連接配接,主動關閉連接配接,鍊路關閉,擷取通信雙方的網絡位址等。它也包含了Netty架構相關的一些功能,包括擷取該Channel的EventLoop,擷取緩沖配置設定器ByteBufAllocator和pipeline等。

下面我們先從Channel的接口分析,講解它的主要API和功能,然後再一起看下它的子類的相關功能實作,最後再對重要子類和接口進行源碼分析。

Channel的工作原理

Channel是Netty抽象出來的網絡I/O讀寫相關的接口,為什麼不适用JDK NIO原生的Channel而要另起爐竈呢,主要原因如下:

(1)JDK的SocketChannel和ServerSocketChannel的主要職責就是網絡I/O操作,由于它們是SPI類接口,由具體的虛拟機廠家來提供,是以通過繼承SPI功能類來擴充其功能的難度很大;直接實作ServerSocketChannel和SocketChannel抽象類,其工作量和重新開發一個新的Channel功能類是差不多的。

(2)Netty的Channel需要能夠跟Netty的整體架構融合在一起,例如I/O模型,基于ChannelPipeline的定制模型,以及基于中繼資料描述配置化的TCP參數等,這些JDK的SocketChannel和ServerSocketChanel都沒有提供,需要重新封裝。

(3)自定義的Channel,功能實作更加靈活。

基于上述4個原因,Netty重新設計了Channel接口,并且給予了很多不同的實作。它的設計原理比較簡單,但是功能卻比較複雜,主要的設計理念如下。

(1)在Channel接口層,采用Facade模式進行統一封裝,将網絡I/O操作,網絡I/O相關的其他操作封裝起來,統一對外提供。

(2)Channel接口的定義盡量大而全,為SocketChannel和ServerSocketChannel提供統一的視圖,由不同子類實作不同的功能,公共功能在抽象父類中實作,最大程度地實作功能和接口的重用。

(3)具體實作采用聚合而非包含的方式,将相關的功能類聚合在Channel中,由Channel統一配置設定和排程,功能實作更加靈活。

Channel的功能實作

1.網絡I/O操作和其他相關的操作。

Channel網絡I/O相關的方法定義如下:

netty(十三)源碼分析之Channel

下面對這些API的功能進行分類說明:

(1)Channel read():從目前的Channel中讀取資料到第一個inbound緩沖區中,如果資料被成功讀取,觸發ChannelHandler.channelRead(ChannelHandlerContext,Object)事件。讀取操作API調用完成後,緊接着會觸發ChannelHander.channelReadConplete(ChannelHandlerContext)事件,這樣業務的ChannelHandler可以決定是否需要繼續讀取資料。如果已經有操作請求被挂起,則後續的讀操作會被忽略。

(2)ChannelFuture write(Object msg):請求将目前的msg通過ChannelPipeline寫入到目标Channel中。注意,write操作隻是将消息存入到消息發送環形數組中,并沒有真正被發送,隻有調用flush操作才會被寫入到Channel中,發送給對方。

(3)ChannelFuture close(ChannelPromise promise):主動關閉目前連接配接,通過ChannelPromise設定操作結果并進行結果通知,無論操作是否成功,都可以通過ChannelPromise擷取操作結果。該操作會級聯觸發ChannelPipeline中所有ChannelHandler的ChannelHandler.close(ChannelHandlerContext,ChannelPromise)事件。

(4)ChannelFuture disconnect(ChannelPromise promise):請求斷開與遠端通信對端的連接配接并使用ChannelPromise來擷取操作結果的通知資訊。該方法會級聯觸發ChannelPipeline中所有ChannelHandler的ChannelHandler.close(ChannelHandlerContext,ChannelPromise)事件。

(5)ChnnelConfig config():擷取目前Channel的配置資訊,例如CONNECT_TIMEOUT_MILLS。

(6)boolean isOpen():判斷目前Channel是否已經打開。

(7)ChannelMetadata metadata():擷取目前Channel的中繼資料描述資訊,包括TCP參數配置等。

(8)SocketAddress localAddress():擷取目前Channel的本地綁定位址。

(9)SocketAddress remoteAddress():擷取目前Channel通信的遠端Socket位址。

2.其他常用的API功能說明

第一個比較重要的方法是eventLoop().Channel需要注冊到EventLoop的多路複用器上,用于處理I/O事件,通過eventLoop()方法可以擷取到Channel注冊的EventLoop。EventLoop本質上就是處理網絡讀寫事件的Reactor線程。在Netty中,它不僅僅用來處理網絡事件,也可以用來執行定時任務和使用者自定義NioTask等任務。

第二個比較常用的方法是metadata()方法。熟悉TCP協定的讀者可能知道,當建立Socket的時候需要指定TCP參數,例如接收和發送的TCP緩沖區大小,TCP的逾時時間。是否重用位址等。在Netty中,每個Channel對應一個實體連結,每個連接配接都有自己的TCP參數配置。是以,Channel會聚合一個ChannelMetadata用來對TCP參數提供中繼資料描述資訊,通過metadata()方法就可以擷取目前Channel的TCP參數配置。

第三個方法是parent()。對于服務端Channel而言,它的父Channel為空;對于用戶端Channel,它的父Channel就是建立它的ServerSocketChannel。

第四個方法是使用者擷取Channel辨別的id(),它傳回ChannelId對象,ChannelId是Channel的唯一辨別。

Channel源碼分析

Channel的實作子類非常多,繼承關系複雜,從學習的角度我們抽取最重要的兩個————io.netty.channel.socket.nio.NioServerSocketChannel和io.netty.channel.socket.nio.NioSocketChannel進行重點分析。

netty(十三)源碼分析之Channel
netty(十三)源碼分析之Channel

AbstractChannel源碼分析

1.成員變量定義

在分析AbstractChannel源碼之前,我們看下它的成員變量定義,首先定了兩個靜态全局異常,如下:

static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();鍊路已經關閉異常

static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException();鍊路尚未建立異常

聲明完上述兩個異常之後,通過靜态塊将它們的堆棧設定為空的StackTraceElement。

estimatorHandle用于預測下一個封包的大小,它基于之前的資料的采樣進行分析預測。

根據之前的Channel原理分析,我們知道AbstractChannel采用聚合的方式封裝各種功能。

AbstractChannel聚合了所有Channel使用到的能力對象,由AbstractChannel提供初始化和統一封裝,如果功能和子類強相關,則定義成抽象方法由子類具體實作,下面對它的主要API進行源碼分析。

2.核心API源碼分析

首先看下網絡讀寫操作,前面介紹網絡I/O操作時講到,它會觸發ChannelPipeline中對應的事件方法。Netty基于事件驅動,我們也可以了解為當Channel進行I/O操作時會産生對應的I/O事件,然後驅動事件在ChannnelPipeline中傳播,由對應的ChannelHandler對事件進行攔截和處理,不關心的事件可以直接忽略。采用事件驅動的方式可以非常輕松地通過事件定義來劃分事件攔截切面,友善業務的定制和功能擴充,相比AOP,其性能更高,但是功能卻基本等價。但使用場景,遠遠還是及不上AOP。

網絡I/O操作直接調用DefaultChannelPipelien的相關方法,由DefaultChannnelPipeline中對應的ChannnelHandler進行具體的邏輯處理,如下所示:

@Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return pipeline.connect(remoteAddress, localAddress);
    }

    @Override
    public ChannelFuture disconnect() {
        return pipeline.disconnect();
    }

    @Override
    public ChannelFuture close() {
        return pipeline.close();
    }

    @Override
    public Channel flush() {
        pipeline.flush();
        return this;
    }
           

AbstractChannel也提供了一些公共API的具體實作,例如localAddress()和remoteAddress()方法,它的源碼實作如下所示:

@Override
    public SocketAddress remoteAddress() {
        SocketAddress remoteAddress = this.remoteAddress;
        if (remoteAddress == null) {
            try {
                this.remoteAddress = remoteAddress = unsafe().remoteAddress();
            } catch (Throwable t) {
                // Sometimes fails on a closed socket in Windows.
                return null;
            }
        }
        return remoteAddress;
    }
           

首先從緩存的成員變量中擷取,如果第一次調用為空,需要通過unsafe的remoteAddress擷取,它是個抽象方法,具體由對應的Channel子類實作。

AbstractNioChannel源碼分析

首先,還是從成員變量定義入手,來了解下它的功能實作,成員變量定義如下:

private static final InternalLogger logger =
            InternalLoggerFactory.getInstance(AbstractNioChannel.class);

    private final SelectableChannel ch;
    protected final int readInterestOp;
    private volatile SelectionKey selectionKey;
    private volatile boolean inputShutdown;

    /**
     * The future of the current connection attempt.  If not null, subsequent
     * connection attempts will fail.
     */
    private ChannelPromise connectPromise;
    private ScheduledFuture<?> connectTimeoutFuture;
    private SocketAddress requestedRemoteAddress;
           

由于NIO Channel、NioSocketChannel和NioServerSocketChannel需要共用,是以定義了一個java.nio.SocketChannel和java.nio.ServerSocketChannel的公共父類SelectableChannel,用于設定SelectableChannel參數和進行I/O操作。

第二個參數是readInterestOp,它代表了JDK SelectionKey的OP_READ.

随後定義了一個volatile修飾的SelectionKey,該SelectionKey是注冊到EventLoop後傳回的選擇鍵。由于Channel會面臨多個業務線程的并發寫操作,當SelectionKey由SelectionKey修改後,為了能讓其他業務線程感覺到變化,是以需要使用volatile保證修飾的可見性。

最後定義了代表連接配接操作結果的ChannelPromise以及連接配接逾時定時器ScheduledFuture和請求的通信位址資訊。

2.核心API源碼分析

我們看下AbstractNioChannel實作的主要API,首先是Channel的注冊,如下:

protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
           

定義一個局部變量selected來辨別注冊操作是否成功,調用SelectableChannel的register()方法,将目前的Channel注冊到EventLoop的多路複用器上。

注冊Channel的時候需要指定監聽的網絡操作位來表示Channel對哪幾類網絡事件感興趣,具體的定義如下:

  • public static final int OP_READ = 1 <<9:讀操作位
  • public static final int OP_WRITE=1 << 2 寫操作位
  • public static final int OP_CONNECT = 1<< 3讀用戶端連接配接服務端操作位
  • public static final int OP_ACCEPT = 1 << 4 服務端接收用戶端連接配接操作位。

AbstractNioChannel注冊的是0,說明對任何事件都不感興趣,僅僅完成注冊操作。注冊的時候可以指定附件,後續Channel接收到網絡事件通知時可以從SelectionKey中重新擷取之前的附件進行處理,此處将AbstractNioChannel的實作子類自身當作附件注冊。如果注冊Channel成功,則傳回selectionKey,通過selectionKey可以從多路複用器中擷取Channel對象。

下面繼續看另一個比較重要的方法:準備處理讀操作之前需要設定網絡操作位為讀,如下所示:

protected void doBeginRead() throws Exception {
        if (inputShutdown) {
            return;
        }

        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
           

繼續閱讀