天天看點

深入淺出了解Java NIOJAVA NIO概述NIO 和 BIO的對比NIO的三大核心元件

深入淺出 NIO 網絡程式設計

  • JAVA NIO概述
  • NIO 和 BIO的對比
  • NIO的三大核心元件
    • NIO的三大核心元件—Buffer緩沖區
      • Buffer記憶體類型
      • 堆外方式的使用場景和注意
    • NIO的三大核心元件—Channel通道
      • NIO Channel VS Java Stream
      • Channel的實作
      • SocketChannel
      • ServerSocketChannel
      • 模拟代碼
    • Selector選擇器
      • Selector監聽事件
      • Selector的使用
    • NIO 與多線程結合改進方案
      • reactor模式
    • 總結

JAVA NIO概述

Java NIO是始于java1.4版本後提出的新的Java IO操作的

非阻塞

API,用于替換标準( 傳統,或者 Blocking IO ,簡稱為 BIO ) Java IO API 的 IO API 。

NIO 和 BIO的對比

NIO BIO
非阻塞IO 阻塞IO
基于緩沖區(Buffer) 基于流(stream)
擁有選擇器Selector,是 NIO 實作非阻塞的基礎 無選擇器
深入淺出了解Java NIOJAVA NIO概述NIO 和 BIO的對比NIO的三大核心元件

NIO的三大核心元件

  1. Buffer緩沖區,本質上市一個可以寫入的記憶體塊(類似數組),可以再次讀取,該記憶體塊包含在NIO Buffer對象中,該對象提供了一系列的方法,用于操作使用記憶體塊,後文會詳細解讀Buffer緩沖區的使用。
  2. Channel通道
  3. Selector選擇器

NIO的三大核心元件—Buffer緩沖區

Buffer的基礎屬性:

capacity容量:buffer具有一定的固定的大小,也成為容量;

position位置:寫入模式下代表寫資料的位置,讀模式下代表讀取資料的位置;

limit限制:寫入模式,限制大小等于buffer的容量大小,limit=capacity,讀取模式下,limit等于寫入的資料量的大小。

mark标記: 記錄目前 position

深入淺出了解Java NIOJAVA NIO概述NIO 和 BIO的對比NIO的三大核心元件

從上述可知

position 屬性代表位置,初始值為 0

。在寫模式下,每 Buffer 中寫入一個值,position 就加 1 ,代表下一次的寫入位置。在讀模式下,每從 Buffer 中讀取一個值,position 就自動加 1 ,代表下一次的讀取位置。

limit 屬性代表上限限制

。在寫模式下,代表最大能寫入的資料上限位置,此時 limit 等于 capacity 。在讀模式下,在 Buffer 完成所有資料寫入後,通過調用 flip() 方法,切換到讀模式。此時,limit 等于 Buffer 中實際的資料大小。因為 Buffer 不一定被寫滿,是以不能使用 capacity 作為實際的資料大小。

mark 屬性為标記

,通過 mark() 方法,記錄目前 position ;通過 reset() 方法,恢複 position 為标記。

寫模式下,标記上一次寫位置。

讀模式下,标記上一次讀位置。

使用Buffer進行資料讀取和寫入操作,步驟如下

1.将資料寫入緩沖區

2.調用buffer.filp(),轉換為讀取模式

3.緩沖區讀取資料

4.調用buffer.clear() 或buffer.compact()清除緩沖區

代碼示例

package nio;

import java.nio.ByteBuffer;

/**
 * @author 潇兮
 * @date 2019/10/13 16:58
 **/
public class BufferTest {

    public static  void main(String[] args){
        //基于堆内( Non-Direct )記憶體的實作類 HeapByteBuffer 的對象,allocateDirect(int capacity)堆外記憶體
        ByteBuffer byteBuffer=ByteBuffer.allocate(5);
        //預設是寫入模式
        System.out.println(String.format("初始化: capacity容量:%s,position位置:%sm,limit上限:%s",byteBuffer.capacity(),byteBuffer.position(),byteBuffer.limit()));

        //寫入操作
        System.out.println("開始讀資料");
        byteBuffer.put((byte) 1);
        byteBuffer.put((byte) 2);
        byteBuffer.put((byte) 3);

        //寫入資料後屬性變化
        System.out.println(String.format("寫入資料後屬性變化: capacity容量:%s,position位置:%sm,limit上限:%s",byteBuffer.capacity(),byteBuffer.position(),byteBuffer.limit()));

        //轉換為讀模式,讀取資料若是不掉用flip方法,position的位置不正确
        byteBuffer.flip();
        byte a=byteBuffer.get();
        System.out.println(a);
        byte b=byteBuffer.get();
        System.out.println(b);

        //讀取資料後屬性的變化
        System.out.println(String.format("讀取資料後屬性的變化: capacity容量:%s,position位置:%sm,limit上限:%s",byteBuffer.capacity(),byteBuffer.position(),byteBuffer.limit()));

    }

}

           

Buffer記憶體類型

ByteBuffer提供了直接記憶體(direct堆外記憶體)和直接記憶體(heap堆)兩種實作。

堆外記憶體擷取的方式:ByteBuffer byteBuffer=ByteBuffer.allocateDirect(int capacity);

使用堆外記憶體的好處:

1.進行網絡 IO 比heapBuffer少一次拷貝。(file/socket——OS memory——jvm heap)因為GC會移動對象記憶體,是以在寫file或者socket時,JVM的實作會先把資料複制到堆外 ,再進行寫入。

2.GC範圍之外,降低GC壓力,實作了自動管理。DirectByteBuffer中有一個Cleaner對象(PhantomReference),Cleaner被GC前會執行clean方法,觸發DirectByteBuffer中定義的回收函數 Deallocator

堆外方式的使用場景和注意

1.在性能确實可觀的情況下才去使用堆外記憶體的方式;例如配置設定給大型、長壽命的對象或應用(網絡傳輸、檔案讀寫場景),減少記憶體拷貝的次數;

2.通過虛拟機參數MaxDirectMemorySize限制大小,防止耗盡整個機器的記憶體

NIO的三大核心元件—Channel通道

Channel的API涵蓋了UDP/TCP網絡和檔案 IO,例如FileChannel、DatagramChannel、SocketChannel

ServerSocketChannel。

NIO Channel VS Java Stream

1.對于同一個channel,我們可以在同一通道内讀取和寫入操作。而對于同一個stream中,要麼隻讀,要麼隻寫,二選一,即是單向操作的;

2.Channel可以非阻塞的讀寫 IO 操作,而stream隻能阻塞的讀寫IO操作;

3.Channel的使用必須搭配Buffer,即是總是先讀取到一個buffer中或向一個buffer中寫入,再寫入Channel。

Channel的實作

Channel 在 Java 中,作為一個接口(java.nio.channels.Channel ),定義了 IO 操作的連接配接與關閉。主要有代碼如下

深入淺出了解Java NIOJAVA NIO概述NIO 和 BIO的對比NIO的三大核心元件

Channel 最為重要的四個 Channel 實作類如下:

SocketChannel

:用戶端用于發起 TCP 的 Channel 。

ServerSocketChannel

:服務端用于監聽新進來的連接配接的 TCP 的 Channel 。對于新進來的連接配接,都會建立一個對應的 SocketChannel 。

DatagramChannel

:通過 UDP 讀寫資料。

FileChannel

:從檔案中,讀寫資料。

SocketChannel

SocketChannel用于建立TCP網絡連接配接,類似java.net.Socket。有兩種建立方式。參考《Java NIO 系列教程(八) SocketChannel》

  1. 用戶端主動發起和伺服器的連接配接
  2. 服務端擷取新連接配接
//用戶端主動發起連接配接
    SocketChannel socketChannel=SocketChannel.open();
    socketChannel.configureBlocking(false);//設定為非阻塞模式,預設是阻塞模式
    socketChannel.connect(new InetSocketAddress("http://xxxx.com",80));

    channel.write(byteBuffer);//發送請求資料——向通道内寫入資料(循環中調用)
    int bytesRead=socketChannel.read(byteBuffer);//讀取服務端傳回的資料——讀取緩沖區的資料
    socketChannel.close();//關閉連接配接

           
注意:write寫:write()在還沒寫入任何内容的時候就有可能傳回(非阻塞),是以需要循環中調用write().
read讀:read()方法可能直接傳回而讀不到任何資料,根據傳回的int值判斷讀取了多少位元組。
           

ServerSocketChannel

ServerSocketChannel可以監聽建立的TCP連接配接通道,類似ServerSocket。詳細介紹可以參考《Java NIO系列教程(九) ServerSocketChannel》

//建立網絡服務端
 ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
 serverSocketChannel.configureBlocking(false);//設定非阻塞
 //綁定端口
 serverSocketChannel.socket().bind(new InetSocketAddress(8080));
 while(true){
     //擷取新的TCP連結端口
    SocketChannel socketChannel=serverSocketChannel.accept();
      if(socketChannel !=null){
           //tcp請求,讀取響應
       }
  }
           

serverSocketChannel.accept():如果該通道處于非阻塞模式,那麼如果沒有挂起連接配接,該方法立即傳回null,是以SocketChannel 要進行非null校驗。

模拟代碼

用戶端代碼:

package nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

/**
 * @Author 作者 :@潇兮
 * @Date 建立時間:2019/10/14 12:09
 * 類說明:
 */
public class NIOClient {

    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
        while (!socketChannel.finishConnect()) {
            // 判斷是否連接配接完成,若沒連接配接上,則一直等待
            Thread.yield();
        }
        Scanner scanner = new Scanner(System.in);
        System.out.println("請輸入:");
        // 發送内容
        String msg = scanner.nextLine();
        ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
        while (buffer.hasRemaining()) {
            socketChannel.write(buffer);
        }
        // 讀取響應
        System.out.println("收到服務端響應:");
        ByteBuffer requestBuffer = ByteBuffer.allocate(1024);

        while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
            // 長連接配接情況下,需要手動判斷資料有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)
            if (requestBuffer.position() > 0) break;
        }
        requestBuffer.flip();
        byte[] content = new byte[requestBuffer.limit()];
        requestBuffer.get(content);
        System.out.println(new String(content));
        scanner.close();
        socketChannel.close();
    }

}

           

服務端代碼(一):

package nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 * @Author 作者 :@潇兮
 * @Date 建立時間:2019/10/14 11:06
 * 類說明:用于初步了解,直接基于非阻塞的寫法,後文繼續待改進
 */
public class NIOServer {

    public static void main(String[] args) throws IOException {
        //建立網絡服務端
        ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);//設定阻塞
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        System.out.println("端口啟動");
        while (true){
            SocketChannel socketChannel=serverSocketChannel.accept();//擷取新連接配接
            //判斷socketChannel
            if (socketChannel!=null){
                System.out.println("擷取的新連接配接:"+socketChannel.getRemoteAddress());
                socketChannel.configureBlocking(false);//設定為非阻塞
                ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                try {
                    while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
                        // 長連接配接情況下,需要手動判斷資料有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)
                        if (requestBuffer.position() > 0) break;
                    }
                    if (requestBuffer.position() == 0) continue;//若無資料,不繼續處理後續
                    requestBuffer.flip();//切換為讀模式
                    byte[] content = new byte[requestBuffer.limit()];//初始化數組大小
                    requestBuffer.get(content);//擷取内容
                    System.out.println("收到消息:"+new String(content));
                    System.out.println("收到資料,來自:"+ socketChannel.getRemoteAddress());
                    // 響應結果 200,模拟請求響應
                    String response = "HTTP/1.1 200 OK\r\n" +
                            "Content-Length: 11\r\n\r\n" +
                            "Yes,He is";
                    ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());//資料存放在byte數組
                    while (buffer.hasRemaining()) {
                       // hasRemaining() 傳回是否有剩餘的可用長度
                        socketChannel.write(buffer);// 非阻塞
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

    }

}

           

從服務端代碼(一)中,我們運作NIOServer,同時打開多個client,發現隻有當一個client發送讀取結束後第二個client才與Server建立連接配接。即是當58729端口完成發送資訊後,58737端口的用戶端才會與server建立連接配接。和 BIO 的代碼實作差別不大,此處隻是将方式改為非阻塞的形式,改進代碼,見

服務端代碼(改進一)

,服務端代碼(一)運作結果如下圖:

深入淺出了解Java NIOJAVA NIO概述NIO 和 BIO的對比NIO的三大核心元件

服務端代碼(改進一)

package nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;

/**
 * @Author 作者 :@潇兮
 * @Date 建立時間:2019/10/14 11:54
 * 類說明:直接基于非阻塞的寫法,一個線程處理輪詢所有請求, 問題: 輪詢通道的方式,低效,浪費CPU。後文繼續改進
 */
public class NIOServer1 {
    //已經建立連接配接的集合
    private static ArrayList<SocketChannel> channels = new ArrayList<>();

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        System.out.println("端口啟動");
        while (true){
            SocketChannel socketChannel=serverSocketChannel.accept();

            if (socketChannel!=null){
                System.out.println("收到新連接配接 : " + socketChannel.getRemoteAddress());
                socketChannel.configureBlocking(false); // 預設是阻塞的,一定要設定為非阻塞
                channels.add(socketChannel);
            }else {
                //若無新連接配接,處理現有連接配接後删除
                Iterator<SocketChannel> iterator = channels.iterator();
                while (iterator.hasNext()){
                  SocketChannel  ch=iterator.next();
                  try {
                      ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
                      if (ch.read(byteBuffer)==0){
                          //若通道内無資料處理,退出目前循環
                           continue;
                      }
                      while (ch.isOpen() && ch.read(byteBuffer)!=-1){
                          // 長連接配接情況下,需要手動判斷資料有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)
                          if (byteBuffer.position() > 0) break;
                      }
                      if (byteBuffer.position()==0) continue; // 如果沒資料了, 則不繼續後面的處理
                      //切換為讀模式
                      byteBuffer.flip();
                      byte[] content = new byte[byteBuffer.limit()];
                      byteBuffer.get(content);
                      System.out.println(new String(content));
                      System.out.println("收到資料,來自:" + ch.getRemoteAddress());

                      // 響應結果 200,模拟響應
                      String response = "HTTP/1.1 200 OK\r\n" +
                              "Content-Length: 11\r\n\r\n" +
                              "Hello World";
                      ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                      while (buffer.hasRemaining()) {
                          // hasRemaining() 傳回是否有剩餘的可用長度
                          ch.write(buffer);
                      }
                      //處理後删除
                      iterator.remove();
                  }catch (Exception e){
                      e.printStackTrace();
                  }
                }
            }
        }
    }
}
           

運作server開啟服務端後運作多個用戶端client,發現通過

輪詢通道

的方式可以一次建立多個連接配接,如同代碼所示,當一個程式與服務端建立連接配接的時候,fu服務端收到響—accept(),收到響應後将它加入channels中,保留多個連接配接,當無新連接配接建立時,線程處理已經建立好的連接配接。但是存在不足,上述代碼的實作方式是為低效的循環檢查的方式,NIO提供了Selector的方式解決此類問題,避免循環檢查,具體代碼見

服務端代碼(改進二)

,服務端代碼(改進一)運作結果如下圖:

深入淺出了解Java NIOJAVA NIO概述NIO 和 BIO的對比NIO的三大核心元件

Selector選擇器

Selector是一個Java NIO 元件,可以檢查一個或者多個NIO通道,确定一個或多個 NIO Channel 的狀态是否處于可讀、可寫。實作了單個線程可以管理多個通道,進而管理多個網絡連接配接。是以,Selector 也被稱為多路複用器。

Selector監聽事件

一個線程使用Selector監聽多個channel的不同僚件:四個事件分别對應SelectionKey四個常量

1.Connet連接配接:連接配接完成事件( TCP 連接配接 ),僅适用于用戶端, SelectionKey.OP_CONNECT

2.Accept:接受新連接配接事件,僅适用于服務端,SelectionKey.OP_ACCEPT

3.Read讀取:讀事件,适用于兩端,表示Buffer可讀,SelectionKey.OP_READ

4.Write寫入:讀事件,适用于兩端,表示Buffer可寫,SelectionKey.OP_WRITE

Selector的使用

建立Selector可以通過

Selector selector = Selector.open();

來建立一個Selector對象。為了Selector能夠管理Channel,将Channel注冊到Selector中(一個 Channel 要注冊到 Selector 中,那麼該 Channel 必須是非阻塞,FileChannel是阻塞的,是以不能夠注冊)

channel.configureBlocking(false); // 
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
           

若Selector 可以對 Channel 的多個事件感興趣,要注冊 Channel 的多個事件到 Selector 中時,可以使用或運算

|

來組合多個事件。示例代碼如下:

channel.configureBlocking(false); // 
SelectionKey key = channel.register(selector, SelectionKey.OP_READ |  SelectionKey.OP_WRITE);
           

服務端代碼(改進二)

package nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @Author 作者 :@潇兮
 * @Date 建立時間:2019/10/14 15:12
 * 類說明:  此處一個selector監聽所有事件,一個線程處理所有請求事件. 會成為瓶頸! 要有多線程的運用,後文繼續改進
 */
public class NIOServer2 {

    public static void main(String[] args) throws IOException {
        // 1. 建立網絡服務端ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false); // 設定為非阻塞模式

        // 2. 建構一個Selector選擇器,将channel注冊上去
        Selector selector = Selector.open();
        // 将serverSocketChannel注冊到selector
        SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);
        // 對serverSocketChannel上面的accept事件監聽(serverSocketChannel隻能支援accept操作)
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
        // 3. 綁定端口
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        System.out.println("端口啟動");

        while (true){
            //不輪詢通道,改用輪詢事件的方式.select方法有阻塞效果,直到有事件通知才會有傳回
            selector.select();
            Set<SelectionKey> selectionKeys=selector.selectedKeys();
            //周遊查詢結果
            Iterator<SelectionKey> iterator=selectionKeys.iterator();
            while (iterator.hasNext()){
                SelectionKey key=iterator.next();
                iterator.remove();

                //監聽讀和寫事件
                if (key.isAcceptable()){
                    ServerSocketChannel server= (ServerSocketChannel) key.attachment();
                    //将通道注冊到selector上
                    SocketChannel clientSocketChannel=server.accept();//mainReactor 輪詢accept
                    clientSocketChannel.configureBlocking(false);
                    clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel);
                    System.out.println("收到新連接配接 : " + clientSocketChannel.getRemoteAddress());
                }
                if (key.isReadable()){
                    SocketChannel socketChannel = (SocketChannel) key.attachment();
                    try {
                        ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                        while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
                            // 長連接配接情況下,需要手動判斷資料有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)
                            if (requestBuffer.position() > 0) break;
                        }
                        if(requestBuffer.position() == 0) continue; // 如果沒資料了, 則不繼續後面的處理
                        requestBuffer.flip();
                        byte[] content = new byte[requestBuffer.limit()];
                        requestBuffer.get(content);
                        System.out.println(new String(content));
                        System.out.println("收到資料,來自:" + socketChannel.getRemoteAddress());
                        // TODO 業務操作 資料庫 接口調用等等

                        // 響應結果 200
                        String response = "HTTP/1.1 200 OK\r\n" +
                                "Content-Length: 11\r\n\r\n" +
                                "Hello World";
                        ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                        while (buffer.hasRemaining()) {
                            socketChannel.write(buffer);
                        }
                    } catch (IOException e) {
                        // e.printStackTrace();
                        key.cancel(); // 取消事件訂閱
                    }
                }
            }
            selector.selectNow();
        }

    }
}
           

NIO 與多線程結合改進方案

reactor模式

reactor模式稱之為響應器模式,常用于nio的網絡通信架構。下圖來源:《Scalable IO in Java》

單Reactor模式

深入淺出了解Java NIOJAVA NIO概述NIO 和 BIO的對比NIO的三大核心元件
概括上圖:Reactor線程接收請求->分發給線程池處理請求
           

單Reactor模式,定義了兩種線程,一種線程是Reator線程。Reator線程主要負責網絡的資料接收(accept())以及網絡連接配接的處理(比如TCP連接配接中接收的資料),接收的資料處理操作(如讀資料、解析協定等)由單獨的線程池執行。 實際上就是将底層的基礎網絡處理和應用層的邏輯處理做了分離,提高效率。

多Reactor模式

深入淺出了解Java NIOJAVA NIO概述NIO 和 BIO的對比NIO的三大核心元件
概括上圖:mainReactor->分發給subReactor讀寫->具體業務邏輯分發給單獨的線程池處理
           

多Reactor模式,是将Reactor分為了多種,将處理網絡連接配接的交由mainReactor去做,資料的讀取處理交由另外一個Reactor去做,其他的和單Reactor模式無較大差別。本質上就是在網絡底層多了一次分發,将資料處理交由另外一個線程去做。

根據Reactor模型改進服務端代碼如下:、

服務端代碼(三)

package nio;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author 潇兮
 * @date 2019/10/15 21:26
 **/
public class NIOServer3 {
    /** 處理業務操作的線程 */
    private static ExecutorService workPool = Executors.newCachedThreadPool();

    /**
     * 封裝了selector.select()等事件輪詢的代碼
     */
    abstract class ReactorThread extends Thread {

        Selector selector;
        LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

        /**
         * Selector監聽到有事件後,調用這個方法
         */
        public abstract void handler(SelectableChannel channel) throws Exception;

        private ReactorThread() throws IOException {
            selector = Selector.open();
        }

        volatile boolean running = false;

        @Override
        public void run() {
            // 輪詢Selector事件
            while (running) {
                try {
                    // 執行隊列中的任務
                    Runnable task;
                    while ((task = taskQueue.poll()) != null) {
                        task.run();
                    }
                    selector.select(1000);

                    // 擷取查詢結果
                    Set<SelectionKey> selected = selector.selectedKeys();
                    // 周遊查詢結果
                    Iterator<SelectionKey> iter = selected.iterator();
                    while (iter.hasNext()) {
                        // 被封裝的查詢結果
                        SelectionKey key = iter.next();
                        iter.remove();
                        int readyOps = key.readyOps();
                        // 關注 Read 和 Accept兩個事件
                        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                            try {
                                SelectableChannel channel = (SelectableChannel) key.attachment();
                                channel.configureBlocking(false);
                                handler(channel);
                                if (!channel.isOpen()) {
                                    key.cancel(); // 如果關閉了,就取消這個KEY的訂閱
                                }
                            } catch (Exception ex) {
                                key.cancel(); // 如果有異常,就取消這個KEY的訂閱
                            }
                        }
                    }
                    selector.selectNow();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        private SelectionKey register(SelectableChannel channel) throws Exception {
            // 為什麼register要以任務送出的形式,讓reactor線程去處理?
            // 因為線程在執行channel注冊到selector的過程中,會和調用selector.select()方法的線程争用同一把鎖
            // 而select()方法實在eventLoop中通過while循環調用的,争搶的可能性很高,為了讓register能更快的執行,就放到同一個線程來處理
            FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel));
            taskQueue.add(futureTask);
            return futureTask.get();
        }

        private void doStart() {
            if (!running) {
                running = true;
                start();
            }
        }
    }

    private ServerSocketChannel serverSocketChannel;
    // 1、建立多個線程 - accept處理reactor線程 (accept線程)
    private ReactorThread[] mainReactorThreads = new ReactorThread[1];
    // 2、建立多個線程 - io處理reactor線程  (I/O線程)
    private ReactorThread[] subReactorThreads = new ReactorThread[8];

    /**
     * 初始化線程組
     */
    private void newGroup() throws IOException {
        // 建立IO線程,負責處理用戶端連接配接以後socketChannel的IO讀寫
        for (int i = 0; i < subReactorThreads.length; i++) {
            subReactorThreads[i] = new ReactorThread() {
                @Override
                public void handler(SelectableChannel channel) throws IOException {
                    // work線程隻負責處理IO處理,不處理accept事件
                    SocketChannel ch = (SocketChannel) channel;
                    ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                    while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                        // 長連接配接情況下,需要手動判斷資料有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)
                        if (requestBuffer.position() > 0) break;
                    }
                    if (requestBuffer.position() == 0) return; // 如果沒資料了, 則不繼續後面的處理
                    requestBuffer.flip();
                    byte[] content = new byte[requestBuffer.limit()];
                    requestBuffer.get(content);
                    System.out.println(new String(content));
                    System.out.println(Thread.currentThread().getName() + "收到資料,來自:" + ch.getRemoteAddress());

                    // TODO 業務操作 資料庫、接口...
                    workPool.submit(() -> {
                    });

                    // 響應結果 200
                    String response = "HTTP/1.1 200 OK\r\n" +
                            "Content-Length: 11\r\n\r\n" +
                            "Hello World";
                    ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                    while (buffer.hasRemaining()) {
                        ch.write(buffer);
                    }
                }
            };
        }

        // 建立mainReactor線程, 隻負責處理serverSocketChannel
        for (int i = 0; i < mainReactorThreads.length; i++) {
            mainReactorThreads[i] = new ReactorThread() {
                AtomicInteger incr = new AtomicInteger(0);

                @Override
                public void handler(SelectableChannel channel) throws Exception {
                    // 隻做請求分發,不做具體的資料讀取
                    ServerSocketChannel ch = (ServerSocketChannel) channel;
                    SocketChannel socketChannel = ch.accept();
                    socketChannel.configureBlocking(false);
                    // 收到連接配接建立的通知之後,分發給I/O線程繼續去讀取資料
                    int index = incr.getAndIncrement() % subReactorThreads.length;
                    ReactorThread workEventLoop = subReactorThreads[index];
                    workEventLoop.doStart();
                    SelectionKey selectionKey = workEventLoop.register(socketChannel);
                    selectionKey.interestOps(SelectionKey.OP_READ);
                    System.out.println(Thread.currentThread().getName() + "收到新連接配接 : " + socketChannel.getRemoteAddress());
                }
            };
        }


    }

    /**
     * 初始化channel,并且綁定一個eventLoop線程
     *
     * @throws IOException IO異常
     */
    private void initAndRegister() throws Exception {
        // 1、 建立ServerSocketChannel
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        // 2、 将serverSocketChannel注冊到selector
        int index = new Random().nextInt(mainReactorThreads.length);
        mainReactorThreads[index].doStart();
        SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
    }

    /**
     * 綁定端口
     *
     * @throws IOException IO異常
     */
    private void bind() throws IOException {
        //  1、 正式綁定端口,對外服務
        serverSocketChannel.bind(new InetSocketAddress(8080));
        System.out.println("啟動完成,端口8080");
    }

    public static void main(String[] args) throws Exception {
        NIOServer3 nioServerV3 = new NIOServer3();
        nioServerV3.newGroup(); // 1、 建立main和sub兩組線程
        nioServerV3.initAndRegister(); // 2、 建立serverSocketChannel,注冊到mainReactor線程上的selector上
        nioServerV3.bind(); // 3、 為serverSocketChannel綁定端口
    }
}

           

附上參考連結Reactor模型了解【NIO系列】——Reactor模式

總結

Java NIO是一種新的Java IO操作的

非阻塞

API。要想提升性能,需要與多線程技術結合使用。由于網絡程式設計的複雜性,在開源社群中湧現了多款對JDK NIO封裝和增強的架構如netty、Mina等架構。

繼續閱讀