天天看點

Java I/O模型從BIO到NIO和Reactor模式 從IO到NIO 阻塞I/O下的伺服器實作 Reactor模式

Unix下五種I/O模型

Unix 下共有五種 I/O 模型:

  • 阻塞 I/O
  • 非阻塞 I/O
  • I/O 多路複用(select和poll)
  • 信号驅動 I/O(SIGIO)
  • 異步 I/O(Posix.1的aio_系列函數)

阻塞I/O

如上文所述,阻塞I/O下請求無法立即完成則保持阻塞。阻塞I/O分為如下兩個階段。

  • 階段1:等待資料就緒。網絡 I/O 的情況就是等待遠端資料陸續抵達;磁盤I/O的情況就是等待磁盤資料從磁盤上讀取到核心态記憶體中。
  • 階段2:資料拷貝。出于系統安全,使用者态的程式沒有權限直接讀取核心态記憶體,是以核心負責把核心态記憶體中的資料拷貝一份到使用者态記憶體中。

非阻塞I/O

非阻塞I/O請求包含如下三個階段

  • socket設定為 NONBLOCK(非阻塞)就是告訴核心,當所請求的I/O操作無法完成時,不要将線程睡眠,而是傳回一個錯誤碼(EWOULDBLOCK) ,這樣請求就不會阻塞。
  • I/O操作函數将不斷的測試資料是否已經準備好,如果沒有準備好,繼續測試,直到資料準備好為止。整個I/O 請求的過程中,雖然使用者線程每次發起I/O請求後可以立即傳回,但是為了等到資料,仍需要不斷地輪詢、重複請求,消耗了大量的 CPU 的資源。
  • 資料準備好了,從核心拷貝到使用者空間。

一般很少直接使用這種模型,而是在其他I/O模型中使用非阻塞I/O 這一特性。這種方式對單個I/O 請求意義不大,但給I/O多路複用提供了條件。

I/O多路複用(異步阻塞 I/O)

I/O多路複用會用到select或者poll函數,這兩個函數也會使線程阻塞,但是和阻塞I/O所不同的是,這兩個函數可以同時阻塞多個I/O操作。而且可以同時對多個讀操作,多個寫操作的I/O函數進行檢測,直到有資料可讀或可寫時,才真正調用I/O操作函數。

從流程上來看,使用select函數進行I/O請求和同步阻塞模型沒有太大的差別,甚至還多了添加監視Channel,以及調用select函數的額外操作,增加了額外工作。但是,使用 select以後最大的優勢是使用者可以在一個線程内同時處理多個Channel的I/O請求。使用者可以注冊多個Channel,然後不斷地調用select讀取被激活的Channel,即可達到在同一個線程内同時處理多個I/O請求的目的。而在同步阻塞模型中,必須通過多線程的方式才能達到這個目的。

調用select/poll該方法由一個使用者态線程負責輪詢多個Channel,直到某個階段1的資料就緒,再通知實際的使用者線程執行階段2的拷貝。 通過一個專職的使用者态線程執行非阻塞I/O輪詢,模拟實作了階段一的異步化。

信号驅動I/O(SIGIO)

首先我們允許socket進行信号驅動I/O,并安裝一個信号處理函數,線程繼續運作并不阻塞。當資料準備好時,線程會收到一個SIGIO 信号,可以在信号處理函數中調用I/O操作函數處理資料。

異步I/O

調用aio_read 函數,告訴核心描述字,緩沖區指針,緩沖區大小,檔案偏移以及通知的方式,然後立即傳回。當核心将資料拷貝到緩沖區後,再通知應用程式。是以異步I/O模式下,階段1和階段2全部由核心完成,完成不需要使用者線程的參與。

幾種I/O模型對比

除異步I/O外,其它四種模型的階段2基本相同,都是從核心态拷貝資料到使用者态。差別在于階段1不同。前四種都屬于同步I/O。

Java中四種I/O模型

上一章所述Unix中的五種I/O模型,除信号驅動I/O外,Java對其它四種I/O模型都有所支援。其中Java最早提供的blocking I/O即是阻塞I/O,而NIO即是非阻塞I/O,同時通過NIO實作的Reactor模式即是I/O複用模型的實作,通過AIO實作的Proactor模式即是異步I/O模型的實作。

從IO到NIO

面向流 vs. 面向緩沖

Java IO是面向流的,每次從流(InputStream/OutputStream)中讀一個或多個位元組,直到讀取完所有位元組,它們沒有被緩存在任何地方。另外,它不能前後移動流中的資料,如需前後移動處理,需要先将其緩存至一個緩沖區。

Java NIO面向緩沖,資料會被讀取到一個緩沖區,需要時可以在緩沖區中前後移動處理,這增加了處理過程的靈活性。但與此同時在處理緩沖區前需要檢查該緩沖區中是否包含有所需要處理的資料,并需要確定更多資料讀入緩沖區時,不會覆寫緩沖區内尚未處理的資料。

阻塞 vs. 非阻塞

Java IO的各種流是阻塞的。當某個線程調用read()或write()方法時,該線程被阻塞,直到有資料被讀取到或者資料完全寫入。阻塞期間該線程無法處理任何其它事情。

Java NIO為非阻塞模式。讀寫請求并不會阻塞目前線程,在資料可讀/寫前目前線程可以繼續做其它事情,是以一個單獨的線程可以管理多個輸入和輸出通道。

選擇器(Selector)

Java NIO的選擇器允許一個單獨的線程同時監視多個通道,可以注冊多個通道到同一個選擇器上,然後使用一個單獨的線程來“選擇”已經就緒的通道。這種“選擇”機制為一個單獨線程管理多個通道提供了可能。

零拷貝

Java NIO中提供的FileChannel擁有transferTo和transferFrom兩個方法,可直接把FileChannel中的資料拷貝到另外一個Channel,或者直接把另外一個Channel中的資料拷貝到FileChannel。該接口常被用于高效的網絡/檔案的資料傳輸和大檔案拷貝。在作業系統支援的情況下,通過該方法傳輸資料并不需要将源資料從核心态拷貝到使用者态,再從使用者态拷貝到目标通道的核心态,同時也避免了兩次使用者态和核心态間的上下文切換,也即使用了“零拷貝”,是以其性能一般高于Java IO中提供的方法。

使用FileChannel的零拷貝将本地檔案内容傳輸到網絡的示例代碼如下所示。

1
     
     
      2
     
     
      3
     
     
      4
     
     
      5
     
     
      6
     
     
      7
     
     
      8
     
     
      9
     
     
      10
     
     
      11
     
     
      12
     
     
      13
     
     
      14
     
     
      15
     
     
      16
           
public 
      class NIOClient {
     
     
       
      public static void main(String[] args) throws IOException, InterruptedException {
     
     
          SocketChannel socketChannel = SocketChannel.open();
     
     
          InetSocketAddress address = 
      new InetSocketAddress(
      1234);
     
     
          socketChannel.connect(address);
     
     
     
          RandomAccessFile file = 
      new RandomAccessFile(
     
     
              NIOClient.class.getClassLoader().getResource(
      "test.txt").getFile(), 
      "rw");
     
     
          FileChannel channel = file.getChannel();
     
     
          channel.transferTo(
      0, channel.size(), socketChannel);
     
     
          channel.close();
     
     
          file.close();
     
     
          socketChannel.close();
     
     
        }
     
     
      }
           

阻塞I/O下的伺服器實作

單線程逐個處理所有請求

使用阻塞I/O的伺服器,一般使用循環,逐個接受連接配接請求并讀取資料,然後處理下一個請求。

1
     
     
      2
     
     
      3
     
     
      4
     
     
      5
     
     
      6
     
     
      7
     
     
      8
     
     
      9
     
     
      10
     
     
      11
     
     
      12
     
     
      13
     
     
      14
     
     
      15
     
     
      16
     
     
      17
     
     
      18
     
     
      19
     
     
      20
     
     
      21
     
     
      22
     
     
      23
     
     
      24
     
     
      25
     
     
      26
           
public 
      class IOServer {
     
     
       
      private 
      static 
      final Logger LOGGER = LoggerFactory.getLogger(IOServer.class);
     
     
       
      public static void main(String[] args) {
     
     
          ServerSocket serverSocket = 
      null;
     
         
      try {
     
     
            serverSocket = 
      new ServerSocket();
     
     
            serverSocket.bind(
      new InetSocketAddress(
      2345));
     
     
          } 
      catch (IOException ex) {
     
     
            LOGGER.error(
      "Listen failed", ex);
     
           
      return;
     
     
          }
     
         
      try{
     
           
      while(
      true) {
     
     
              Socket socket = serverSocket.accept();
     
     
              InputStream inputstream = socket.getInputStream();
     
     
              LOGGER.info(
      "Received message {}", IOUtils.toString(inputstream));
     
     
              IOUtils.closeQuietly(inputstream);
     
     
            }
     
     
          } 
      catch(IOException ex) {
     
     
            IOUtils.closeQuietly(serverSocket);
     
     
            LOGGER.error(
      "Read message failed", ex);
     
     
          }
     
     
        }
     
     
      }
           

為每個請求建立一個線程

上例使用單線程逐個處理所有請求,同一時間隻能處理一個請求,等待I/O的過程浪費大量CPU資源,同時無法充分使用多CPU的優勢。下面是使用多線程對阻塞I/O模型的改進。一個連接配接建立成功後,建立一個單獨的線程處理其I/O操作。

Java I/O模型從BIO到NIO和Reactor模式 從IO到NIO 阻塞I/O下的伺服器實作 Reactor模式
1
     
     
      2
     
     
      3
     
     
      4
     
     
      5
     
     
      6
     
     
      7
     
     
      8
     
     
      9
     
     
      10
     
     
      11
     
     
      12
     
     
      13
     
     
      14
     
     
      15
     
     
      16
     
     
      17
     
     
      18
     
     
      19
     
     
      20
     
     
      21
     
     
      22
     
     
      23
     
     
      24
     
     
      25
     
     
      26
     
     
      27
     
     
      28
     
     
      29
     
     
      30
           
public 
      class IOServerMultiThread {
     
       
      private 
      static 
      final Logger LOGGER = LoggerFactory.getLogger(IOServerMultiThread.class);
     
       
      public static void main(String[] args) {
     
     
        ServerSocket serverSocket = 
      null;
     
         
      try {
     
     
            serverSocket = 
      new ServerSocket();
     
     
            serverSocket.bind(
      new InetSocketAddress(
      2345));
     
     
          } 
      catch (IOException ex) {
     
     
            LOGGER.error(
      "Listen failed", ex);
     
           
      return;
     
     
          }
     
         
      try{
     
           
      while(
      true) {
     
     
              Socket socket = serverSocket.accept();
     
             
      new Thread( () -> {
     
               
      try{
     
     
                  InputStream inputstream = socket.getInputStream();
     
     
                  LOGGER.info(
      "Received message {}", IOUtils.toString(inputstream));
     
     
                  IOUtils.closeQuietly(inputstream);
     
     
                } 
      catch (IOException ex) {
     
     
                  LOGGER.error(
      "Read message failed", ex);
     
     
                }
     
     
              }).start();
     
     
            }
     
     
          } 
      catch(IOException ex) {
     
     
            IOUtils.closeQuietly(serverSocket);
     
     
            LOGGER.error(
      "Accept connection failed", ex);
     
     
          }
     
     
        }
     
     
      }
           

使用線程池處理請求

為了防止連接配接請求過多,導緻伺服器建立的線程數過多,造成過多線程上下文切換的開銷。可以通過線程池來限制建立的線程數,如下所示。

1
     
     
      2
     
     
      3
     
     
      4
     
     
      5
     
     
      6
     
     
      7
     
     
      8
     
     
      9
     
     
      10
     
     
      11
     
     
      12
     
     
      13
     
     
      14
     
     
      15
     
     
      16
     
     
      17
     
     
      18
     
     
      19
     
     
      20
     
     
      21
     
     
      22
     
     
      23
     
     
      24
     
     
      25
     
     
      26
     
     
      27
     
     
      28
     
     
      29
     
     
      30
     
     
      31
     
     
      32
     
     
      33
     
     
      34
     
     
      35
           
public 
      class IOServerThreadPool {
     
     
       
      private 
      static 
      final Logger LOGGER = LoggerFactory.getLogger(IOServerThreadPool.class);
     
     
       
      public static void main(String[] args) {
     
     
          ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
     
     
          ServerSocket serverSocket = 
      null;
     
         
      try {
     
     
            serverSocket = 
      new ServerSocket();
     
     
            serverSocket.bind(
      new InetSocketAddress(
      2345));
     
     
          } 
      catch (IOException ex) {
     
     
            LOGGER.error(
      "Listen failed", ex);
     
           
      return;
     
     
          }
     
         
      try{
     
           
      while(
      true) {
     
     
              Socket socket = serverSocket.accept();
     
     
              executorService.submit(() -> {
     
               
      try{
     
     
                  InputStream inputstream = socket.getInputStream();
     
     
                  LOGGER.info(
      "Received message {}", IOUtils.toString(
      new InputStreamReader(inputstream)));
     
     
                } 
      catch (IOException ex) {
     
     
                  LOGGER.error(
      "Read message failed", ex);
     
     
                }
     
     
              });
     
     
            }
     
     
          } 
      catch(IOException ex) {
     
           
      try {
     
     
              serverSocket.close();
     
     
            } 
      catch (IOException e) {
     
     
            }
     
     
            LOGGER.error(
      "Accept connection failed", ex);
     
     
          }
     
     
        }
     
     
      }
           

Reactor模式

精典Reactor模式

精典的Reactor模式示意圖如下所示。

Java I/O模型從BIO到NIO和Reactor模式 從IO到NIO 阻塞I/O下的伺服器實作 Reactor模式

在Reactor模式中,包含如下角色

  • Reactor 将I/O事件發派給對應的Handler
  • Acceptor 處理用戶端連接配接請求
  • Handlers 執行非阻塞讀/寫

最簡單的Reactor模式實作代碼如下所示。

1
     
     
      2
     
     
      3
     
     
      4
     
     
      5
     
     
      6
     
     
      7
     
     
      8
     
     
      9
     
     
      10
     
     
      11
     
     
      12
     
     
      13
     
     
      14
     
     
      15
     
     
      16
     
     
      17
     
     
      18
     
     
      19
     
     
      20
     
     
      21
     
     
      22
     
     
      23
     
     
      24
     
     
      25
     
     
      26
     
     
      27
     
     
      28
     
     
      29
     
     
      30
     
     
      31
     
     
      32
     
     
      33
     
     
      34
     
     
      35
     
     
      36
     
     
      37
     
     
      38
     
     
      39
     
     
      40
           
public 
      class NIOServer {
     
     
       
      private 
      static 
      final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);
     
     
       
      public static void main(String[] args) throws IOException {
     
     
          Selector selector = Selector.open();
     
     
          ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
     
     
          serverSocketChannel.configureBlocking(
      false);
     
     
          serverSocketChannel.bind(
      new InetSocketAddress(
      1234));
     
     
          serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
     
     
         
      while (selector.select() > 
      0) {
     
     
            Set<SelectionKey> keys = selector.selectedKeys();
     
     
            Iterator<SelectionKey> iterator = keys.iterator();
     
           
      while (iterator.hasNext()) {
     
     
              SelectionKey key = iterator.next();
     
     
              iterator.remove();
     
             
      if (key.isAcceptable()) {
     
     
                ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
     
     
                SocketChannel socketChannel = acceptServerSocketChannel.accept();
     
     
                socketChannel.configureBlocking(
      false);
     
     
                LOGGER.info(
      "Accept request from {}", socketChannel.getRemoteAddress());
     
     
                socketChannel.register(selector, SelectionKey.OP_READ);
     
     
              } 
      else 
      if (key.isReadable()) {
     
     
                SocketChannel socketChannel = (SocketChannel) key.channel();
     
     
                ByteBuffer buffer = ByteBuffer.allocate(
      1024);
     
               
      int count = socketChannel.read(buffer);
     
               
      if (count <= 
      0) {
     
     
                  socketChannel.close();
     
     
                  key.cancel();
     
     
                  LOGGER.info(
      "Received invalide data, close the connection");
     
                 
      continue;
     
     
                }
     
     
                LOGGER.info(
      "Received message {}", 
      new String(buffer.array()));
     
     
              }
     
     
              keys.remove(key);
     
     
            }
     
     
          }
     
     
        }
     
     
      }
           

為了友善閱讀,上示代碼将Reactor模式中的所有角色放在了一個類中。

從上示代碼中可以看到,多個Channel可以注冊到同一個Selector對象上,實作了一個線程同時監控多個請求狀态(Channel)。同時注冊時需要指定它所關注的事件,例如上示代碼中socketServerChannel對象隻注冊了OP_ACCEPT事件,而socketChannel對象隻注冊了OP_READ事件。

selector.select()

是阻塞的,當有至少一個通道可用時該方法傳回可用通道個數。同時該方法隻捕獲Channel注冊時指定的所關注的事件。

多工作線程Reactor模式

經典Reactor模式中,盡管一個線程可同時監控多個請求(Channel),但是所有讀/寫請求以及對新連接配接請求的處理都在同一個線程中處理,無法充分利用多CPU的優勢,同時讀/寫操作也會阻塞對新連接配接請求的處理。是以可以引入多線程,并行處理多個讀/寫操作,如下圖所示。

Java I/O模型從BIO到NIO和Reactor模式 從IO到NIO 阻塞I/O下的伺服器實作 Reactor模式

多線程Reactor模式示例代碼如下所示。

1
     
     
      2
     
     
      3
     
     
      4
     
     
      5
     
     
      6
     
     
      7
     
     
      8
     
     
      9
     
     
      10
     
     
      11
     
     
      12
     
     
      13
     
     
      14
     
     
      15
     
     
      16
     
     
      17
     
     
      18
     
     
      19
     
     
      20
     
     
      21
     
     
      22
     
     
      23
     
     
      24
     
     
      25
     
     
      26
     
     
      27
     
     
      28
     
     
      29
     
     
      30
     
     
      31
     
     
      32
     
     
      33
     
     
      34
     
     
      35
           
public 
      class NIOServer {
     
     
       
      private 
      static 
      final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);
     
     
       
      public static void main(String[] args) throws IOException {
     
     
          Selector selector = Selector.open();
     
     
          ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
     
     
          serverSocketChannel.configureBlocking(
      false);
     
     
          serverSocketChannel.bind(
      new InetSocketAddress(
      1234));
     
     
          serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
     
     
         
      while (
      true) {
     
           
      if(selector.selectNow() < 
      0) {
     
             
      continue;
     
     
            }
     
     
            Set<SelectionKey> keys = selector.selectedKeys();
     
     
            Iterator<SelectionKey> iterator = keys.iterator();
     
           
      while(iterator.hasNext()) {
     
     
              SelectionKey key = iterator.next();
     
     
              iterator.remove();
     
             
      if (key.isAcceptable()) {
     
     
                ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
     
     
                SocketChannel socketChannel = acceptServerSocketChannel.accept();
     
     
                socketChannel.configureBlocking(
      false);
     
     
                LOGGER.info(
      "Accept request from {}", socketChannel.getRemoteAddress());
     
     
                SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
     
     
                readKey.attach(
      new Processor());
     
     
              } 
      else 
      if (key.isReadable()) {
     
     
                Processor processor = (Processor) key.attachment();
     
     
                processor.process(key);
     
     
              }
     
     
            }
     
     
          }
     
     
        }
     
     
      }
           

從上示代碼中可以看到,注冊完SocketChannel的OP_READ事件後,可以對相應的SelectionKey attach一個對象(本例中attach了一個Processor對象,該對象處理讀請求),并且在擷取到可讀事件後,可以取出該對象。

注:attach對象及取出該對象是NIO提供的一種操作,但該操作并非Reactor模式的必要操作,本文使用它,隻是為了友善示範NIO的接口。

具體的讀請求處理在如下所示的Processor類中。該類中設定了一個靜态的線程池處理所有請求。而process方法并不直接處理I/O請求,而是把該I/O操作送出給上述線程池去處理,這樣就充分利用了多線程的優勢,同時将對新連接配接的處理和讀/寫操作的處理放在了不同的線程中,讀/寫操作不再阻塞對新連接配接請求的處理。

1
     
     
      2
     
     
      3
     
     
      4
     
     
      5
     
     
      6
     
     
      7
     
     
      8
     
     
      9
     
     
      10
     
     
      11
     
     
      12
     
     
      13
     
     
      14
     
     
      15
     
     
      16
     
     
      17
     
     
      18
     
     
      19
     
     
      20
     
     
      21
     
     
      22
           
public 
      class Processor {
     
       
      private 
      static 
      final Logger LOGGER = LoggerFactory.getLogger(Processor.class);
     
       
      private 
      static 
      final ExecutorService service = Executors.newFixedThreadPool(
      16);
     
     
       
      public void process(SelectionKey selectionKey) {
     
     
          service.submit(() -> {
     
     
            ByteBuffer buffer = ByteBuffer.allocate(
      1024);
     
     
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
     
           
      int count = socketChannel.read(buffer);
     
           
      if (count < 
      0) {
     
     
              socketChannel.close();
     
     
              selectionKey.cancel();
     
     
              LOGGER.info(
      "{}\t Read ended", socketChannel);
     
             
      return 
      null;
     
     
            } 
      else 
      if(count == 
      0) {
     
             
      return 
      null;
     
     
            }
     
     
            LOGGER.info(
      "{}\t Read message {}", socketChannel, 
      new String(buffer.array()));
     
           
      return 
      null;
     
     
          });
     
     
        }
     
     
      }
           

多Reactor

Netty中使用的Reactor模式,引入了多Reactor,也即一個主Reactor負責監控所有的連接配接請求,多個子Reactor負責監控并處理讀/寫請求,減輕了主Reactor的壓力,降低了主Reactor壓力太大而造成的延遲。

并且每個子Reactor分别屬于一個獨立的線程,每個成功連接配接後的Channel的所有操作由同一個線程處理。這樣保證了同一請求的所有狀态和上下文在同一個線程中,避免了不必要的上下文切換,同時也友善了監控請求響應狀态。

多Reactor模式示意圖如下所示。

Java I/O模型從BIO到NIO和Reactor模式 從IO到NIO 阻塞I/O下的伺服器實作 Reactor模式

多Reactor示例代碼如下所示。

1
     
     
      2
     
     
      3
     
     
      4
     
     
      5
     
     
      6
     
     
      7
     
     
      8
     
     
      9
     
     
      10
     
     
      11
     
     
      12
     
     
      13
     
     
      14
     
     
      15
     
     
      16
     
     
      17
     
     
      18
     
     
      19
     
     
      20
     
     
      21
     
     
      22
     
     
      23
     
     
      24
     
     
      25
     
     
      26
     
     
      27
     
     
      28
     
     
      29
     
     
      30
     
     
      31
     
     
      32
     
     
      33
     
     
      34
     
     
      35
           
public 
      class NIOServer {
     
     
       
      private 
      static 
      final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);
     
     
       
      public static void main(String[] args) throws IOException {
     
     
          Selector selector = Selector.open();
     
     
          ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
     
     
          serverSocketChannel.configureBlocking(
      false);
     
     
          serverSocketChannel.bind(
      new InetSocketAddress(
      1234));
     
     
          serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
     
     
         
      int coreNum = Runtime.getRuntime().availableProcessors();
     
     
          Processor[] processors = 
      new Processor[coreNum];
     
         
      for (
      int i = 
      0; i < processors.length; i++) {
     
     
            processors[i] = 
      new Processor();
     
     
          }
     
     
         
      int index = 
      0;
     
         
      while (selector.select() > 
      0) {
     
     
            Set<SelectionKey> keys = selector.selectedKeys();
     
           
      for (SelectionKey key : keys) {
     
     
              keys.remove(key);
     
             
      if (key.isAcceptable()) {
     
     
                ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
     
     
                SocketChannel socketChannel = acceptServerSocketChannel.accept();
     
     
                socketChannel.configureBlocking(
      false);
     
     
                LOGGER.info(
      "Accept request from {}", socketChannel.getRemoteAddress());
     
     
                Processor processor = processors[(
      int) ((index++) % coreNum)];
     
     
                processor.addChannel(socketChannel);
     
     
                processor.wakeup();
     
     
              }
     
     
            }
     
     
          }
     
     
        }
     
     
      }
           

如上代碼所示,本文設定的子Reactor個數是目前機器可用核數的兩倍(與Netty預設的子Reactor個數一緻)。對于每個成功連接配接的SocketChannel,通過round robin的方式交給不同的子Reactor。

子Reactor對SocketChannel的處理如下所示。

1
     
     
      2
     
     
      3
     
     
      4
     
     
      5
     
     
      6
     
     
      7
     
     
      8
     
     
      9
     
     
      10
     
     
      11
     
     
      12
     
     
      13
     
     
      14
     
     
      15
     
     
      16
     
     
      17
     
     
      18
     
     
      19
     
     
      20
     
     
      21
     
     
      22
     
     
      23
     
     
      24
     
     
      25
     
     
      26
     
     
      27
     
     
      28
     
     
      29
     
     
      30
     
     
      31
     
     
      32
     
     
      33
     
     
      34
     
     
      35
     
     
      36
     
     
      37
     
     
      38
     
     
      39
     
     
      40
     
     
      41
     
     
      42
     
     
      43
     
     
      44
     
     
      45
     
     
      46
     
     
      47
     
     
      48
     
     
      49
     
     
      50
     
     
      51
     
     
      52
           
public 
      class Processor {
     
       
      private 
      static 
      final Logger LOGGER = LoggerFactory.getLogger(Processor.class);
     
       
      private 
      static 
      final ExecutorService service =
     
     
            Executors.newFixedThreadPool(
      2 * Runtime.getRuntime().availableProcessors());
     
     
       
      private Selector selector;
     
     
       
      public Processor() throws IOException {
     
         
      this.selector = SelectorProvider.provider().openSelector();
     
     
          start();
     
     
        }
     
     
       
      public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
     
     
          socketChannel.register(
      this.selector, SelectionKey.OP_READ);
     
     
        }
     
     
       
      public void wakeup() {
     
         
      this.selector.wakeup();
     
     
        }
     
     
       
      public void start() {
     
     
          service.submit(() -> {
     
           
      while (
      true) {
     
             
      if (selector.select(
      500) <= 
      0) {
     
               
      continue;
     
     
              }
     
     
              Set<SelectionKey> keys = selector.selectedKeys();
     
     
              Iterator<SelectionKey> iterator = keys.iterator();
     
             
      while (iterator.hasNext()) {
     
     
                SelectionKey key = iterator.next();
     
     
                iterator.remove();
     
               
      if (key.isReadable()) {
     
     
                  ByteBuffer buffer = ByteBuffer.allocate(
      1024);
     
     
                  SocketChannel socketChannel = (SocketChannel) key.channel();
     
                 
      int count = socketChannel.read(buffer);
     
                 
      if (count < 
      0) {
     
     
                    socketChannel.close();
     
     
                    key.cancel();
     
     
                    LOGGER.info(
      "{}\t Read ended", socketChannel);
     
                   
      continue;
     
     
                  } 
      else 
      if (count == 
      0) {
     
     
                    LOGGER.info(
      "{}\t Message size is 0", socketChannel);
     
                   
      continue;
     
     
                  } 
      else {
     
     
                    LOGGER.info(
      "{}\t Read message {}", socketChannel, 
      new String(buffer.array()));
     
     
                  }
     
     
                }
     
     
              }
     
     
            }
     
     
          });
     
     
        }
     
     
      }
           

在Processor中,同樣建立了一個靜态的線程池,且線程池的大小為機器核數的兩倍。每個Processor執行個體均包含一個Selector執行個體。同時每次擷取Processor執行個體時均送出一個任務到該線程池,并且該任務正常情況下一直循環處理,不會停止。而送出給該Processor的SocketChannel通過在其Selector注冊事件,加入到相應的任務中。由此實作了每個子Reactor包含一個Selector對象,并由一個獨立的線程處理。

轉自:http://www.jasongj.com/java/nio_reactor/

繼續閱讀