天天看點

Java NIO Reactor網絡程式設計模型的深度了解1 Reactor模型的介紹2 單Reactor單線程模式3 單Reactor多線程模式4 多Reactor多線程模式

詳細介紹了Reactor網絡程式設計模型的概念。

此前,我們介紹了了Java中常見的四種網絡IO模型:Java 四種常見網絡IO模型以及select、poll、epoll函數的簡單介紹。現在我們來看看基于IO多路複用演變而來的Reactor網絡程式設計模型。

文章目錄

  • 1 Reactor模型的介紹
  • 2 單Reactor單線程模式
    • 2.1 僞代碼
  • 3 單Reactor多線程模式
    • 3.1 僞代碼
  • 4 多Reactor多線程模式
    • 4.1 僞代碼

1 Reactor模型的介紹

常見的網絡程式設計模型并不是最基本的四種網絡IO模型,因為這涉及到了底層代碼的編寫,大佬們在基本網絡IO模型的基礎上采用面向對象的方式進行了進一步封裝,形成了更加易于了解的Reactor、Proactor、Acceptor-Connector等程式設計模型。Reactor模型是最常見的網絡程式設計模型,大名鼎鼎的Netty、Tomcat等架構或者軟體都是使用Reactor模型實作高并發、高性能的網絡通信。

Java并發程式設計之父Doug Lea早在多年之前就對Reactor模型進行了詳盡的闡述:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf,Java nio包中的

Selector

就是基于最簡單的Reactor模型實作的,在Doug Lea的文章中還給出了一些使用案例。

IO 多路複用模型我們此前就學過了,好處主要有兩個:一是該模型能夠在同一個線程内同時監聽多個IO請求,系統不必建立大量的線程,進而大大減小了系統的開銷。二是阻塞等待的方式能減少無效的系統調用,減少了對 CPU 資源的消耗。

Reactor模型是IO 多路複用模型的進一步面向對象的封裝,讓使用者不用考慮底層網絡 API 的細節,隻需要關注應用代碼的編寫。Reactor直譯過來就是反應器,這裡的反應是指對事件的反應:當IO多路複用程式監聽并收到事件通知之後,根據事件類型配置設定給不同的處理器處理,是以Reactor 模型也被稱為 Dispatcher (分派器)模型,或者稱為基于事件驅動的模型。

Reactor模型可以抽象出兩個重要元件:

  1. Reactor,專門用于監聽和響應各種IO事件,比如連接配接建立就緒(ACCEPT)、讀就緒(READ)、寫就緒(WRITE)等,當檢測到一個新的事件到來時,将其發送給相應的Handler去處理。
  2. Handler,專門用于處理特定的事件,比如讀取資料,業務邏輯執行,寫出響應等。

Reactor模型發展至今,包含了多種實作,常見的有單Reactor單線程模式、單Reactor多線程模式,多Reactor多線程模式。

2 單Reactor單線程模式

Doug Lea文章中給出的該模式的流程圖如下:

Java NIO Reactor網絡程式設計模型的深度了解1 Reactor模型的介紹2 單Reactor單線程模式3 單Reactor多線程模式4 多Reactor多線程模式

上圖中,Reactor用于監聽各種IO事件,并配置設定(dispatch)給特定的Handler,accepter元件專門用于處理建立連接配接事件,可以看做是一個特殊的Handler。

總體流程為:

  1. 服務端的Reactor 線程對象通過循環 select調用(IO 多路複用)監聽各種IO事件,還會注冊一個accepter事件處理器到Reactor中,accepter專用于處理建立連接配接事件。
  2. 用戶端首先發起一個建立連接配接的請求,Reactor監聽到ACCEPT事件的到來後将該ACCEPT事件分派給accepter元件,accepter通過accept()方法與用戶端建立對應的連接配接(SocketChannel),然後将該連接配接所關注的READ事件以及對應的READ事件處理器注冊到Reactor中,這樣Reactor就會監聽該連接配接的READ事件。
  3. 當Reactor監聽到該連接配接有讀或者寫事件發生時,将相關的事件派發給對應的處理器進行處理。比如,讀處理器會通過SocketChannel的read()方法直接讀取到資料,随後可進行各種業務處理,之後需要向用戶端發送資料時,也可以注冊該連接配接的WRITE事件和其對應的處理器,當channel可寫時,通過SocketChannel的wtite()方法寫資料。
  4. 每當處理完所有就緒的IO事件後,Reactor線程會再次執行select()操作阻塞等待新的事件就緒并将其分派給對應處理器進行處理。

單Reactor單線程模式的意思就是以上的Reactor和Hander的所有操作都是在同一個線程中完成的。上面的select、accept、read、wtite等調用以及業務邏輯處理,都是在同一個線程中完成的。

單Reactor單線程模式是最基礎的Reactor模型,實作起來比較簡單,由于是單線程,業務代碼編寫也不用考慮有任何的并發問題,Java的NIO模式的Selector底層其實就是最簡單的單Reactor單線程模式。

但是單Reactor單線程模式隻有一個線程工作,無法充分利用現代多核CPU的性能,并且如果某個client的業務邏輯耗時較長,将會造成後續其他client的請求阻塞執行。

因為Redis的業務處理主要都是在記憶體中完成,記憶體操作的速度很快,Redis性能瓶頸不在 CPU 上(在網絡IO的消耗以及記憶體上),加上這種模式實作起來也很簡單,是以Redis 6之前的對于指令的執行也是單Reactor單線程模型。

但是在Redis 6之後還是引入了多線程機制(多線程真香),但Redis 的多線程隻是在網絡IO資料的讀寫這類耗時操作上使用,降低了網絡IO帶來的性能損耗,而實際執行指令(Handler)仍然是單線程順序執行的,是以也不需要擔心Redis的線程安全問題。

2.1 僞代碼

Doug Lea文章中給出的單Reactor單線程模式的僞代碼如下:

Reactor:

/**
 * Reactor
 * 負責監聽并分發事件
 */
class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException { //Reactor初始化
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        //socket設定為非阻塞
        serverSocket.configureBlocking(false);

        //注冊監聽accept事件
        SelectionKey sk =
                serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        //注冊一個Acceptor作為accept事件的回調
        sk.attach(new Acceptor());
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                //循環調用select等到事件就緒
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                    //Reactor負責dispatch收到的事件
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        //調用之前注冊的callback對象
        if (r != null) {
            r.run();
        }
    }

    /**
     * Acceptor
     * 處理accept事件的回調函數
     */
    class Acceptor implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel channel = serverSocket.accept();
                if (channel != null) {
                    new Handler(selector, channel);
                }
            } catch (IOException ex) { /* ... */ }
        }
    }
}
           

Handler:

class Handler implements Runnable {
    final SocketChannel channel;
    final SelectionKey sk;
    final int MAXIN = 2048;
    final int MAXOUT = 2048;
    //配置設定緩沖區
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        c.configureBlocking(false);
        // 預設不注冊任何事件
        // 0表示對這個channel的任何事件都不感興趣,這樣會導緻永遠select不到這個channel
        sk = channel.register(selector, 0);

        //将目前Handler對象作為事件就緒時的callback對象
        sk.attach(this);

        //注冊Read就緒事件
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    boolean inputIsComplete() {
        /* ... */
        return false;
    }

    boolean outputIsComplete() {

        /* ... */
        return false;
    }

    void process() {
        /* ... */
        return;
    }

    @Override
    public void run() {
        try {
            //處理讀就緒事件
            if (state == READING) {
                read();
                //處理寫就緒事件
            } else if (state == SENDING) {
                send();
            }
        } catch (IOException ex) { /* ... */ }
    }

    /**
     * 處理讀就緒事件
     */
    void read() throws IOException {
        //讀取資料
        channel.read(input);
        if (inputIsComplete()) {
            //處理資料
            process();

            state = SENDING;
            //資料處理完畢,需要寫資料
            //開始監聽寫就緒事件
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    /**
     * 處理寫就緒事件
     */
    void send() throws IOException {
        channel.write(output);

        //write完就表示一次事件處理完畢結,關閉SelectionKey
        if (outputIsComplete()) {
            sk.cancel();
        }
    }
}
           

3 單Reactor多線程模式

為了克服單Reactor單線程模型下無法利用多核CPU的優勢以及可能因為某個請求的業務執行時間過長造成後續請求IO阻塞的問題,發展出了單Reactor多線程模型。

Doug Lea文章中給出的該模式的流程圖如下:

Java NIO Reactor網絡程式設計模型的深度了解1 Reactor模型的介紹2 單Reactor單線程模式3 單Reactor多線程模式4 多Reactor多線程模式

上圖中,單個Reactor線程用于監聽各種IO事件,并配置設定(dispatch)給特定的Handler,這一點和單Reactor單線程模型是一樣的,差別該改模型還添加了一個工作線程池,将非IO操作(除了read、send調用之外的業務操作)從Reactor線程中移出轉交給工作線程池來并發的執行。

總體流程為:

  1. 服務端的Reactor 線程對象通過循環 select調用(IO 多路複用)監聽各種IO事件,還會注冊一個accepter事件處理器到Reactor中,accepter專用于處理建立連接配接事件。
  2. 用戶端首先發起一個建立連接配接的請求,Reactor監聽到ACCEPT事件的到來後将該ACCEPT事件分派給accepter元件,accepter通過accept()方法與用戶端建立對應的連接配接(SocketChannel),然後将該連接配接所關注的READ事件以及對應的READ事件處理器注冊到Reactor中,這樣Reactor就會監聽該連接配接的READ事件。
  3. 當Reactor監聽到該連接配接有讀或者寫事件發生時,将相關的事件派發給對應的處理器進行處理。比如,讀處理器會通過SocketChannel的read()方法直接讀取到資料,随後可進行各種業務處理,之後需要向用戶端發送資料時,也可以注冊該連接配接的WRITE事件和其對應的處理器,當channel可寫時,通過SocketChannel的wtite()方法寫資料。
    1. 這裡和單Reactor單線程模型的不同點就是,Reactor線程隻負責Hander中的網絡IO調用,即read讀取資料和send發送資料調用,讀取到資料之後的處理,比如反序列化、執行業務邏輯、序列化等操作則是通過一個線程池來并行執行的。
  4. 每當處理完所有就緒的IO事件後,Reactor線程會再次執行select()操作阻塞等待新的事件就緒并将其分派給對應處理器進行處理。

該模式中,Handler處理時除了read和send調用之外的其他業務邏輯都是多線程執行的,這樣就可以讓Reactor線程更快的進行下一輪的select操作,提升了對于請求的IO響應速度,不至于因為一些耗時的業務邏輯而延遲對後面IO請求的處理。

該模式中能夠充分利用多核 CPU 性能,但是會帶來多線程并發的問題,對于業務邏輯的編寫需要特别注意共享資料的處理。

另外,雖然該模式下業務處理使用了異步執行,效率有所提升,但是仍然是采用單個 Reactor 線程承擔所有事件的監聽和基本IO操作,比如accept、read、send、connect操作,在面對瞬間到來的成百上千個連接配接這樣的高并發場景時,仍然會成為性能瓶頸。

3.1 僞代碼

Doug Lea文章中給出的單Reactor多線程模式的僞代碼如下:

Handler類變成了支援多線程處理業務的MthreadHandler,Reactor類沒有太大變化,在Acceptor中,new Handler變成了new MthreadHandler:

class MthreadHandler implements Runnable {
    final SocketChannel channel;
    final SelectionKey selectionKey;
    final int MAXIN = 2048;
    final int MAXOUT = 2048;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    /**
     * 設定一個靜态線程池
     */
    static ExecutorService pool = Executors.newFixedThreadPool(2);

    static final int PROCESSING = 3;

    MthreadHandler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        c.configureBlocking(false);
        selectionKey = channel.register(selector, 0);
        selectionKey.attach(this);
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    boolean inputIsComplete() {
        /* ... */
        return false;
    }

    boolean outputIsComplete() {

        /* ... */
        return false;
    }


    @Override
    public void run() {
        try {
            if (state == READING) {
                read();
            } else if (state == SENDING) {
                send();
            }
        } catch (IOException ex) { /* ... */ }
    }


    synchronized void read() throws IOException {
        //接受資料
        channel.read(input);
        if (inputIsComplete()) {
            state = PROCESSING;
            /*
             * 使用線程池中的線程異步的處理資料,執行業務邏輯
             *
             * 該調用執行之後Reactor線程可以馬上傳回,不需要等到業務執行完畢
             */
            pool.execute(new Processer());
        }
    }

    void send() throws IOException {
        channel.write(output);

        if (outputIsComplete()) {
            selectionKey.cancel();
        }
    }

    /**
     * 異步任務
     */
    class Processer implements Runnable {
        @Override
        public void run() {
            processAndHandOff();
        }
    }


    synchronized void processAndHandOff() {
        //執行業務
        process();

        state = SENDING;
        //資料處理完畢,需要寫資料
        //開始監聽寫就緒事件
        selectionKey.interestOps(SelectionKey.OP_WRITE);
    }


    void process() {
        /* ... */
        return;
    }

}
           

4 多Reactor多線程模式

為了不讓單個Reactor成為性能瓶頸,我們可以繼續改造,将一個Reactor的功能拆分為“連接配接用戶端”和“與用戶端通信”兩部分,由不同的Reactor執行個體(多個Reactor線程)來共同完成,這就是多Reactor多線程模式,也被稱為Reactor主從多線程模式。

Doug Lea文章中給出的該模式的流程圖如下:

Java NIO Reactor網絡程式設計模型的深度了解1 Reactor模型的介紹2 單Reactor單線程模式3 單Reactor多線程模式4 多Reactor多線程模式

mainReactor擁有自己的Selector,通過 select 專門監控連接配接建立事件,事件準備就緒後通過 Acceptor 對象中的 accept 檢核與用戶端的連接配接,随後将新的連接配接配置設定給某個subReactor,subReactor也有自己的Selector,在subReactor中對該連接配接繼續進行監聽并執行其他事件,比如讀就緒和寫就緒事件,這樣就将Reactor的工作分為兩部分,這兩部分可以在獨立的線程中執行,進一步提升性能。

總體流程為:

  1. 服務端的mainReactor線程通過循環 select調用(IO 多路複用)監聽連接配接建立事件,還會注冊一個accepter事件處理器到Reactor中,accepter專用于處理建立連接配接事件。
  2. 用戶端首先發起一個建立連接配接的請求,mainReactor監聽到ACCEPT事件的到來後将該ACCEPT事件分派給accepter元件,accepter通過accept()方法與用戶端建立對應的連接配接(SocketChannel),然後将該連接配接配置設定給一個subReactor。随後mainReactor線程傳回,繼續執行下一輪的select監聽操作。
  3. subReactor也有自己的Selector,它會将該連接配接将所關注的READ事件以及對應的READ事件處理器注冊并通過select監聽該連接配接的READ事件。
  4. 當subReactor監聽到該連接配接有讀或者寫事件發生時,将相關的事件派發給對應的處理器進行處理。比如,讀處理器會通過SocketChannel的read()方法直接讀取到資料,随後可進行各種業務處理,之後需要向用戶端發送資料時,也可以注冊該連接配接的WRITE事件和其對應的處理器,當channel可寫時,通過SocketChannel的wtite()方法寫資料。
  5. subReactor線程隻負責Hander中的網絡IO調用,即read讀取資料和send發送資料調用,讀取到資料之後的處理,比如反序列化、執行業務邏輯、序列化等操作則是通過一個線程池來并行執行的。
  6. 每當處理完所有就緒的IO事件後,subReactor線程會再次執行select()操作阻塞等待新的事件就緒并将其分派給對應處理器進行處理。

多 Reactor 多線程模式中,mainReactor和subReactor都可以有多個,每一個都有自己的Selector,都在一個獨立的線程中工作,這樣進一步利用了多核CPU的多線程優勢,讓Reactor不會輕易成為性能瓶頸,提升了連接配接速度以及IO讀寫的速度。

但多 Reactor 多線程模式仍然不能從根源上解決耗時的IO操作對其他的client的影響,因為一個subReactor仍有可能對應多個client連接配接,為此,可以使用真正的異步IO模型演化而來的設計模式—Proactor模式來實作真正的異步IO。

Netty 和 Memcached 都是采用的多 Reactor 多線程模式。Nginx 也是采用多 Reactor 多程序模式。實際上Netty的多 Reactor 多線程模式實作更為簡單,subReactor處理read、write等IO操作的同時還處理業務的執行,即去掉了工作者線程池(Thread Pool),或者說SubReactor和Worker線程在同一個線程池中:

  1. mainReactor對應Netty中配置的BossGroup線程組,主要負責接受用戶端連接配接的建立。一般隻暴露一個服務端口,BossGroup線程組一般一個線程工作即可
  2. subReactor對應Netty中配置的WorkerGroup線程組,BossGroup線程組接受并建立完用戶端的連接配接後,将網絡socket轉交給WorkerGroup線程組,然後在WorkerGroup線程組内選擇一個線程,進行I/O的處理。WorkerGroup線程組主要處理I/O,一般設定2*CPU核數個線程。

Netty 可以通過配置的參數同時支援 Reactor 單線程模型、多線程模型,預設模式時上面的多 Reactor 多線程模式變體。

4.1 僞代碼

Doug Lea文章中給出的多Reactor多線程模式的僞代碼如下:

class MthreadReactor implements Runnable {

    /**
     * subReactors集合, 一個selector代表一個subReactor
     */
    Selector[] selectors = new Selector[2];

    int next = 0;
    final ServerSocketChannel serverSocket;
    /**
     * mainSelector
     */
    final Selector selector;

    MthreadReactor(int port) throws IOException {
        selector = Selector.open();
        selectors[0] = Selector.open();
        selectors[1] = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);


        //監聽accept事件
        SelectionKey sk =
                serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        //Acceptor用于建立連接配接
        sk.attach(new Acceptor());
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                    //Reactor負責dispatch收到的事件
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        //調用之前注冊的callback對象
        if (r != null) {
            r.run();
        }
    }


    class Acceptor implements Runnable { // ...
        @SneakyThrows
        @Override
        public synchronized void run() {
            //mainSelector負責accept建立連接配接
            try (SocketChannel connection = serverSocket.accept()) {
                //連接配接建立之後将連接配接傳給一個subSelector,監聽read和write事件
                if (connection != null) {
                    new Handler(selectors[next], connection); //選個subReactor去負責接收到的connection
                }
            }
            if (++next == selectors.length) {
                next = 0;
            }
        }
    }
}
           

參考資料:

  1. Scalable IO in Java(Doug Lea)
  2. Reactor模式詳解
  3. 原來 8 張圖,就能學廢 Reactor 和 Proactor
如有需要交流,或者文章有誤,請直接留言。另外希望點贊、收藏、關注,我将不間斷更新各種Java學習部落格!