socket server是基于TCP協定的C/S通信模式下,伺服器端的實作。一個socket server最主要的工作是處理網絡IO,同僚,高效處理網絡IO也是一個socket server最重要的性能名額。
本文會用java NIO架構實作一個同步非阻塞的socket server。
最基礎的結構
首先從最基礎的考慮。一個socket server最簡單最經典的實作必然是多線程阻塞的版本:
主線程監聽socket端口(阻塞),每當有新客戶連接配接,為這個使用者單獨建立一個線程,并在新線程裡處理業務邏輯:
class Server implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
} catch (IOException ex) { }
}
}
class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { }
}
private byte[] process(byte[] cmd) { }
}
這種I/O模型的主要缺點是:線程不是免費的。作業系統配置設定給每個程序的最大線程數是有限的,在高并發的情況下,伺服器會因為不能建立新線程而不能響應請求。
這時,演變成線程池版本的多線程伺服器:
public class ExecutorServiceServer implements Runnable {
public void run() throws IOException {
ServerSocket ss = new ServerSocket(PORT);
ExecutorService service = Executors.newFixedThreadPool(MAX_POOL_SIZE);
while (true) {
Socket s = ss.accept();
service.submit(new Handler(s));
}
}
}
我們解決了線程數不足的問題,但是伺服器的IO模型依然是:one-thread-per-client,而且,每個線程都是阻塞的。這種I/O模型的主要缺點是:切換線程上下文的開銷。我們可以看到server的ss.accept()和handler的socket.getInputStream().read(input)都是阻塞調用。最糟糕的情況是:每個client都不是經常讀寫data,這樣,大部分線程都會阻塞在read或write方法上,但CPU可不管這個線程是否正在阻塞,它依然公平的給每個線程配置設定時間。 這樣,大部分CPU時間都會浪費在等待阻塞調用上。
為了解決這種無謂的上下文切換帶來的開銷,我們需要非阻塞IO。
Reactor模式下的socket server
Reactor模式是一種事件驅動的IO相關的設計模式。上一篇文章已經介紹了它是如何工作的。用Reactor實作socket server,裡面的類會有一點變種,先看圖:

我稍微搬運一下這個slide的代碼,分類結合代碼說說:
Reactor
reactor的職責沒變,還是使用JAVA NIO包中的selector監聽IO事件,然後分發到指定的handler。
public class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocketChannel;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey0.attach(new Acceptor());
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null) {
r.run();
}
}
}
可以看到,selector和handler是運作在同一線程的。reactor調用selector.select();,然後根據key.attachment()找到handler,調用run方法。
第二點是,我們看reactor的構造方法:
SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey0.attach(new Acceptor());
Acceptor的實作是:
public class Reactor implements Runnable {
class Acceptor implements Runnable {
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
new Handler(selector, socketChannel);
}
System.out.println("Connection Accepted by Reactor");
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
我們需要把ServerSocketChannel注冊到selector中,并用Accpeter處理它。
SelectionKey 0表示這種注冊關系:它告訴selector使用Accpeter處理ServerSocketChannel的OP_ACCEPT事件:當一個client請求連接配接時,ServerSocketChannel會觸發一個IO事件(OP_ACCEPT),此時,selector的select方法會找到selectionKey0,進而找到Accpeter。
Accpeter的職責是:為新接收的socketChannel配置設定一個handler,也就是one-handler-per-client:
new Handler(selector, socketChannel);
我們即将看到handler做了什麼。
Handler
Handler的構造函數如下:
public class Handler implements Runnable {
final SocketChannel socketChannel;
final SelectionKey selectionKey;
ByteBuffer input = ByteBuffer.allocate(1024);
static final int READING = 0, SENDING = 1;
int state = READING;
String clientName = "";
Handler(Selector selector, SocketChannel c) throws IOException {
socketChannel = c;
c.configureBlocking(false);
selectionKey = socketChannel.register(selector, 0);
selectionKey.attach(this);
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
}
Handler的構造過程實際上是向selector注冊一個socketChannel和一個handler。
注冊之後,下一次當selector.select()傳回這個selectionKey時,就會找到這個handler,執行它的處理邏輯:
public class Handler implements Runnable {
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
整個過程就是:一個連接配接過來,先觸發ACCEPT事件,reactor會分發給Acceptor,Acceptor調用new Handler(),把一個handler配置設定給這個socketChannel,并把兩者注冊到reactor中。保證這個handler和socketChannel的通信能被reactor配置設定。
以上就是一個reactor模式下的socketserver的基本實作。
為了追求更高的性能,這個模型還有一些變種:比如多線程運作handler,主從reactor,多線程運作reactor等。都比較複雜,你可以在參考一欄看到相關描述。
參考
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
http://jeewanthad.blogspot.hk/2013/02/reactor-pattern-explained-part-1.html