天天看點

Reactive 簡介

1. 概念

Reactive 非常适合低延遲、高吞吐量的工作負載。

Reactive Processing 是一種範式(規範),它使開發人員能夠建構非阻塞的、異步的應用程式,這些應用程式能夠處理背壓(流控制)

Reactive Streams 為無阻塞背壓的異步流處理提供标準。

Reactor 是基于Reactive Streams規範的第四代響應庫,用于在JVM上建構非阻塞的應用程式。

Project Reactor 是一個完全無阻塞的基礎,其中包括背壓支援。它是Spring生态系統中的響應式堆棧的基礎,并且在諸如Spring WebFlux,Spring Data和Spring Cloud Gateway等項目中都有它的身影。利用Project Reactor可以高效的響應式系統。剛才說Reactive Streams是規範,那麼Project Reactor就是實作。

Reactive 簡介

2. 響應式程式設計

響應式程式設計是一種異步程式設計風格,它關注資料流和變化的傳播。

響應式程式設計是一種與資料流和變化傳播相關的聲明式程式設計範式。使用此範例,可以輕松地表示靜态(例如,數組)或動态(例如,事件發射器)資料流,并且還可以表示關聯執行模型中的推斷出的依賴關系,這有助于更改資料流的自動傳播。 

reactive programming   (響應式程式設計)

imperative programming(指令式程式設計)

在指令式程式設計中,a:=b+c意味着将b+c的結果指派給a,并且此後b或c的值發生變化不會影響到a的值。而在響應式程式設計中,a的值會随着b或c的改變而自動更新,并且不需要重新執行a:=b+c來确定目前配置設定給a的值。(PS:是不是很像angularjs、vuejs這種MVVM架構,視圖綁定模型,模型變了,視圖自動就跟着變了)

例如,在 model–view–controller (MVC) 架構中,響應式程式設計可以促進基礎模型中的更改,這些更改會自動反映在關聯的視圖中。

響應式程式設計與面向對象程式設計中通常使用的觀察者模式具有很多相似之處。

如果從推拉的角度來看的話,響應式程式設計是“推”,它主動将變化推送給它的訂閱者。Publisher-Subscriber是兩個非常重要的概念。

想象一下,資料流從源出發,經過一個一個節點的處理,最終達到目的地。節點就相當于操作符,處理完了以後就将流發射出去,到下一個節點再執行再發射。

我總覺得這個流程很眼熟,很像 Apache Storm 的處理方式。在一個拓撲結構中,資料流從Spout發出,經過若幹bolt的處理,最終彙集到某個地方。

還有一種了解,我覺得也很不錯,說響應式程式設計是一種通過異步和資料流來建構事務關系的程式設計模型。事物可以了解程一次處理過程,一次執行過程。響應式程式設計就是要建構關系,事務和事務之間的關系。而資料流就像是一個橋梁一樣,資料流從一個事務流向下一個事務。

想象一下,長江流經宜賓、泸州、重慶、涪陵、萬州、宜昌、荊州、武漢、黃石、鄂州、九江、安慶、銅陵、蕪湖、南京、上海,最終彙入東海。

就像CompleteFuture把Future進行編排一樣。

本質來講,響應式程式設計上是對資料流或某種變化所作出的反應,但是這個變化什麼時候發生是未知的,是以他是一種基于異步、回調的方式在處理問題

3. NIO

NIO(Non-Blocking I/O)

BIO(Blocking I/O)

在經典的線程模型中,socket.accept()、socket.read()、socket.write()三個主要函數都是同步阻塞的,當一個連接配接在處理I/O的時候,系統是阻塞的,如果使用單線程的話就阻塞在那裡了,但CPU是并沒有阻塞,如果用多線程的話,就可以讓CPU去處理更多的事情。其實這也是所有使用多線程的本質: 當I/O阻塞系統,但CPU空閑的時候,可以利用多線程使用CPU資源。然而,線程的建立、銷毀、切換成本都是很高的。

事實上,所有的系統I/O都分為兩個階段:等待就緒和操作。舉例來說,讀函數,分為等待系統可讀和真正的讀;同理,寫函數分為等待網卡可以寫和真正的寫。

需要說明的是等待就緒的阻塞是不使用CPU的,是在“空等”;而真正的讀寫操作的阻塞是使用CPU的,真正在”幹活”。

以socket.read()為例子:

傳統的BIO裡面socket.read(),如果TCP RecvBuffer裡沒有資料,函數會一直阻塞,直到收到資料,傳回讀到的資料。

對于NIO,如果TCP RecvBuffer有資料,就把資料從網卡讀到記憶體,并且傳回給使用者;反之則直接傳回0,永遠不會阻塞。 

在BIO模型中,沒有辦法知道到底能不能寫、能不能讀,隻能”傻等”。而在NIO模型中,如果一個連接配接不能讀寫(socket.read()傳回0或者socket.write()傳回0),我們可以把這件事記下來,記錄的方式通常是在Selector上注冊标記位,然後切換到其它就緒的連接配接(channel)繼續進行讀寫。

NIO的主要事件有幾個:讀就緒、寫就緒、有新連接配接到來。那麼,首先需要注冊當這幾個事件到來的時候所對應的處理器,然後在合适的時機告訴事件選擇器:我對這個事件感興趣,最後用一個死循環選擇就緒的事件。select是阻塞的,是以你可以放心大膽地在一個while(true)裡面調用這個函數而不用擔心CPU空轉。

總結起來就是:注冊所有感興趣的事件處理器,單線程輪詢選擇就緒事件,執行事件處理器。

我們大概可以總結出NIO是怎麼解決掉線程的瓶頸并處理海量連接配接的:

NIO由原來的阻塞讀寫(占用線程)變成了單線程輪詢事件,找到可以進行讀寫的網絡描述符進行讀寫。除了事件的輪詢是阻塞的(沒有可幹的事情必須要阻塞),剩餘的I/O操作都是純CPU操作,沒有必要開啟多線程。

NIO由原來的阻塞讀寫(占用線程)變成了單線程輪詢事件,找到可以進行讀寫的網絡描述符進行讀寫。除了事件的輪詢是阻塞的(沒有可幹的事情必須要阻塞),剩餘的I/O操作都是純CPU操作,沒有必要開啟多線程。并且由于線程的節約,連接配接數大的時候因為線程切換帶來的問題也随之解決,進而為處理海量連接配接提供了可能。單線程處理I/O的效率确實非常高,沒有線程切換,隻是拼命的讀、寫、選擇事件。但現在的伺服器,一般都是多核處理器,如果能夠利用多核心進行I/O,無疑對效率會有更大的提高。

Buffer(緩沖區)

在NIO中,所有資料都是用緩沖區處理的。在讀取資料時,它是直接讀到緩沖區中的;在寫入資料時,它也是寫入到緩沖區中的。 

Channel(通道)

通道是一個對象,通過它可以讀取和寫入資料,當然了所有資料都通過Buffer對象來處理。我們永遠不會将位元組直接寫入通道中,相反是将資料寫入包含一個或者多個位元組的緩沖區。同樣不會直接從通道中讀取位元組,而是将資料從通道讀入緩沖區,再從緩沖區擷取這個位元組。

Selector(選擇器)

Selector類是NIO的核心類,Selector(選擇器)選擇器提供了選擇已經就緒的任務的能力。Selector會不斷的輪詢注冊在上面的所有channel,如果某個channel為讀寫等事件做好準備,那麼就處于就緒狀态,通過Selector可以不斷輪詢發現出就緒的channel,進行後續的IO操作。一個Selector能夠同時輪詢多個channel。這樣,一個單獨的線程就可以管理多個channel,進而管理多個網絡連接配接。這樣就不用為每一個連接配接都建立一個線程,同時也避免了多線程之間上下文切換導緻的開銷。

一個簡單的讀取檔案的例子:

1 package com.cjs.example.restservice.nio;
 2 
 3 import java.io.FileInputStream;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.FileChannel;
 6 
 7 /**
 8  * @author ChengJianSheng
 9  * @date 2020-03-26
10  */
11 public class Hello {
12 
13     public static void main(String[] args) throws Exception {
14         FileInputStream fis = new FileInputStream("/data.txt");
15         FileChannel channel = fis.getChannel();
16 
17         ByteBuffer buffer = ByteBuffer.allocate(10);
18 
19         while (true) {
20             if (channel.read(buffer) == -1) {
21                 break;
22             }
23             buffer.flip();
24             while (buffer.hasRemaining()) {
25                 System.out.print((char)buffer.get());
26             }
27             buffer.clear();
28         }
29 
30         channel.close();
31         fis.close();
32     }
33 }       

Server.java

1 package com.cjs.example.restservice.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.SelectionKey;
 6 import java.nio.channels.Selector;
 7 import java.nio.channels.ServerSocketChannel;
 8 import java.nio.channels.SocketChannel;
 9 import java.util.Iterator;
10 import java.util.Set;
11 
12 /**
13  * @author ChengJianSheng
14  * @date 2020-03-26
15  */
16 public class Server {
17     public static void main(String[] args) throws Exception {
18         //  建立一個Selector
19         Selector selector = Selector.open();
20 
21         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
22         serverSocketChannel.configureBlocking(false);
23         serverSocketChannel.bind(new InetSocketAddress(9000));
24 
25         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
26 
27         while (true) {
28             selector.select();
29 
30             Set<SelectionKey> selectedKeys = selector.selectedKeys();
31             Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
32             while (keyIterator.hasNext()) {
33                 SelectionKey key = keyIterator.next();
34                 if(key.isAcceptable()) {
35                     // a connection was accepted by a ServerSocketChannel.
36 
37                     System.out.println(1);
38                     ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
39                     SocketChannel sc = ssc.accept();
40                     sc.configureBlocking(false);
41                     sc.register(selector, SelectionKey.OP_READ);
42                 } else if (key.isConnectable()) {
43                     // a connection was established with a remote server.
44                 } else if (key.isReadable()) {
45                     // a channel is ready for reading
46 
47                     System.out.println(2);
48                     SocketChannel socketChannel = (SocketChannel) key.channel();
49                     ByteBuffer buffer = ByteBuffer.allocate(1024);
50                     int len = 0;
51                     while ((len = socketChannel.read(buffer)) != -1) {
52                         buffer.flip();
53                         System.out.println(new String(buffer.array(), 0, len));
54                     }
55 
56                     socketChannel.close();
57                 } else if (key.isWritable()) {
58                     // a channel is ready for writing
59                 }
60 
61                 keyIterator.remove();
62             }
63         }
64     }
65 }
      

Client.java

1 package com.cjs.example.restservice.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.SocketChannel;
 6 
 7 /**
 8  * @author ChengJianSheng
 9  * @date 2020-03-26
10  */
11 public class Client {
12 
13     public static void main(String[] args) throws Exception {
14         SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9000));
15         socketChannel.configureBlocking(false);
16 
17         ByteBuffer buffer = ByteBuffer.allocate(1024);
18         String msg = "Hello, World!";
19         buffer.put(msg.getBytes());
20         buffer.flip();
21         socketChannel.write(buffer);
22 
23         socketChannel.close();
24     }
25 }       

關于Selector的用法

1 Selector selector = Selector.open();
 2 
 3 channel.configureBlocking(false);
 4 
 5 SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
 6 
 7 while(true) {
 8 
 9     int readyChannels = selector.selectNow();
10 
11     if(readyChannels == 0) continue;
12 
13 
14     Set<SelectionKey> selectedKeys = selector.selectedKeys();
15 
16     Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
17 
18     while(keyIterator.hasNext()) {
19 
20         SelectionKey key = keyIterator.next();
21 
22         if(key.isAcceptable()) {
23             // a connection was accepted by a ServerSocketChannel.
24 
25         } else if (key.isConnectable()) {
26             // a connection was established with a remote server.
27 
28         } else if (key.isReadable()) {
29             // a channel is ready for reading
30 
31         } else if (key.isWritable()) {
32             // a channel is ready for writing
33         }
34 
35         keyIterator.remove();
36     }
37 }
      

參考:

https://spring.io/reactive

https://www.jianshu.com/p/d47835316016

https://www.cnblogs.com/haimishasha/p/10756448.html

https://tech.meituan.com/2016/11/04/nio.html 

牆裂推薦Java NIO教程

http://tutorials.jenkov.com/java-nio/index.html

http://tutorials.jenkov.com/java-nio/selectors.html

http://tutorials.jenkov.com/java-nio/server-socket-channel.html