天天看點

帶你讀《Netty、Redis、ZooKeeper高并發實戰》之三:Java NIO通信基礎詳解第3章

點選檢視第一章 點選檢視第二章

第3章

Java NIO通信基礎詳解

高性能的Java通信,絕對離不開Java NIO技術,現在主流的技術架構或中間件伺服器,都使用了Java NIO技術,譬如Tomcat、Jetty、Netty。學習和掌握NIO技術,已經不是一項加分技能,而是一項必備技能。不管是面試,還是實際開發,作為Java的“攻城獅”(工程師的諧音),都必須掌握NIO的原理和開發實踐技能。

3.1 Java NIO簡介

在1.4版本之前,Java IO類庫是阻塞IO;從1.4版本開始,引進了新的異步IO庫,被稱為Java New IO類庫,簡稱為JAVA NIO。New IO類庫的目标,就是要讓Java支援非阻塞IO,基于這個原因,更多的人喜歡稱Java NIO為非阻塞IO(Non-Block IO),稱“老的”阻塞式Java IO為OIO(Old IO)。總體上說,NIO彌補了原來面向流的OIO同步阻塞的不足,它為标準Java代碼提供了高速的、面向緩沖區的IO。

Java NIO由以下三個核心元件組成:

  • Channel(通道)
  • Buffer(緩沖區)
  • Selector(選擇器)

如果了解了第1章的四種IO模型,大家一眼就能識别出來,Java NIO,屬于第三種模型—— IO 多路複用模型。當然,Java NIO元件,提供了統一的API,為大家屏蔽了底層的不同作業系統的差異。

後面的章節,我們會對以上的三個Java NIO的核心元件,展開詳細介紹。先來看看Java的NIO和OIO的簡單對比。

3.1.1 NIO和OIO的對比

在Java中,NIO和OIO的差別,主要展現在三個方面:

(1)OIO是面向流(Stream Oriented)的,NIO是面向緩沖區(Buffer Oriented)的。

何謂面向流,何謂面向緩沖區呢?

OIO是面向位元組流或字元流的,在一般的OIO操作中,我們以流式的方式順序地從一個流(Stream)中讀取一個或多個位元組,是以,我們不能随意地改變讀取指針的位置。而在NIO操作中則不同,NIO中引入了Channel(通道)和Buffer(緩沖區)的概念。讀取和寫入,隻需要從通道中讀取資料到緩沖區中,或将資料從緩沖區中寫入到通道中。NIO不像OIO那樣是順序操作,可以随意地讀取Buffer中任意位置的資料。

(2)OIO的操作是阻塞的,而NIO的操作是非阻塞的。

NIO如何做到非阻塞的呢?大家都知道,OIO操作都是阻塞的,例如,我們調用一個read方法讀取一個檔案的内容,那麼調用read的線程會被阻塞住,直到read操作完成。

而在NIO的非阻塞模式中,當我們調用read方法時,如果此時有資料,則read讀取資料并傳回;如果此時沒有資料,則read直接傳回,而不會阻塞目前線程。NIO的非阻塞,是如何做到的呢?其實在上一章,答案已經揭曉了,NIO使用了通道和通道的多路複用技術。

(3)OIO沒有選擇器(Selector)概念,而NIO有選擇器的概念。

NIO的實作,是基于底層的選擇器的系統調用。NIO的選擇器,需要底層作業系統提供支援。而OIO不需要用到選擇器。

3.1.2 通道(Channel)

在OIO中,同一個網絡連接配接會關聯到兩個流:一個輸入流(Input Stream),另一個輸出流(Output Stream)。通過這兩個流,不斷地進行輸入和輸出的操作。

在NIO中,同一個網絡連接配接使用一個通道表示,所有的NIO的IO操作都是從通道開始的。一個通道類似于OIO中的兩個流的結合體,既可以從通道讀取,也可以向通道寫入。

3.1.3 Selector 選擇器

首先,回顧一個基礎的問題,什麼是IO多路複用?指的是一個程序/線程可以同時監視多個檔案描述符(一個網絡連接配接,作業系統底層使用一個檔案描述符來表示),一旦其中的一個或者多個檔案描述符可讀或者可寫,系統核心就通知該程序/線程。在Java應用層面,如何實作對多個檔案描述符的監視呢?需要用到一個非常重要的Java NIO元件——Selector 選擇器。

選擇器的神奇功能是什麼呢?它一個IO事件的查詢器。通過選擇器,一個線程可以查詢多個通道的IO事件的就緒狀态。

實作IO多路複用,從具體的開發層面來說,首先把通道注冊到選擇器中,然後通過選擇器内部的機制,可以查詢(select)這些注冊的通道是否有已經就緒的IO事件(例如可讀、可寫、網絡連接配接完成等)。

一個選擇器隻需要一個線程進行監控,換句話說,我們可以很簡單地使用一個線程,通過選擇器去管理多個通道。這是非常高效的,這種高效來自于Java的選擇器元件Selector,以及其背後的作業系統底層的IO多路複用的支援。

與OIO相比,使用選擇器的最大優勢:系統開銷小,系統不必為每一個網絡連接配接(檔案描述符)建立程序/線程,進而大大減小了系統的開銷。

3.1.4 緩沖區(Buffer)

應用程式與通道(Channel)主要的互動操作,就是進行資料的read讀取和write寫入。為了完成如此大任,NIO為大家準備了第三個重要的元件——NIO Buffer(NIO緩沖區)。通道的讀取,就是将資料從通道讀取到緩沖區中;通道的寫入,就是将資料從緩沖區中寫入到通道中。

緩沖區的使用,是面向流的OIO所沒有的,也是NIO非阻塞的重要前提和基礎之一。

下面從緩沖區開始,詳細介紹NIO的Buffer(緩沖區)、Channel(通道)、Selector(選擇器)三大核心元件。

3.2 詳解NIO Buffer類及其屬性

NIO的Buffer(緩沖區)本質上是一個記憶體塊,既可以寫入資料,也可以從中讀取資料。NIO的Buffer類,是一個抽象類,位于java.nio包中,其内部是一個記憶體塊(數組)。

NIO的Buffer與普通的記憶體塊(Java數組)不同的是:NIO Buffer對象,提供了一組更加有效的方法,用來進行寫入和讀取的交替通路。

需要強調的是:Buffer類是一個非線程安全類。

3.2.1 Buffer類

Buffer類是一個抽象類,對應于Java的主要資料類型,在NIO中有8種緩沖區類,分别如下:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer、MappedByteBuffer。

前7種Buffer類型,覆寫了能在IO中傳輸的所有的Java基本資料類型。第8種類型MappedByteBuffer是專門用于記憶體映射的一種ByteBuffer類型。

實際上,使用最多的還是ByteBuffer二進制位元組緩沖區類型,後面會看到。

3.2.2 Buffer類的重要屬性

Buffer類在其内部,有一個byte[]數組記憶體塊,作為記憶體緩沖區。為了記錄讀寫的狀态和位置,Buffer類提供了一些重要的屬性。其中,有三個重要的成員屬性:capacity(容量)、position(讀寫位置)、limit(讀寫的限制)。

除此之外,還有一個标記屬性:mark(标記),可以将目前的position臨時存入mark中;需要的時候,可以再從mark标記恢複到position位置。

1. capacity屬性

Buffer類的capacity屬性,表示内部容量的大小。一旦寫入的對象數量超過了capacity容量,緩沖區就滿了,不能再寫入了。

Buffer類的capacity屬性一旦初始化,就不能再改變。原因是什麼呢?Buffer類的對象在初始化時,會按照capacity配置設定内部的記憶體。在記憶體配置設定好之後,它的大小當然就不能改變了。

再強調一下,capacity容量不是指記憶體塊byte[]數組的位元組的數量。capacity容量指的是寫入的資料對象的數量。

前面講到,Buffer類是一個抽象類,Java不能直接用來建立對象。使用的時候,必須使用Buffer的某個子類,例如使用DoubleBuffer,則寫入的資料是double類型,如果其capacity是100,那麼我們最多可以寫入100個double資料。

2. position屬性

Buffer類的position屬性,表示目前的位置。position屬性與緩沖區的讀寫模式有關。在不同的模式下,position屬性的值是不同的。當緩沖區進行讀寫的模式改變時,position會進行調整。

在寫入模式下,position的值變化規則如下:(1)在剛進入到寫模式時,position值為0,表示目前的寫入位置為從頭開始。(2)每當一個資料寫到緩沖區之後,position會向後移動到下一個可寫的位置。(3)初始的position值為0,最大可寫值position為limit– 1。當position值達到limit時,緩沖區就已經無空間可寫了。

在讀模式下,position的值變化規則如下:(1)當緩沖區剛開始進入到讀模式時,position會被重置為0。(2)當從緩沖區讀取時,也是從position位置開始讀。讀取資料後,position向前移動到下一個可讀的位置。(3)position最大的值為最大可讀上限limit,當position達到limit時,表明緩沖區已經無資料可讀。

起點在哪裡呢?當建立一個緩沖區時,緩沖區處于寫入模式,這時是可以寫資料的。資料寫入後,如果要從緩沖區讀取資料,這就要進行模式的切換,可以使用(即調用)flip翻轉方法,将緩沖區變成讀取模式。

在這個flip翻轉過程中,position會進行非常巨大的調整,具體的規則是:position由原來的寫入位置,變成新的可讀位置,也就是0,表示可以從頭開始讀。flip翻轉的另外一半工作,就是要調整limit屬性。

3. limit屬性

Buffer類的limit屬性,表示讀寫的最大上限。limit屬性,也與緩沖區的讀寫模式有關。在不同的模式下,limit的值的含義是不同的。

在寫模式下,limit屬性值的含義為可以寫入的資料最大上限。在剛進入到寫模式時,limit的值會被設定成緩沖區的capacity容量值,表示可以一直将緩沖區的容量寫滿。

在讀模式下,limit的值含義為最多能從緩沖區中讀取到多少資料。

一般來說,是先寫入再讀取。當緩沖區寫入完成後,就可以開始從Buffer讀取資料,可以使用flip翻轉方法,這時,limit的值也會進行非常大的調整。

具體如何調整呢?将寫模式下的position值,設定成讀模式下的limit值,也就是說,将之前寫入的最大數量,作為可以讀取的上限值。

在flip翻轉時,屬性的調整,将涉及position、limit兩個屬性,這種調整比較微妙,不是太好了解,舉一個簡單例子:

首先,建立緩沖區。剛開始,緩沖區處于寫模式。position為0,limit為最大容量。

然後,向緩沖區寫資料。每寫入一個資料,position向後面移動一個位置,也就是position的值加1。假定寫入了5個數,當寫入完成後,position的值為5。

這時,使用(即調用)flip方法,将緩沖區切換到讀模式。limit的值,先會被設定成寫模式時的position值。這裡新的limit是5,表示可以讀取的最大上限是5個數。同時,新的position會被重置為0,表示可以從0開始讀。

3.2.3 4個屬性的小結

除了前面的3個屬性,第4個屬性mark(标記)比較簡單。就是相當一個暫存屬性,暫時儲存position的值,友善後面的重複使用position值。

下面用一個表格總結一下 Buffer類的4個重要屬性,參見表3-1。

表3-1 Buffer四個重要屬性的取值說明

帶你讀《Netty、Redis、ZooKeeper高并發實戰》之三:Java NIO通信基礎詳解第3章

3.3 詳解NIO Buffer類的重要方法

本小節将詳細介紹Buffer類使用中常用的幾個方法,包含Buffer執行個體的擷取、對Buffer執行個體的寫入、讀取、重複讀、标記和重置等。

3.3.1 allocate()建立緩沖區

在使用Buffer(緩沖區)之前,我們首先需要擷取Buffer子類的執行個體對象,并且配置設定記憶體空間。

為了擷取一個Buffer執行個體對象,這裡并不是使用子類的構造器new來建立一個執行個體對象,而是調用子類的allocate()方法。

下面的程式片段就是用來擷取一個整型Buffer類的緩沖區執行個體對象,代碼如下:

package com.crazymakercircle.bufferDemo;
//...
public class UseBuffer
{
static IntBufferintBuffer = null;
    public static void allocatTest()
    {
               //調用allocate方法,而不是使用new 
            intBuffer = IntBuffer.allocate(20);
            //輸出buffer的主要屬性值    
Logger.info("------------after allocate------------------");
            Logger.info("position=" + intBuffer.position());
            Logger.info("limit=" + intBuffer.limit());
            Logger.info("capacity=" + intBuffer.capacity());
    }
//...
}           

例子中,IntBuffer是具體的Buffer子類,通過調用IntBuffer.allocate(20),建立了一個Intbuffer執行個體對象,并且配置設定了20 * 4個位元組的記憶體空間。

通過程式的輸出結果,我們可以檢視一個建立緩沖區執行個體對象的主要屬性值,如下所示:

allocatTest |> ------------after allocate------------------

allocatTest |> position=0

allocatTest |> limit=20

allocatTest |> capacity=20

從上面的運作結果,可以看出:

一個緩沖區在建立後,處于寫入的模式,position寫入位置為0,最大可寫上限limit為的初始化值(這裡是20),而緩沖區的容量capacity也是初始化值。

3.3.2 put()寫入到緩沖區

在調用allocate方法配置設定記憶體、傳回了執行個體對象後,緩沖區執行個體對象處于寫模式,可以寫入對象。要寫入緩沖區,需要調用put方法。put方法很簡單,隻有一個參數,即為所需要寫入的對象。不過,寫入的資料類型要求與緩沖區的類型保持一緻。

接着前面的例子,向剛剛建立的intBuffer緩存執行個體對象中,寫入的5個整數,代碼如下:

package com.crazymakercircle.bufferDemo;
//...
public class UseBuffer
{
    static IntBufferintBuffer = null;
     //省略了建立緩沖區的代碼,具體看源代碼工程
    public static void putTest()
    {
            for (int i = 0; i< 5; i++)
            {
                    //寫入一個整數到緩沖區
                    intBuffer.put(i);
            }
            //輸出緩沖區的主要屬性值            
            Logger.info("------------after put------------------");
            Logger.info("position=" + intBuffer.position());
            Logger.info("limit=" + intBuffer.limit());
            Logger.info("capacity=" + intBuffer.capacity());
    }
    //...
}           

寫入5個元素後,同樣輸出緩沖區的主要屬性值,輸出的結果如下:

putTest |>  ------------after putTest------------------            

putTest |> position=5

putTest |> limit=20

putTest |> capacity=20

從結果可以看到,position變成了5,指向了第6個可以寫入的元素位置。而limit最大寫入元素的上限、capacity最大容量的值,并沒有發生變化。

3.3.3 flip()翻轉

向緩沖區寫入資料之後,是否可以直接從緩沖區中讀取資料呢?呵呵,不能。

這時緩沖區還處于寫模式,如果需要讀取資料,還需要将緩沖區轉換成讀模式。flip()翻轉方法是Buffer類提供的一個模式轉變的重要方法,它的作用就是将寫入模式翻轉成讀取模式。

接着前面的例子,示範一下flip()方法的使用:

package com.crazymakercircle.bufferDemo;
//...
public class UseBuffer
{
    static IntBufferintBuffer = null;
    //省略了緩沖區的建立、寫入的代碼,具體看源代碼工程
    public static void flipTest()
    {
            //翻轉緩沖區,從寫模式翻轉成讀模式            
            intBuffer.flip();
            //輸出緩沖區的主要屬性值            
            Logger.info("------------after flip ------------------");
            Logger.info("position=" + intBuffer.position());
            Logger.info("limit=" + intBuffer.limit());
            Logger.info("capacity=" + intBuffer.capacity());
    }
    //...
}           

在調用flip進行模式翻轉之後,緩沖區的屬性有了奇妙的變化,輸出如下:

flipTest |> ------------after flipTest ------------------

flipTest |> position=0

flipTest |> limit=5

flipTest |> capacity=20

調用flip方法後,之前寫入模式下的position值5,變成了可讀上限limit值5;而新的讀取模式下的position值,簡單粗暴地變成了0,表示從頭開始讀取。

對flip()方法的從寫入到讀取轉換的規則,詳細的介紹如下:

首先,設定可讀的長度上限limit。将寫模式下的緩沖區中内容的最後寫入位置position值,作為讀模式下的limit上限值。

其次,把讀的起始位置position的值設為0,表示從頭開始讀。

最後,清除之前的mark标記,因為mark儲存的是寫模式下的臨時位置。在讀模式下,如果繼續使用舊的mark标記,會造成位置混亂。

有關上面的三步,其實可以檢視flip方法的源代碼,Buffer.flip()方法的源代碼如下:

public final Buffer flip() {

limit = position;  //設定可讀的長度上限limit,為寫入的position
position = 0;       //把讀的起始位置position的值設為0,表示從頭開始讀           

mark = UNSET_MARK; // 清除之前的mark标記

return this;

}

至此,大家都知道了,如何将緩沖區切換成讀取模式。

新的問題來了,在讀取完成後,如何再一次将緩沖區切換成寫入模式呢?可以調用Buffer.clear() 清空或者Buffer.compact()壓縮方法,它們可以将緩沖區轉換為寫模式。

Buffer的模式轉換,大緻如圖3-1所示。

帶你讀《Netty、Redis、ZooKeeper高并發實戰》之三:Java NIO通信基礎詳解第3章

圖3-1 緩沖區讀寫模式的轉換

3.3.4 get()從緩沖區讀取

調用flip方法,将緩沖區切換成讀取模式。這時,可以開始從緩沖區中進行資料讀取了。讀資料很簡單,調用get方法,每次從position的位置讀取一個資料,并且進行相應的緩沖區屬性的調整。

接着前面flip的使用執行個體,示範一下緩沖區的讀取操作,代碼如下:

package com.crazymakercircle.bufferDemo;
//...
public class UseBuffer
{
static IntBufferintBuffer = null;
//省略了緩沖區的建立、寫入、翻轉的代碼,具體看源代碼工程
  public static void getTest()
{
//先讀2個
       for (int i = 0; i< 2; i++)
        {
            int j = intBuffer.get();
            Logger.info("j = " + j);
        }
//輸出緩沖區的主要屬性值            
Logger.info("------------after get 2 int ------------------");
Logger.info("position=" + intBuffer.position());
Logger.info("limit=" + intBuffer.limit());
Logger.info("capacity=" + intBuffer.capacity());
//再讀3個
        for (int i = 0; i< 3; i++)
        {
            int j = intBuffer.get();
            Logger.info("j = " + j);
        }
//輸出緩沖區的主要屬性值            
Logger.info("------------after get 3 int ------------------");
Logger.info("position=" + intBuffer.position());
Logger.info("limit=" + intBuffer.limit());
Logger.info("capacity=" + intBuffer.capacity());
    }
//...
}
先讀2個,再讀3個,運作後,輸出的結果如下:
getTest |>  ------------after get 2 int ------------------ 
getTest |>  position=2 
getTest |>  limit=5 
getTest |>  capacity=20 
getTest |>  ------------after get 3 int ------------------ 
getTest |>  position=5 
getTest |>  limit=5 
getTest |>  capacity=20           

從程式的輸出結果,我們可以看到,讀取操作會改變可讀位置position的值,而limit值不會改變。如果position值和limit的值相等,表示所有資料讀取完成,position指向了一個沒有資料的元素位置,已經不能再讀了。此時再讀,會抛出BufferUnderflowException異常。

這裡強調一下,在讀完之後,是否可以立即進行寫入模式呢?不能。現在還處于讀取模式,我們必須調用Buffer.clear()或Buffer.compact(),即清空或者壓縮緩沖區,才能變成寫入模式,讓其重新可寫。

另外,還有一個問題:緩沖區是不是可以重複讀呢?答案是可以的。

3.3.5 rewind()倒帶

已經讀完的資料,如果需要再讀一遍,可以調用rewind()方法。rewind()也叫倒帶,就像播放錄音帶一樣倒回去,再重新播放。

接着前面的代碼,繼續rewind方法使用的示範,示例代碼如下:

package com.crazymakercircle.bufferDemo;
//...
public class UseBuffer
{
    static IntBufferintBuffer = null;
     //省略了緩沖區的建立、寫入、讀取的代碼,具體看源代碼工程
    public static void rewindTest() {
              //倒帶
            intBuffer.rewind();
              //輸出緩沖區屬性
            Logger.info("------------after rewind ------------------");
            Logger.info("position=" + intBuffer.position());
            Logger.info("limit=" + intBuffer.limit());
            Logger.info("capacity=" + intBuffer.capacity());
    }
//...
}           

這個範例程式的執行結果如下:

rewindTest |> ------------after rewind ------------------

rewindTest |> position=0

rewindTest |> limit=5

rewindTest |> capacity=20

rewind ()方法,主要是調整了緩沖區的position屬性,具體的調整規則如下:

(1)position重置為0,是以可以重讀緩沖區中的所有資料。

(2)limit保持不變,資料量還是一樣的,仍然表示能從緩沖區中讀取多少個元素。

(3)mark标記被清理,表示之前的臨時位置不能再用了。

Buffer.rewind()方法的源代碼如下:

public final Buffer rewind() {
            position = 0;//重置為0,是以可以重讀緩沖區中的所有資料
            mark = -1; //  mark标記被清理,表示之前的臨時位置不能再用了
            return this;
        }           

通過源代碼,我們可以看到rewind()方法與flip()很相似,差別在于:rewind()不會影響limit屬性值;而flip()會重設limit屬性值。

在rewind倒帶之後,就可以再一次讀取,重複讀取的示例代碼如下:

package com.crazymakercircle.bufferDemo;
//...
public class UseBuffer
{
   static IntBufferintBuffer = null;
   //省略了緩沖區的讀取、倒帶的代碼,具體看源代碼工程
   public static void reRead() {
        for (int i = 0; i< 5; i++) {
            if (i == 2) {
                //臨時儲存,标記一下第3個位置
                intBuffer.mark();
            }
//讀取元素
            int j = intBuffer.get();
            Logger.info("j = " + j);
        }
//輸出緩沖區的屬性值
Logger.info("------------after reRead------------------");
Logger.info("position=" + intBuffer.position());
Logger.info("limit=" + intBuffer.limit());
Logger.info("capacity=" + intBuffer.capacity());
    }
//...
}           

這段代碼,和前面的讀取示例代碼基本相同,隻是增加了一個mark調用。

3.3.6 mark( )和reset( )

Buffer.mark()方法的作用是将目前position的值儲存起來,放在mark屬性中,讓mark屬性記住這個臨時位置;之後,可以調用Buffer.reset()方法将mark的值恢複到position中。

也就是說,Buffer.mark()和Buffer.reset()方法是配套使用的。兩種方法都需要内部mark屬性的支援。

在前面重複讀取緩沖區的示例代碼中,讀到第3個元素(i= =2時),調用mark()方法,把目前位置position的值儲存到mark屬性中,這時mark屬性的值為2。

接下來,就可以調用reset方法,将mark屬性的值恢複到position中。然後可以從位置2(第三個元素)開始讀。

繼續接着前面的重複讀取的代碼,進行reset的示例示範,代碼如下:

package com.crazymakercircle.bufferDemo;
//...
public class UseBuffer
{
    static IntBufferintBuffer = null;
    //省略了緩沖區之前的mark等代碼,具體看源代碼工程
    public static void afterReset() {
            Logger.info("------------after reset------------------");
            //把前面儲存在mark中的值恢複到position
            intBuffer.reset();
            //輸出緩沖區的屬性值
            Logger.info("position=" + intBuffer.position());
            Logger.info("limit=" + intBuffer.limit());
            Logger.info("capacity=" + intBuffer.capacity());
            //讀取并且輸出元素
            for (int i =2; i< 5; i++) {
                    int j = intBuffer.get();
                    Logger.info("j = " + j);
            }
    }
//...
}           

在上面的代碼中,首先調用reset()把mark中的值恢複到position中,是以讀取的位置position就是2,表示可以再次開始從第3個元素開始讀取資料。上面的程式代碼的輸出結果是:

afterReset |> ------------after reset------------------

afterReset |> position=2

afterReset |> limit=5

afterReset |> capacity=20

afterReset |> j = 2

afterReset |> j = 3

afterReset |> j = 4

調用reset方法之後,position的值為2。此時去讀取緩沖區,輸出後面的三個元素為2、3、4。

3.3.7 clear( )清空緩沖區

在讀取模式下,調用clear()方法将緩沖區切換為寫入模式。此方法會将position清零,limit設定為capacity最大容量值,可以一直寫入,直到緩沖區寫滿。

接着上面的執行個體,示範一下clear方法。代碼如下:

package com.crazymakercircle.bufferDemo;
//...
public class UseBuffer
{
    static IntBufferintBuffer = null;
    //省略了之前的buffer操作代碼,具體看源代碼工程
    public static void clearDemo() {
        Logger.info("------------after clear------------------");
        //清空緩沖區,進入寫入模式
        intBuffer.clear();
        //輸出緩沖區的屬性值
        Logger.info("position=" + intBuffer.position());
        Logger.info("limit=" + intBuffer.limit());
        Logger.info("capacity=" + intBuffer.capacity());
    }
//...
}           

這個程式運作之後,結果如下:

main |>清空

clearDemo |> ------------after clear------------------

clearDemo |> position=0

clearDemo |> limit=20

clearDemo |> capacity=20

在緩沖區處于讀取模式時,調用clear(),緩沖區會被切換成寫入模式。調用clear()之後,我們可以看到清空了position的值,即設定寫入的起始位置為0,并且寫入的上限為最大容量。

3.3.8 使用Buffer類的基本步驟

總體來說,使用Java NIO Buffer類的基本步驟如下:

(1)使用建立子類執行個體對象的allocate()方法,建立一個Buffer類的執行個體對象。

(2)調用put方法,将資料寫入到緩沖區中。

(3)寫入完成後,在開始讀取資料前,調用Buffer.flip()方法,将緩沖區轉換為讀模式。

(4)調用get方法,從緩沖區中讀取資料。

(5)讀取完成後,調用Buffer.clear() 或Buffer.compact()方法,将緩沖區轉換為寫入模式。

3.4 詳解NIO Channel(通道)類

前面講到,NIO中一個連接配接就是用一個Channel(通道)來表示。大家知道,從更廣泛的層面來說,一個通道可以表示一個底層的檔案描述符,例如硬體裝置、檔案、網絡連接配接等。然而,遠遠不止如此,除了可以對應到底層檔案描述符,Java NIO的通道還可以更加細化。例如,對應不同的網絡傳輸協定類型,在Java中都有不同的NIO Channel(通道)實作。

3.4.1 Channel(通道)的主要類型

這裡不對紛繁複雜的Java NIO通道類型進行過多的描述,僅僅聚焦于介紹其中最為重要的四種Channel(通道)實作:FileChannel、SocketChannel、ServerSocketChannel、DatagramChannel。

對于以上四種通道,說明如下:

(1)FileChannel檔案通道,用于檔案的資料讀寫。

(2)SocketChannel套接字通道,用于Socket套接字TCP連接配接的資料讀寫。

(3)ServerSocketChannel伺服器嵌套字通道(或伺服器監聽通道),允許我們監聽TCP連接配接請求,為每個監聽到的請求,建立一個SocketChannel套接字通道。

(4)DatagramChannel資料報通道,用于UDP協定的資料讀寫。

這個四種通道,涵蓋了檔案IO、TCP網絡、UDP IO基礎IO。下面從Channel(通道)的擷取、讀取、寫入、關閉四個重要的操作,來對四種通道進行簡單的介紹。

3.4.2 FileChannel檔案通道

FileChannel是專門操作檔案的通道。通過FileChannel,既可以從一個檔案中讀取資料,也可以将資料寫入到檔案中。特别申明一下,FileChannel為阻塞模式,不能設定為非阻塞模式。

下面分别介紹:FileChannel的擷取、讀取、寫入、關閉四個操作。

1. 擷取FileChannel通道

可以通過檔案的輸入流、輸出流擷取FileChannel檔案通道,示例如下:

//建立一條檔案輸入流
FileInputStreamfis = new FileInputStream(srcFile);
//擷取檔案流的通道
FileChannelinChannel = fis.getChannel();

//建立一條檔案輸出流
FileOutputStreamfos = new FileOutputStream(destFile);
//擷取檔案流的通道
FileChanneloutchannel = fos.getChannel();
也可以通過RandomAccessFile檔案随機通路類,擷取FileChannel檔案通道:
// 建立RandomAccessFile随機通路對象
RandomAccessFileaFile = new RandomAccessFile("filename.txt","rw");
//擷取檔案流的通道
FileChannelinChannel = aFile.getChannel();           

2. 讀取FileChannel通道

在大部分應用場景,從通道讀取資料都會調用通道的int read(ByteBufferbuf)方法,它從通道讀取到資料寫入到ByteBuffer緩沖區,并且傳回讀取到的資料量。

RandomAccessFileaFile = new RandomAccessFile(fileName, "rw");
//擷取通道
FileChannelinChannel=aFile.getChannel();
//擷取一個位元組緩沖區
ByteBufferbuf = ByteBuffer.allocate(CAPACITY);
int length = -1;
//調用通道的read方法,讀取資料并買入位元組類型的緩沖區
while ((length = inChannel.read(buf)) != -1) {
//省略……處理讀取到的buf中的資料
}           

注 意

雖然對于通道來說是讀取資料,但是對于ByteBuffer緩沖區來說是寫入資料,這時,ByteBuffer緩沖區處于寫入模式。

3. 寫入FileChannel通道

寫入資料到通道,在大部分應用場景,都會調用通道的int write(ByteBufferbuf)方法。此方法的參數——ByteBuffer緩沖區,是資料的來源。write方法的作用,是從ByteBuffer緩沖區中讀取資料,然後寫入到通道自身,而傳回值是寫入成功的位元組數。

//如果buf剛寫完資料,需要flip翻轉buf,使其變成讀取模式
buf.flip();
int outlength = 0;
//調用write方法,将buf的資料寫入通道
while ((outlength = outchannel.write(buf)) != 0) {
        System.out.println("寫入的位元組數:" + outlength);
}           

此時的ByteBuffer緩沖區要求是可讀的,處于讀模式下。

4.關閉通道

當通道使用完成後,必須将其關閉。關閉非常簡單,調用close方法即可。

//關閉通道

channel.close();

5.強制重新整理到磁盤

在将緩沖區寫入通道時,出于性能原因,作業系統不可能每次都實時将資料寫入磁盤。如果需要保證寫入通道的緩沖資料,最終都真正地寫入磁盤,可以調用FileChannel的force()方法。

//強制重新整理到磁盤

channel.force(true);

3.4.3 使用FileChannel完成檔案複制的實踐案例

下面是一個簡單的實戰案例:使用檔案通道複制檔案。其功能是:使用FileChannel檔案通道,将原檔案複制一份,也就是把原文中的資料都複制到目标檔案中。完整代碼如下:

package com.crazymakercircle.iodemo.fileDemos;
//...省略import的類,具體請參見源代碼工程
public class FileNIOCopyDemo {
    public static void main(String[] args) {
        //示範複制資源檔案
        nioCopyResouceFile();
    }
    /**
     * 複制兩個資源目錄下的檔案
     */
    public static void nioCopyResouceFile() {
        String sourcePath = NioDemoConfig.FILE_RESOURCE_SRC_PATH;
        String srcPath = IOUtil.getResourcePath(sourcePath);
        Logger.info("srcPath=" + srcPath);

        String destPath = NioDemoConfig.FILE_RESOURCE_DEST_PATH;
        String destDecodePath = IOUtil.builderResourcePath(destPath);
        Logger.info("destDecodePath=" + destDecodePath);

        nioCopyFile(srcDecodePath, destDecodePath);
    }
    /**
     * nio方式複制檔案
     * @param srcPath
     * @param destPath
     */
    public static void nioCopyFile(String srcPath, String destPath) {
        File srcFile = new File(srcPath);
        File destFile = new File(destPath);
        try {
            //如果目标檔案不存在,則建立
            if (!destFile.exists()) {
                destFile.createNewFile();
            }
        long startTime = System.currentTimeMillis();
        FileInputStreamfis = null;
        FileOutputStreamfos = null;
        FileChannelinChannel = null;
        FileChanneloutchannel = null;
        try {
            fis = new FileInputStream(srcFile);
            fos = new FileOutputStream(destFile);
            inChannel = fis.getChannel();
            outchannel = fos.getChannel();
            int length = -1;
            ByteBufferbuf = ByteBuffer.allocate(1024);
            //從輸入通道讀取到buf
            while ((length = inChannel.read(buf)) != -1) {
            //第一次切換:翻轉buf,變成讀取模式
                buf.flip();

                int outlength = 0;
                //将buf寫入到輸出的通道
                while ((outlength = outchannel.write(buf)) != 0) {
                    System.out.println("寫入的位元組數:" + outlength);
                }
                //第二次切換:清除buf,變成寫入模式
                buf.clear();
            }
            //強制重新整理到磁盤
            outchannel.force(true);
        } finally {
             //關閉所有的可關閉對象
             IOUtil.closeQuietly(outchannel);
             IOUtil.closeQuietly(fos);
             IOUtil.closeQuietly(inChannel);
             IOUtil.closeQuietly(fis);
          }
        long endTime = System.currentTimeMillis();
        Logger.info("base複制毫秒數:" + (endTime - startTime));
    } catch (IOException e) {
              e.printStackTrace();
       }
}           

特别強調一下,除了FileChannel的通道操作外,還需要注意ByteBuffer的模式切換。建立的ByteBuffer,預設是寫入模式,可以作為inChannel.read(ByteBuffer)的參數。inChannel.read方法将從通道inChannel讀到的資料寫入到ByteBuffer。

此後,需要調用緩沖區的flip方法,将ByteBuffer切換成讀取模式,才能作為outchannel.write(ByteBuffer)方法的參數,從ByteBuffer讀取資料,再寫入到outchannel輸出通道。

如此,便是完成一次複制。在進入下一次複制前,還要進行一次緩沖區的模式切換。ByteBuffer資料讀完之後,需要将通過clear方法切換成寫入模式,才能進入下一次的複制。

在示例代碼中,外層的每一輪while循環,都需要兩次模式ByteBuffer切換:第一次切換時,翻轉buf,變成讀取模式;第二次切換時,清除buf,變成寫入模式。

上面的示例代碼,主要的目的在于:示範檔案通道以及位元組緩沖區的使用。作為檔案複制的程式來說,實戰代碼的效率不是最高的。

更高效的檔案複制,可以調用檔案通道的transferFrom方法。具體的代碼,可以參見源代碼工程中的FileNIOFastCopyDemo類,完整源檔案的路徑為:

com.crazymakercircle.iodemo.fileDemos.FileNIOFastCopyDemo

3.4.4 SocketChannel套接字通道

在NIO中,涉及網絡連接配接的通道有兩個,一個是SocketChannel負責連接配接傳輸,另一個是ServerSocketChannel負責連接配接的監聽。

NIO中的SocketChannel傳輸通道,與OIO中的Socket類對應。

NIO中的ServerSocketChannel監聽通道,對應于OIO中的ServerSocket類。

ServerSocketChannel應用于伺服器端,而SocketChannel同時處于伺服器端和用戶端。換句話說,對應于一個連接配接,兩端都有一個負責傳輸的SocketChannel傳輸通道。

無論是ServerSocketChannel,還是SocketChannel,都支援阻塞和非阻塞兩種模式。如何進行模式的設定呢?調用configureBlocking方法,具體如下:

(1)socketChannel.configureBlocking(false)設定為非阻塞模式。

(2)socketChannel.configureBlocking(true)設定為阻塞模式。

在阻塞模式下,SocketChannel通道的connect連接配接、read讀、write寫操作,都是同步的和阻塞式的,在效率上與Java舊的OIO的面向流的阻塞式讀寫操作相同。是以,在這裡不介紹阻塞模式下的通道的具體操作。在非阻塞模式下,通道的操作是異步、高效率的,這也是相對于傳統的OIO的優勢所在。下面詳細介紹在非阻塞模式下通道的打開、讀寫和關閉操作等操作。

1. 擷取SocketChannel傳輸通道

在用戶端,先通過SocketChannel靜态方法open()獲得一個套接字傳輸通道;然後,将socket套接字設定為非阻塞模式;最後,通過connect()執行個體方法,對伺服器的IP和端口發起連接配接。

//獲得一個套接字傳輸通道           

SocketChannelsocketChannel = SocketChannel.open();

//設定為非阻塞模式           

socketChannel.configureBlocking(false);

//對伺服器的IP和端口發起連接配接           

socketChannel.connect(new InetSocketAddress("127.0.0.1",80));

非阻塞情況下,與伺服器的連接配接可能還沒有真正建立,socketChannel.connect方法就傳回了,是以需要不斷地自旋,檢查目前是否是連接配接到了主機:

while(! socketChannel.finishConnect() ){

//不斷地自旋、等待,或者做一些其他的事情……               

在伺服器端,如何擷取傳輸套接字呢?

當新連接配接事件到來時,在伺服器端的ServerSocketChannel能成功地查詢出一個新連接配接事件,并且通過調用伺服器端ServerSocketChannel監聽套接字的accept()方法,來擷取新連接配接的套接字通道:

//新連接配接事件到來,首先通過事件,擷取伺服器監聽通道

ServerSocketChannel server = (ServerSocketChannel) key.channel();

//擷取新連接配接的套接字通道

SocketChannelsocketChannel = server.accept();

//設定為非阻塞模式

強調一下,NIO套接字通道,主要用于非阻塞應用場景。是以,需要調用configureBlocking(false),從阻塞模式設定為非阻塞模式。

2. 讀取SocketChannel傳輸通道

當SocketChannel通道可讀時,可以從SocketChannel讀取資料,具體方法與前面的檔案通道讀取方法是相同的。調用read方法,将資料讀入緩沖區ByteBuffer。

ByteBufferbuf = ByteBuffer.allocate(1024);

int bytesRead = socketChannel.read(buf);

在讀取時,因為是異步的,是以我們必須檢查read的傳回值,以便判斷目前是否讀取到了資料。read()方法的傳回值,是讀取的位元組數。如果傳回-1,那麼表示讀取到對方的輸出結束标志,對方已經輸出結束,準備關閉連接配接。實際上,通過read方法讀資料,本身是很簡單的,比較困難的是,在非阻塞模式下,如何知道通道何時是可讀的呢?這就需要用到NIO的新元件——Selector通道選擇器,稍後介紹。

3. 寫入到SocketChannel傳輸通道

和前面的把資料寫入到FileChannel檔案通道一樣,大部分應用場景都會調用通道的int write(ByteBufferbuf)方法。

//寫入前需要讀取緩沖區,要求ByteBuffer是讀取模式

buffer.flip();

socketChannel.write(buffer);

4. 關閉SocketChannel傳輸通道

在關閉SocketChannel傳輸通道前,如果傳輸通道用來寫入資料,則建議調用一次shutdownOutput()終止輸出方法,向對方發送一個輸出的結束标志(-1)。然後調用socketChannel.close()方法,關閉套接字連接配接。

//終止輸出方法,向對方發送一個輸出的結束标志

socketChannel.shutdownOutput();

//關閉套接字連接配接

IOUtil.closeQuietly(socketChannel);

3.4.5 使用SocketChannel發送檔案的實踐案例

下面的實踐案例是使用FileChannel檔案通道讀取本地檔案内容,然後在用戶端使用SocketChannel套接字通道,把檔案資訊和檔案内容發送到伺服器。用戶端的完整代碼如下:

package com.crazymakercircle.iodemo.socketDemos;
//...
public class NioSendClient {
    private Charset charset = Charset.forName("UTF-8");
    /**
     * 向伺服器端傳輸檔案
     */
    public void sendFile() throws Exception {
        try {
            String sourcePath = NioDemoConfig.SOCKET_SEND_FILE;
            String srcPath = IOUtil.getResourcePath(sourcePath);
            Logger.info("srcPath=" + srcPath);
            String destFile = NioDemoConfig.SOCKET_RECEIVE_FILE;
            Logger.info("destFile=" + destFile);
            File file = new File(srcPath);
            if (!file.exists()) {
                Logger.info("檔案不存在");
                return;
            }
            FileChannelfileChannel = new FileInputStream(file).getChannel();
            SocketChannelsocketChannel = SocketChannel.open();
            socketChannel.socket().connect(
                   InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                                             NioDemoConfig.SOCKET_SERVER_PORT));
                socketChannel.configureBlocking(false);
                while(! socketChannel.finishConnect() ){
                    //不斷地自旋、等待,或者做一些其他的事情
                }
                Logger.info("Client成功連接配接伺服器端");
                //發送檔案名稱
                ByteBufferfileNameByteBuffer = charset.encode(destFile);
                socketChannel.write(fileNameByteBuffer);
                //發送檔案長度
                ByteBuffer buffer = ByteBuffer.allocate
                                               (NioDemoConfig.SEND_BUFFER_SIZE);
                buffer.putLong(file.length());
                buffer.flip();
                socketChannel.write(buffer);
                buffer.clear();
                //發送檔案内容
                Logger.info("開始傳輸檔案");
                int length = 0;
                long progress = 0;
                while ((length = fileChannel.read(buffer)) > 0) {
                    buffer.flip();
                    socketChannel.write(buffer);
                    buffer.clear();
                    progress += length;
                    Logger.info("| "+(100 * progress / file.length()) + "% |");
                }
                if (length == -1) {
                    IOUtil.closeQuietly(fileChannel);
                    //在SocketChannel傳輸通道關閉前,盡量發送一個輸出結束标志到對端
                    socketChannel.shutdownOutput();
                    IOUtil.closeQuietly(socketChannel);
                }
                Logger.info("======== 檔案傳輸成功 ========");
            } catch (Exception e) {
                e.printStackTrace();
                }
        }
        public static void main(String[] args) {
        NioSendClient client = new NioSendClient(); // 啟動用戶端連接配接
        client.sendFile(); // 傳輸檔案
    }
}           

以上代碼中的檔案發送過程:首先發送目标檔案名稱(不帶路徑),然後發送檔案長度,最後是發送檔案内容。代碼中的配置項,如伺服器的IP、伺服器端口、待發送的源檔案名稱(帶路徑)、遠端的目标檔案名稱等配置資訊,都是從system.properties配置檔案中讀取的,通過自定義的NioDemoConfig配置類來完成配置。

在運作以上用戶端的程式之前,需要先運作伺服器端的程式。伺服器端的類與用戶端的源代碼在同一個包下,類名為NioReceiveServer,具體參見源代碼工程,我們稍後再詳細介紹這個類。

3.4.6 DatagramChannel資料報通道

和Socket套接字的TCP傳輸協定不同,UDP協定不是面向連接配接的協定。使用UDP協定時,隻要知道伺服器的IP和端口,就可以直接向對方發送資料。在Java中使用UDP協定傳輸資料,比TCP協定更加簡單。在Java NIO中,使用DatagramChannel資料報通道來處理UDP協定的資料傳輸。

1. 擷取DatagramChannel資料報通道

擷取資料報通道的方式很簡單,調用DatagramChannel類的open靜态方法即可。然後調用configureBlocking(false)方法,設定成非阻塞模式。

//擷取DatagramChannel資料報通道

DatagramChannel channel = DatagramChannel.open();

datagramChannel.configureBlocking(false);

如果需要接收資料,還需要調用bind方法綁定一個資料報的監聽端口,具體如下:

//調用bind方法綁定一個資料報的監聽端口

channel.socket().bind(new InetSocketAddress(18080));

2. 讀取DatagramChannel資料報通道資料

當DatagramChannel通道可讀時,可以從DatagramChannel讀取資料。和前面的SocketChannel的讀取方式不同,不是調用read方法,而是調用receive(ByteBufferbuf)方法将資料從DatagramChannel讀入,再寫入到ByteBuffer緩沖區中。

//建立緩沖區

//從DatagramChannel讀入,再寫入到ByteBuffer緩沖區

SocketAddressclientAddr= datagramChannel.receive(buffer);

通道讀取receive(ByteBufferbuf)方法的傳回值,是SocketAddress類型,表示傳回發送端的連接配接位址(包括IP和端口)。通過receive方法讀資料非常簡單,但是,在非阻塞模式下,如何知道DatagramChannel通道何時是可讀的呢?和SocketChannel一樣,同樣需要用到NIO的新元件—Selector通道選擇器,稍後介紹。

3. 寫入DatagramChannel資料報通道

向DatagramChannel發送資料,和向SocketChannel通道發送資料的方法也是不同的。這裡不是調用write方法,而是調用send方法。示例代碼如下:

//把緩沖區翻轉到讀取模式

//調用send方法,把資料發送到目标IP+端口

dChannel.send(buffer, new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,

NioDemoConfig.SOCKET_SERVER_PORT));           

//清空緩沖區,切換到寫入模式

buffer.clear();

由于UDP是面向非連接配接的協定,是以,在調用send方法發送資料的時候,需要指定接收方的位址(IP和端口)。

4. 關閉DatagramChannel資料報通道

這個比較簡單,直接調用close()方法,即可關閉資料報通道。

//簡單關閉即可

dChannel.close();

3.4.7 使用DatagramChannel資料包通道發送資料的實踐案例

下面是一個使用DatagramChannel資料包通到發送資料的用戶端示例程式代碼。其功能是:擷取使用者的輸入資料,通過DatagramChannel資料報通道,将資料發送到遠端的伺服器。用戶端的完整程式代碼如下:

package com.crazymakercircle.iodemo.udpDemos;
//...
public class UDPClient {
    public void send() throws IOException {
        //擷取DatagramChannel資料報通道
        DatagramChanneldChannel = DatagramChannel.open();
        //設定為非阻塞
        dChannel.configureBlocking(false);
        ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_
                                     SIZE);
        Scanner scanner = new Scanner(System.in);
        Print.tcfo("UDP用戶端啟動成功!");
        Print.tcfo("請輸入發送内容:");
        while (scanner.hasNext()) {
            String next = scanner.next();
            buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
            buffer.flip();
            //通過DatagramChannel資料報通道發送資料
            dChannel.send(buffer, new InetSocketAddress(NioDemoConfig.
                              SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT));
            buffer.clear();
        }
        //操作四:關閉DatagramChannel資料報通道
        dChannel.close();
    }
    public static void main(String[] args) throws IOException {
        new UDPClient().send();
    }
}           

通過示例程式代碼可以看出,在用戶端使DatagramChannel資料報通道發送資料,比起在用戶端使用套接字SocketChannel發送資料,簡單很多。

接下來看看在伺服器端應該如何使用DatagramChannel資料包通道接收資料呢?

下面貼出伺服器端通過DatagramChannel資料包通道接收資料的程式代碼,可能大家目前不一定可以看懂,因為代碼中用到了Selector選擇器,但是不要緊,下一個小節就介紹它。

伺服器端的接收功能是:通過DatagramChannel資料報通道,綁定一個伺服器位址(IP+端口),接收用戶端發送過來的UDP資料報。伺服器端的完整代碼如下:

package com.crazymakercircle.iodemo.udpDemos;
//...
public class UDPServer {
    public void receive() throws IOException {
        //擷取DatagramChannel資料報通道
        DatagramChanneldatagramChannel = DatagramChannel.open();
        //設定為非阻塞模式
        datagramChannel.configureBlocking(false);
        //綁定監聽位址
        datagramChannel.bind(new InetSocketAddress(NioDemoConfig.SOCKET
                                     _SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT));
        Print.tcfo("UDP伺服器啟動成功!");
       //開啟一個通道選擇器
       Selector selector = Selector.open();
       //将通道注冊到選擇器
       datagramChannel.register(selector, SelectionKey.OP_READ);
       //通過選擇器,查詢IO事件
       while (selector.select() > 0) {
           Iterator<SelectionKey> iterator = selector.selectedKeys()
                                                        .iterator();
           ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND
                                        _BUFFER_SIZE);
           //疊代IO事件
           while (iterator.hasNext()) {
               SelectionKeyselectionKey = iterator.next();
               //可讀事件,有資料到來
               if (selectionKey.isReadable()) {
                    //讀取DatagramChannel資料報通道的資料
                    SocketAddress client = datagramChannel.receive(buffer);
                    buffer.flip();
                    Print.tcfo(new String(buffer.array(), 0, buffer.limit()));
                    buffer.clear();
               }
           }
           iterator.remove();
       }
       //關閉選擇器和通道
       selector.close();
       datagramChannel.close();
    }
    public static void main(String[] args) throws IOException {
        new UDPServer().receive();
    }
}           

在伺服器端,首先調用了bind方法綁定datagramChannel的監聽端口。當資料到來後,調用了receive方法,從datagramChannel資料包通道接收資料,再寫入到ByteBuffer緩沖區中。

除此之外,在伺服器端代碼中,為了監控資料的到來,使用了Selector選擇器。什麼是選擇器?如何使用選擇器呢?欲知後事如何,請聽下節分解。

3.5 詳解NIO Selector選擇器

Java NIO的三大核心元件:Channel(通道)、Buffer(緩沖區)、Selector(選擇器)。其中通道和緩沖區,二者的聯系也比較密切:資料總是從通道讀到緩沖區内,或者從緩沖區寫入到通道中。

至此,前面兩個元件已經介紹完畢,下面迎來了最後一個非常重要的角色——選擇器(Selector)。

3.5.1 選擇器以及注冊

選擇器(Selector)是什麼呢?選擇器和通道的關系又是什麼?

簡單地說:選擇器的使命是完成IO的多路複用。一個通道代表一條連接配接通路,通過選擇器可以同時監控多個通道的IO(輸入輸出)狀況。選擇器和通道的關系,是監控和被監控的關系。

選擇器提供了獨特的API方法,能夠選出(select)所監控的通道擁有哪些已經準備好的、就緒的IO操作事件。

一般來說,一個單線程處理一個選擇器,一個選擇器可以監控很多通道。通過選擇器,一個單線程可以處理數百、數千、數萬、甚至更多的通道。在極端情況下(數萬個連接配接),隻用一個線程就可以處理所有的通道,這樣會大量地減少線程之間上下文切換的開銷。

通道和選擇器之間的關系,通過register(注冊)的方式完成。調用通道的Channel.register(Selector sel,int ops)方法,可以将通道執行個體注冊到一個選擇器中。register方法有兩個參數:第一個參數,指定通道注冊到的選擇器執行個體;第二個參數,指定選擇器要監控的IO事件類型。

可供選擇器監控的通道IO事件類型,包括以下四種:

(1)可讀:SelectionKey.OP_READ

(2)可寫:SelectionKey.OP_WRITE

(3)連接配接:SelectionKey.OP_CONNECT

(4)接收:SelectionKey.OP_ACCEPT

事件類型的定義在SelectionKey類中。如果選擇器要監控通道的多種事件,可以用“按位或”運算符來實作。例如,同時監控可讀和可寫IO事件:

//監控通道的多種事件,用“按位或”運算符來實作

int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE ;

什麼是IO事件呢?這個概念容易混淆,這裡特别說明一下。這裡的IO事件不是對通道的IO操作,而是通道的某個IO操作的一種就緒狀态,表示通道具備完成某個IO操作的條件。

比方說,某個SocketChannel通道,完成了和對端的握手連接配接,則處于“連接配接就緒”(OP_CONNECT)狀态。

再比方說,某個ServerSocketChannel伺服器通道,監聽到一個新連接配接的到來,則處于“接收就緒”(OP_ACCEPT)狀态。

還比方說,一個有資料可讀的SocketChannel通道,處于“讀就緒”(OP_READ)狀态;一個等待寫入資料的,處于“寫就緒”(OP_WRITE)狀态。

3.5.2 SelectableChannel可選擇通道

并不是所有的通道,都是可以被選擇器監控或選擇的。比方說,FileChannel檔案通道就不能被選擇器複用。判斷一個通道能否被選擇器監控或選擇,有一個前提:判斷它是否繼承了抽象類SelectableChannel(可選擇通道)。如果繼承了SelectableChannel,則可以被選擇,否則不能。

簡單地說,一條通道若能被選擇,必須繼承SelectableChannel類。

SelectableChannel類,是何方神聖呢?它提供了實作通道的可選擇性所需要的公共方法。Java NIO中所有網絡連結Socket套接字通道,都繼承了SelectableChannel類,都是可選擇的。而FileChannel檔案通道,并沒有繼承SelectableChannel,是以不是可選擇通道。

3.5.3 SelectionKey選擇鍵

通道和選擇器的監控關系注冊成功後,就可以選擇就緒事件。具體的選擇工作,和調用選擇器Selector的select()方法來完成。通過select方法,選擇器可以不斷地選擇通道中所發生操作的就緒狀态,傳回注冊過的感興趣的那些IO事件。換句話說,一旦在通道中發生了某些IO事件(就緒狀态達成),并且是在選擇器中注冊過的IO事件,就會被選擇器選中,并放入SelectionKey選擇鍵的集合中。

這裡出現一個新的概念——SelectionKey選擇鍵。SelectionKey選擇鍵是什麼呢?簡單地說,SelectionKey選擇鍵就是那些被選擇器選中的IO事件。前面講到,一個IO事件發生(就緒狀态達成)後,如果之前在選擇器中注冊過,就會被選擇器選中,并放入SelectionKey選擇鍵集合中;如果之前沒有注冊過,即使發生了IO事件,也不會被選擇器選中。SelectionKey選擇鍵和IO的關系,可以簡單地了解為:選擇鍵,就是被選中了的IO事件。

在程式設計時,選擇鍵的功能是很強大的。通過SelectionKey選擇鍵,不僅僅可以獲得通道的IO事件類型,比方說SelectionKey.OP_READ;還可以獲得發生IO事件所在的通道;另外,也可以獲得選出選擇鍵的選擇器執行個體。

3.5.4 選擇器使用流程

使用選擇器,主要有以下三步:

(1)擷取選擇器執行個體;(2)将通道注冊到選擇器中;(3)輪詢感興趣的IO就緒事件(選擇鍵集合)。

第一步:擷取選擇器執行個體

選擇器執行個體是通過調用靜态工廠方法open()來擷取的,具體如下:

//調用靜态工廠方法open()來擷取Selector執行個體

Selector selector = Selector.open();

Selector選擇器的類方法open()的内部,是向選擇器SPI(SelectorProvider)送出請求,通過預設的SelectorProvider(選擇器提供者)對象,擷取一個新的選擇器執行個體。Java中SPI全稱為(Service Provider Interface,服務提供者接口),是JDK的一種可以擴充的服務提供和發現機制。Java通過SPI的方式,提供選擇器的預設實作版本。也就是說,其他的服務提供商可以通過SPI的方式,提供定制化版本的選擇器的動态替換或者擴充。

第二步:将通道注冊到選擇器執行個體

要實作選擇器管理通道,需要将通道注冊到相應的選擇器上,簡單的示例代碼如下:

// 2.擷取通道
ServerSocketChannelserverSocketChannel = ServerSocketChannel.open();
// 3.設定為非阻塞
serverSocketChannel.configureBlocking(false);
// 4.綁定連接配接
serverSocketChannel.bind(new InetSocketAddress(SystemConfig.SOCKET_SERVER_PORT));
// 5.将通道注冊到選擇器上,并制定監聽事件為:“接收連接配接”事件
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);           

上面通過調用通道的register()方法,将ServerSocketChannel通道注冊到了一個選擇器上。當然,在注冊之前,首先要準備好通道。

這裡需要注意:注冊到選擇器的通道,必須處于非阻塞模式下,否則将抛出IllegalBlockingModeException異常。這意味着,FileChannel檔案通道不能與選擇器一起使用,因為FileChannel檔案通道隻有阻塞模式,不能切換到非阻塞模式;而Socket套接字相關的所有通道都可以。

其次,還需要注意:一個通道,并不一定要支援所有的四種IO事件。例如伺服器監聽通道ServerSocketChannel,僅僅支援Accept(接收到新連接配接)IO事件;而SocketChannel傳輸通道,則不支援Accept(接收到新連接配接)IO事件。

如何判斷通道支援哪些事件呢?可以在注冊之前,可以通過通道的validOps()方法,來擷取該通道所有支援的IO事件集合。

第三步:選出感興趣的IO就緒事件(選擇鍵集合)

通過Selector選擇器的select()方法,選出已經注冊的、已經就緒的IO事件,儲存到SelectionKey選擇鍵集合中。SelectionKey集合儲存在選擇器執行個體内部,是一個元素為SelectionKey類型的集合(Set)。調用選擇器的selectedKeys()方法,可以取得選擇鍵集合。

接下來,需要疊代集合的每一個選擇鍵,根據具體IO事件類型,執行對應的業務操作。大緻的處理流程如下:

//輪詢,選擇感興趣的IO就緒事件(選擇鍵集合)
while (selector.select() > 0) {
    Set selectedKeys = selector.selectedKeys();
    Iterator keyIterator = selectedKeys.iterator();
    while(keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
//根據具體的IO事件類型,執行對應的業務操作
        if(key.isAcceptable()) {
        // IO事件:ServerSocketChannel伺服器監聽通道有新連接配接
        } else if (key.isConnectable()) {
        // IO事件:傳輸通道連接配接成功
        } else if (key.isReadable()) {
        // IO事件:傳輸通道可讀
        } else if (key.isWritable()) {
        // IO事件:傳輸通道可寫
        }
        //處理完成後,移除選擇鍵
        keyIterator.remove();
    }
}           

處理完成後,需要将選擇鍵從這個SelectionKey集合中移除,防止下一次循環的時候,被重複的處理。SelectionKey集合不能添加元素,如果試圖向SelectionKey選擇鍵集合中添加元素,則将抛出java.lang.UnsupportedOperationException異常。

用于選擇就緒的IO事件的select()方法,有多個重載的實作版本,具體如下:

(1)select():阻塞調用,一直到至少有一個通道發生了注冊的IO事件。

(2)select(long timeout):和select()一樣,但最長阻塞時間為timeout指定的毫秒數。

(3)selectNow():非阻塞,不管有沒有IO事件,都會立刻傳回。

select()方法傳回的整數值(int整數類型),表示發生了IO事件的通道數量。更準确地說,是從上一次select到這一次select之間,有多少通道發生了IO事件。強調一下,select()方法傳回的數量,指的是通道數,而不是IO事件數,準确地說,是指發生了選擇器感興趣的IO事件的通道數。

3.5.5 使用NIO實作Discard伺服器的實踐案例

Discard伺服器的功能很簡單:僅僅讀取用戶端通道的輸入資料,讀取完成後直接關閉用戶端通道;并且讀取到的資料直接抛棄掉(Discard)。Discard伺服器足夠簡單明了,作為第一個學習NIO的通信執行個體,較有參考價值。

下面的Discard伺服器代碼,将選擇器使用流程中的步驟進行了細化:

package com.crazymakercircle.iodemo.NioDiscard;
//...
public class NioDiscardServer {
    public static void startServer() throws IOException {
        // 1.擷取選擇器
        Selector selector = Selector.open();
        // 2.擷取通道
        ServerSocketChannelserverSocketChannel = ServerSocketChannel.open();
        // 3.設定為非阻塞
        serverSocketChannel.configureBlocking(false);
        // 4.綁定連接配接
        serverSocketChannel.bind(newInetSocketAddress(NioDemoConfig
                                          .SOCKET_SERVER_PORT));
        Logger.info("伺服器啟動成功");
        // 5.将通道注冊的“接收新連接配接”IO事件,注冊到選擇器上
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 6.輪詢感興趣的IO就緒事件(選擇鍵集合)
        while (selector.select() > 0) {
            // 7.擷取選擇鍵集合
            Iterator<SelectionKey>selectedKeys = selector.selectedKeys().
                                                             iterator();
            while (selectedKeys.hasNext()) {
                // 8.擷取單個的選擇鍵,并處理
                SelectionKeyselectedKey = selectedKeys.next();
                // 9.判斷key是具體的什麼事件
                if (selectedKey.isAcceptable()) {
                    // 10.若選擇鍵的IO事件是“連接配接就緒”事件,就擷取用戶端連接配接
                    SocketChannelsocketChannel = serverSocketChannel.accept();
                    // 11.切換為非阻塞模式
                    socketChannel.configureBlocking(false);
                    // 12.将該新連接配接的通道的可讀事件,注冊到選擇器上
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (selectedKey.isReadable()) {
                    // 13.若選擇鍵的IO事件是“可讀”事件, 讀取資料
                    SocketChannelsocketChannel = (SocketChannel) selectedKey.
                                                             channel();
                    // 14.讀取資料,然後丢棄
                    ByteBufferbyteBuffer = ByteBuffer.allocate(1024);
                    int length = 0;
                    while ((length = socketChannel.read(byteBuffer)) >0) {
                        byteBuffer.flip();
                        Logger.info(new String(byteBuffer.array(), 0, length));
                        byteBuffer.clear();
                    }
                    socketChannel.close();
                }
                // 15.移除選擇鍵
                selectedKeys.remove();
            }
        }
        // 16.關閉連接配接
        serverSocketChannel.close();
    }
    public static void main(String[] args) throws IOException {
        startServer();
    }
}           

實作DiscardServer一共分為16步,其中第7到第15步是循環執行的。不斷選擇感興趣的IO事件到選擇器的選擇鍵集合中,然後通過selector.selectedKeys()擷取該選擇鍵集合,并且進行疊代處理。對于建立立的socketChannel用戶端傳輸通道,也要注冊到同一個選擇器上,使用同一個選擇線程,不斷地對所有的注冊通道進行選擇鍵的選擇。

在DiscardServer程式中,涉及到兩次選擇器注冊:一次是注冊serverChannel伺服器通道;另一次,注冊接收到的socketChannel用戶端傳輸通道。serverChannel伺服器通道注冊的,是新連接配接的IO事件SelectionKey.OP_ACCEPT;用戶端socketChannel傳輸通道注冊的,是可讀IO事件SelectionKey.OP_READ。

DiscardServer在對選擇鍵進行處理時,通過對類型進行判斷,然後進行相應的處理

(1)如果是SelectionKey.OP_ACCEPT新連接配接事件類型,代表serverChannel伺服器通道發生了新連接配接事件,則通過伺服器通道的accept方法,擷取新的socketChannel傳輸通道,并且将新通道注冊到選擇器。

(2)如果是SelectionKey.OP_READ可讀事件類型,代表某個用戶端通道有資料可讀,則讀取選擇鍵中socketChannel傳輸通道的資料,然後丢棄。

用戶端的DiscardClient代碼,則更為簡單。用戶端首先建立到伺服器的連接配接,發送一些簡單的資料,然後直接關閉連接配接。代碼如下:

package com.crazymakercircle.iodemo.NioDiscard;
//...
public class NioDiscardClient {
    public static void startClient() throws IOException {
        InetSocketAddress address =new InetSocketAddress(NioDemoConfig.
                            SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT);
        // 1.擷取通道
        SocketChannelsocketChannel = SocketChannel.open(address);
        // 2.切換成非阻塞模式
        socketChannel.configureBlocking(false);
       //不斷地自旋、等待連接配接完成,或者做一些其他的事情
        while (!socketChannel.finishConnect()) {
        }
        Logger.info("用戶端連接配接成功");
        // 3.配置設定指定大小的緩沖區
        ByteBufferbyteBuffer = ByteBuffer.allocate(1024);
        byteBuffer.put("hello world".getBytes());
        byteBuffer.flip();
        //發送到伺服器
        socketChannel.write(byteBuffer);
        socketChannel.shutdownOutput();
        socketChannel.close();
    }
     public static void main(String[] args) throws IOException {
        startClient();
    }
}           

如果需要執行整個程式,首先要執行前面的伺服器端程式,然後執行後面的用戶端程式。

通過Discard伺服器的開發實踐,大家對NIO Selector(選擇)的使用流程,應該了解得非常清楚了。

下面來看一個稍微複雜一點的案例:在伺服器端接收檔案和内容。

3.5.6 使用SocketChannel在伺服器端接收檔案的實踐案例

本示例示範檔案的接收,是伺服器端的程式。和前面介紹的檔案發送的SocketChannel用戶端程式是互相配合使用的。由于在伺服器端,需要用到選擇器,是以在介紹完選擇器後,才開始介紹NIO檔案傳輸的Socket伺服器端程式。伺服器端接收檔案的示例代碼如下所示:

package com.crazymakercircle.iodemo.socketDemos;
//...
public class NioReceiveServer {
    private Charset charset = Charset.forName("UTF-8");
    /**
     * 内部類,伺服器端儲存的用戶端對象,對應一個用戶端檔案
     */
    static class Client {
        //檔案名稱
        String fileName;
        //長度
        long fileLength;
        //開始傳輸的時間
        long startTime;
        //用戶端的位址
        InetSocketAddressremoteAddress;
        //輸出的檔案通道
        FileChanneloutChannel;
    }
    private ByteBuffer buffer
            = ByteBuffer.allocate(NioDemoConfig.SERVER_BUFFER_SIZE);
    //使用Map儲存每個檔案傳輸,當OP_READ可讀時,根據通道找到對應的對象
    Map<SelectableChannel, Client>clientMap = new HashMap<SelectableChannel, Client>();

    public void startServer() throws IOException {
        // 1.擷取選擇器
        Selector selector = Selector.open();
        // 2.擷取通道
        ServerSocketChannelserverChannel = ServerSocketChannel.open();
        ServerSocketserverSocket = serverChannel.socket();
        // 3.設定為非阻塞
        serverChannel.configureBlocking(false);
        // 4.綁定連接配接
        InetSocketAddress address
                = new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_PORT);
        serverSocket.bind(address);
        // 5.将通道注冊到選擇器上,并注冊的IO事件為:“接收新連接配接”
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        Print.tcfo("serverChannel is listening...");
        // 6.選擇感興趣的IO就緒事件(選擇鍵集合)
        while (selector.select() > 0) {
            // 7.擷取選擇鍵集合
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                // 8.擷取單個的選擇鍵,并處理
                SelectionKey key = it.next();

                // 9.判斷key是具體的什麼事件,是否為新連接配接事件
                if (key.isAcceptable()) {
                    // 10.若接受的事件是“新連接配接”事件,就擷取用戶端新連接配接
                    ServerSocketChannel server 
                                     = (ServerSocketChannel) key.channel();
                    SocketChannelsocketChannel = server.accept();
                    if (socketChannel == null) continue;
                    // 11.用戶端新連接配接,切換為非阻塞模式
                    socketChannel.configureBlocking(false);
                    // 12.将用戶端新連接配接通道注冊到選擇器上
                    SelectionKeyselectionKey =
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    // 為每一條傳輸通道,建立一個Client用戶端對象,放入map,供後面使用
                    Client client = new Client();
                    client.remoteAddress  
                            = (InetSocketAddress) socketChannel.getRemoteAddress();
                    clientMap.put(socketChannel, client);
                    Logger.info(socketChannel.getRemoteAddress() + "連接配接成功...");

                } else if (key.isReadable()) {
                    // 13.若接收的事件是“資料可讀”事件,就讀取用戶端新連接配接
                    processData(key);
                  }
                // NIO的特點隻會累加,已選擇的鍵的集合不會删除
                // 如果不删除,下一次又會被select函數選中
                it.remove();
            }
        }
    }

    /**
     * 處理用戶端傳輸過來的資料
     */
    private void processData(SelectionKey key) throws IOException {
        Client client = clientMap.get(key.channel());
        SocketChannelsocketChannel = (SocketChannel) key.channel();
        int num = 0;
        try {
            buffer.clear();
            while ((num = socketChannel.read(buffer)) > 0) {
                buffer.flip();
                if (null == client.fileName) {
                    //用戶端發送過來的,首先是檔案名
                    //根據檔案名,建立伺服器端的檔案,将檔案通道儲存到用戶端
                    String fileName = charset.decode(buffer).toString();
                    String destPath = IOUtil.getResourcePath(
                    NioDemoConfig.SOCKET_RECEIVE_PATH);
                    File directory = new File(destPath);
                    if (!directory.exists()) {
                        directory.mkdir();
                    }
                    client.fileName = fileName;
                    String fullName = directory.getAbsolutePath()
                            + File.separatorChar + fileName;
                    Logger.info("NIO  傳輸目标檔案:" + fullName);
                    File file = new File(fullName);
                    FileChannelfileChannel 
                                  = new FileOutputStream(file).getChannel();
                    client.outChannel = fileChannel;
                }else if (0 == client.fileLength) {
                    //用戶端發送過來的,其次是檔案長度
                    long fileLength = buffer.getLong();
                    client.fileLength = fileLength;
                    client.startTime = System.currentTimeMillis();
                    Logger.info("NIO  傳輸開始:");
                    } else {
                        //用戶端發送過來的,最後是檔案内容,寫入檔案内容
                        client.outChannel.write(buffer);
                    }
                buffer.clear();
            }
           key.cancel();
        } catch (IOException e) {
            key.cancel();
            e.printStackTrace();
            return;
        }
        // 讀取數量-1,表示用戶端傳輸結束标志到了
        if (num == -1) {
            IOUtil.closeQuietly(client.outChannel);
            System.out.println("上傳完畢");
            key.cancel();
            Logger.info("檔案接收成功,File Name:" + client.fileName);
            Logger.info(" Size:" + 
                                IOUtil.getFormatFileSize(client.fileLength));
            long endTime = System.currentTimeMillis();
            Logger.info("NIO IO傳輸毫秒數:" + (endTime - client.startTime));
        }
    }

    public static void main(String[] args) throws Exception {
        NioReceiveServer server = new NioReceiveServer();
        server.startServer();
    }
}
           

由于用戶端每次傳輸檔案,都會分為多次傳輸:

(1)首先傳入檔案名稱。

(2)其次是檔案大小。

(3)然後是檔案内容。

對應于每一個用戶端socketChannel,建立一個Client用戶端對象,用于儲存用戶端狀态,分别儲存檔案名、檔案大小和寫入的目标檔案通道outChannel。

socketChannel和Client對象之間是一對一的對應關系:建立連接配接的時候,以socketChannel作為鍵(Key),Client對象作為值(Value),将Client儲存在map中。當socketChannel傳輸通道有資料可讀時,通過選擇鍵key.channel()方法,取出IO事件所在socketChannel通道。然後通過socketChannel通道,從map中取到對應的Client對象。

接收到資料時,如果檔案名為空,先處理檔案名稱,并把檔案名儲存到Client對象,同時建立伺服器上的目标檔案;接下來再讀到資料,說明接收到了檔案大小,把檔案大小儲存到Client對象;接下來再接到資料,說明是檔案内容了,則寫入Client對象的outChannel檔案通道中,直到資料讀取完畢。

運作方式:啟動這個NioReceiveServer伺服器程式後,再啟動前面介紹的用戶端程式NioSendClient,即可以完成檔案的傳輸。

3.6 本章小結

在程式設計難度上,Java NIO程式設計的難度比同步阻塞Java OIO程式設計大很多。請注意,前面的實踐案例,是比較簡單的,并不是複雜的通信程式,沒有看到“粘包”和“拆包”等問題。如果加上這些問題,代碼将會更加複雜。

與Java OIO相比,Java NIO程式設計大緻的特點如下:

(1)在NIO中,伺服器接收新連接配接的工作,是異步進行的。不像Java的OIO那樣,伺服器監聽連接配接,是同步的、阻塞的。NIO可以通過選擇器(也可以說成:多路複用器),後續不斷地輪詢選擇器的選擇鍵集合,選擇新到來的連接配接。

(2)在NIO中,SocketChannel傳輸通道的讀寫操作都是異步的。如果沒有可讀寫的資料,負責IO通信的線程不會同步等待。這樣,線程就可以處理其他連接配接的通道;不需要像OIO那樣,線程一直阻塞,等待所負責的連接配接可用為止。

(3)在NIO中,一個選擇器線程可以同時處理成千上萬個用戶端連接配接,性能不會随着用戶端的增加而線性下降。

總之,有了Linux底層的epoll支援,有了Java NIO Selector選擇器這樣的應用層IO複用技術,Java程式進而可以實作IO通信的高TPS、高并發,使伺服器具備并發數十萬、數百萬的連接配接能力。Java的NIO技術非常适合用于高性能、高負載的網絡伺服器。鼎鼎大名的通信伺服器中間件Netty,就是基于Java的NIO技術實作的。

當然,Java NIO技術僅僅是基礎,如果要實作通信的高性能和高并發,還離不開高效率的設計模式。下一章将開始為大家介紹高性能服務必備的設計模式:Reactor反應器模式。