天天看點

java nio socket_Java Nio(三) - 用NIO實作高性能socketserver

socket server是基于TCP協定的C/S通信模式下,伺服器端的實作。一個socket server最主要的工作是處理網絡IO,同僚,高效處理網絡IO也是一個socket server最重要的性能名額。

本文會用java NIO架構實作一個同步非阻塞的socket server。

最基礎的結構

首先從最基礎的考慮。一個socket server最簡單最經典的實作必然是多線程阻塞的版本:

主線程監聽socket端口(阻塞),每當有新客戶連接配接,為這個使用者單獨建立一個線程,并在新線程裡處理業務邏輯:

class Server implements Runnable {

public void run() {

try {

ServerSocket ss = new ServerSocket(PORT);

while (!Thread.interrupted())

new Thread(new Handler(ss.accept())).start();

// or, single-threaded, or a thread pool

} catch (IOException ex) { }

}

}

class Handler implements Runnable {

final Socket socket;

Handler(Socket s) { socket = s; }

public void run() {

try {

byte[] input = new byte[MAX_INPUT];

socket.getInputStream().read(input);

byte[] output = process(input);

socket.getOutputStream().write(output);

} catch (IOException ex) { }

}

private byte[] process(byte[] cmd) { }

}

這種I/O模型的主要缺點是:線程不是免費的。作業系統配置設定給每個程序的最大線程數是有限的,在高并發的情況下,伺服器會因為不能建立新線程而不能響應請求。

這時,演變成線程池版本的多線程伺服器:

public class ExecutorServiceServer implements Runnable {

public void run() throws IOException {

ServerSocket ss = new ServerSocket(PORT);

ExecutorService service = Executors.newFixedThreadPool(MAX_POOL_SIZE);

while (true) {

Socket s = ss.accept();

service.submit(new Handler(s));

}

}

}

我們解決了線程數不足的問題,但是伺服器的IO模型依然是:one-thread-per-client,而且,每個線程都是阻塞的。這種I/O模型的主要缺點是:切換線程上下文的開銷。我們可以看到server的ss.accept()和handler的socket.getInputStream().read(input)都是阻塞調用。最糟糕的情況是:每個client都不是經常讀寫data,這樣,大部分線程都會阻塞在read或write方法上,但CPU可不管這個線程是否正在阻塞,它依然公平的給每個線程配置設定時間。 這樣,大部分CPU時間都會浪費在等待阻塞調用上。

為了解決這種無謂的上下文切換帶來的開銷,我們需要非阻塞IO。

Reactor模式下的socket server

Reactor模式是一種事件驅動的IO相關的設計模式。上一篇文章已經介紹了它是如何工作的。用Reactor實作socket server,裡面的類會有一點變種,先看圖:

java nio socket_Java Nio(三) - 用NIO實作高性能socketserver

我稍微搬運一下這個slide的代碼,分類結合代碼說說:

Reactor

reactor的職責沒變,還是使用JAVA NIO包中的selector監聽IO事件,然後分發到指定的handler。

public class Reactor implements Runnable {

final Selector selector;

final ServerSocketChannel serverSocketChannel;

Reactor(int port) throws IOException {

selector = Selector.open();

serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(port));

serverSocketChannel.configureBlocking(false);

SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

selectionKey0.attach(new Acceptor());

}

public void run() {

try {

while (!Thread.interrupted()) {

selector.select();

Set selected = selector.selectedKeys();

Iterator it = selected.iterator();

while (it.hasNext()) {

dispatch((SelectionKey) (it.next()));

}

selected.clear();

}

} catch (IOException ex) {

ex.printStackTrace();

}

}

void dispatch(SelectionKey k) {

Runnable r = (Runnable) (k.attachment());

if (r != null) {

r.run();

}

}

}

可以看到,selector和handler是運作在同一線程的。reactor調用selector.select();,然後根據key.attachment()找到handler,調用run方法。

第二點是,我們看reactor的構造方法:

SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

selectionKey0.attach(new Acceptor());

Acceptor的實作是:

public class Reactor implements Runnable {

class Acceptor implements Runnable {

public void run() {

try {

SocketChannel socketChannel = serverSocketChannel.accept();

if (socketChannel != null) {

new Handler(selector, socketChannel);

}

System.out.println("Connection Accepted by Reactor");

} catch (IOException ex) {

ex.printStackTrace();

}

}

}

}

我們需要把ServerSocketChannel注冊到selector中,并用Accpeter處理它。

SelectionKey 0表示這種注冊關系:它告訴selector使用Accpeter處理ServerSocketChannel的OP_ACCEPT事件:當一個client請求連接配接時,ServerSocketChannel會觸發一個IO事件(OP_ACCEPT),此時,selector的select方法會找到selectionKey0,進而找到Accpeter。

Accpeter的職責是:為新接收的socketChannel配置設定一個handler,也就是one-handler-per-client:

new Handler(selector, socketChannel);

我們即将看到handler做了什麼。

Handler

Handler的構造函數如下:

public class Handler implements Runnable {

final SocketChannel socketChannel;

final SelectionKey selectionKey;

ByteBuffer input = ByteBuffer.allocate(1024);

static final int READING = 0, SENDING = 1;

int state = READING;

String clientName = "";

Handler(Selector selector, SocketChannel c) throws IOException {

socketChannel = c;

c.configureBlocking(false);

selectionKey = socketChannel.register(selector, 0);

selectionKey.attach(this);

selectionKey.interestOps(SelectionKey.OP_READ);

selector.wakeup();

}

}

Handler的構造過程實際上是向selector注冊一個socketChannel和一個handler。

注冊之後,下一次當selector.select()傳回這個selectionKey時,就會找到這個handler,執行它的處理邏輯:

public class Handler implements Runnable {

public void run() {

try {

if (state == READING) {

read();

} else if (state == SENDING) {

send();

}

} catch (IOException ex) {

ex.printStackTrace();

}

}

}

整個過程就是:一個連接配接過來,先觸發ACCEPT事件,reactor會分發給Acceptor,Acceptor調用new Handler(),把一個handler配置設定給這個socketChannel,并把兩者注冊到reactor中。保證這個handler和socketChannel的通信能被reactor配置設定。

以上就是一個reactor模式下的socketserver的基本實作。

為了追求更高的性能,這個模型還有一些變種:比如多線程運作handler,主從reactor,多線程運作reactor等。都比較複雜,你可以在參考一欄看到相關描述。

參考

http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

http://jeewanthad.blogspot.hk/2013/02/reactor-pattern-explained-part-1.html