天天看点

mina中责任链模式的实现

1.责任链模式在mina中有重要的作用,其中Filter机制就是基于责任链实现的,再来回顾mina框架组成

mina中责任链模式的实现

从上图看到消息的接受从IoService层先经过Filter层过滤处理后最后交给IoHander,消息的发送则是反过来从IoHander层经过Filter层再到IoService层。 我们来想想这样做的好处: 第一点就是可以把消息从原始字节封装成对象方便处理,或者从对象到原始字节,那这就是decode和encode过程 第二点过滤消息,业务层只需要处理感兴趣的消息 当然还有其他... 再来想想Filter层是怎么实现的呢 从图中看到接收消息和发送消息经过Filter层是相反处理的,那么每个Filter就必须知道前一个和后一个Filter,我们就很容易想到了双向链表结构。 没错Filter层就是用双向链表实现的,那么让我们来看看Filter层具体是如何实现,即mina中责任链模式的实现

2.先来看下mina的filterchain包结构

mina中责任链模式的实现

Filter层的每个filter都是对上图IoFilter接口的实现,我们将具体讲解IoFilter,IoFilterChain,DefaultIoFilterChain这几个类 IoFilterChainBuilder接口和DefaultIoFilterChainBuilder实现不再细讲,从字面意思就是IoFilterChain的建造者 IoFilterEvent是代表filter事件,IoFilterLifeCycleException是指filter中出现循环链表异常 下面的图是我们要重点讲解的几个类的关系

mina中责任链模式的实现

我们重点讲解的只有四个类,为什么会出现这么多的类呢,有些类使内部实现,不太好表示就都画出来了,先来说明下 IoFilter接口:NextFilter接口是其内部接口 IoFilterAdapter类:对IoFilter接口的实现,是所有Filter的基类 IoFilterChain接口:Entry接口是其内部接口 DefaultIoFilterChain类:是对IoFilterChain接口的实现,有EntryImpl,HeadFilter,TailFilter三个内部类,其中EntryImpl类中又有NextFilter接口的内部实现 还需要说明下:IoFilter还有相关接口就写了两个方法,一个接受消息触发的方法还有一个是发送消息触发的方法,剩下的都是这两类消息处理方法就不表示了 HeadFilter类只对发送消息处理方法重载,TailFilter类只对接受消息处理方法重载,想一想为什么?再想想为什么用内部类来实现? 从上图看到EntryImp类是重点,我们就来看看EntryImpl类的实现

private class EntryImpl implements Entry {
        private EntryImpl prevEntry ;

        private EntryImpl nextEntry ;

        private final String name;

        private IoFilter filter ;

        private final NextFilter nextFilter;

        private EntryImpl(EntryImpl prevEntry, EntryImpl nextEntry, String name, IoFilter filter) {
            if (filter == null) {
                throw new IllegalArgumentException("filter");
            }
            if (name == null) {
                throw new IllegalArgumentException("name");
            }

            this.prevEntry = prevEntry;
            this.nextEntry = nextEntry;
            this.name = name;
            this.filter = filter;
            this.nextFilter = new NextFilter() {
              
                public void sessionOpened(IoSession session) {
                    Entry nextEntry = EntryImpl. this.nextEntry ;
                    callNextSessionOpened(nextEntry, session);
                }

                
                public void filterWrite(IoSession session, WriteRequest writeRequest) {
                    Entry nextEntry = EntryImpl. this.prevEntry ;
                    callPreviousFilterWrite(nextEntry, session, writeRequest);
                }

               
            };
        }
           

从EntryImpl类的构造方法看到,EntryImpl中保持对上一个节点和下一个节点引用,双向链表结构,name即过滤层名称,filter即过滤层的具体实现,而nextFilter是在构造方法中的内部实现。 我们再来看看DefaultIoFilterChain的实现

public class DefaultIoFilterChain implements IoFilterChain {
 /** The associated session */
    private final AbstractIoSession session ;

    private final Map<String, Entry> name2entry = new ConcurrentHashMap<String, Entry>();

    /** The chain head */
    private final EntryImpl head ;

    /** The chain tail */
    private final EntryImpl tail ;

    public DefaultIoFilterChain(AbstractIoSession session) {
        if (session == null) {
            throw new IllegalArgumentException("session");
        }

        this.session = session;
        head = new EntryImpl(null, null, "head" , new HeadFilter ());
        tail = new EntryImpl(head, null, "tail", new TailFilter());
        head.nextEntry = tail;
    }
           

从上图看到,head顾名思义就是头节点,tail即尾节点,两个节点在构造方法中已经构造好了,head里装的是HeadFilter,tail中即TailFilter。上面我们说到HeadFilter只实现发送消息处理方法,TailFilter只实现接受消息处理方法。 想一想便得知,HeadFilter是发送消息最后的处理节点,TailFilter是接受消息最后的处理节点。最后的节点处理就是将写消息交给Io线程处理,将读消息交给IoHander的业务层处理。所以说HeadFilter和TailFilter只需对某一方消息处理,反面消息默认交给下一个节点处理。 HeadFilter

private class HeadFilter extends IoFilterAdapter {
        @SuppressWarnings("unchecked" )
        @Override
        public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {

            AbstractIoSession s = (AbstractIoSession) session;

            // Maintain counters.
            if (writeRequest.getMessage() instanceof IoBuffer) {
                IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
                // I/O processor implementation will call buffer.reset()
                // it after the write operation is finished, because
                // the buffer will be specified with messageSent event.
                buffer.mark();
                int remaining = buffer.remaining();

                if (remaining == 0) {
                    // Zero-sized buffer means the internal message
                    // delimiter.
                    s.increaseScheduledWriteMessages();
                } else {
                  s.increaseScheduledWriteBytes(remaining);
                }
            } else {
                s.increaseScheduledWriteMessages();
            }

            WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue();

            if (!s.isWriteSuspended()) {
                if (writeRequestQueue.size() == 0) {
                    // We can write directly the message
                    s.getProcessor().write(s, writeRequest);
                } else {
                    s.getWriteRequestQueue().offer(s, writeRequest);
                    s.getProcessor().flush(s);
                }
            } else {
                s.getWriteRequestQueue().offer(s, writeRequest);
            }
        }
}
           

TailFilter

private static class TailFilter extends IoFilterAdapter {
@Override
        public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
            session.getHandler().sessionOpened(session);
        }
}
           

下面我们来看看sessionOpen消息处理的完整过程,首先是IoFilterChain收到这个消息触发 fireSessionOpened方法

public void fireSessionOpened() {
        Entry head = this.head ;
        callNextSessionOpened(head, session);
    }

    private void callNextSessionOpened(Entry entry, IoSession session) {
        try {
            IoFilter filter = entry.getFilter();
            NextFilter nextFilter = entry.getNextFilter();
            filter.sessionOpened(nextFilter, session);
        } catch (Throwable e) {
            fireExceptionCaught(e);
        }
    }
           

fireSessionOpened方法获取当前的头节点,然后调用callNextSessionOpened方法,而callNextSessionOpened方法是从entry中获取filter和nextfitler,触发filter的sessionOpened方法,同时将nextfilter作为参数传进去,而filter层如果对这个消息感兴趣可以处理完成后调用nextfilter的sessionOpened方法,不感兴趣的话,可能消息到此就结束了。 再回到上面EntryImpl中对NextFilter的实现,我们看到NextFilter收到sessionOpen消息,获取当前节点的下一个节点,然后触发IoFilterChain的callNextSessionOpened方法,即上图所示。再然后就是传递到下一节点处理,要么filter层拦截过滤结束,要不就是传到最后一层由TailFilter交给业务层处理。而写消息恰好相反,nextFilter是获取前一个节点,这就实现了双向过滤的功能。 到这我们就明白了为什么EntryImpl还有NextFilter选择内部类实现了。 NextFilter其实是起到中转站的作用,收到Reveceive消息转交给后一节点,收到Send消息转交给前一个消息。那我们再来想想为什么要用NextFilter来作为中转呢?我想应该是接口隔离的原则。Filter只需要关心如何处理接受到的消息,至于如何转交到下一个Filter不应该由他实现。 至此Mina的责任链实现就已经讲完了,其余细节的东西由读者自己体会吧,在此思考下 IoFilterChain是如何检测链表中是否有环呢?