之前講了Reactor單線程模型的優缺點以及實作,接下來看看Reactor模型的優化和改進
這裡有兩步改進,先貼出第一次改進的圖(借用),這兩張圖是一個意思,提出一個線程來接收請求以及讀寫,然後由線程池來處理具體的業務,具體的代碼就不貼的,下面貼出最終優化的代碼,這個部分相對簡單,隻需要一個selector負責注冊接收,然後交給Worker Thread執行就行了

進一步的改進就是,用一個線程池負責接收線程,用一個線程池來處理讀寫(負載均衡的選擇selector),每個selector中的線程池來處理業務,這樣就達到了需要的效果了n個selector,每個selector m個業務線程,感覺太能造了。
貼出代碼如下,
//主從Reactor的抽象方法
public abstract class AbstractReactor implements Runnable {
@Override
public void run() {
}
protected void dispatch(SelectionKey key){
try {
Runnable task = (Runnable) key.attachment();
if (task != null){
task.run();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
這是主Reactor,用于接收事件
//用于接收的MainReactor
public class MainReactor extends AbstractReactor implements Runnable {
//多路複用器
private Selector selector;
public MainReactor(ServerSocketChannel serverSocketChannel,int port) {
try {
this.selector =Selector.open();
//綁定端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
//配置非阻塞模式
serverSocketChannel.configureBlocking(false);
SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//依附處理Handler
key.attach(new Acceptor(serverSocketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
while(!Thread.interrupted()){
selector.select();
//周遊準備好的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
//分發任務
dispatch(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class SubReactor extends AbstractReactor implements Runnable {
private Selector selector;
public SubReactor(Selector selector) {
this.selector = selector;
}
private boolean register = false;
@Override
public void run() {
while (!Thread.interrupted()){
System.out.println("等待注冊中。。。。。。");
//接收事件
while(!Thread.interrupted()){
try {
//如果沒有事件準備好,繼續監聽,
if (selector.select() == 0){
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
//分發任務
dispatch(key);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
public Selector getSelector() {
return selector;
}
}
處理事件的Handler
public class AsyncHandler implements Runnable{
/**
* 被選中的selector
*/
private Selector selector;
/**
* task socketChannel
*/
private SocketChannel socketChannel;
private SelectionKey selectionKey;
ExecutorService workers = Executors.newFixedThreadPool(5);
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);
private final static int READ = 0;
private final static int SEND = 1;
private final static int PROCESSING = 2;
private int status = READ;
public AsyncHandler(SocketChannel socketChannel, Selector selector) {
this.selector = selector;
this.socketChannel = socketChannel;
try {
socketChannel.configureBlocking(false);
selectionKey = socketChannel.register(selector,0);
} catch (IOException e) {
e.printStackTrace();
}
selectionKey.attach(this);
selectionKey.interestOps(SelectionKey.OP_READ);
}
//建立用來處理業務的線程池
@Override
public void run() {
//對事件的處理
switch (status){
case READ:
//讀事件準備好
read();
break;
case SEND:
//發送事件準備好
send();
break;
default:
}
}
private void send() {
if (selectionKey.isValid()){
status = PROCESSING;
workers.execute(this::sendWorker);
selectionKey.interestOps(SelectionKey.OP_READ);
}
}
private void read() {
if (selectionKey.isValid()){
readBuffer.clear();
try {
int count = socketChannel.read(readBuffer);
if (count > 0){
status = PROCESSING;
workers.execute(this::readWorker);
}else{
selectionKey.cancel();
socketChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
selectionKey.cancel();
try {
socketChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
//讀入資訊後的業務處理
private void readWorker () {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("收到來自用戶端的消息: %s",
new String(readBuffer.array())));
status = SEND;
selectionKey.interestOps(SelectionKey.OP_WRITE); //注冊寫事件
this.selector.wakeup(); //喚醒阻塞在select的線程,因為該interestOps寫事件是放到子線程的,select在該channel還是對read事件感興趣時又被調用,是以如果不主動喚醒,select可能并不會立刻select該讀就緒事件(在該例中,可能永遠不會被select到)
}
private void sendWorker() {
try {
sendBuffer.clear();
sendBuffer.put(String.format("我收到來自%s的資訊辣:%s, 200ok;",
socketChannel.getRemoteAddress(),
new String(readBuffer.array())).getBytes());
sendBuffer.flip();
int count = socketChannel.write(sendBuffer);
if (count < 0) {
selectionKey.cancel();
socketChannel.close();
System.out.println("send時-------連接配接關閉");
} else {
//再次切換到讀
status = READ;
}
} catch (IOException e) {
System.err.println("異步處理send業務時發生異常!異常資訊:" + e.getMessage());
selectionKey.cancel();
try {
socketChannel.close();
} catch (IOException e1) {
System.err.println("異步處理send業務關閉通道時發生異常!異常資訊:" + e.getMessage());
}
}
}
}
//隻負責接收請求的元件
public class Acceptor implements Runnable {
private int coreSize = Runtime.getRuntime().availableProcessors();
private ServerSocketChannel serverSocketChannel;
private Selector[] selectors = new Selector[coreSize];
private SubReactor[] reactors = new SubReactor[coreSize];
private Thread[] threads = new Thread[coreSize];
private AtomicInteger index = new AtomicInteger();
public Acceptor(ServerSocketChannel serverSocketChannel) throws IOException {
this.serverSocketChannel = serverSocketChannel;
for (int i = 0; i < coreSize; i++) {
selectors[i] = Selector.open();
reactors[i] = new SubReactor(selectors[i]);
threads[i] = new Thread(reactors[i]);
threads[i].start();
}
}
@Override
public void run() {
SocketChannel socketChannel;
try {
socketChannel = serverSocketChannel.accept();
if (socketChannel != null){
System.out.println(String.format("收到%s的連接配接",socketChannel.getRemoteAddress()));
socketChannel.configureBlocking(false);
Selector selector = getSeletor();
selector.wakeup();
SelectionKey key = socketChannel.register(selector,SelectionKey.OP_READ);
key.attach(new AsyncHandler(socketChannel,selector));
}
} catch (IOException e) {
e.printStackTrace();
}
}
public Selector getSeletor(){
return selectors[Math.abs(index.getAndIncrement() % selectors.length)];
}