天天看点

Reactor模式与NIOReactor模式与NIO

文章目录

  • Reactor模式与NIO
    • 简单的NIO编程方式
    • Reactor模式
      • 单线程Reactor模式
      • 多线程Reactor模式
      • 主从Reactor多线程模型

Reactor模式与NIO

NIO是非阻塞IO,而Reactor是基于NIO的一种设计模式。

简单的NIO编程方式

简单的NIO编程方式其实是以面向过程的方式编写代码的。

public class ChargenServer {

	public static void main(String[] args) {
		byte[] rotation = new byte[95*2];
		for(byte i=' '; i<='~'; i++) {
			rotation[i-' '] = i;
			rotation[i+95-' '] = i;
		}
		
		Selector selector;
		try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
			serverSocketChannel.bind(new InetSocketAddress(19));
			serverSocketChannel.configureBlocking(false);
			selector = Selector.open();
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
		}catch(IOException ex) {
			ex.printStackTrace();
			return;
		}
		
		while(true) {
			try {
				selector.select();
			} catch (IOException e) {
				e.printStackTrace();
				break;
			}
			Set<SelectionKey> readyKeys = selector.selectedKeys();
			Iterator<SelectionKey> iterator = readyKeys.iterator();
			while(iterator.hasNext()) {
				SelectionKey key = iterator.next();
				iterator.remove();
				try {
					if(key.isAcceptable()) {
						ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
						SocketChannel clientSocketChannel = serverSocketChannel.accept();
						clientSocketChannel.configureBlocking(false);
						SelectionKey keyTemp = clientSocketChannel.register(selector, SelectionKey.OP_WRITE);
						ByteBuffer buffer = ByteBuffer.allocate(74);
						buffer.put(rotation, 0, 72);
						buffer.put((byte)'\r');
						buffer.put((byte)'\n');
						buffer.flip();
						keyTemp.attach(buffer);
					}else if(key.isWritable()) {
						SocketChannel clientChannel = (SocketChannel)key.channel();
						ByteBuffer buffer = (ByteBuffer)key.attachment();
						if(!buffer.hasRemaining()) {
							buffer.rewind();
							int first = buffer.get();
							buffer.rewind();
							int position = first - ' '+1;
							buffer.put(rotation,position, 72);
							buffer.put((byte)'\r');
							buffer.put((byte)'\n');
							buffer.flip();
						}
						clientChannel.write(buffer);
					}
				} catch (IOException e) {
					key.cancel();
					try {
						key.channel().close();
					} catch (IOException e1) {
						e1.printStackTrace();
					}
					e.printStackTrace();
				}
			}
		}
	}
}
           

这种编程方式有以下几个问题:

  1. 职责不明确。
  2. 不利于维护与扩展。

Reactor模式

该模式抽象出两个组件:Reactor线程和Handler处理器。

Reactor线程:负责响应IO事件,并分发到Handler处理器。新的事件包含连接建立就绪、读就绪、写就绪等。

Handler处理器:执行非阻塞的操作。

Reactor模式又分为以下三种模式。

单线程Reactor模式

这是最简单的一种Reactor模式。一个线程完成socket连接以及IO处理。

Reactor模式与NIOReactor模式与NIO

缺点:当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。不能充分利用多核资源。

一个线程负载过重后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了 NIO 线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈。

一旦 NIO 线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。

多线程Reactor模式

有一个专门的Acceptor线程用于监听服务端,接收客户端的TCP连接请求。一方面根据实际情况用于匹配调节CPU处理与IO读写的效率,提高系统资源的利用率,另一方面在静态或动态构造中每个反应器线程都包含对应的Selector,Thread,dispatchloop。

有一个专门处理读写的线程池。包含一个任务队列和N个可用的线程。由这些线程处理消息的读取、编解码、写出、业务处理。1个线程可以同时处理N个链路,但1个链路只对应1个NIO线程,防止发生并发操作问题。

Reactor模式与NIOReactor模式与NIO

可能存在的问题:一个Acceptor线程负责监听和处理所有的客户端连接可能存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但认证本身非常损耗性能(如HTTPS),这这类场景下,单独一个Acceptor线程可能会存在性能不足问题。

主从Reactor多线程模型

服务端用于接收客户端连接的不再是个 1 个单独的 NIO 线程,而是一个的 NIO 线程池。解决一个Acceptor线程无法处理高并发客户端连接的性能问题。

Reactor模式与NIOReactor模式与NIO

它的工作流程如下:

  1. 从Acceptor线程池中随机选择一个线程作为acceptor线程,用于绑定监听端口,接收客户端连接。
  2. Acceptor 线程接收客户端连接请求之后创建新的 SocketChannel,将其注册到主线程池的其它 Reactor 线程上,由其负责接入认证、IP 黑白名单过滤、握手等操作;
  3. 步骤 2 完成之后,业务层的链路正式建立,将 SocketChannel 从主线程池的 Reactor 线程的多路复用器上摘除,重新注册到 Sub 线程池的线程上,用于处理 I/O 的读写操作。