天天看点

Java AIO初探(异步网络IO)

按照《unix网络编程》的划分,io模型可以分为:阻塞io、非阻塞io、io复用、信号驱动io和异步io,按照posix标准来划分只分为两类:同步io和异步io。如何区分呢?首先一个io操作其实分成了两个步骤:发起io请求和实际的io操作,同步io和异步io的区别就在于第二个步骤是否阻塞,如果实际的io读写阻塞请求进程,那么就是同步io,因此阻塞io、非阻塞io、io服用、信号驱动io都是同步io,如果不阻塞,而是操作系统帮你做完io操作再将结果返回给你,那么就是异步io。阻塞io和非阻塞io的区别在于第一步,发起io请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞io,如果不阻塞,那么就是非阻塞io。

   java nio 2.0的主要改进就是引入了异步io(包括文件和网络),这里主要介绍下异步网络io api的使用以及框架的设计,以tcp服务端为例。首先看下为了支持aio引入的新的类和接口:

 java.nio.channels.asynchronouschannel

       标记一个channel支持异步io操作。

 java.nio.channels.asynchronousserversocketchannel

       serversocket的aio版本,创建tcp服务端,绑定地址,监听端口等。

 java.nio.channels.asynchronoussocketchannel

       面向流的异步socket channel,表示一个连接。

 java.nio.channels.asynchronouschannelgroup

       异步channel的分组管理,目的是为了资源共享。一个asynchronouschannelgroup绑定一个线程池,这个线程池执行两个任务:处理io事件和派发completionhandler。asynchronousserversocketchannel创建的时候可以传入一个asynchronouschannelgroup,那么通过asynchronousserversocketchannel创建的asynchronoussocketchannel将同属于一个组,共享资源。

 java.nio.channels.completionhandler

       异步io操作结果的回调接口,用于定义在io操作完成后所作的回调工作。aio的api允许两种方式来处理异步操作的结果:返回的future模式或者注册completionhandler,我更推荐用completionhandler的方式,这些handler的调用是由asynchronouschannelgroup的线程池派发的。显然,线程池的大小是性能的关键因素。asynchronouschannelgroup允许绑定不同的线程池,通过三个静态方法来创建:

 public static asynchronouschannelgroup withfixedthreadpool(int nthreads,

                                                               threadfactory threadfactory)

        throws ioexception

 public static asynchronouschannelgroup withcachedthreadpool(executorservice executor,

                                                                int initialsize)

 public static asynchronouschannelgroup withthreadpool(executorservice executor)

     需要根据具体应用相应调整,从框架角度出发,需要暴露这样的配置选项给用户。

     在介绍完了aio引入的tcp的主要接口和类之后,我们来设想下一个aio框架应该怎么设计。参考非阻塞nio框架的设计,一般都是采用reactor模式,reacot负责事件的注册、select、事件的派发;相应地,异步io有个proactor模式,proactor负责completionhandler的派发,查看一个典型的io写操作的流程来看两者的区别:

     reactor:  send(msg) -> 消息队列是否为空,如果为空  -> 向reactor注册op_write,然后返回 -> reactor select -> 触发writable,通知用户线程去处理 ->先注销writable(很多人遇到的cpu 100%的问题就在于没有注销),处理writeable,如果没有完全写入,继续注册op_write。注意到,写入的工作还是用户线程在处理。

     proactor: send(msg) -> 消息队列是否为空,如果为空,发起read异步调用,并注册completionhandler,然后返回。 -> 操作系统负责将你的消息写入,并返回结果(写入的字节数)给proactor -> proactor派发completionhandler。可见,写入的工作是操作系统在处理,无需用户线程参与。事实上在aio的api中,asynchronouschannelgroup就扮演了proactor的角色。

    completionhandler有三个方法,分别对应于处理成功、失败、被取消(通过返回的future)情况下的回调处理:

public interface completionhandler<v,a> {

     void completed(v result, a attachment);

    void failed(throwable exc, a attachment);

    void cancelled(a attachment);

}

    其中的泛型参数v表示io调用的结果,而a是发起调用时传入的attchment。

    第一步,创建一个asynchronousserversocketchannel,创建之前先创建一个asynchronouschannelgroup,上文提到asynchronousserversocketchannel可以绑定一个asynchronouschannelgroup,那么通过这个asynchronousserversocketchannel建立的连接都将同属于一个asynchronouschannelgroup并共享资源:

this.asynchronouschannelgroup = asynchronouschannelgroup

                    .withcachedthreadpool(executors.newcachedthreadpool(),

                            this.threadpoolsize);

    然后初始化一个asynchronousserversocketchannel,通过open方法:

this.serversocketchannel = asynchronousserversocketchannel

                .open(this.asynchronouschannelgroup);

    通过nio 2.0引入的socketoption类设置一些tcp选项:

this.serversocketchannel

                    .setoption(

                            standardsocketoption.so_reuseaddr,true);

                            standardsocketoption.so_rcvbuf,16*1024);

    绑定本地地址:

                    .bind(new inetsocketaddress("localhost",8080), 100);

    其中的100用于指定等待连接的队列大小(backlog)。完了吗?还没有,最重要的监听工作还没开始,监听端口是为了等待连接上来以便accept产生一个asynchronoussocketchannel来表示一个新建立的连接,因此需要发起一个accept调用,调用是异步的,操作系统将在连接建立后,将最后的结果——asynchronoussocketchannel返回给你:

public void pendingaccept() {

        if (this.started && this.serversocketchannel.isopen()) {

            this.acceptfuture = this.serversocketchannel.accept(null,

                    new acceptcompletionhandler());

        } else {

            throw new illegalstateexception("controller has been closed");

        }

    }

   注意,重复的accept调用将会抛出pendingacceptexception,后文提到的read和write也是如此。accept方法的第一个参数是你想传给completionhandler的attchment,第二个参数就是注册的用于回调的completionhandler,最后返回结果future<asynchronoussocketchannel>。你可以对future做处理,这里采用更推荐的方式就是注册一个completionhandler。那么accept的completionhandler中做些什么工作呢?显然一个赤裸裸的asynchronoussocketchannel是不够的,我们需要将它封装成session,一个session表示一个连接(mina里就叫iosession了),里面带了一个缓冲的消息队列以及一些其他资源等。在连接建立后,除非你的服务器只准备接受一个连接,不然你需要在后面继续调用pendingaccept来发起另一个accept请求:

private final class acceptcompletionhandler implements

            completionhandler<asynchronoussocketchannel, object> {

        @override

        public void cancelled(object attachment) {

            logger.warn("accept operation was canceled");

        public void completed(asynchronoussocketchannel socketchannel,

                object attachment) {

            try {

                logger.debug("accept connection from "

                        + socketchannel.getremoteaddress());

                configurechannel(socketchannel);

                aiosessionconfig sessionconfig = buildsessionconfig(socketchannel);

                session session = new aiotcpsession(sessionconfig,

                        aiotcpcontroller.this.configuration

                                .getsessionreadbuffersize(),

                        aiotcpcontroller.this.sessiontimeout);

                session.start();

                registersession(session);

            } catch (exception e) {

                e.printstacktrace();

                logger.error("accept error", e);

                notifyexception(e);

            } finally {

                pendingaccept();

            }

        public void failed(throwable exc, object attachment) {

            logger.error("accept error", exc);

                notifyexception(exc);

    注意到了吧,我们在failed和completed方法中在最后都调用了pendingaccept来继续发起accept调用,等待新的连接上来。有的同学可能要说了,这样搞是不是递归调用,会不会堆栈溢出?实际上不会,因为发起accept调用的线程与completionhandler回调的线程并非同一个,不是一个上下文中,两者之间没有耦合关系。要注意到,completionhandler的回调共用的是asynchronouschannelgroup绑定的线程池,因此千万别在回调方法中调用阻塞或者长时间的操作,例如sleep,回调方法最好能支持超时,防止线程池耗尽。

    连接建立后,怎么读和写呢?回忆下在nonblocking nio框架中,连接建立后的第一件事是干什么?注册op_read事件等待socket可读。异步io也同样如此,连接建立后马上发起一个异步read调用,等待socket可读,这个是session.start方法中所做的事情:

public class aiotcpsession 

Java AIO初探(异步网络IO)

{

    protected void start0() {

        pendingread();

    protected final void pendingread() {

        if (!isclosed() && this.asynchronoussocketchannel.isopen()) {

            if (!this.readbuffer.hasremaining()) {

                this.readbuffer = bytebufferutils

                        .increasebuffercapatity(this.readbuffer);

            this.readfuture = this.asynchronoussocketchannel.read(

                    this.readbuffer, this, this.readcompletionhandler);

            throw new illegalstateexception(

                    "session or channel has been closed");

Java AIO初探(异步网络IO)
Java AIO初探(异步网络IO)

     asynchronoussocketchannel的read调用与asynchronousserversocketchannel的accept调用类似,同样是非阻塞的,返回结果也是一个future,但是写的结果是整数,表示写入了多少字节,因此read调用返回的是future<integer>,方法的第一个参数是读的缓冲区,操作系统将io读到数据拷贝到这个缓冲区,第二个参数是传递给completionhandler的attchment,第三个参数就是注册的用于回调的completionhandler。这里保存了read的结果future,这是为了在关闭连接的时候能够主动取消调用,accept也是如此。现在可以看看read的completionhandler的实现:

public final class readcompletionhandler implements

        completionhandler<integer, abstractaiosession> {

    private static final logger log = loggerfactory

            .getlogger(readcompletionhandler.class);

    protected final aiotcpcontroller controller;

    public readcompletionhandler(aiotcpcontroller controller) {

        this.controller = controller;

    @override

    public void cancelled(abstractaiosession session) {

        log.warn("session(" + session.getremotesocketaddress()

                + ") read operation was canceled");

    public void completed(integer result, abstractaiosession session) {

        if (log.isdebugenabled())

            log.debug("session(" + session.getremotesocketaddress()

                    + ") read +" + result + " bytes");

        if (result < 0) {

            session.close();

            return;

        try {

            if (result > 0) {

                session.updatetimestamp();

                session.getreadbuffer().flip();

                session.decode();

                session.getreadbuffer().compact();

        } finally {

                session.pendingread();

            } catch (ioexception e) {

                session.onexception(e);

                session.close();

        controller.checksessiontimeout();

    public void failed(throwable exc, abstractaiosession session) {

        log.error("session read error", exc);

        session.onexception(exc);

        session.close();

   如果io读失败,会返回失败产生的异常,这种情况下我们就主动关闭连接,通过session.close()方法,这个方法干了两件事情:关闭channel和取消read调用:

if (null != this.readfuture) {

            this.readfuture.cancel(true);

this.asynchronoussocketchannel.close();

   在读成功的情况下,我们还需要判断结果result是否小于0,如果小于0就表示对端关闭了,这种情况下我们也主动关闭连接并返回。如果读到一定字节,也就是result大于0的情况下,我们就尝试从读缓冲区中decode出消息,并派发给业务处理器的回调方法,最终通过pendingread继续发起read调用等待socket的下一次可读。可见,我们并不需要自己去调用channel来进行io读,而是操作系统帮你直接读到了缓冲区,然后给你一个结果表示读入了多少字节,你处理这个结果即可。而nonblocking io框架中,是reactor通知用户线程socket可读了,然后用户线程自己去调用read进行实际读操作。这里还有个需要注意的地方,就是decode出来的消息的派发给业务处理器工作最好交给一个线程池来处理,避免阻塞group绑定的线程池。

   io写的操作与此类似,不过通常写的话我们会在session中关联一个缓冲队列来处理,没有完全写入或者等待写入的消息都存放在队列中,队列为空的情况下发起write调用:

    protected void write0(writemessage message) {

        boolean needwrite = false;

        synchronized (this.writequeue) {

            needwrite = this.writequeue.isempty();

            this.writequeue.offer(message);

        if (needwrite) {

            pendingwrite(message);

    protected final void pendingwrite(writemessage message) {

        message = preprocesswritemessage(message);

            this.asynchronoussocketchannel.write(message.getwritebuffer(),

                    this, this.writecompletionhandler);

    write调用返回的结果与read一样是一个future<integer>,而write的completionhandler处理的核心逻辑大概是这样:

@override

                    + ") writen " + result + " bytes");

        writemessage writemessage;

        queue<writemessage> writequeue = session.getwritequeue();

        synchronized (writequeue) {

            writemessage = writequeue.peek();

            if (writemessage.getwritebuffer() == null

                    || !writemessage.getwritebuffer().hasremaining()) {

                writequeue.remove();

                if (writemessage.getwritefuture() != null) {

                    writemessage.getwritefuture().setresult(boolean.true);

                }

                try {

                    session.gethandler().onmessagesent(session,

                            writemessage.getmessage());

                } catch (exception e) {

                    session.onexception(e);

                writemessage = writequeue.peek();

        if (writemessage != null) {

                session.pendingwrite(writemessage);

    }

   compete方法中的result就是实际写入的字节数,然后我们判断消息的缓冲区是否还有剩余,如果没有就将消息从队列中移除,如果队列中还有消息,那么继续发起write调用。

   重复一下,这里引用的代码都是yanf4j aio分支中的源码,感兴趣的朋友可以直接check out出来看看: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio。

   在引入了aio之后,java对于网络层的支持已经非常完善,该有的都有了,java也已经成为服务器开发的首选语言之一。java的弱项在于对内存的管理上,由于这一切都交给了gc,因此在高性能的网络服务器上还是cpp的天下。java这种单一堆模型比之erlang的进程内堆模型还是有差距,很难做到高效的垃圾回收和细粒度的内存管理。

   这里仅仅是介绍了aio开发的核心流程,对于一个网络框架来说,还需要考虑超时的处理、缓冲buffer的处理、业务层和网络层的切分、可扩展性、性能的可调性以及一定的通用性要求。

文章转自庄周梦蝶  ,原文发布时间 2009-09-20