天天看點

Java aio(異步網絡IO)初探

按照《unix網絡程式設計》的劃分,io模型可以分為:阻塞io、非阻塞io、io複用、信号驅動io和異步io,按照posix标準來劃分隻分為兩類:同步io和異步io。如何區分呢?首先一個io操作其實分成了兩個步驟:發起io請求和實際的io操作,同步io和異步io的差別就在于第二個步驟是否阻塞,如果實際的io讀寫阻塞請求程序,那麼就是同步io,是以阻塞io、非阻塞io、io服用、信号驅動io都是同步io,如果不阻塞,而是作業系統幫你做完io操作再将結果傳回給你,那麼就是異步io。阻塞io和非阻塞io的差別在于第一步,發起io請求是否會被阻塞,如果阻塞直到完成那麼就是傳統的阻塞io,如果不阻塞,那麼就是非阻塞io。

   java nio 2.0的主要改進就是引入了異步io(包括檔案和網絡),這裡主要介紹下異步網絡io api的使用以及架構的設計,以tcp服務端為例。首先看下為了支援aio引入的新的類和接口:

 java.nio.channels.asynchronouschannel

       标記一個channel支援異步io操作。

 java.nio.channels.asynchronousserversocketchannel

       serversocket的aio版本,建立tcp服務端,綁定位址,監聽端口等。

 java.nio.channels.asynchronoussocketchannel

       面向流的異步socket channel,表示一個連接配接。

 java.nio.channels.asynchronouschannelgroup

       異步channel的分組管理,目的是為了資源共享。一個asynchronouschannelgroup綁定一個線程池,這個線程池執行兩個任務:處理io事件和派發completionhandler。asynchronousserversocketchannel建立的時候可以傳入一個 asynchronouschannelgroup,那麼通過asynchronousserversocketchannel建立的 asynchronoussocketchannel将同屬于一個組,共享資源。

 java.nio.channels.completionhandler

       異步io操作結果的回調接口,用于定義在io操作完成後所作的回調工作。aio的api允許兩種方式來處理異步操作的結果:傳回的future模式或者注冊completionhandler,我更推薦用completionhandler的方式,這些handler的調用是由 asynchronouschannelgroup的線程池派發的。顯然,線程池的大小是性能的關鍵因素。asynchronouschannelgroup允許綁定不同的線程池,通過三個靜态方法來建立:

Java aio(異步網絡IO)初探

public static asynchronouschannelgroup withfixedthreadpool(int nthreads,  

                                                              threadfactory threadfactory)  

       throws ioexception  

public static asynchronouschannelgroup withcachedthreadpool(executorservice executor,  

                                                               int initialsize)  

public static asynchronouschannelgroup withthreadpool(executorservice executor)  

     需要根據具體應用相應調整,從架構角度出發,需要暴露這樣的配置選項給使用者。

     在介紹完了aio引入的tcp的主要接口和類之後,我們來設想下一個aio架構應該怎麼設計。參考非阻塞nio架構的設計,一般都是采用reactor模式,reacot負責事件的注冊、select、事件的派發;相應地,異步io有個proactor模式,proactor負責 completionhandler的派發,檢視一個典型的io寫操作的流程來看兩者的差別:

     reactor:  send(msg) -> 消息隊列是否為空,如果為空  -> 向reactor注冊op_write,然後傳回 -> reactor select -> 觸發writable,通知使用者線程去處理 ->先登出writable(很多人遇到的cpu 100%的問題就在于沒有登出),處理writeable,如果沒有完全寫入,繼續注冊op_write。注意到,寫入的工作還是使用者線程在處理。

     proactor: send(msg) -> 消息隊列是否為空,如果為空,發起read異步調用,并注冊completionhandler,然後傳回。 -> 作業系統負責将你的消息寫入,并傳回結果(寫入的位元組數)給proactor -> proactor派發completionhandler。可見,寫入的工作是作業系統在處理,無需使用者線程參與。事實上在aio的api 中,asynchronouschannelgroup就扮演了proactor的角色。

    completionhandler有三個方法,分别對應于處理成功、失敗、被取消(通過傳回的future)情況下的回調處理:

Java aio(異步網絡IO)初探

public interface completionhandler<v,a> {  

     void completed(v result, a attachment);  

    void failed(throwable exc, a attachment);  

    void cancelled(a attachment);  

}  

    其中的泛型參數v表示io調用的結果,而a是發起調用時傳入的attchment。

    第一步,建立一個asynchronousserversocketchannel,建立之前先建立一個 asynchronouschannelgroup,上文提到asynchronousserversocketchannel可以綁定一個 asynchronouschannelgroup,那麼通過這個asynchronousserversocketchannel建立的連接配接都将同屬于一個asynchronouschannelgroup并共享資源:

Java aio(異步網絡IO)初探

this.asynchronouschannelgroup = asynchronouschannelgroup  

                    .withcachedthreadpool(executors.newcachedthreadpool(),  

                            this.threadpoolsize);  

     然後初始化一個asynchronousserversocketchannel,通過open方法:

Java aio(異步網絡IO)初探

this.serversocketchannel = asynchronousserversocketchannel  

                .open(this.asynchronouschannelgroup);  

    通過nio 2.0引入的socketoption類設定一些tcp選項:

Java aio(異步網絡IO)初探

this.serversocketchannel  

                    .setoption(  

                            standardsocketoption.so_reuseaddr,true);  

                            standardsocketoption.so_rcvbuf,16*1024);  

    綁定本地位址:

Java aio(異步網絡IO)初探

                    .bind(new inetsocketaddress("localhost",8080), 100);  

    其中的100用于指定等待連接配接的隊列大小(backlog)。完了嗎?還沒有,最重要的監聽工作還沒開始,監聽端口是為了等待連接配接上來以便accept産生一個asynchronoussocketchannel來表示一個建立立的連接配接,是以需要發起一個accept調用,調用是異步的,作業系統将在連接配接建立後,将最後的結果——asynchronoussocketchannel傳回給你:

Java aio(異步網絡IO)初探

public void pendingaccept() {  

        if (this.started && this.serversocketchannel.isopen()) {  

            this.acceptfuture = this.serversocketchannel.accept(null,  

                    new acceptcompletionhandler());  

        } else {  

            throw new illegalstateexception("controller has been closed");  

        }  

    }  

   注意,重複的accept調用将會抛出pendingacceptexception,後文提到的read和write也是如此。accept方法的第一個參數是你想傳給completionhandler的attchment,第二個參數就是注冊的用于回調的completionhandler,最後傳回結果future<asynchronoussocketchannel>。你可以對future做處理,這裡采用更推薦的方式就是注冊一個completionhandler。那麼accept的completionhandler中做些什麼工作呢?顯然一個赤裸裸的

asynchronoussocketchannel是不夠的,我們需要将它封裝成session,一個session表示一個連接配接(mina裡就叫 iosession了),裡面帶了一個緩沖的消息隊列以及一些其他資源等。在連接配接建立後,除非你的伺服器隻準備接受一個連接配接,不然你需要在後面繼續調用pendingaccept來發起另一個accept請求:

Java aio(異步網絡IO)初探

private final class acceptcompletionhandler implements  

            completionhandler<asynchronoussocketchannel, object> {  

        @override  

        public void cancelled(object attachment) {  

            logger.warn("accept operation was canceled");  

        public void completed(asynchronoussocketchannel socketchannel,  

                object attachment) {  

            try {  

                logger.debug("accept connection from "  

                        + socketchannel.getremoteaddress());  

                configurechannel(socketchannel);  

                aiosessionconfig sessionconfig = buildsessionconfig(socketchannel);  

                session session = new aiotcpsession(sessionconfig,  

                        aiotcpcontroller.this.configuration  

                                .getsessionreadbuffersize(),  

                        aiotcpcontroller.this.sessiontimeout);  

                session.start();  

                registersession(session);  

            } catch (exception e) {  

                e.printstacktrace();  

                logger.error("accept error", e);  

                notifyexception(e);  

            } finally {  

                <strong>pendingaccept</strong>();  

            }  

        public void failed(throwable exc, object attachment) {  

            logger.error("accept error", exc);  

                notifyexception(exc);  

    注意到了吧,我們在failed和completed方法中在最後都調用了pendingaccept來繼續發起accept調用,等待新的連接配接上來。有的同學可能要說了,這樣搞是不是遞歸調用,會不會堆棧溢出?實際上不會,因為發起accept調用的線程與completionhandler回調的線程并非同一個,不是一個上下文中,兩者之間沒有耦合關系。要注意到,completionhandler的回調共用的是 asynchronouschannelgroup綁定的線程池,是以千萬别在completionhandler回調方法中調用阻塞或者長時間的操作,例如sleep,回調方法最好能支援逾時,防止線程池耗盡。

    連接配接建立後,怎麼讀和寫呢?回憶下在nonblocking nio架構中,連接配接建立後的第一件事是幹什麼?注冊op_read事件等待socket可讀。異步io也同樣如此,連接配接建立後馬上發起一個異步read調用,等待socket可讀,這個是session.start方法中所做的事情:

Java aio(異步網絡IO)初探

public class aiotcpsession {  

    protected void start0() {  

        pendingread();  

    protected final void pendingread() {  

        if (!isclosed() && this.asynchronoussocketchannel.isopen()) {  

            if (!this.readbuffer.hasremaining()) {  

                this.readbuffer = bytebufferutils  

                        .increasebuffercapatity(this.readbuffer);  

            this.readfuture = this.asynchronoussocketchannel.read(  

                    this.readbuffer, this, this.readcompletionhandler);  

            throw new illegalstateexception(  

                    "session or channel has been closed");  

     asynchronoussocketchannel的read調用與asynchronousserversocketchannel的accept調用類似,同樣是非阻塞的,傳回結果也是一個future,但是寫的結果是整數,表示寫入了多少位元組,是以read調用傳回的是 future<integer>,方法的第一個參數是讀的緩沖區,作業系統将io讀到資料拷貝到這個緩沖區,第二個參數是傳遞給 completionhandler的attchment,第三個參數就是注冊的用于回調的completionhandler。這裡儲存了read的結果future,這是為了在關閉連接配接的時候能夠主動取消調用,accept也是如此。現在可以看看read的completionhandler的實作:

Java aio(異步網絡IO)初探

public final class readcompletionhandler implements  

        completionhandler<integer, abstractaiosession> {  

    private static final logger log = loggerfactory  

            .getlogger(readcompletionhandler.class);  

    protected final aiotcpcontroller controller;  

    public readcompletionhandler(aiotcpcontroller controller) {  

        this.controller = controller;  

    @override  

    public void cancelled(abstractaiosession session) {  

        log.warn("session(" + session.getremotesocketaddress()  

                + ") read operation was canceled");  

    public void completed(integer result, abstractaiosession session) {  

        if (log.isdebugenabled())  

            log.debug("session(" + session.getremotesocketaddress()  

                    + ") read +" + result + " bytes");  

        if (result < 0) {  

            session.close();  

            return;  

        try {  

            if (result > 0) {  

                session.updatetimestamp();  

                session.getreadbuffer().flip();  

                session.decode();  

                session.getreadbuffer().compact();  

        } finally {  

                session.pendingread();  

            } catch (ioexception e) {  

                session.onexception(e);  

                session.close();  

        controller.checksessiontimeout();  

    public void failed(throwable exc, abstractaiosession session) {  

        log.error("session read error", exc);  

        session.onexception(exc);  

        session.close();  

   如果io讀失敗,會傳回失敗産生的異常,這種情況下我們就主動關閉連接配接,通過session.close()方法,這個方法幹了兩件事情:關閉channel和取消read調用:

Java aio(異步網絡IO)初探

if (null != this.readfuture) {  

            this.readfuture.cancel(true);  

this.asynchronoussocketchannel.close();  

   在讀成功的情況下,我們還需要判斷結果result是否小于0,如果小于0就表示對端關閉了,這種情況下我們也主動關閉連接配接并傳回。如果讀到一定位元組,也就是result大于0的情況下,我們就嘗試從讀緩沖區中decode出消息,并派發給業務處理器的回調方法,最終通過pendingread繼續發起read調用等待socket的下一次可讀。可見,我們并不需要自己去調用channel來進行io讀,而是作業系統幫你直接讀到了緩沖區,然後給你一個結果表示讀入了多少位元組,你處理這個結果即可。而nonblocking

io架構中,是reactor通知使用者線程socket可讀了,然後使用者線程自己去調用read進行實際讀操作。這裡還有個需要注意的地方,就是decode出來的消息的派發給業務處理器工作最好交給一個線程池來處理,避免阻塞group綁定的線程池。

   io寫的操作與此類似,不過通常寫的話我們會在session中關聯一個緩沖隊列來處理,沒有完全寫入或者等待寫入的消息都存放在隊列中,隊列為空的情況下發起write調用:

Java aio(異步網絡IO)初探

protected void write0(writemessage message) {  

      boolean needwrite = false;  

      synchronized (this.writequeue) {  

          needwrite = this.writequeue.isempty();  

          this.writequeue.offer(message);  

      }  

      if (needwrite) {  

          pendingwrite(message);  

  }  

  protected final void pendingwrite(writemessage message) {  

      message = preprocesswritemessage(message);  

      if (!isclosed() && this.asynchronoussocketchannel.isopen()) {  

          this.asynchronoussocketchannel.write(message.getwritebuffer(),  

                  this, this.writecompletionhandler);  

      } else {  

          throw new illegalstateexception(  

                  "session or channel has been closed");  

    write調用傳回的結果與read一樣是一個future<integer>,而write的completionhandler處理的核心邏輯大概是這樣:

Java aio(異步網絡IO)初探

@override  

                    + ") writen " + result + " bytes");  

        writemessage writemessage;  

        queue<writemessage> writequeue = session.getwritequeue();  

        synchronized (writequeue) {  

            writemessage = writequeue.peek();  

            if (writemessage.getwritebuffer() == null  

                    || !writemessage.getwritebuffer().hasremaining()) {  

                writequeue.remove();  

                if (writemessage.getwritefuture() != null) {  

                    writemessage.getwritefuture().setresult(boolean.true);  

                }  

                try {  

                    session.gethandler().onmessagesent(session,  

                            writemessage.getmessage());  

                } catch (exception e) {  

                    session.onexception(e);  

                writemessage = writequeue.peek();  

        if (writemessage != null) {  

                session.pendingwrite(writemessage);  

   compete方法中的result就是實際寫入的位元組數,然後我們判斷消息的緩沖區是否還有剩餘,如果沒有就将消息從隊列中移除,如果隊列中還有消息,那麼繼續發起write調用。

   在引入了aio之後,java對于網絡層的支援已經非常完善,該有的都有了,java也已經成為伺服器開發的首選語言之一。java的弱項在于對記憶體的管理上,由于這一切都交給了gc,是以在高性能的網絡伺服器上還是cpp的天下。java這種單一堆模型比之erlang的程序内堆模型還是有差距,很難做到高效的垃圾回收和細粒度的記憶體管理。

   這裡僅僅是介紹了aio開發的核心流程,對于一個網絡架構來說,還需要考慮逾時的處理、緩沖buffer的處理、業務層和網絡層的切分、可擴充性、性能的可調性以及一定的通用性要求。