天天看點

Reids高性能原理(六)

一、Redis知識系統觀

Redis從應用次元有:緩存使用、叢集運用、資料結構的巧妙使用;

Redis從系統次元有:可以歸類為三類:

  1. 高性能:線程模型、網絡 IO 模型、資料結構、持久化機制;
  2. 高可用:主從複制、哨兵叢集;
  3. 高拓展:Cluster 分片叢集

Redis 為了高性能,從各方各面都進行了優化。根據官方資料,Redis 的 QPS 可以達到約 100000(每秒請求數),有興趣的可以參考官方的基準程式測試《How fast is Redis?》,官方位址:https://redis.io/topics/benchmarks

Reids高性能原理(六)

 橫軸是連接配接數,縱軸是 QPS。此時,這張圖反映了一個數量級

二、Redis為什麼這麼快

一般我們在分析一個軟體性能的時候會從幾個主要方面進行分析:存儲方式、CPU、和網絡互動;Redis 的高性能主要依賴于幾個方面。

  • C 語言實作,C 語言在一定程度上還是比 Java 語言性能要高一些,因為 C 語言不需要經過 JVM 進行翻譯。
  • Redis是基于記憶體操作,需要的時候需要我們手動持久化到硬碟中
  • Redis高效資料結構,對資料的操作也比較簡單
  • Redis是單線程模型,進而避開了多線程中上下文頻繁切換的操作
  • 使用多路I/O複用模型,非阻塞I/O
  • 使用底層模型不同,它們之間底層實作方式以及與用戶端之間通信的應用協定不一樣,Redis直接自己建構了VM 機制 ,因為一般的系統調用系統函數的話,會浪費一定的時間去移動和請求

 下面分别從上述幾個方面進行展開說明,先來看網絡I/O的多路複用模型。

三、從請求處理開始分析

當我們在用戶端向Redis Server發送一條指令,并且得到Redis回複的整個過程中,Redis做了什麼呢?

Reids高性能原理(六)

要處理指令,則redis必須完整地接收用戶端的請求,并将指令解析出來,再将結果讀出來,通過網絡回寫到用戶端。整個工序分為以下幾個部分:

  • 接收,通過TCP接收到指令,可能會曆經多次TCP包、ack、IO操作
  • 解析,将指令取出來
  • 執行,到對應的地方将value讀出來
  • 傳回,将value通過TCP傳回給用戶端,如果value較大,則IO負荷會更重

其中解析和執行是純cpu/記憶體操作,而接收和傳回主要是IO操作,首先我們先來看通信的過程。

四、網絡IO通信原理

        首先,對于TCP通信來說,每個TCP Socket的核心中都有一個發送緩沖區和一個接收緩沖區

        接收緩沖區把資料緩存到核心,若應用程序一直沒有調用Socket的read方法進行讀取,那麼該資料會一直被緩存在接收緩沖區内。不管程序是否讀取Socket,對端發來的資料都會經過核心接收并緩存到Socket的核心接收緩沖區。

        read所要做的工作,就是把核心接收緩沖區中的資料複制到應用層使用者的Buffer裡。

         程序調用Socket的send發送資料的時候,一般情況下是将資料從應用層使用者的Buffer裡複制到Socket的核心發送緩沖區,然後send就會在上層傳回。換句話說,send傳回時,資料不一定會被發送到對端。

Reids高性能原理(六)

       網卡中的緩沖區既不屬于核心空間,也不屬于使用者空間。它屬于硬體緩沖,允許網卡與作業系統之間有個緩沖;核心緩沖區在核心空間,在記憶體中,用于核心程式,做為讀自或寫往硬體的資料緩沖區;使用者緩沖區在使用者空間,在記憶體中,用于使用者程式,做為讀自或寫往硬體的資料緩沖區

       網卡晶片收到網絡資料會以中斷的方式通知CPU,我有資料了,存在我的硬體緩沖裡了,來讀我啊。CPU收到這個中斷信号後,會調用相應的驅動接口函數從網卡的硬體緩沖裡把資料讀到核心緩沖區,正常情況下會向上傳遞給TCP/IP子產品一層一層的處理。

BIO模型

  Redis的通信采用的是多路複用機制,什麼是多路複用機制呢? 由于Redis是C語言實作,為了簡化大家的了解,我們采用Java語言來描述這個過程。 在了解多路複用之前,我們先來了解一下BIO。

  BIO模型 在Java中,如果要實作網絡通信,我們會采用Socket套接字來完成。

  Socket這不是一個協定,而是一個通信模型。其實它最初是BSD發明的,主要用來一台電腦的兩個程序 間通信,然後把它用到了兩台電腦的程序間通信。是以,可以把它簡單了解為程序間通信,不是什麼高 級的東西。主要做的事情不就是:

  • A發包:發請求包給某個已經綁定的端口(是以我們經常會通路這樣的位址182.13.15.16:1235, 1235就是端口);收到B的允許;然後正式發送;發送完了,告訴B要斷開連結;收到斷開允許, 馬上斷開,然後發送已經斷開資訊給B。
  • B收包:綁定端口和IP;然後在這個端口監聽;接收到A的請求,發允許給A,并做好接收準備,主 要就是清理緩存等待接收新資料;然後正式接收;接受到斷開請求,允許斷開;确認斷開後,繼續 監聽其它請求。

  可見,Socket其實就是I/O操作,Socket并不僅限于網絡通信,在網絡通信中,它涵蓋了網絡層、傳輸 層、會話層、表示層、應用層——其實這都不需要記,因為Socket通信時候用到了IP和端口,僅這兩個 就表明了它用到了網絡層和傳輸層;而且它無視多台電腦通信的系統差别,是以它涉及了表示層;一般 Socket都是基于一個應用程式的,是以會涉及到會話層和應用層。

  建構基礎的BIO通信模型 ,BIO有什麼弊端呢? 當服務端收到用戶端的請求後,不直接傳回,而是等待20s。

public class BIOServerSocket {
    //先定義一個端口号,這個端口的值是可以自己調整的。
    static final int DEFAULT_PORT = 8080;

    public static void main(String[] args) throws IOException,
            InterruptedException {
        ServerSocket serverSocket = null;
        serverSocket = new ServerSocket(DEFAULT_PORT);
        System.out.println("啟動服務,監聽端口:" + DEFAULT_PORT);
        while (true) { //case1: 增加循環,允許循環接收請求
            Socket socket = serverSocket.accept();
            System.out.println("用戶端:" + socket.getPort() + "已連接配接");
            BufferedReader bufferedReader = new BufferedReader(new
                    InputStreamReader (socket.getInputStream()));
            String clientStr = bufferedReader.readLine(); //讀取一行資訊
            System.out.println("用戶端發了一段消息:" + clientStr);
            Thread.sleep(20000); //case2: 修改:增加等待時間
            BufferedWriter bufferedWriter = new BufferedWriter(new
                    OutputStreamWriter (socket.getOutputStream()));
            bufferedWriter.write("我已經收到你的消息了\n");
            bufferedWriter.flush(); //清空緩沖區觸發消息發送
        }
    }
}      

這個情況會導緻一個問題,如果服務端在同一個時刻隻能處理一個用戶端的連接配接,而如果一個網站同時 有1000個使用者通路,那麼剩下的999個使用者都需要等待,而這個等待的耗時取決于前面的請求的處理時長.

Reids高性能原理(六)

   

  基于多線程優化BIO 為了讓服務端能夠同時處理更多的用戶端連接配接,避免因為某個用戶端連接配接阻塞導緻後續請求被阻塞,于是引入多線程技術。如圖所示,當引入了多線程之後,每個用戶端的連結(Socket),我們可以直接給到線程池去執 行,而由于這個過程是異步的,是以并不會同步阻塞影響後續連結的監聽,是以在一定程度上可以提升 服務端連結的處理數量。

Reids高性能原理(六)
public class BIOServerSocketWithThread {

    static ExecutorService executorService= Executors.newFixedThreadPool (10);
    public static void main(String[] args) {
        ServerSocket serverSocket=null;
        try{
            serverSocket=new ServerSocket ( 8080 );
            System.out.println("啟動服務:監聽端口:8080");
            while (true){
                Socket socket = serverSocket.accept(); //連接配接阻塞
                System.out.println("用戶端:" + socket.getPort());
                //IO變成了異步執行
                executorService.submit ( new SocketThread (socket) );
            }
        }catch (IOException e){
            e.printStackTrace ();
        }finally {
            if (serverSocket!=null){
                try {
                    serverSocket.close ();
                }catch (IOException e){
                    e.printStackTrace ();
                }
            }


        }
    }
}      
public class SocketThread implements Runnable{

    private Socket socket;
    public SocketThread(Socket socket) {
        this.socket=socket;
    }

    @Override
    public void run() {
        try {
            //inputstream是阻塞的(***)
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader (socket.getInputStream())); //表示擷取用戶端的請求封包
            String clientStr = bufferedReader.readLine();
            System.out.println("收到用戶端發送的消息:" + clientStr);
            BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter (socket.getOutputStream()));
            bufferedWriter.write("receive a message:" + clientStr + "\n");
            bufferedWriter.flush();

        }catch (Exception e){
            e.printStackTrace ();
        }finally {

        }
    }
}      

傳統 I/O 資料拷貝:

  以讀操作為例:當應用程式執行 read 系統調用讀取檔案描述符(FD)的時候,如果這塊資料已經存在于使用者程序的頁記憶體中,就直接從記憶體中讀取資料。如果資料不存在,則先将資料從磁盤加載資料到核心緩沖區中,再從核心緩沖區拷貝到使用者程序的頁記憶體中。(兩次拷貝,兩次 user 和 kernel 的上下文切換)。

Reids高性能原理(六)

  I/O 的阻塞到底阻塞在哪裡?Blocking I/O:

  當使用 read 或 write 對某個檔案描述符進行過讀寫時,如果目前 FD 不可讀,系統就不會對其他的操作做出響應。從裝置複制資料到核心緩沖區是阻塞的,從核心緩沖區拷貝到使用者空間,也是阻塞的,直到 copy complete,核心傳回結果,使用者程序才解除block 的狀态。

Reids高性能原理(六)

  為了解決阻塞的問題,我們有幾個思路。

  1. 在服務端建立多個線程或者使用線程池,但是在高并發的情況下需要的線程會很多,系統無法承受,而且建立和釋放線程都需要消耗資源。
  2. 由請求方定期輪詢,在資料準備完畢後再從核心緩存緩沖區複制資料到使用者空間(非阻塞式 I/O),這種方式會存在一定的延遲。

  能不能用一個線程處理多個用戶端請求?

NIO非阻塞IO:

使用多線程的方式來解決這個問題,仍然有一個缺點,線程的數量取決于硬體配置,是以線程數量是有 限的,如果請求量比較大的時候,線程本身會收到限制進而并發量也不會太高。那怎麼辦呢,我們可以 采用非阻塞IO。 NIO 從JDK1.4 提出的,本意是New IO,它的出現為了彌補原本IO的不足,提供了更高效的方式,提出 一個通道(channel)的概念,在IO中它始終以流的形式對資料的傳輸和接受,下面我們示範一下NIO 的使用。  所謂的NIO(非阻塞IO),其實就是取消了IO阻塞和連接配接阻塞,當服務端不存在阻塞的時候,就可以不 斷輪詢處理用戶端的請求,如圖所示,表示NIO下的運作流程。

Reids高性能原理(六)

   

  上述這種NIO的使用方式,仍然存在一個問題,就是用戶端或者服務端需要通過一個線程不斷輪詢才能 獲得結果,而這個輪詢過程中會浪費線程資源。

public class NIOServerSocket {
    public static void main(String[] args) {
        try {
            ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false); //設定連接配接非阻塞
            serverSocketChannel.socket().bind(new InetSocketAddress(8080));
            while(true){
                //是非阻塞的
                SocketChannel socketChannel=serverSocketChannel.accept(); //獲得一個用戶端連接配接
//                socketChannel.configureBlocking(false);//IO非阻塞
                if(socketChannel!=null){
                    ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
                    int i=socketChannel.read(byteBuffer);
                    Thread.sleep(10000);
                    byteBuffer.flip(); //反轉
                    socketChannel.write(byteBuffer);
                }else{
                    Thread.sleep(1000);
                    System.out.println("連接配接位就緒");
                }
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}      

 大家站在全局的角度再思考一下整個過程,有哪些地方可以優化呢?  

NIO多路複用機制:

  • I/O 指的是網絡 I/O。
  • 多路指的是多個 TCP 連接配接(Socket 或 Channel)。
  • 複用指的是複用一個或多個線程。

  它的基本原理就是不再由應用程式自己監視連接配接,而是由核心替應用程式監視檔案描述符。用戶端在操作的時候,會産生具有不同僚件類型的 socket。在服務端,I/O 多路複用程式(I/O Multiplexing Module)會把消息放入隊列中,然後通過檔案事件分派器(Fileevent Dispatcher),轉發到不同的事件處理器中。

Reids高性能原理(六)

  多路複用有很多的實作,以 select 為例,當使用者程序調用了多路複用器,程序會被阻塞。核心會監視多路複用器負責的所有 socket,當任何一個 socket 的資料準備好了,多路複用器就會傳回。這時候使用者程序再調用 read 操作,把資料從核心緩沖區拷貝到使用者空間。

Reids高性能原理(六)

  是以,I/O 多路複用的特點是通過一種機制一個程序能同時等待多個檔案描述符,而這些檔案描述符(套接字描述符)其中的任意一個進入讀就緒(readable)狀态,select()函數就可以傳回。Redis 的多路複用, 提供了 select, epoll, evport, kqueue 幾種選擇,在編譯的時候來選擇一種。

  • evport 是 Solaris 系統核心提供支援的;
  • epoll 是 LINUX 系統核心提供支援的;
  • kqueue 是 Mac 系統提供支援的;
  • select 是 POSIX 提供的,一般的作業系統都有支撐(保底方案);

  我們看到  NIOClientSocket中下面這段代碼,當用戶端通過 read 方法去讀取服務端傳回的資料時,如果 此時服務端資料未準備好,對于用戶端來說就是一次無效的輪詢。

while(true) {
    int i = socketChannel.read(byteBuffer);
    if (i > 0) {
        System.out.println("收到服務端的資料:" + new String(byteBuffer.array()));
    } else {
        System.out.println("服務端資料未準備好");
        Thread.sleep(1000);
    }
}      

 我們能不能夠設計成,當用戶端調用 read 方法之後,不僅僅不阻塞,同時也不需要輪詢。而是等到服 務端的資料就緒之後, 告訴用戶端。然後用戶端再去讀取服務端傳回的資料呢?就像點外賣一樣,我們在網上下單之後,繼續做其他事情,等到外賣到了公司,外賣小哥主動打 電話告訴你,你直接去前台取餐即可。

  是以為了優化這個問題,引入了多路複用機制。

  I/O多路複用的本質是通過一種機制(系統核心緩沖I/O資料),讓單個程序可以監視多個檔案描述符, 一旦某個描述符就緒(一般是讀就緒或寫就緒),能夠通知程式進行相應的讀寫操作

  什麼是fd:在linux中,核心把所有的外部裝置都當成是一個檔案來操作,對一個檔案的讀寫會調 用核心提供的系統指令,傳回一個fd(檔案描述符)。而對于一個socket的讀寫也會有相應的檔案描 述符,成為socketfd。

  常見的IO多路複用方式有【select、poll、epoll】,都是Linux API提供的IO複用方式,那麼接下來重 點講一下select、和epoll這兩個模型

  • select:程序可以通過把一個或者多個fd傳遞給select系統調用,程序會阻塞在select操作上,這 樣select可以幫我們檢測多個fd是否處于就緒狀态,這個模式有兩個缺點 由于他能夠同時監聽多個檔案描述符,假如說有1000個,這個時候如果其中一個fd 處于就緒 狀态了,那麼目前程序需要線性輪詢所有的fd,也就是監聽的fd越多,性能開銷越大。 同時,select在單個程序中能打開的fd是有限制的,預設是1024,對于那些需要支援單機上 萬的TCP連接配接來說确實有點少
  • epoll:linux還提供了epoll的系統調用,epoll是基于事件驅動方式來代替順序掃描,是以性能相 對來說更高,主要原理是,當被監聽的fd中,有fd就緒時,會告知目前程序具體哪一個fd就緒,那 麼目前程序隻需要去從指定的fd上讀取資料即可,另外,epoll所能支援的fd上線是作業系統的最 大檔案句柄,這個數字要遠遠大于1024

  由于epoll能夠通過事件告知應用程序哪個fd是可讀的,是以我們也稱這種IO為異步非阻塞IO, 當然它是僞異步的,因為它還需要去把資料從核心同步複制到使用者空間中,真正的異步非阻塞, 應該是資料已經完全準備好了,我隻需要從使用者空間讀就行.

  同步和異步,指的是使用者線程和核心的互動方式,阻塞和非阻塞,指使用者線程調用核心IO操作的方式是阻塞還是非阻塞就像在Java中使用多線程做異步處理的概念,通過多線程去執行一個流程,主線程可以不用等待。而阻塞和非阻塞我們可以了解為假如在同步流程或者異步流程中做IO操作,如果緩沖區資料還沒準備好,IO的這個過程會阻塞。

  I/O多路複用的好處是可以通過把多個I/O的阻塞複用到同一個select的阻塞上,進而使得系統在單線程 的情況下可以同時處理多個用戶端請求。它的最大優勢是系統開銷小,并且不需要建立新的程序或者線 程,降低了系統的資源開銷,它的整體實作思想如下圖所示。

  用戶端請求到服務端後,此時用戶端在傳輸資料過程中,為了避免Server端在read用戶端資料過程中阻 塞,服務端會把該請求注冊到Selector複路器上,服務端此時不需要等待,隻需要啟動一個線程,通過 selector.select()阻塞輪詢複路器上就緒的channel即可。

  也就是說,如果某個用戶端連接配接資料傳輸完 成,那麼select()方法會傳回就緒的channel,然後執行相關的處理即可。

Reids高性能原理(六)

 代碼如下:

public class NIOSelectorServerSocket implements Runnable{

    Selector selector;
    ServerSocketChannel serverSocketChannel;

    public NIOSelectorServerSocket(int port) throws IOException {
        selector=Selector.open();
        serverSocketChannel=ServerSocketChannel.open();
        //如果采用selector模型,必須要設定非阻塞
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    @Override
    public void run() {
        while(!Thread.interrupted()){
            try {
                selector.select(); //阻塞等待事件就緒
                Set selected=selector.selectedKeys(); //事件清單
                Iterator it=selected.iterator();
                while(it.hasNext()){
                    //說明有連接配接進來
                    dispatch((SelectionKey) it.next());
                    it.remove();//移除目前就緒的事件
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    private void dispatch(SelectionKey key) throws IOException {
        if(key.isAcceptable()){ //是連接配接事件?
            register(key);
        }else if(key.isReadable()){ //讀事件
            read(key);
        }else if(key.isWritable()){ //寫事件
            //TODO
        }
    }
    private void register(SelectionKey key) throws IOException {
        ServerSocketChannel channel= (ServerSocketChannel) key.channel(); //用戶端連接配接
        SocketChannel socketChannel=channel.accept(); //獲得用戶端連接配接
        socketChannel.configureBlocking(false);
        socketChannel.register(selector,SelectionKey.OP_READ);
    }
    private void read(SelectionKey key) throws IOException {
        //得到的是socketChannel
        SocketChannel channel= (SocketChannel) key.channel();
        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
        channel.read(byteBuffer);
        System.out.println("Server Receive Msg:"+new String(byteBuffer.array()));
    }

    public static void main(String[] args) throws IOException {
        NIOSelectorServerSocket selectorServerSocket=new NIOSelectorServerSocket(8080);
        new Thread(selectorServerSocket).start();
    }
}      

事實上NIO已經解決了上述BIO暴露的下面兩個問題:

  1.  同步阻塞IO,讀寫阻塞,線程等待時間過長。
  2.  在制定線程政策的時候,隻能根據CPU的數目來限定可用線程資源,不能根據連接配接并發數目來制 定,也就是連接配接有限制。否則很難保證對用戶端請求的高效和公平。

   到這裡為止,通過NIO的多路複用機制,解決了IO阻塞導緻用戶端連接配接處理受限的問題,服務端隻需要 一個線程就可以維護多個用戶端,并且用戶端的某個連接配接如果準備就緒時,會通過事件機制告訴應用程 序某個channel可用,應用程式通過select方法選出就緒的channel進行處理。

單線程Reactor 模型(高性能I/O設計模式):

  了解了NIO多路複用後,就有必要再和大家說一下Reactor多路複用高性能I/O設計模式,Reactor本質 上就是基于NIO多路複用機制提出的一個高性能IO設計模式。

  它的核心思想是把響應IO事件和業務處理 進行分離,通過一個或者多個線程來處理IO事件,然後将就緒得到事件分發到業務處理handlers線程去步非阻塞處理,如下圖所示。 Reactor模型有三個重要的元件:

  1. Reactor :将I/O事件發派給對應的Handler
  2. Acceptor :處理用戶端連接配接請求
  3. Handlers :執行非阻塞讀/寫
Reids高性能原理(六)

   

  代碼實作如下:  

  Reactor:

public class Reactor implements Runnable {

    private final Selector selector;
    private final ServerSocketChannel serverSocketChannel;

    public Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(selector, serverSocketChannel));
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    dispatch(iterator.next());
                    iterator.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey key) {
        //可能拿到的對象有兩個
        // Acceptor
        // Handler
        Runnable runnable = (Runnable) key.attachment();
        if (runnable != null) {
            runnable.run(); //
        }
    }
}      

Acceptor:

public class Acceptor implements Runnable {

    private final Selector selector;
    private final ServerSocketChannel serverSocketChannel;

    public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;
    }

    @Override
    public void run() {
        SocketChannel channel;

        try {
            channel = serverSocketChannel.accept();//得到一個用戶端連接配接
            System.out.println(channel.getRemoteAddress() + ":收到一個用戶端連接配接");
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_READ, new Handler(channel));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}      

Handler :

public class Handler implements Runnable {
    SocketChannel channel;

    public Handler(SocketChannel channe) {
        this.channel = channe;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "------");
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int len = 0, total = 0;
        String msg = "";
        try {
            do {
                len = channel.read(buffer);
                if (len > 0) {
                    total += len;
                    msg += new String(buffer.array());
                }
            } while (len > buffer.capacity());
            System.out.println("total:" + total);

            //msg=表示通信傳輸封包
            //耗時2s
            //登入: username:password
            //ServetRequets: 請求資訊
            //資料庫的判斷
            //傳回資料,通過channel寫回到用戶端

            System.out.println(channel.getRemoteAddress() + ": Server receive Msg:" + msg);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}      

 以上代碼碼是最基本的單Reactor單線程模型(整體的I/O操作是由同一個線程完成的)。

  其中Reactor線程,負責多路分離套接字,有新連接配接到來觸發connect 事件之後,交由Acceptor進行處 理,有IO讀寫事件之後交給hanlder 處理。

  Acceptor主要任務就是建構handler ,在擷取到和client相關的SocketChannel之後 ,綁定到相應的 hanlder上,對應的SocketChannel有讀寫事件之後,基于racotor 分發,hanlder就可以處理了(所有的 IO事件都綁定到selector上,有Reactor分發)。

  Reactor 模式本質上指的是使用 I/O 多路複用(I/O multiplexing) + 非阻塞 I/O(nonblocking I/O) 的模式。

多線程單Reactor模型:

  單線程Reactor這種實作方式有存在着缺點,從執行個體代碼中可以看出,handler的執行是串行的,如果其 中一個handler處理線程阻塞将導緻其他的業務處理阻塞。由于handler和reactor在同一個線程中的執 行,這也将導緻新的無法接收新的請求,我們做一個小實驗:

  • 在上述Reactor代碼的DispatchHandler的run方法中,增加一個Thread.sleep()。
  • 打開多個用戶端視窗連接配接(可以通過telnet連接配接)到Reactor Server端,其中一個視窗發送一個資訊後被阻塞,另外一個窗 口再發資訊時由于前面的請求阻塞導緻後續請求無法被處理。

  Redis6.0中多線程帶來的性能提升。Redis中的特殊的多線程單Reactor模型。下圖是美團技術團隊使用阿裡雲伺服器壓測GET/SET指令在4個線程IO時性能上的對比結果,可以明顯 的看到,Redis 在使用多線程模式之後性能大幅提升,達到了一倍。

  Redis Server 阿裡雲 Ubuntu 18.04 , 8CPU 2.5GHZ,8G記憶體,主機型号: ecs.ic5.2xlarge Redis Benchmark client: 阿裡雲 Unbuntu 18.04 , 8CPU 2.5GHZ,8G記憶體,主機型号: ecs.ic5.2xlarge

Reids高性能原理(六)
Reids高性能原理(六)

  為了解決這種問題,有人提出使用多線程的方式來處理業務,也就是在業務處理的地方加入線程池異步 處理,将reactor和handler在不同的線程來執行,如下圖所示。

Reids高性能原理(六)

   

  改造代碼代碼如下:

public class MutilDispatchHandler implements Runnable {

    SocketChannel channel;

    private Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    public MutilDispatchHandler(SocketChannel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {
        processor();
    }

    private void processor() {
        executor.execute(new ReaderHandler(channel));
    }

    static class ReaderHandler implements Runnable {
        private SocketChannel channel;

        public ReaderHandler(SocketChannel channel) {
            this.channel = channel;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ":-----");
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int len = 0, total = 0;
            String msg = "";
            try {
                do {
                    len = channel.read(buffer);
                    if (len > 0) {
                        total += len;
                        msg += new String(buffer.array());
                    }
                } while (len > buffer.capacity());
                System.out.println("total:" + total);

                //msg=表示通信傳輸封包
                //耗時2s
                //登入: username:password
                //ServetRequets: 請求資訊
                //資料庫的判斷
                //傳回資料,通過channel寫回到用戶端
                System.out.println(channel.getRemoteAddress() + ": Server receive Msg:" + msg);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}      

  在多線程Reactor模型中,添加了一個工作者線程池,并将非I/O操作從Reactor線程中移出轉交給工作 者線程池來執行。這樣能夠提高Reactor線程的I/O響應,不至于因為一些耗時的業務邏輯而延遲對後面 I/O請求的處理。

主從多線程多Reactor模式:

Reids高性能原理(六)

  mainReactor負責監聽連接配接,accept連接配接給subReactor處理,為什麼要單獨分一個Reactor來處理監聽呢?因為像TCP這樣需要經過3次握手才能建立連接配接,這個建立連接配接的過程也是要耗時間和資源的,單獨分一個Reactor來處理,可以提高性能。

  代碼實作如下:

  Acceptor

public class Acceptor implements Runnable{

    final Selector sel;
    final ServerSocketChannel serverSocketChannel;
    //擷取目前核心數
    private final int POOL_SIZE=Runtime.getRuntime().availableProcessors();
    //線程池
    private Executor subReactorExecutor= Executors.newFixedThreadPool(POOL_SIZE);
    //subReactors 個數
    private Reactor[] subReactors=new Reactor[POOL_SIZE];
    //任務分發輪詢分發計數
    int handerNext=0;

    public Acceptor(Selector sel,int port) throws IOException {
        this.sel=sel;//打開連結
        this.serverSocketChannel=ServerSocketChannel.open();
        this.serverSocketChannel.socket().bind(new InetSocketAddress(port));
        this.serverSocketChannel.configureBlocking(false);
        this.serverSocketChannel.register(this.sel, SelectionKey.OP_ACCEPT,this);
        init();
        System.out.println("Main Reactor Acceptor: Listening on port:"+port);
    }
    private void init() throws IOException {
        //初始化subReactors
        for (int i = 0; i < subReactors.length; i++) {
            subReactors[i]=new Reactor();
            subReactorExecutor.execute(subReactors[i]);
        }
    }
    @Override
    public void run() {
        //負責處理連接配接事件和IO事件
        try {
            SocketChannel socketChannel=serverSocketChannel.accept(); //擷取連接配接
            if(socketChannel!=null){
                socketChannel.write(ByteBuffer.wrap("Multiply Reactor Patterm\r\nreactor> ".getBytes()));
                System.out.println(Thread.currentThread().getName()+": Main-Reactor-Acceptor:"+socketChannel.getLocalAddress()+"連接配接");
                //輪詢分發
                Reactor subReactor=subReactors[handerNext];
                //異步處理
                subReactor.register(new AsyncHandler(socketChannel));
                if(++handerNext==subReactors.length){
                    handerNext=0;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}      

Reactor:

public class Reactor implements Runnable{

    private final Selector selector;
    //任務隊列
    private ConcurrentLinkedQueue<AsyncHandler> events=new ConcurrentLinkedQueue<>();

    public Reactor() throws IOException {
        //打開selector
        this.selector = Selector.open();
    }

    public Selector getSelector() {
        return selector;
    }

    @Override
    public void run() {
        while(!Thread.interrupted()){
            AsyncHandler handler;
            try {
                //阻塞。擷取任務隊列,為空的時候會阻塞
                while((handler=events.poll())!=null){
                    handler.getChannel().configureBlocking(false);
                    SelectionKey selectionKey=handler.getChannel().register(selector,SelectionKey.OP_READ);
                    selectionKey.attach(handler);
                    handler.setSk(selectionKey);
                }
                //擷取到了任務隊列,此刻進入輪詢監聽IO事件
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while(iterator.hasNext()){
                    SelectionKey key=iterator.next();
                    Runnable runnable=(Runnable) key.attachment(); //得到AsyncHandler執行個體
                    if(runnable!=null){
                        //調用AsyncHandler 處理任務
                        runnable.run();
                    }
                    iterator.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }

    public void register(AsyncHandler handler){
        events.offer(handler); //有一個事件注冊,添加任務隊列
        selector.wakeup();//喚醒阻塞selector
    }
}      

AsyncHandler:

public class AsyncHandler implements Runnable{

    private SocketChannel channel;
    private SelectionKey sk;
    
    StringBuilder stringBuilder=new StringBuilder();

    ByteBuffer inputBuffer=ByteBuffer.allocate(1024);
    ByteBuffer outputBuffer=ByteBuffer.allocate(1024);

    public AsyncHandler(SocketChannel channel) {
        this.channel = channel;
    }

    public SocketChannel getChannel() {
        return channel;
    }

    public SelectionKey getSk() {
        return sk;
    }

    public void setSk(SelectionKey sk) {
        this.sk = sk;
    }

    @Override
    public void run() {
        try {
            if (sk.isReadable()) {
                read();
            } else if (sk.isWritable()) {
                write();
            }
        }catch (Exception e){
        }
    }
    private void read() throws IOException {
        inputBuffer.clear();
        int n=channel.read(inputBuffer);
        if(inputBufferComplete(n)){
            System.out.println(Thread.currentThread().getName()+": Server端收到用戶端的請求消息:"+stringBuilder.toString());
            outputBuffer.put(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));
            this.sk.interestOps(SelectionKey.OP_WRITE);
        }
    }
    private boolean inputBufferComplete(int bytes) throws EOFException {
        if(bytes>0){
            inputBuffer.flip();
            while(inputBuffer.hasRemaining()){
                byte ch=inputBuffer.get(); //得到輸入的字元
                if(ch==3) { //表示Ctrl+c
                    throw new EOFException();
                }else if(ch=='\r'||ch=='\n'){
                    return true;
                }else {
                    stringBuilder.append((char)ch);
                }
            }
        }else if(bytes==1){
            throw new EOFException();
        }
        return false;
    }

    private void write() throws IOException {
        int write=-1;
        outputBuffer.flip();
        if(outputBuffer.hasRemaining()){
            write=channel.write(outputBuffer); //把收到的資料寫回到用戶端
        }
        outputBuffer.clear();
        stringBuilder.delete(0,stringBuilder.length());
        if(write<=0){
            this.sk.channel().close();
        }else{
            channel.write(ByteBuffer.wrap("\r\nreactor> ".getBytes()));
            this.sk.interestOps(SelectionKey.OP_READ);//又轉化為讀事件
        }
    }
}      

MultiplyReactor:

public class MultiplyReactor {

    private int port;

    private Reactor mainReactor; //main Reactor

    Executor mainReactorExecutor= Executors.newFixedThreadPool(10);

    public MultiplyReactor(int port) throws IOException {
        this.port = port;
        mainReactor=new Reactor();
    }

    public void start() throws IOException {
        new Acceptor(mainReactor.getSelector(),port);
        mainReactorExecutor.execute(mainReactor);
    }

    public static void main(String[] args) throws IOException {
        new MultiplyReactor(8080).start();
    }
}      

 父線程與子線程的資料互動簡單職責明确,父線程隻需要接收新連接配接,子線程完成後續的業務處理。父線程與子線程的資料互動簡單,Reactor 主線程隻需要把新連接配接傳給子線程,子線程無需傳回資料。

  這種模型在許多項目中廣泛使用,包括 Nginx 主從 Reactor 多程序模型,Memcached 主從多線程,Netty 主從多線程模型的支援。

 源碼位址:https://gitee.com/TongHuaShuShuoWoDeJieJu/redis.git

這短短的一生我們最終都會失去,不妨大膽一點,愛一個人,攀一座山,追一個夢