詳細介紹了Reactor網絡程式設計模型的概念。
此前,我們介紹了了Java中常見的四種網絡IO模型:Java 四種常見網絡IO模型以及select、poll、epoll函數的簡單介紹。現在我們來看看基于IO多路複用演變而來的Reactor網絡程式設計模型。
文章目錄
- 1 Reactor模型的介紹
- 2 單Reactor單線程模式
-
- 2.1 僞代碼
- 3 單Reactor多線程模式
-
- 3.1 僞代碼
- 4 多Reactor多線程模式
-
- 4.1 僞代碼
1 Reactor模型的介紹
常見的網絡程式設計模型并不是最基本的四種網絡IO模型,因為這涉及到了底層代碼的編寫,大佬們在基本網絡IO模型的基礎上采用面向對象的方式進行了進一步封裝,形成了更加易于了解的Reactor、Proactor、Acceptor-Connector等程式設計模型。Reactor模型是最常見的網絡程式設計模型,大名鼎鼎的Netty、Tomcat等架構或者軟體都是使用Reactor模型實作高并發、高性能的網絡通信。
Java并發程式設計之父Doug Lea早在多年之前就對Reactor模型進行了詳盡的闡述:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf,Java nio包中的
Selector
就是基于最簡單的Reactor模型實作的,在Doug Lea的文章中還給出了一些使用案例。
IO 多路複用模型我們此前就學過了,好處主要有兩個:一是該模型能夠在同一個線程内同時監聽多個IO請求,系統不必建立大量的線程,進而大大減小了系統的開銷。二是阻塞等待的方式能減少無效的系統調用,減少了對 CPU 資源的消耗。
Reactor模型是IO 多路複用模型的進一步面向對象的封裝,讓使用者不用考慮底層網絡 API 的細節,隻需要關注應用代碼的編寫。Reactor直譯過來就是反應器,這裡的反應是指對事件的反應:當IO多路複用程式監聽并收到事件通知之後,根據事件類型配置設定給不同的處理器處理,是以Reactor 模型也被稱為 Dispatcher (分派器)模型,或者稱為基于事件驅動的模型。
Reactor模型可以抽象出兩個重要元件:
- Reactor,專門用于監聽和響應各種IO事件,比如連接配接建立就緒(ACCEPT)、讀就緒(READ)、寫就緒(WRITE)等,當檢測到一個新的事件到來時,将其發送給相應的Handler去處理。
- Handler,專門用于處理特定的事件,比如讀取資料,業務邏輯執行,寫出響應等。
Reactor模型發展至今,包含了多種實作,常見的有單Reactor單線程模式、單Reactor多線程模式,多Reactor多線程模式。
2 單Reactor單線程模式
Doug Lea文章中給出的該模式的流程圖如下:
上圖中,Reactor用于監聽各種IO事件,并配置設定(dispatch)給特定的Handler,accepter元件專門用于處理建立連接配接事件,可以看做是一個特殊的Handler。
總體流程為:
- 服務端的Reactor 線程對象通過循環 select調用(IO 多路複用)監聽各種IO事件,還會注冊一個accepter事件處理器到Reactor中,accepter專用于處理建立連接配接事件。
- 用戶端首先發起一個建立連接配接的請求,Reactor監聽到ACCEPT事件的到來後将該ACCEPT事件分派給accepter元件,accepter通過accept()方法與用戶端建立對應的連接配接(SocketChannel),然後将該連接配接所關注的READ事件以及對應的READ事件處理器注冊到Reactor中,這樣Reactor就會監聽該連接配接的READ事件。
- 當Reactor監聽到該連接配接有讀或者寫事件發生時,将相關的事件派發給對應的處理器進行處理。比如,讀處理器會通過SocketChannel的read()方法直接讀取到資料,随後可進行各種業務處理,之後需要向用戶端發送資料時,也可以注冊該連接配接的WRITE事件和其對應的處理器,當channel可寫時,通過SocketChannel的wtite()方法寫資料。
- 每當處理完所有就緒的IO事件後,Reactor線程會再次執行select()操作阻塞等待新的事件就緒并将其分派給對應處理器進行處理。
單Reactor單線程模式的意思就是以上的Reactor和Hander的所有操作都是在同一個線程中完成的。上面的select、accept、read、wtite等調用以及業務邏輯處理,都是在同一個線程中完成的。
單Reactor單線程模式是最基礎的Reactor模型,實作起來比較簡單,由于是單線程,業務代碼編寫也不用考慮有任何的并發問題,Java的NIO模式的Selector底層其實就是最簡單的單Reactor單線程模式。
但是單Reactor單線程模式隻有一個線程工作,無法充分利用現代多核CPU的性能,并且如果某個client的業務邏輯耗時較長,将會造成後續其他client的請求阻塞執行。
因為Redis的業務處理主要都是在記憶體中完成,記憶體操作的速度很快,Redis性能瓶頸不在 CPU 上(在網絡IO的消耗以及記憶體上),加上這種模式實作起來也很簡單,是以Redis 6之前的對于指令的執行也是單Reactor單線程模型。
但是在Redis 6之後還是引入了多線程機制(多線程真香),但Redis 的多線程隻是在網絡IO資料的讀寫這類耗時操作上使用,降低了網絡IO帶來的性能損耗,而實際執行指令(Handler)仍然是單線程順序執行的,是以也不需要擔心Redis的線程安全問題。
2.1 僞代碼
Doug Lea文章中給出的單Reactor單線程模式的僞代碼如下:
Reactor:
/**
* Reactor
* 負責監聽并分發事件
*/
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException { //Reactor初始化
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
//socket設定為非阻塞
serverSocket.configureBlocking(false);
//注冊監聽accept事件
SelectionKey sk =
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//注冊一個Acceptor作為accept事件的回調
sk.attach(new Acceptor());
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
//循環調用select等到事件就緒
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
//Reactor負責dispatch收到的事件
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
//調用之前注冊的callback對象
if (r != null) {
r.run();
}
}
/**
* Acceptor
* 處理accept事件的回調函數
*/
class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel channel = serverSocket.accept();
if (channel != null) {
new Handler(selector, channel);
}
} catch (IOException ex) { /* ... */ }
}
}
}
Handler:
class Handler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final int MAXIN = 2048;
final int MAXOUT = 2048;
//配置設定緩沖區
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
// 預設不注冊任何事件
// 0表示對這個channel的任何事件都不感興趣,這樣會導緻永遠select不到這個channel
sk = channel.register(selector, 0);
//将目前Handler對象作為事件就緒時的callback對象
sk.attach(this);
//注冊Read就緒事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
boolean inputIsComplete() {
/* ... */
return false;
}
boolean outputIsComplete() {
/* ... */
return false;
}
void process() {
/* ... */
return;
}
@Override
public void run() {
try {
//處理讀就緒事件
if (state == READING) {
read();
//處理寫就緒事件
} else if (state == SENDING) {
send();
}
} catch (IOException ex) { /* ... */ }
}
/**
* 處理讀就緒事件
*/
void read() throws IOException {
//讀取資料
channel.read(input);
if (inputIsComplete()) {
//處理資料
process();
state = SENDING;
//資料處理完畢,需要寫資料
//開始監聽寫就緒事件
sk.interestOps(SelectionKey.OP_WRITE);
}
}
/**
* 處理寫就緒事件
*/
void send() throws IOException {
channel.write(output);
//write完就表示一次事件處理完畢結,關閉SelectionKey
if (outputIsComplete()) {
sk.cancel();
}
}
}
3 單Reactor多線程模式
為了克服單Reactor單線程模型下無法利用多核CPU的優勢以及可能因為某個請求的業務執行時間過長造成後續請求IO阻塞的問題,發展出了單Reactor多線程模型。
Doug Lea文章中給出的該模式的流程圖如下:
上圖中,單個Reactor線程用于監聽各種IO事件,并配置設定(dispatch)給特定的Handler,這一點和單Reactor單線程模型是一樣的,差別該改模型還添加了一個工作線程池,将非IO操作(除了read、send調用之外的業務操作)從Reactor線程中移出轉交給工作線程池來并發的執行。
總體流程為:
- 服務端的Reactor 線程對象通過循環 select調用(IO 多路複用)監聽各種IO事件,還會注冊一個accepter事件處理器到Reactor中,accepter專用于處理建立連接配接事件。
- 用戶端首先發起一個建立連接配接的請求,Reactor監聽到ACCEPT事件的到來後将該ACCEPT事件分派給accepter元件,accepter通過accept()方法與用戶端建立對應的連接配接(SocketChannel),然後将該連接配接所關注的READ事件以及對應的READ事件處理器注冊到Reactor中,這樣Reactor就會監聽該連接配接的READ事件。
- 當Reactor監聽到該連接配接有讀或者寫事件發生時,将相關的事件派發給對應的處理器進行處理。比如,讀處理器會通過SocketChannel的read()方法直接讀取到資料,随後可進行各種業務處理,之後需要向用戶端發送資料時,也可以注冊該連接配接的WRITE事件和其對應的處理器,當channel可寫時,通過SocketChannel的wtite()方法寫資料。
- 這裡和單Reactor單線程模型的不同點就是,Reactor線程隻負責Hander中的網絡IO調用,即read讀取資料和send發送資料調用,讀取到資料之後的處理,比如反序列化、執行業務邏輯、序列化等操作則是通過一個線程池來并行執行的。
- 每當處理完所有就緒的IO事件後,Reactor線程會再次執行select()操作阻塞等待新的事件就緒并将其分派給對應處理器進行處理。
該模式中,Handler處理時除了read和send調用之外的其他業務邏輯都是多線程執行的,這樣就可以讓Reactor線程更快的進行下一輪的select操作,提升了對于請求的IO響應速度,不至于因為一些耗時的業務邏輯而延遲對後面IO請求的處理。
該模式中能夠充分利用多核 CPU 性能,但是會帶來多線程并發的問題,對于業務邏輯的編寫需要特别注意共享資料的處理。
另外,雖然該模式下業務處理使用了異步執行,效率有所提升,但是仍然是采用單個 Reactor 線程承擔所有事件的監聽和基本IO操作,比如accept、read、send、connect操作,在面對瞬間到來的成百上千個連接配接這樣的高并發場景時,仍然會成為性能瓶頸。
3.1 僞代碼
Doug Lea文章中給出的單Reactor多線程模式的僞代碼如下:
Handler類變成了支援多線程處理業務的MthreadHandler,Reactor類沒有太大變化,在Acceptor中,new Handler變成了new MthreadHandler:
class MthreadHandler implements Runnable {
final SocketChannel channel;
final SelectionKey selectionKey;
final int MAXIN = 2048;
final int MAXOUT = 2048;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
/**
* 設定一個靜态線程池
*/
static ExecutorService pool = Executors.newFixedThreadPool(2);
static final int PROCESSING = 3;
MthreadHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
selectionKey = channel.register(selector, 0);
selectionKey.attach(this);
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
boolean inputIsComplete() {
/* ... */
return false;
}
boolean outputIsComplete() {
/* ... */
return false;
}
@Override
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException ex) { /* ... */ }
}
synchronized void read() throws IOException {
//接受資料
channel.read(input);
if (inputIsComplete()) {
state = PROCESSING;
/*
* 使用線程池中的線程異步的處理資料,執行業務邏輯
*
* 該調用執行之後Reactor線程可以馬上傳回,不需要等到業務執行完畢
*/
pool.execute(new Processer());
}
}
void send() throws IOException {
channel.write(output);
if (outputIsComplete()) {
selectionKey.cancel();
}
}
/**
* 異步任務
*/
class Processer implements Runnable {
@Override
public void run() {
processAndHandOff();
}
}
synchronized void processAndHandOff() {
//執行業務
process();
state = SENDING;
//資料處理完畢,需要寫資料
//開始監聽寫就緒事件
selectionKey.interestOps(SelectionKey.OP_WRITE);
}
void process() {
/* ... */
return;
}
}
4 多Reactor多線程模式
為了不讓單個Reactor成為性能瓶頸,我們可以繼續改造,将一個Reactor的功能拆分為“連接配接用戶端”和“與用戶端通信”兩部分,由不同的Reactor執行個體(多個Reactor線程)來共同完成,這就是多Reactor多線程模式,也被稱為Reactor主從多線程模式。
Doug Lea文章中給出的該模式的流程圖如下:
mainReactor擁有自己的Selector,通過 select 專門監控連接配接建立事件,事件準備就緒後通過 Acceptor 對象中的 accept 檢核與用戶端的連接配接,随後将新的連接配接配置設定給某個subReactor,subReactor也有自己的Selector,在subReactor中對該連接配接繼續進行監聽并執行其他事件,比如讀就緒和寫就緒事件,這樣就将Reactor的工作分為兩部分,這兩部分可以在獨立的線程中執行,進一步提升性能。
總體流程為:
- 服務端的mainReactor線程通過循環 select調用(IO 多路複用)監聽連接配接建立事件,還會注冊一個accepter事件處理器到Reactor中,accepter專用于處理建立連接配接事件。
- 用戶端首先發起一個建立連接配接的請求,mainReactor監聽到ACCEPT事件的到來後将該ACCEPT事件分派給accepter元件,accepter通過accept()方法與用戶端建立對應的連接配接(SocketChannel),然後将該連接配接配置設定給一個subReactor。随後mainReactor線程傳回,繼續執行下一輪的select監聽操作。
- subReactor也有自己的Selector,它會将該連接配接将所關注的READ事件以及對應的READ事件處理器注冊并通過select監聽該連接配接的READ事件。
- 當subReactor監聽到該連接配接有讀或者寫事件發生時,将相關的事件派發給對應的處理器進行處理。比如,讀處理器會通過SocketChannel的read()方法直接讀取到資料,随後可進行各種業務處理,之後需要向用戶端發送資料時,也可以注冊該連接配接的WRITE事件和其對應的處理器,當channel可寫時,通過SocketChannel的wtite()方法寫資料。
- subReactor線程隻負責Hander中的網絡IO調用,即read讀取資料和send發送資料調用,讀取到資料之後的處理,比如反序列化、執行業務邏輯、序列化等操作則是通過一個線程池來并行執行的。
- 每當處理完所有就緒的IO事件後,subReactor線程會再次執行select()操作阻塞等待新的事件就緒并将其分派給對應處理器進行處理。
多 Reactor 多線程模式中,mainReactor和subReactor都可以有多個,每一個都有自己的Selector,都在一個獨立的線程中工作,這樣進一步利用了多核CPU的多線程優勢,讓Reactor不會輕易成為性能瓶頸,提升了連接配接速度以及IO讀寫的速度。
但多 Reactor 多線程模式仍然不能從根源上解決耗時的IO操作對其他的client的影響,因為一個subReactor仍有可能對應多個client連接配接,為此,可以使用真正的異步IO模型演化而來的設計模式—Proactor模式來實作真正的異步IO。
Netty 和 Memcached 都是采用的多 Reactor 多線程模式。Nginx 也是采用多 Reactor 多程序模式。實際上Netty的多 Reactor 多線程模式實作更為簡單,subReactor處理read、write等IO操作的同時還處理業務的執行,即去掉了工作者線程池(Thread Pool),或者說SubReactor和Worker線程在同一個線程池中:
- mainReactor對應Netty中配置的BossGroup線程組,主要負責接受用戶端連接配接的建立。一般隻暴露一個服務端口,BossGroup線程組一般一個線程工作即可
- subReactor對應Netty中配置的WorkerGroup線程組,BossGroup線程組接受并建立完用戶端的連接配接後,将網絡socket轉交給WorkerGroup線程組,然後在WorkerGroup線程組内選擇一個線程,進行I/O的處理。WorkerGroup線程組主要處理I/O,一般設定2*CPU核數個線程。
Netty 可以通過配置的參數同時支援 Reactor 單線程模型、多線程模型,預設模式時上面的多 Reactor 多線程模式變體。
4.1 僞代碼
Doug Lea文章中給出的多Reactor多線程模式的僞代碼如下:
class MthreadReactor implements Runnable {
/**
* subReactors集合, 一個selector代表一個subReactor
*/
Selector[] selectors = new Selector[2];
int next = 0;
final ServerSocketChannel serverSocket;
/**
* mainSelector
*/
final Selector selector;
MthreadReactor(int port) throws IOException {
selector = Selector.open();
selectors[0] = Selector.open();
selectors[1] = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
//監聽accept事件
SelectionKey sk =
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//Acceptor用于建立連接配接
sk.attach(new Acceptor());
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
//Reactor負責dispatch收到的事件
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
//調用之前注冊的callback對象
if (r != null) {
r.run();
}
}
class Acceptor implements Runnable { // ...
@SneakyThrows
@Override
public synchronized void run() {
//mainSelector負責accept建立連接配接
try (SocketChannel connection = serverSocket.accept()) {
//連接配接建立之後将連接配接傳給一個subSelector,監聽read和write事件
if (connection != null) {
new Handler(selectors[next], connection); //選個subReactor去負責接收到的connection
}
}
if (++next == selectors.length) {
next = 0;
}
}
}
}
參考資料:
- Scalable IO in Java(Doug Lea)
- Reactor模式詳解
- 原來 8 張圖,就能學廢 Reactor 和 Proactor
如有需要交流,或者文章有誤,請直接留言。另外希望點贊、收藏、關注,我将不間斷更新各種Java學習部落格!