按照《unix網絡程式設計》的劃分,io模型可以分為:阻塞io、非阻塞io、io複用、信号驅動io和異步io,按照posix标準來劃分隻分為兩類:同步io和異步io。如何區分呢?首先一個io操作其實分成了兩個步驟:發起io請求和實際的io操作,同步io和異步io的差別就在于第二個步驟是否阻塞,如果實際的io讀寫阻塞請求程序,那麼就是同步io,是以阻塞io、非阻塞io、io服用、信号驅動io都是同步io,如果不阻塞,而是作業系統幫你做完io操作再将結果傳回給你,那麼就是異步io。阻塞io和非阻塞io的差別在于第一步,發起io請求是否會被阻塞,如果阻塞直到完成那麼就是傳統的阻塞io,如果不阻塞,那麼就是非阻塞io。
java nio 2.0的主要改進就是引入了異步io(包括檔案和網絡),這裡主要介紹下異步網絡io api的使用以及架構的設計,以tcp服務端為例。首先看下為了支援aio引入的新的類和接口:
java.nio.channels.asynchronouschannel
标記一個channel支援異步io操作。
java.nio.channels.asynchronousserversocketchannel
serversocket的aio版本,建立tcp服務端,綁定位址,監聽端口等。
java.nio.channels.asynchronoussocketchannel
面向流的異步socket channel,表示一個連接配接。
java.nio.channels.asynchronouschannelgroup
異步channel的分組管理,目的是為了資源共享。一個asynchronouschannelgroup綁定一個線程池,這個線程池執行兩個任務:處理io事件和派發completionhandler。asynchronousserversocketchannel建立的時候可以傳入一個 asynchronouschannelgroup,那麼通過asynchronousserversocketchannel建立的 asynchronoussocketchannel将同屬于一個組,共享資源。
java.nio.channels.completionhandler
異步io操作結果的回調接口,用于定義在io操作完成後所作的回調工作。aio的api允許兩種方式來處理異步操作的結果:傳回的future模式或者注冊completionhandler,我更推薦用completionhandler的方式,這些handler的調用是由 asynchronouschannelgroup的線程池派發的。顯然,線程池的大小是性能的關鍵因素。asynchronouschannelgroup允許綁定不同的線程池,通過三個靜态方法來建立:

public static asynchronouschannelgroup withfixedthreadpool(int nthreads,
threadfactory threadfactory)
throws ioexception
public static asynchronouschannelgroup withcachedthreadpool(executorservice executor,
int initialsize)
public static asynchronouschannelgroup withthreadpool(executorservice executor)
需要根據具體應用相應調整,從架構角度出發,需要暴露這樣的配置選項給使用者。
在介紹完了aio引入的tcp的主要接口和類之後,我們來設想下一個aio架構應該怎麼設計。參考非阻塞nio架構的設計,一般都是采用reactor模式,reacot負責事件的注冊、select、事件的派發;相應地,異步io有個proactor模式,proactor負責 completionhandler的派發,檢視一個典型的io寫操作的流程來看兩者的差別:
reactor: send(msg) -> 消息隊列是否為空,如果為空 -> 向reactor注冊op_write,然後傳回 -> reactor select -> 觸發writable,通知使用者線程去處理 ->先登出writable(很多人遇到的cpu 100%的問題就在于沒有登出),處理writeable,如果沒有完全寫入,繼續注冊op_write。注意到,寫入的工作還是使用者線程在處理。
proactor: send(msg) -> 消息隊列是否為空,如果為空,發起read異步調用,并注冊completionhandler,然後傳回。 -> 作業系統負責将你的消息寫入,并傳回結果(寫入的位元組數)給proactor -> proactor派發completionhandler。可見,寫入的工作是作業系統在處理,無需使用者線程參與。事實上在aio的api 中,asynchronouschannelgroup就扮演了proactor的角色。
completionhandler有三個方法,分别對應于處理成功、失敗、被取消(通過傳回的future)情況下的回調處理:

public interface completionhandler<v,a> {
void completed(v result, a attachment);
void failed(throwable exc, a attachment);
void cancelled(a attachment);
}
其中的泛型參數v表示io調用的結果,而a是發起調用時傳入的attchment。
第一步,建立一個asynchronousserversocketchannel,建立之前先建立一個 asynchronouschannelgroup,上文提到asynchronousserversocketchannel可以綁定一個 asynchronouschannelgroup,那麼通過這個asynchronousserversocketchannel建立的連接配接都将同屬于一個asynchronouschannelgroup并共享資源:

this.asynchronouschannelgroup = asynchronouschannelgroup
.withcachedthreadpool(executors.newcachedthreadpool(),
this.threadpoolsize);
然後初始化一個asynchronousserversocketchannel,通過open方法:

this.serversocketchannel = asynchronousserversocketchannel
.open(this.asynchronouschannelgroup);
通過nio 2.0引入的socketoption類設定一些tcp選項:

this.serversocketchannel
.setoption(
standardsocketoption.so_reuseaddr,true);
standardsocketoption.so_rcvbuf,16*1024);
綁定本地位址:

.bind(new inetsocketaddress("localhost",8080), 100);
其中的100用于指定等待連接配接的隊列大小(backlog)。完了嗎?還沒有,最重要的監聽工作還沒開始,監聽端口是為了等待連接配接上來以便accept産生一個asynchronoussocketchannel來表示一個建立立的連接配接,是以需要發起一個accept調用,調用是異步的,作業系統将在連接配接建立後,将最後的結果——asynchronoussocketchannel傳回給你:

public void pendingaccept() {
if (this.started && this.serversocketchannel.isopen()) {
this.acceptfuture = this.serversocketchannel.accept(null,
new acceptcompletionhandler());
} else {
throw new illegalstateexception("controller has been closed");
}
}
注意,重複的accept調用将會抛出pendingacceptexception,後文提到的read和write也是如此。accept方法的第一個參數是你想傳給completionhandler的attchment,第二個參數就是注冊的用于回調的completionhandler,最後傳回結果future<asynchronoussocketchannel>。你可以對future做處理,這裡采用更推薦的方式就是注冊一個completionhandler。那麼accept的completionhandler中做些什麼工作呢?顯然一個赤裸裸的
asynchronoussocketchannel是不夠的,我們需要将它封裝成session,一個session表示一個連接配接(mina裡就叫 iosession了),裡面帶了一個緩沖的消息隊列以及一些其他資源等。在連接配接建立後,除非你的伺服器隻準備接受一個連接配接,不然你需要在後面繼續調用pendingaccept來發起另一個accept請求:

private final class acceptcompletionhandler implements
completionhandler<asynchronoussocketchannel, object> {
@override
public void cancelled(object attachment) {
logger.warn("accept operation was canceled");
public void completed(asynchronoussocketchannel socketchannel,
object attachment) {
try {
logger.debug("accept connection from "
+ socketchannel.getremoteaddress());
configurechannel(socketchannel);
aiosessionconfig sessionconfig = buildsessionconfig(socketchannel);
session session = new aiotcpsession(sessionconfig,
aiotcpcontroller.this.configuration
.getsessionreadbuffersize(),
aiotcpcontroller.this.sessiontimeout);
session.start();
registersession(session);
} catch (exception e) {
e.printstacktrace();
logger.error("accept error", e);
notifyexception(e);
} finally {
<strong>pendingaccept</strong>();
}
public void failed(throwable exc, object attachment) {
logger.error("accept error", exc);
notifyexception(exc);
注意到了吧,我們在failed和completed方法中在最後都調用了pendingaccept來繼續發起accept調用,等待新的連接配接上來。有的同學可能要說了,這樣搞是不是遞歸調用,會不會堆棧溢出?實際上不會,因為發起accept調用的線程與completionhandler回調的線程并非同一個,不是一個上下文中,兩者之間沒有耦合關系。要注意到,completionhandler的回調共用的是 asynchronouschannelgroup綁定的線程池,是以千萬别在completionhandler回調方法中調用阻塞或者長時間的操作,例如sleep,回調方法最好能支援逾時,防止線程池耗盡。
連接配接建立後,怎麼讀和寫呢?回憶下在nonblocking nio架構中,連接配接建立後的第一件事是幹什麼?注冊op_read事件等待socket可讀。異步io也同樣如此,連接配接建立後馬上發起一個異步read調用,等待socket可讀,這個是session.start方法中所做的事情:

public class aiotcpsession {
protected void start0() {
pendingread();
protected final void pendingread() {
if (!isclosed() && this.asynchronoussocketchannel.isopen()) {
if (!this.readbuffer.hasremaining()) {
this.readbuffer = bytebufferutils
.increasebuffercapatity(this.readbuffer);
this.readfuture = this.asynchronoussocketchannel.read(
this.readbuffer, this, this.readcompletionhandler);
throw new illegalstateexception(
"session or channel has been closed");
asynchronoussocketchannel的read調用與asynchronousserversocketchannel的accept調用類似,同樣是非阻塞的,傳回結果也是一個future,但是寫的結果是整數,表示寫入了多少位元組,是以read調用傳回的是 future<integer>,方法的第一個參數是讀的緩沖區,作業系統将io讀到資料拷貝到這個緩沖區,第二個參數是傳遞給 completionhandler的attchment,第三個參數就是注冊的用于回調的completionhandler。這裡儲存了read的結果future,這是為了在關閉連接配接的時候能夠主動取消調用,accept也是如此。現在可以看看read的completionhandler的實作:

public final class readcompletionhandler implements
completionhandler<integer, abstractaiosession> {
private static final logger log = loggerfactory
.getlogger(readcompletionhandler.class);
protected final aiotcpcontroller controller;
public readcompletionhandler(aiotcpcontroller controller) {
this.controller = controller;
@override
public void cancelled(abstractaiosession session) {
log.warn("session(" + session.getremotesocketaddress()
+ ") read operation was canceled");
public void completed(integer result, abstractaiosession session) {
if (log.isdebugenabled())
log.debug("session(" + session.getremotesocketaddress()
+ ") read +" + result + " bytes");
if (result < 0) {
session.close();
return;
try {
if (result > 0) {
session.updatetimestamp();
session.getreadbuffer().flip();
session.decode();
session.getreadbuffer().compact();
} finally {
session.pendingread();
} catch (ioexception e) {
session.onexception(e);
session.close();
controller.checksessiontimeout();
public void failed(throwable exc, abstractaiosession session) {
log.error("session read error", exc);
session.onexception(exc);
session.close();
如果io讀失敗,會傳回失敗産生的異常,這種情況下我們就主動關閉連接配接,通過session.close()方法,這個方法幹了兩件事情:關閉channel和取消read調用:

if (null != this.readfuture) {
this.readfuture.cancel(true);
this.asynchronoussocketchannel.close();
在讀成功的情況下,我們還需要判斷結果result是否小于0,如果小于0就表示對端關閉了,這種情況下我們也主動關閉連接配接并傳回。如果讀到一定位元組,也就是result大于0的情況下,我們就嘗試從讀緩沖區中decode出消息,并派發給業務處理器的回調方法,最終通過pendingread繼續發起read調用等待socket的下一次可讀。可見,我們并不需要自己去調用channel來進行io讀,而是作業系統幫你直接讀到了緩沖區,然後給你一個結果表示讀入了多少位元組,你處理這個結果即可。而nonblocking
io架構中,是reactor通知使用者線程socket可讀了,然後使用者線程自己去調用read進行實際讀操作。這裡還有個需要注意的地方,就是decode出來的消息的派發給業務處理器工作最好交給一個線程池來處理,避免阻塞group綁定的線程池。
io寫的操作與此類似,不過通常寫的話我們會在session中關聯一個緩沖隊列來處理,沒有完全寫入或者等待寫入的消息都存放在隊列中,隊列為空的情況下發起write調用:

protected void write0(writemessage message) {
boolean needwrite = false;
synchronized (this.writequeue) {
needwrite = this.writequeue.isempty();
this.writequeue.offer(message);
}
if (needwrite) {
pendingwrite(message);
}
protected final void pendingwrite(writemessage message) {
message = preprocesswritemessage(message);
if (!isclosed() && this.asynchronoussocketchannel.isopen()) {
this.asynchronoussocketchannel.write(message.getwritebuffer(),
this, this.writecompletionhandler);
} else {
throw new illegalstateexception(
"session or channel has been closed");
write調用傳回的結果與read一樣是一個future<integer>,而write的completionhandler處理的核心邏輯大概是這樣:

@override
+ ") writen " + result + " bytes");
writemessage writemessage;
queue<writemessage> writequeue = session.getwritequeue();
synchronized (writequeue) {
writemessage = writequeue.peek();
if (writemessage.getwritebuffer() == null
|| !writemessage.getwritebuffer().hasremaining()) {
writequeue.remove();
if (writemessage.getwritefuture() != null) {
writemessage.getwritefuture().setresult(boolean.true);
}
try {
session.gethandler().onmessagesent(session,
writemessage.getmessage());
} catch (exception e) {
session.onexception(e);
writemessage = writequeue.peek();
if (writemessage != null) {
session.pendingwrite(writemessage);
compete方法中的result就是實際寫入的位元組數,然後我們判斷消息的緩沖區是否還有剩餘,如果沒有就将消息從隊列中移除,如果隊列中還有消息,那麼繼續發起write調用。
在引入了aio之後,java對于網絡層的支援已經非常完善,該有的都有了,java也已經成為伺服器開發的首選語言之一。java的弱項在于對記憶體的管理上,由于這一切都交給了gc,是以在高性能的網絡伺服器上還是cpp的天下。java這種單一堆模型比之erlang的程序内堆模型還是有差距,很難做到高效的垃圾回收和細粒度的記憶體管理。
這裡僅僅是介紹了aio開發的核心流程,對于一個網絡架構來說,還需要考慮逾時的處理、緩沖buffer的處理、業務層和網絡層的切分、可擴充性、性能的可調性以及一定的通用性要求。