為何要用Reactor
1BIO
下面是采用BIO的方式進行網絡連接配接
{
// 建立一個serverSocket對象,相當于伺服器,并且自己設定端口,最好設定1024以後
ServerSocket serverSocket = new ServerSocket(8888);
while (true){
// 調用accept方法監聽通路的Socket
Socket socket = serverSocket.accept();
System.out.println("接受到新socket...");
new Thread(
new Runnable() {
@Override
public void run() {
try {
// 從socket中讀資料
InputStream is = socket.getInputStream();
InputStreamReader isr = new InputStreamReader(is);
BufferedReader bs = new BufferedReader(isr);
String str = "";
while ((str = bs.readLine()) != null) {
System.out.println("我是伺服器,用戶端說:" + str);
}
// 關閉輸入流
socket.shutdownInput();
// ---->下面是伺服器響應用戶端
// 獲得輸出流
OutputStream os = socket.getOutputStream();
// 寫入資料
PrintWriter pw = new PrintWriter(os);
pw.write("歡迎您:" + new Date().toString());
pw.flush();
// 關閉輸出流資源
socket.shutdownOutput();
pw.close();
os.close();
// 關閉輸入流資源
bs.close();
isr.close();
is.close();
} catch (Exception e) {
e.printStackTrace();
}
}
})
.start();
}
}
其中出現的問題就是
- 同步阻塞IO,讀寫阻塞,線程等待時間過長
- 在制定線程政策的時候,隻能根據CPU的數目來限定可用線程資源,不能根據連接配接并發數目來制定,也就是連接配接有限制。否則很難保證對用戶端請求的高效和公平。
- 多線程之間的上下文切換,造成線程使用效率并不高,并且不易擴充
2NIO
{
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(10022));
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
if (next.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (next.isReadable()) {
SocketChannel socketChannel = (SocketChannel) next.channel();
// 配置設定指定大小緩沖區
ByteBuffer buffer = ByteBuffer.allocate(1024 * 5);
int len = 0;
while ((len = socketChannel.read(buffer)) > 0) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}
}
iterator.remove();
}
}
}
- 非阻塞的IO讀寫
- 基于IO事件進行分發任務,同時支援對多個fd的監聽
Reactor模式
請看參考裡的視訊,講的比較仔細
Handle(句柄或是描述符):本質上表示一種資源,是由作業系統提供的;該資源用于一個個的事件,比如說檔案描述符号,或事針對網絡程式設計中的Socket描述符。事件即可以來自外部,也可以來自内部;外部事件比如說用戶端的連接配接請求,用戶端發送過來的資料等;内部事件比如說作業系統的定時器事件等,。它本質上就是一個檔案描述符。Handle是事件産生的發源地。
Synchronous Event Demultiplexer(同步事件分離器):它本身是一個系統的調用,用于等待事件的發生(事件可能是一個,也可能是多個)。調用方在調用它的時候會阻塞,一直阻塞到同步事件分離器有事件産生為止。對于Linux來說,同步事件分離器指的就是常用的I/0多路複用機制,比如select、poll、epoll等。在Java NIO集合中,同步事件分離器對應的元件就是Selector;對應的阻塞方法就是select方法。
Event Handler(事件處理器):本身由多個回調方法構成,這些回調方法構成了與應用相關的對于某個事件的回報機制。Netty相比于Java NIO來說,在事件處理器這個角色進行了一個更新,它為我們開發者提供了大量的回調方法,供我們在特定的事件産生時實作相應的回調方法進行業務邏輯的處理。
Concrete Event Handler(具體事件處理器):是事件處理器的實作。它本身實作了事件處理器所提供的各個回調方法,進而實作了特定與業務的邏輯。它本質上就是我們所編寫的一個個處理器實作。
Initiation Dispatcher(初始分發器):事件上就是Reactor角色。他本身定義了一些規範,這些規範用于控制事件的排程方式,同時又提供了應用進行事件處理器的注冊、删除等裝置。它本身是整個事件處理器的核心所在,Initiation Dispatcher會通過同步事件分離器來等待事件的發生。一旦事件發生,Initiation Dispatcher首先會分理處每一個事件,然後調用事件處理器,最後調用相關的回調方法來處理這些事件。
Reactor模式的流程
1.當應用向Initiation Dispatcher 注冊具體的事件處理器時,應用會辨別出事件處理器希望Initiation Dispatcher 在某個事件發生時向其通知的該事件,該事件與Handle關聯。
2.Initiation Dispatcher會要求每個事件處理器向其傳遞内部的Handle。該Handle向作業系統辨別了事件處理器。
3.當所有的事件處理器注冊完畢後,應用會調用handle_events方法來啟動Initiation Dispatcher的事件循環。這時,Initiation Dispatcher會将每個注冊的事件管理器的Handle合并起來,并使用同步事件分離器等待這些事件的發生。比如,TCP協定層會使用select同步事件分離器操作來等待用戶端發送的資料到達連接配接的socket handle上。
4.當與某個事件源對應的Handle變為ready狀态時(比如說,TCP socket變為等待讀狀态時) ,同步事件分離器就會通知Initiation Dispatcher。
5. Initiation Dispatcher會觸發事件處理器的回調方法,進而響應這個處于ready狀态的Handle。當事件發生時,Initiation Dispatcher 會将被事件源激活的Handle作為key 來尋找并分發恰當的事件處理器回調方法。
6. Initiation Dispatcher會回調事件處理器的handle_ events回調方法來執行特定于應用的功能(開發者自己所編寫的功能),進而響應這個事件。所發生的事件類型可以作為該方法參數并被該方法内部使用來執行額外的特定于服務的分離與分發。
兩種主流Reactor圖檔比較
一般百度Reactor模式就是上面的兩張圖檔,其實兩張圖檔的内容差不多,現在從比較這兩張圖檔中進行分析這兩張圖檔。
左圖中的Initiation Dispatcher就是右邊的Reactor(mainReactor和subReactor)
左圖中的handle就是有圖中的Handle
左圖中的Synchronous Event Demultiplexer其實屬于Initiation Dispatcher的一部分,是以實際也相當于在右圖中的Reactor中。
左圖中的Event Handler對應右圖中的read和send
左圖中的Concrete Event Handler是Event Handler的實作。
Reactor與Netty的關系
下面是Netty伺服器端的一個HelloWorld
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
// 保持連接配接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// TimeClientHandler是自己定義的方法
socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast(new MyServerHandler());
}
});
// 綁定端口
ChannelFuture f = b.bind(8888).sync();
// 等待服務端監聽端口關閉
f.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
// 優雅關閉,釋放線程池資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
圖一
其中BossEventLoopGroup和WorkBossEventLoopGroup都是Initiation Dispatcher,裡面有一個Synchronous Event Demultiplexer(Selector)
Concrete Event Handler是Event Handler的實作類,則上面的代碼中new MyServerHandler()就是實作類,而SimpleChannelInboundHandler和SimpleChannelOutboundHandler就是接口,即Event Handler
圖二
其中BossEventLoopGroup和WorkBossEventLoopGroup就是上圖中的mainRector和subReactor,
acceptor其實也是一個Event Handler,在源碼中有顯示
//public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>
@Override
void init(Channel channel) throws Exception {
//省略
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//acceptor
//acceptor
//acceptor
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}