天天看點

Java NIO的基本概念Channel、Buffer、Selector以及非阻塞網絡通信案例1 基本概念2 通道(Channel)與緩沖區(Buffer)3 選擇器Selector4 NIO非阻塞網絡通信案例

詳細介紹了Java NIO中的基本概念,Buffer、Channel、Selector,以及NIO非阻塞網絡通信的基本案例。

文章目錄

  • 1 基本概念
      • 1.1 同步和異步
      • 1.2 阻塞和非阻塞
  • 2 通道(Channel)與緩沖區(Buffer)
    • 2.1 緩沖區(Buffer)
      • 2.1.1 緩沖區的基本屬性
      • 2.1.2 緩沖區的基本操作與常用方法
      • 2.1.3 緩沖區的資料操作
      • 2.1.4 直接與非直接緩沖區
    • 2.2 通道(Channel)
      • 2.2.1 Channel的實作
        • 2.2.2 擷取通道
  • 3 選擇器Selector
    • 3.1 建立Selector
    • 3.2 向選擇器注冊通道
    • 3.3 監聽事件
    • 3.4 Selector 的常用方法
  • 4 NIO非阻塞網絡通信案例
    • 4.1 非阻塞式TCP通信
    • 4.2 非阻塞式UDP通信

1 基本概念

1.1 同步和異步

  1. 同步I/O:每個請求必須逐個地被處理,一個請求的處理會導緻整個流程的暫時等待,這些事件無法并發地執行。使用者線程發起I/O請求後需要等待或者輪詢核心I/O操作完成後才能繼續執行。
  2. 異步I/O:多個請求可以并發地執行,一個請求或者任務的執行不會導緻整個流程的暫時等待。使用者線程發起I/O請求後仍然繼續執行,當核心I/O操作完成後會通知使用者線程,或者調用使用者線程注冊的回調函數。

傳統IO和NIO都是同步的,AIO是異步的。

1.2 阻塞和非阻塞

阻塞和非阻塞是程序在通路資料的時候,請求操作是否準備就緒的一種處理方式。

  1. 阻塞:某個請求發出後,由于該請求操作需要的條件不滿足,請求操作一直阻塞,不會傳回,直到條件滿足。
  2. 非阻塞:請求發出後,若該請求需要的條件不滿足,則立即傳回一個标志資訊告知條件不滿足,而不會一直等待。一般需要通過循環判斷請求條件是否滿足來擷取請求結果。

同步和異步着重點在于多個任務執行過程中,後發起的任務是否必須等先發起的任務完成之後再進行。不管先發起的任務請求是阻塞等待完成,還是立即傳回通過循環等待請求成功。而阻塞和非阻塞重點在于請求的方法是否在條件不滿足時被阻塞,是否立即傳回。

傳統的IO 流都是阻塞式的。也就是說,當一個線程調用read() 或write() 時,該線程被阻塞,直到有一些資料被讀取或寫入,該線程在此期間不能執行其他任務。是以,在完成網絡通信進行IO 操作時,由于線程會阻塞,是以伺服器端必須為每個用戶端都提供一個獨立的線程進行處理,即多線程方案,但是當伺服器端需要處理大量用戶端時,仍然會造成大量線程等待,性能仍然急劇下降。

Java NIO 是非阻塞模式的。當線程從某通道進行讀寫資料時,若沒有資料可用時,該線程可以進行其他任務。線程通常将非阻塞IO 的空閑時間用于在其他通道上執行IO 操作,是以單獨的線程可以管理多個輸入和輸出通道。是以,NIO 可以讓伺服器端使用一個或有限幾個線程來同時處理連接配接到伺服器端的所有用戶端。

2 通道(Channel)與緩沖區(Buffer)

Java NIO系統的核心在于:通道(Channel)和緩沖區(Buffer)。通道表示打開到IO 裝置(例如:檔案、套接字)的連接配接。若需要使用NIO 系統,需要擷取用于連接配接IO 裝置的通道以及用于容納資料的緩沖區。然後操作緩沖區,對資料進行處理。

簡而言之,Channel 負責傳輸,Buffer 負責存儲。

2.1 緩沖區(Buffer)

一個用于特定基本資料類型的容器。由java.nio 包定義的,所有緩沖區都是Buffer 抽象類的子類。Java NIO 中的Buffer 主要用于與NIO 通道進行互動。在讀取資料時,它是直接讀到緩沖區中的; 在寫入資料時,寫入到緩沖區中。任何時候通路NIO中的資料,都是通過緩沖區進行操作。

Buffer就像一個數組,可以儲存多個相同類型的資料。但它不僅僅是一個數組。緩沖區提供了對資料的結構化通路,而且還可以跟蹤系統的讀/寫程序。根據資料類型不同(boolean 除外),有以下Buffer 常用子類:

ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。

上述Buffer 類他們都采用相似的方法進行管理資料,隻是各自管理的資料類型不同而已。都是通過對應的allocate(int capacity)方法擷取一個Buffer 對象,該方法表示建立一個緩沖區容量為capacity 的XxxBuffer對象!

2.1.1 緩沖區的基本屬性

public abstract class Buffer {
    private int mark = -1;
    private int position = 0;
    private int limit;
    private int capacity;
   .............
}
           

容量(capacity) :表示Buffer 最大資料容量,緩沖區容量不能為負,并且建立後不能更改。

限制/界限(limit):第一個不可讀取或寫入的資料的索引,即位于limit索引以及之後的資料不可讀寫。緩沖區的限制不能為負,并且不能大于其容量。寫模式下,limit等于Buffer的capacity。讀模式時,limit表示你最多能讀到多少資料,即limit會被設定成寫模式下的position值。

位置(position):将要讀取或寫入的資料的索引。緩沖區的位置不能為負,并且不能大于其限制。最小為0,最大可為capacity – 1.

标記(mark)與重置(reset):标記是一個索引,通過Buffer中的mark()方法指定Buffer中一個特定的(目前的)position,之後可以通過調用reset()方法恢複到這個position.mark應該小于等于position和limit,如果調整了位置之後mark大于這兩數的位置的話,maark将被置為-1.

-1 <= mark <= position <= limit <= capacity

2.1.2 緩沖區的基本操作與常用方法

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-S0jOdIl3-1631502083167)(C:\Users\lx\AppData\Roaming\Typora\typora-user-images\image-20210913105354392.png)]

Buffer 的常用方法:

方法 描述
Buffer clear() 清空緩沖區并傳回對緩沖區的引用。但是緩沖區的資料還在,隻是處于被遺忘的狀态,即指針初始化。可以通過get()驗證,還能得到第一個資料。或者hasRemaining驗證
Buffer flip() 将緩沖區的限制設定為目前位置所在的索引,并将目前位置的索引重置為0。切換到讀模式。
int capacity() 傳回Buffer 的capacity大小
boolean hasRemaining() 判斷緩沖區中是否還有元素
int limit() 傳回Buffer 的限制(limit)
Bufferlimit(int n) 将設定緩沖區界限為n,并傳回一個具有新limit 的緩沖區對象
Buffer mark() 對緩沖區的位置設定标記(标記目前position的索引位置)
int position() 傳回緩沖區的目前位置position
Buffer position(int n) 将設定緩沖區的目前位置為n,并傳回修改後的Buffer 對象
int remaining() 傳回position 和limit 之間的元素個數
Buffer reset() 将位置position 轉到以前設定的mark 所在的位置。
Buffer rewind() 将位置設為為0,取消設定的mark。可重複讀。

2.1.3 緩沖區的資料操作

Buffer 所有子類提供了兩個用于資料操作的方法:get() 與put() 方法

擷取Buffer 中的資料:

byte get() 讀取單個位元組
ByteBuffer get(byte[] dst) 批量讀取多個位元組到dst 中
byte get(int index) 讀取指定索引位置的位元組(不會移動position)

放入資料到Buffer 中:

ByteBuffer put(byte b) 将給定單個位元組寫入緩沖區的目前位置
ByteBuffer put(byte[] src) 将src 中的位元組寫入緩沖區的目前位置
ByteBuffer put(int index, byte b) 将指定位元組寫入緩沖區的索引位置(不會移動position)

2.1.4 直接與非直接緩沖區

位元組緩沖區要麼是直接的,要麼是非直接的。如果為直接位元組緩沖區,則Java虛拟機會盡最大努力直接在此緩沖區上執行本機I/O操作。也就是說,在每次調用基礎作業系統的一個本機I/O操作之前(或之後),虛拟機都會盡量避免将緩沖區的内容複制到中間緩沖區中(或從中間緩沖區中複制内容)。直接緩沖區可以有效減少資料拷貝次數,提升性能,這種方式的學名叫做記憶體映射。

傳統的io和nio的XxxBuffer#allocate(capacity)方法擷取的緩沖區都是非直接緩沖區,直接位元組緩沖區可以通過調用此NIO類的allocateDirect(capacity)方法來建立。此方法傳回的直接緩沖區進行配置設定和取消配置設定所需成本通常高于非直接緩沖區。

直接緩沖區的内容可以駐留在正常的垃圾回收堆之外。是以,它們對應用程式的記憶體需求量造成的影響可能并不明顯,Java的GC也不會對其進行回收。是以,建議将直接緩沖區主要配置設定給那些易受基礎系統的本機I/O 操作影響的大型、持久的緩沖區。一般情況下,最好僅在直接緩沖區能在程式性能方面帶來明顯好處時配置設定它們。

直接位元組緩沖區還可以通過FileChannel#map()方法将檔案區域直接映射到記憶體中來建立,這就是mmap技術,即記憶體映射檔案技術,該方法傳回MappedByteBuffer。

mmap是一種記憶體映射檔案的方法,即将一個檔案或者其它對象映射到程序的位址空間,實作檔案磁盤位址和程序虛拟位址空間中一段虛拟位址的一一對映關系。實作這樣的映射關系後,程序就可以采用指針的方式讀寫操作這一段記憶體,而系統會自動回寫髒頁面到對應的檔案磁盤上,即完成了對檔案的操作而不必再調用read,write等系統調用函數。相反,核心空間對這段區域的修改也直接反映使用者空間,進而可以實作不同程序間的檔案共享。

位元組緩沖區是直接緩沖區還是非直接緩沖區可通過調用其isDirect()方法來确定。提供此方法是為了能夠在性能關鍵型代碼中執行顯式緩沖區管理。

2.2 通道(Channel)

通道 Channel 是對原 I/O 包中的流的模拟,可以通過它讀取和寫入資料。

通道與流的不同之處在于,流隻能在一個方向上移動(一個流必須是 InputStream 或者 OutputStream 的子類),而通道是雙向的,可以用于讀、寫或者同時用于讀寫。

Channel 本身不能直接通路資料,Channel 隻能與Buffer 進行互動。

2.2.1 Channel的實作

Java 為Channel 接口提供的最主要實作類如下:

  1. FileChannel:用于讀取、寫入、映射和操作檔案的通道。
  2. DatagramChannel:通過UDP 讀寫網絡中的資料通道。
  3. SocketChannel:通過TCP 讀寫網絡中的資料,用戶端通道。
  4. ServerSocketChannel:可以監聽新進來的TCP 連接配接,對每一個新進來的連接配接都會建立一個SocketChannel,服務端通道。

2.2.2 擷取通道

  1. 擷取通道的一種方式是對支援通道的對象調用getChannel()方法。支援通道的類如下:
  2. 本地IO的相關類,FileInputStream、FileOutputStream、RandomAccessFile,擷取FileChannel。
  3. 網絡IO相關類,DatagramSocket擷取DatagramChannel,Socket擷取SocketChannel,ServerSocket擷取ServerSocketChannel
  4. 在JDK1.7中的NIO.2中,新增了Files#newByteChannel()擷取FileChannel的方式,以及通過各個通道實作類的靜态方法open()打開并傳回指定通道的方式。

3 選擇器Selector

NIO 常常被叫做非阻塞 IO,主要是因為 NIO 在網絡通信中的非阻塞特性被廣泛使用。

Java NIO 實作了 IO 多路複用中的 Reactor 模型,一個線程使用一個選擇器 Selector 通過輪詢的方式去監聽多個通道 Channel 上的事件,進而讓一個線程就可以處理多個事件。

通過配置監聽的通道 Channel 為非阻塞,那麼當 Channel 上的 IO 事件還未到達時,就不會進入阻塞狀态一直等待,而是繼續輪詢其它Channel,找到IO事件已經到達的 Channel 執行。

因為建立和切換線程的開銷很大,是以使用一個線程來處理多個事件而不是一個線程處理一個事件,對于 IO 密集型的應用具有很好地性能。

應該注意的是,隻有套接字 Channel 才能配置為非阻塞,而 FileChannel 不能,為 FileChannel 配置非阻塞也沒有意義。

Java NIO的基本概念Channel、Buffer、Selector以及非阻塞網絡通信案例1 基本概念2 通道(Channel)與緩沖區(Buffer)3 選擇器Selector4 NIO非阻塞網絡通信案例

選擇器(Selector)是SelectableChannel 對象的多路複用器,Selector可以同時監控多個SelectableChannel 的IO 狀況,也就是說,利用Selector 可使一個單獨的線程管理多個Channel。Selector 是非阻塞IO 的核心。

SelectableChannel 的結構如下圖:

Java NIO的基本概念Channel、Buffer、Selector以及非阻塞網絡通信案例1 基本概念2 通道(Channel)與緩沖區(Buffer)3 選擇器Selector4 NIO非阻塞網絡通信案例

3.1 建立Selector

通過調用Selector#open() 方法建立一個Selector。

3.2 向選擇器注冊通道

該方法的傳回值是一個SelectionKey對象,SelectionKey表示一個Selector和Channel 之間的注冊關系。當Channel 注冊到Selector 上時,就相當于确立了兩者的服務關系,而SelectionKey 就是這個契約。當Selector或者Channel被關閉時, 它們對應的SelectionKey 就會失效。

當調用register方法将通道注冊選擇器時,選擇器對通道的監聽事件需要通過第二個參數ops 指定。可以監聽的事件類型(可使用SelectionKey 的四個常量表示):

  1. 讀: SelectionKey.OP_READ (1)
  2. 寫: SelectionKey.OP_WRITE (4)
  3. 連接配接: SelectionKey.OP_CONNECT(8)
  4. 接收: SelectionKey.OP_ACCEPT (16)

若注冊時不止監聽一個事件,則可以使用“位或”操作符連接配接。例:

SelectionKey的方法:

方法 描述
int interestOps() 擷取感興趣事件集合
int readyOps() 擷取通道已經準備就緒的操作的集合
SelectableChannel channel() 擷取注冊通道
Selector selector() 傳回選擇器
boolean isReadable() 檢測Channal 中讀事件是否就緒
boolean isWritable() 檢測Channal 中寫事件是否就緒
booleanisConnectable() 檢測Channel 中連接配接是否就緒
booleanisAcceptable() 檢測Channel 中接收是否就緒

3.3 監聽事件

使用 select()方法來監聽到達的事件,它會一直阻塞直到有至少一個事件到達。這個方法應該被循環調用。

3.4 Selector 的常用方法

方法 描述
Set keys() 所有的SelectionKey 集合。代表注冊在該Selector上的Channel
selectedKeys() 被選擇的SelectionKey 集合。傳回此Selector的已選擇鍵集
intselect() 監控所有注冊的Channel,當它們中間有需要處理的IO 操作時,該方法傳回,并将對應得的SelectionKey 加入被選擇的SelectionKey 集合中,該方法傳回這些Channel 的數量。
int select(long timeout) 可以設定逾時時長的select() 操作
intselectNow() 執行一個立即傳回的select() 操作,該方法不會阻塞線程
Selectorwakeup() 使一個還未傳回的select() 方法立即傳回
void close() 關閉該選擇器

4 NIO非阻塞網絡通信案例

4.1 非阻塞式TCP通信

Java NIO中的SocketChannel是一個連接配接到TCP網絡套接字的通道。Java NIO中的ServerSocketChannel是一個可以監聽新進來的TCP連接配接的通道,就像标準IO中的ServerSocket一樣。

TcpClient:

/**
 * @author lx
 */
public class TcpClient {
    public static void main(String[] args) throws IOException {
        //1. 擷取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
        //2. 切換非阻塞模式
        sChannel.configureBlocking(false);
        //3. 配置設定指定大小的緩沖區
        ByteBuffer buf = ByteBuffer.allocate(1024);
        //4. 發送資料給服務端
        Scanner scan = new Scanner(System.in);
        while (scan.hasNext()) {
            String str = scan.next();
            buf.put((new Date().toString() + "\n" + str).getBytes());
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }
        //5. 關閉通道
        sChannel.close();
    }
}
           

TcpServer:

/**
 * @author lx
 */
public class TcpServer {
    public static void main(String[] args) throws IOException {
        //1. 擷取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        //2. 切換非阻塞模式
        ssChannel.configureBlocking(false);
        //3. 綁定連接配接
        ssChannel.bind(new InetSocketAddress(9898));
        //4. 擷取選擇器
        Selector selector = Selector.open();
        //5. 将通道注冊到選擇器上, 并且指定“監聽接收事件”
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);
        //6. 輪詢式的擷取選擇器上的一個已經“準備就緒”的事件
        while (selector.select() > 0) {
            //7. 擷取目前選擇器中所有注冊的“選擇鍵(已就緒的監聽事件)”
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                //8. 擷取準備就緒的事件
                SelectionKey sk = it.next();
                //9. 判斷具體是什麼事件準備就緒
                if (sk.isAcceptable()) {
                    //10. 若“接收就緒”,擷取用戶端連接配接
                    SocketChannel sChannel = ssChannel.accept();
                    //11. 切換非阻塞模式
                    sChannel.configureBlocking(false);
                    //12. 将該通道注冊到選擇器上
                    sChannel.register(selector, SelectionKey.OP_READ);
                } else if (sk.isReadable()) {
                    //13. 擷取目前選擇器上“讀就緒”狀态的通道
                    SocketChannel sChannel = (SocketChannel) sk.channel();
                    //14. 讀取資料
                    ByteBuffer buf = ByteBuffer.allocate(1024);
                    int len = 0;
                    while ((len = sChannel.read(buf)) > 0) {
                        buf.flip();
                        System.out.println(new String(buf.array(), 0, len));
                        buf.clear();
                    }
                }
                //15. 取消選擇鍵 SelectionKey
                //這個非常重要,當處理完一個SelectionKey後,務必将其從集合内删除
                //否則下次就會重複處理相同的SelectionKey
                it.remove();
            }
        }
    }
}
           

4.2 非阻塞式UDP通信

Java NIO中的DatagramChannel是一個能收發UDP包的通道。

UdpClient:

/**
 * @author lx
 */
public class UdpClient {
    public static void main(String[] args) throws IOException {
        DatagramChannel dc = DatagramChannel.open();
        dc.configureBlocking(false);
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String next = scanner.next();
            byteBuffer.put((new Date().toString() + "\n" + next).getBytes());
            byteBuffer.flip();
            dc.send(byteBuffer, new InetSocketAddress("127.0.0.1", 9898));
            byteBuffer.clear();
        }
        dc.close();
    }
}
           

UdpServer:

/**
 * @author lx
 */
public class UdpServer {
    public static void main(String[] args) throws IOException {
        DatagramChannel dc = DatagramChannel.open();
        dc.configureBlocking(false);
        dc.bind(new InetSocketAddress(9898));
        Selector selector = Selector.open();
        dc.register(selector, SelectionKey.OP_READ);
        while (selector.select() > 0) {
            Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
            while (selectionKeyIterator.hasNext()) {
                SelectionKey next = selectionKeyIterator.next();
                if (next.isReadable()) {
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    dc.receive(byteBuffer);
                    byteBuffer.flip();
                    System.out.println(new String(byteBuffer.array(), 0, byteBuffer.limit()));
                    byteBuffer.clear();
                }
            }
            selectionKeyIterator.remove();
        }
    }
}
           
如有需要交流,或者文章有誤,請直接留言。另外希望點贊、收藏、關注,我将不間斷更新各種Java學習部落格!