天天看點

Reactor模型講解一、什麼是Reactor模型二、為什麼要用Reactor三、Reactor的組成四、Reactor模型的發展與種類

一、什麼是Reactor模型

        反應器設計模式(Reactor pattern)是一種為處理并發服務請求,并将請求送出到一個或

者多個服務處理程式的事件設計模式。當用戶端請求抵達後,服務處理程式使用多路配置設定政策,由一個非阻塞的線程來接收所有的請求,然後派發這些請求至相關的工作線程進行處理。

關于reactor 是什麼,我們先從wiki上看下:

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

從上述文字中我們可以看出以下關鍵點 :

  1. 事件驅動(event handling)
  2. 可以處理一個或多個輸入源(one or more inputs)
  3. 通過Service Handler同步的将輸入事件(Event)采用多路複用分發給相應的Request Handler(多個)處理
Reactor模型講解一、什麼是Reactor模型二、為什麼要用Reactor三、Reactor的組成四、Reactor模型的發展與種類

自POSA2 中的關于Reactor Pattern 介紹中,我們了解了Reactor 的處理方式:

  1. 同步的等待多個事件源到達(采用select()實作)
  2. 将事件多路分解以及配置設定相應的事件服務進行處理,這個分派采用server集中處理(dispatch)
  3. 分解的事件以及對應的事件服務應用從分派服務中分離出去(handler)

二、為什麼要用Reactor

        常見的網絡服務中,如果每一個用戶端都維持一個與登陸伺服器的連接配接。那麼伺服器将維護多個和用戶端的連接配接以出來和用戶端的contnect 、read、write ,特别是對于長連結的服務,有多少個c端,就需要在s端維護同等的IO連接配接。這對伺服器來說是一個很大的開銷。

三、Reactor的組成

首先我們基于Reactor Pattern 處理模式中,定義以下三種角色:

  • Reactor 将I/O事件分派給對應的Handler
  • Acceptor 處理用戶端新連接配接,并分派請求到處理器鍊中
  • Handlers 執行非阻塞讀/寫 任務

這是最基本的單Reactor單線程模型。其中Reactor線程,負責多路分離套接字,有新連接配接到來觸發connect 事件之後,交由Acceptor進行處理,有IO讀寫事件之後交給hanlder 處理。

Acceptor主要任務就是建構handler ,在擷取到和client相關的SocketChannel之後 ,綁定到相應的hanlder上,對應的SocketChannel有讀寫事件之後,基于racotor 分發,hanlder就可以處理了(所有的IO事件都綁定到selector上,有Reactor分發)。

該模型 适用于處理器鍊中業務處理元件能快速完成的場景。不過,這種單線程模型不能充分利用多核資源,是以實際使用的不多。

四、Reactor模型的發展與種類

4.1 單Reactor單線程模型(也叫單線程模式)

Reactor模型講解一、什麼是Reactor模型二、為什麼要用Reactor三、Reactor的組成四、Reactor模型的發展與種類

        這是最簡單的單Reactor單線程模型。Reactor線程是個多面手,負責多路分離套接字,Accept新連接配接,并分派請求到Handler處理器中。 

下面的圖,來自于“Scalable IO in Java”,和上面的圖的意思,差不多。Reactor和Hander 處于一條線程執行。

Reactor模型講解一、什麼是Reactor模型二、為什麼要用Reactor三、Reactor的組成四、Reactor模型的發展與種類

順便說一下,可以将上圖的accepter,看做是一種特殊的handler。

4.1.1 單線程Reactor的參考代碼

“Scalable IO in Java”,實作了一個單線程Reactor的參考代碼,Reactor的代碼如下:

package com.crazymakercircle.ReactorModel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class Reactor implements Runnable
{
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException
    { //Reactor初始化
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        //非阻塞
        serverSocket.configureBlocking(false);

        //分步處理,第一步,接收accept事件
        SelectionKey sk =
                serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        //attach callback object, Acceptor
        sk.attach(new Acceptor());
    }

    public void run()
    {
        try
        {
            while (!Thread.interrupted())
            {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                {
                    //Reactor負責dispatch收到的事件
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException ex)
        { /* ... */ }
    }

    void dispatch(SelectionKey k)
    {
        Runnable r = (Runnable) (k.attachment());
        //調用之前注冊的callback對象
        if (r != null)
        {
            r.run();
        }
    }

    // inner class
    class Acceptor implements Runnable
    {
        public void run()
        {
            try
            {
                SocketChannel channel = serverSocket.accept();
                if (channel != null)
                    new Handler(selector, channel);
            } catch (IOException ex)
            { /* ... */ }
        }
    }
}
           

Handler的代碼如下:

package com.crazymakercircle.ReactorModel;


import com.crazymakercircle.config.SystemConfig;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

class Handler implements Runnable
{
    final SocketChannel channel;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
    ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector selector, SocketChannel c) throws IOException
    {
        channel = c;
        c.configureBlocking(false);
        // Optionally try first read now
        sk = channel.register(selector, 0);

        //将Handler作為callback對象
        sk.attach(this);

        //第二步,注冊Read就緒事件
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    boolean inputIsComplete()
    {
        /* ... */
        return false;
    }

    boolean outputIsComplete()
    {

        /* ... */
        return false;
    }

    void process()
    {
        /* ... */
        return;
    }

    public void run()
    {
        try
        {
            if (state == READING)
            {
                read();
            }
            else if (state == SENDING)
            {
                send();
            }
        } catch (IOException ex)
        { /* ... */ }
    }

    void read() throws IOException
    {
        channel.read(input);
        if (inputIsComplete())
        {

            process();

            state = SENDING;
            // Normally also do first write now

            //第三步,接收write就緒事件
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException
    {
        channel.write(output);

        //write完就結束了, 關閉select key
        if (outputIsComplete())
        {
            sk.cancel();
        }
    }
}
           

這兩段代碼,是建立在JAVA NIO的基礎上的,這兩段代碼建議一定要看懂。可以在IDE中去看源碼,這樣直覺感覺更佳。 

4.1.2 單線程模式的缺點

  1. 當其中某個 handler 阻塞時, 會導緻其他所有的 client 的 handler 都得不到執行, 并且更嚴重的是, handler 的阻塞也會導緻整個服務不能接收新的 client 請求(因為 acceptor 也被阻塞了)。 因為有這麼多的缺陷, 是以單線程Reactor 模型用的比較少。這種單線程模型不能充分利用多核資源,是以實際使用的不多。
  2. 是以,單線程模型僅僅适用于handler 中業務處理元件能快速完成的場景。

4.2 單Reactor多線程模型(也叫多線程模式)

4.2.1 基于線程池的改進

線上程Reactor模式基礎上,做如下改進:

  1. 将Handler處理器的執行放入線程池,多線程進行業務處理。
  2. 而對于Reactor而言,可以仍為單個線程。如果伺服器為多核的CPU,為充分利用系統資源,可以将Reactor拆分為兩個線程。

一個簡單的圖如下:

Reactor模型講解一、什麼是Reactor模型二、為什麼要用Reactor三、Reactor的組成四、Reactor模型的發展與種類

4.2.2 改進後的完整示意圖

下面的圖,來自于“Scalable IO in Java”,和上面的圖的意思,差不多,隻是更加詳細。Reactor是一條獨立的線程,Hander 處于線程池中執行。

Reactor模型講解一、什麼是Reactor模型二、為什麼要用Reactor三、Reactor的組成四、Reactor模型的發展與種類

4.2.3 多線程模式參考代碼

“Scalable IO in Java”,的多線程Reactor的參考代碼,是基于單線程做一個線程池的改進,改進的Handler的代碼如下:

package com.crazymakercircle.ReactorModel;


import com.crazymakercircle.config.SystemConfig;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class MthreadHandler implements Runnable
{
    final SocketChannel channel;
    final SelectionKey selectionKey;
    ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
    ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
    static final int READING = 0, SENDING = 1;
    int state = READING;


    ExecutorService pool = Executors.newFixedThreadPool(2);
    static final int PROCESSING = 3;

    MthreadHandler(Selector selector, SocketChannel c) throws IOException
    {
        channel = c;
        c.configureBlocking(false);
        // Optionally try first read now
        selectionKey = channel.register(selector, 0);

        //将Handler作為callback對象
        selectionKey.attach(this);

        //第二步,注冊Read就緒事件
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    boolean inputIsComplete()
    {
       /* ... */
        return false;
    }

    boolean outputIsComplete()
    {

       /* ... */
        return false;
    }

    void process()
    {
       /* ... */
        return;
    }

    public void run()
    {
        try
        {
            if (state == READING)
            {
                read();
            }
            else if (state == SENDING)
            {
                send();
            }
        } catch (IOException ex)
        { /* ... */ }
    }


    synchronized void read() throws IOException
    {
        // ...
        channel.read(input);
        if (inputIsComplete())
        {
            state = PROCESSING;
            //使用線程pool異步執行
            pool.execute(new Processer());
        }
    }

    void send() throws IOException
    {
        channel.write(output);

        //write完就結束了, 關閉select key
        if (outputIsComplete())
        {
            selectionKey.cancel();
        }
    }

    synchronized void processAndHandOff()
    {
        process();
        state = SENDING;
        // or rebind attachment
        //process完,開始等待write就緒
        selectionKey.interestOps(SelectionKey.OP_WRITE);
    }

    class Processer implements Runnable
    {
        public void run()
        {
            processAndHandOff();
        }
    }

}
           

4.3 多Reactor多線程模型(也叫主從多線程模式)

Reactor模型講解一、什麼是Reactor模型二、為什麼要用Reactor三、Reactor的組成四、Reactor模型的發展與種類

第三種模型比起第二種模型,是将Reactor分成兩部分,

  1. mainReactor負責監聽server socket,用來處理新連接配接的建立,将建立的socketChannel指定注冊給subReactor。
  2. subReactor維護自己的selector, 基于mainReactor 注冊的socketChannel多路分離IO讀寫事件,讀寫網 絡資料,對業務處理的功能,另其扔給worker線程池來完成。

第三種模型中,我們可以看到,mainReactor 主要是用來處理網絡IO 連接配接建立操作,通常一個線程就可以處理,而subReactor主要做和建立起來的socket做資料互動和事件業務處理操作,它的個數上一般是和CPU個數等同,每個subReactor一個縣城來處理。

此種模型中,每個子產品的工作更加專一,耦合度更低,性能和穩定性也大量的提升,支援的可并發用戶端數量可達到上百萬級别。

關于此種模型的應用,目前有很多優秀的礦建已經在應用了,比如mina 和netty 等。上述中去掉線程池的第三種形式的變種,也 是Netty NIO的預設模式。下一節我們将着重講解netty的架構模式。

4.3.1 主從多線程模式參考代碼

對于多個CPU的機器,為充分利用系統資源,将Reactor拆分為兩部分。代碼如下:

package com.crazymakercircle.ReactorModel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class MthreadReactor implements Runnable
{

    //subReactors集合, 一個selector代表一個subReactor
    Selector[] selectors=new Selector[2];
    int next = 0;
    final ServerSocketChannel serverSocket;

    MthreadReactor(int port) throws IOException
    { //Reactor初始化
        selectors[0]=Selector.open();
        selectors[1]= Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        //非阻塞
        serverSocket.configureBlocking(false);


        //分步處理,第一步,接收accept事件
        SelectionKey sk =
                serverSocket.register( selectors[0], SelectionKey.OP_ACCEPT);
        //attach callback object, Acceptor
        sk.attach(new Acceptor());
    }

    public void run()
    {
        try
        {
            while (!Thread.interrupted())
            {
                for (int i = 0; i <2 ; i++)
                {
                    selectors[i].select();
                    Set selected =  selectors[i].selectedKeys();
                    Iterator it = selected.iterator();
                    while (it.hasNext())
                    {
                        //Reactor負責dispatch收到的事件
                        dispatch((SelectionKey) (it.next()));
                    }
                    selected.clear();
                }

            }
        } catch (IOException ex)
        { /* ... */ }
    }

    void dispatch(SelectionKey k)
    {
        Runnable r = (Runnable) (k.attachment());
        //調用之前注冊的callback對象
        if (r != null)
        {
            r.run();
        }
    }


    class Acceptor { // ...
        public synchronized void run() throws IOException
        {
            SocketChannel connection =
                    serverSocket.accept(); //主selector負責accept
            if (connection != null)
            {
                new Handler(selectors[next], connection); //選個subReactor去負責接收到的connection
            }
            if (++next == selectors.length) next = 0;
        }
    }
}
           

4.3.2 主從多線程模式優缺點

優點

  1. 響應快,不必為單個同步時間所阻塞,雖然Reactor本身依然是同步的;
  2. 程式設計相對簡單,可以最大程度的避免複雜的多線程及同步問題,并且避免了多線程/程序的切換開銷;
  3. 可擴充性,可以友善的通過增加Reactor執行個體個數來充分利用CPU資源;
  4. 可複用性,reactor架構本身與具體事件處理邏輯無關,具有很高的複用性;

缺點

  1. 相比傳統的簡單模型,Reactor增加了一定的複雜性,因而有一定的門檻,并且不易于調試。
  2. Reactor模式需要底層的Synchronous Event Demultiplexer支援,比如Java中的Selector支援,作業系統的select系統調用支援,如果要自己實作Synchronous Event Demultiplexer可能不會有那麼高效。
  3. Reactor模式在IO讀寫資料時還是在同一個線程中實作的,即使使用多個Reactor機制的情況下,那些共享一個Reactor的Channel如果出現一個長時間的資料讀寫,會影響這個Reactor中其他Channel的相應時間,比如在大檔案傳輸時,IO操作就會影響其他Client的相應時間,因而對這種操作,使用傳統的Thread-Per-Connection或許是一個更好的選擇,或則此時使用改進版的Reactor模式如Proactor模式。

繼續閱讀