天天看點

《保姆式的詳解》Reactor 模式和 Java NIO

作者:笨鳥先飛ftyjh
《保姆式的詳解》Reactor 模式和 Java NIO

概述

本文先從基本的 Socket 程式設計模式說起,介紹了 Java 傳統的同步阻塞 IO 網絡程式設計的基本實作,以及存在的性能問題,進而引出 Reactor 設計模式,最後通過 Java NIO 給出單 Reactor 單線程的實作方案。

Socket 程式設計模式

Unix 有幾個統一性的理念或象征,并塑造了它的 API 及由此形成的開發風格。其中最重要的一點應當是“一切皆檔案”模型及在此基礎上建立的管道概念。

在 Unix/Linux 環境下,網絡中的程序通過 Socket 進行通信,Socket 本質上也是一種特殊的檔案,可以按照“打開,讀寫,關閉”的模式來操作。Socket 程式設計的基本模式如圖 1 所示:

《保姆式的詳解》Reactor 模式和 Java NIO

圖 1 Socket 程式設計模式

  1. 建立 socket:本質上就是建立一個檔案,每個檔案都有一個整型的檔案描述符(fd)來指代這個檔案;
  2. 綁定端口:一台伺服器可以同時運作多個不同的應用,在 TCP/IP 協定下通過端口進行區分,是以接下來需要綁定端口,所有連接配接到該端口的請求都會被我們的服務處理;
  3. 監聽端口:執行建立 socket 和bind之後,socket 還處于closed狀态,不對外監聽,需要調用listen方法,讓 socket 進入被動監聽狀态;其 API 定義如下:
int listen(int sockfd, int backlog);
// 在TCP協定下,建立連接配接需要完成三次握手,當連接配接建立完成後會先放到一個連接配接隊列,backlog就是指定這個隊列的大小。           

4.接收請求:通過調用accept()從已完成連接配接的隊列中拿到連接配接進行處理,如果沒有連接配接則調用會被阻塞。

基于同步阻塞 IO 的 Java Socket 程式設計

下面介紹在 Java 語言下如何完成 Socket 通信。傳統的 Java Socket 程式設計模式使用同步阻塞 IO,如圖 2 所示。

《保姆式的詳解》Reactor 模式和 Java NIO

圖 2 經典的 Java Socket 程式設計模式

服務端通過new ServerSocket(端口号)完成了的工作,接着循環調用 accept() 方法擷取用戶端請求(如果沒有新的請求,程式就會阻塞),并為每一個用戶端請求建立一個處理線程,避免因為主線程正在處理請求而無法響應其他連接配接。具體實作代碼如下:

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
    public static void main(String[] args) {
        try {
            // 綁定并監聽端口
            ServerSocket server = new ServerSocket(5566);
            Socket client;
            while (!Thread.interrupted()) {
                // 接受請求,沒有請求會阻塞
                client = server.accept();
                new Thread(new Handler(client)).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    static class Handler implements Runnable {
        final Socket client;

        public Handler(Socket client) {
            this.client = client;
        }

        @Override
        public void run() {
            try {
                BufferedReader reader = new BufferedReader(
                        new InputStreamReader(client.getInputStream()));
                // 接收用戶端發送的内容
                String line;
                PrintWriter writer = new PrintWriter(
                        new OutputStreamWriter(client.getOutputStream()));
                // 用戶端連接配接未關閉前,readLine傳回值不為null,沒有資料時會阻塞
                while ((line = reader.readLine()) != null) {
                    writer.println("你輸入的是:" + line);
                    writer.flush();

                    // 通過約定特定輸入,結束通訊
                    if ("end".equals(line)) {
                        break;
                    }
                }
                writer.close();
                reader.close();
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}           

在本地啟動服務端程式,通過 telnet 請求連接配接。

《保姆式的詳解》Reactor 模式和 Java NIO

這個模式在并發請求量不高的情況下是完全沒有問題的,也建議使用這種模式。但在在高并發環境下,由于線程本身需要耗費系統資源,其次多線程下需要頻繁進行上下文切換也會消耗性能,再者連接配接建立後還需要等待用戶端資料到來,是以并不是最有效的解決方案。問題的關鍵在于兩點:

  1. 頻繁的上下文切換;
  2. 一個連接配接作為一個整體進行處理,粒度太粗。連接配接建立後,還需要等待資料可讀才能進行真正的處理。

基于事件驅動的 Reactor 模式

解決上述問題的方式就是分治法和事件驅動。即:

  1. 把一個連接配接的處理過程拆成多個小任務;
  2. 隻有在任務等待的事件觸發時才執行任務。

Reactor 模式的核心理念即是如此。Reactor 模式也叫 Dispatcher 模式,即 I/O 多路複用統一監聽事件,收到事件後配置設定(Dispatch)給某個程序。

論文《Reactor: An Object Behavioral Pattern forDemultiplexing and Dispatching Handles for Synchronous Events》詳細介紹了 Reactor 模式的設計和實作,下面先了解幾個關鍵的組成部分的作用:

主要概念

Handles

Handle(注意和 handler 區分開),中文常見翻譯為句柄。句柄的叫法比較抽象,這裡引用知乎的一個回答幫助大家了解。

句柄的英文是 handle。在英文中,有操作、處理、控制之類的意義。作為一個名詞時,是指某個中間媒介,通過這個中間媒介可控制、操作某樣東西。

這樣說有點抽象,舉個例子。door handle 是指門把手,通過門把手可以去控制門,但 door handle 并非 door 本身,隻是一個中間媒介。又比如 knife handle 是刀柄,通過刀柄可以使用刀。

跟 door handle 類似,我們可以用 file handle 去操作 file, 但 file handle 并非 file 本身。這個 file handle 就被翻譯成檔案句柄,同理還有各種資源句柄。

一個句柄代表作業系統的一個資源,例如檔案。event 會在句柄上觸發,是以監控 event 需要在句柄上等待。

Synchronous Event Demultiplexer

直譯過來就是同步事件分用器,它的作用是阻塞等待上面提到的一系列句柄集合産生事件。為什麼叫做分用器?因為它監控了多個句柄,當某一個事件發生了,它會傳回對應的那個句柄。一個常見的 I/O 事件分用器就是 select 系統調用。

Initiation Dispatcher

初始分發器,提供接口用于注冊事件處理器 Event Handler。當同步事件分發器檢測有一個句柄上産生事件時,會通知初始分發器,初始分發器通過查詢已注冊的事件處理器找到對應的處理對象。

Event Handler

事件處理器抽象類,定義了事件的處理方法。

Concrete Event Handler

具體的事件處理實作。

工作流程

《保姆式的詳解》Reactor 模式和 Java NIO

Java NIO 下實作 Reactor 模式

Doug Lea 在《Scalable IO in Java》中介紹 Reactor 模式時引入了三種角色:

  1. Reactor:負責響應 IO 事件,分發給對應的事件處理,相當于上文提到的 Initiation Dispatcher;
  2. Handler:事件處理器,與上文一緻;
  3. Acceptor:特殊的事件處理器,專門處理connection事件。

單 Reactor 單線程的實作方案:

《保姆式的詳解》Reactor 模式和 Java NIO

圖檔引用自李運華的《單伺服器高性能模式:Reactor 與 Proactor》一文

在給出具體實作前,需要先介紹下 Java NIO 的基本用法,它支援面向緩沖的,基于通道的 I/O 操作方法,主要由以下幾個核心部分組成:Channel、Buffer 和 Selector。

Channel 和 Buffer

一般翻譯為通道和緩存,資料可以從 Channel 讀到 Buffer 中,也可以從 Buffer 寫到 Channel 中。

《保姆式的詳解》Reactor 模式和 Java NIO

Selector

Selector 允許單線程處理多個 Channel。要使用 Selector,得向 Selector 注冊 Channel,然後調用它的 select() 方法。這個方法會一直阻塞到某個注冊的通道有事件就緒。一旦這個方法傳回,線程就可以處理這些事件,事件的例子有如新連接配接進來,資料接收等。

代碼示例

下面使用 Java NIO 實作單 Reactor 單線程。參考《Scalable IO in Java》的示例,需要實作三個角色,首先是 Reactor,負責事件監聽和分發。事件監聽使用 Selector 實作 IO 多路複用。從前面我們已經了解到,需要先建立 Channel,再向 Selector 注冊。

Java NIO 中定義了四種事件類型,分别是 OP_CONNECT(用戶端使用)、OP_ACCEPT(服務端使用)、OP_READ、OP_WRITE。

《保姆式的詳解》Reactor 模式和 Java NIO

顯然,Reactor 需要負責監聽 OP_ACCEPT 事件。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;

public class Reactor {
    final ServerSocketChannel serverSocket;
    final Selector selector;

    public Reactor(int port) throws IOException {
        // 建立Channel
        serverSocket = ServerSocketChannel.open();
        serverSocket.configureBlocking(false);

        // 綁定端口
        serverSocket.bind(new InetSocketAddress(port));

        // 建立Selector
        selector = Selector.open();
      
        // 向Selector注冊Channel,通過Selector監聽Channel上的 Accept事件
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    }
}           

當 Selector 監聽到 OP_ACCEPT 事件後,如前面所說,連接配接建立完成後會先放到一個連接配接隊列,還需要調用 accept() 從已完成連接配接的隊列中拿到連接配接進行處理。是以引入 Acceptor 角色來專門處理。Acceptor 和 Handler 都是事件處理器,為了減少篇幅,這裡用 Runnable 作為他們共同的抽象處理類。

Acceptor 為每個用戶端連接配接綁定一個對應的處理類執行個體,後續其他事件都由對應的事件處理類處理。需要說明的是,雖然這裡用 Runnable 作為處理類的抽象類,但并沒有當成線程來使用,而是當成普通的抽象類,直接調用run()方法執行。

import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class Acceptor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    public Acceptor(Selector selector, ServerSocketChannel serverSocket) {
        this.selector = selector;
        this.serverSocket = serverSocket;
    }
  
    @Override
    public void run() {
        try {
          	// 調用accept()從已完成連接配接的隊列中拿到連接配接進行處理
            SocketChannel clientSocket = serverSocket.accept();
          
            if (clientSocket != null) {
                // 綁定連接配接對應的事件處理器執行個體
                new Handler(selector, clientSocket);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}           

接下來需要打通 Reactor 和 Acceptor 兩個角色的聯系:當Selector監聽到 OP_ACCEPT 事件後,把事件分發給 Acceptor 。在前面的基礎上,我們在 Reactor 角色上加上事件分發邏輯:

  1. 輪詢 select() 擷取有事件就緒的通道;
  2. 擷取所有就緒通道對應的 SelectedKey 集合;
  3. 依次分發處理 SelectedKey 執行個體;
  • 每個 SelectedKey 都附着了對應的處理器執行個體,擷取執行個體;
  • 執行每個執行個體的處理方法。

ServerSocketChannel 注冊到 Selector 後,生成對應的 SelectedKey 對象,在上面附着 Acceptor 執行個體。當 ServerSocketChannel 監聽的事件 OP_ACCEPT 就緒時,Selector.select() 傳回對應的 SelectedKey,進而能擷取到 Acceptor 執行個體,進而由 Acceptor 接管連接配接處理事件。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Set;

public class Reactor {
    final ServerSocketChannel serverSocket;
    final Selector selector;

    public Reactor(int port) throws IOException {
        // 建立Channel
        serverSocket = ServerSocketChannel.open();
        serverSocket.configureBlocking(false);

        // 綁定端口
        serverSocket.bind(new InetSocketAddress(port));

        // 建立Selector
        selector = Selector.open();

        // 把Channel注冊到Selector進行監聽,監聽連接配接的 Accept事件
        SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        
      // 可以将一個對象或者更多資訊附着到SelectionKey上,這樣就能友善的識别某個給定的通道
        selectionKey.attach(new Acceptor(selector, serverSocket));
    }

    public void run() {
        while (!Thread.interrupted()) {
            try {
                // 一直阻塞到某個注冊的通道有事件就緒
                selector.select();

                // 每一個注冊的通道都對應一個SelectionKey
                Set<SelectionKey> selected = selector.selectedKeys();
                for (SelectionKey selectionKey : selected) {
                    // 分發處理
                    dispatch(selectionKey);
                }
              
                // 移除避免重複處理
                selected.clear();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey selectionKey) {
        // 雖然這裡用 Runnable 作為處理類的抽象類,但并沒有當成線程來使用,而是當成普通的抽象類,直接調用run()方法執行
        Runnable r = (Runnable) selectionKey.attachment();
        if (r != null) {
            r.run();
        }
    }
}           

目前為止,我們通過在 Selector 上注冊 OP_ACCEPT 事件打通了 Reactor 和 Acceptor,Acceptor 為每個連接配接綁定了對應的 Handler 執行個體。當讀緩沖區有資料可讀時,我們希望響應事件,并且由Handler 來處理。是以,仿照前面的做法:

  1. 需要在 Selector 上注冊 OP_READ ;
  2. 在 Selector 傳回的 SelectedKey 上附着 Handler 執行個體。
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class Handler implements Runnable {
    final Selector selector;
    final SocketChannel socket;
    final SelectionKey sk;

    public Handler(Selector selector, SocketChannel socket) throws IOException {
        this.selector = selector;
        this.socket = socket;
        this.socket.configureBlocking(false);

        // 把Channel注冊到Selector進行監聽,監聽讀緩沖區資料就緒事件
        sk = socket.register(selector, SelectionKey.OP_READ);

        // 把事件處理類本身附着到SelectionKey,友善識别通道和後續處理
        sk.attach(this);
    }

    @Override
    public void run() {
    }
}           

接下來完善讀事件和寫事件的處理邏輯。

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class Handler implements Runnable {
    final Selector selector;
    final SocketChannel socket;
    final SelectionKey sk;

    ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
    ByteBuffer outputBuffer = ByteBuffer.allocate(1024);

    public Handler(Selector selector, SocketChannel socket) throws IOException {
        this.selector = selector;
        this.socket = socket;
        this.socket.configureBlocking(false);

        // 把Channel注冊到Selector進行監聽,監聽讀緩沖區資料就緒事件
        sk = socket.register(selector, SelectionKey.OP_READ);

        // 把事件處理類本身附着到SelectionKey,友善識别通道和後續處理
        sk.attach(this);
    }

    @Override
    public void run() {
        if (sk.isReadable()) {
            read();
        } else if (sk.isWritable()) {
            write();
        }
    }

    public void read() {
        try {
            while (socket.read(inputBuffer) > 0) {
                inputBuffer.flip();
                while(inputBuffer.hasRemaining()){
                    System.out.print((char) inputBuffer.get());
                }
                inputBuffer.clear();

                // 注冊監聽寫緩沖區空閑事件,與前面通過channel注冊到selector的寫法等價
                sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void write() {
        try {
            outputBuffer.put("YYDS-ABCD-EFG\n".getBytes());
            outputBuffer.flip();
            while (outputBuffer.hasRemaining()) {
                socket.write(outputBuffer);
            }
            outputBuffer.clear();
            // 取消監聽緩沖區空閑事件
            sk.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}           

啟動程式

import java.io.IOException;

public class Main {
    public static void main(String[] args) throws IOException {
        Reactor reactor = new Reactor(5566);
        reactor.run();
    }
}           

在本地啟動服務端程式,通過 telnet 發起請求驗證。

補充說明

水準觸發和邊緣觸發

Java 的 NIO 屬于水準觸發,即條件觸發,在使用 Java 的 NIO 程式設計的時候,在沒有資料可以往外寫的時候要取消寫事件,在有資料往外寫的時候再注冊寫事件。

水準觸發(level-triggered,也被稱為條件觸發)LT: 隻要滿足條件,就觸發一個事件(隻要有資料沒有被擷取,核心就不斷通知你)

邊緣觸發(edge-triggered)ET: 每當狀态變化時,觸發一個事件。

如果你覺得這篇文章對你有幫助 點贊關注,然後私信回複【888】即可擷取Java進階全套視訊以及源碼學習資料

《保姆式的詳解》Reactor 模式和 Java NIO

繼續閱讀