apache mina作为一个网络应用框架,经常被用作消息系统中,也就是说经常要去处理IO,因此要实现高效的写和读。
最近又看了看写的处理,始终发现不能完全的理解他的处理思路。
涉及的类
先看一下写操作中涉及的类。
分析过程
从最底下的AbstractPollingIoProcessor.flush来分析
在mina的处理流程中,当Processor接受到可以写可以读的Sessions后,会将他们加入到flush队列,而
AbstractPollingIoProcessor.flush就是flush的具体实现。
AbstractPollingIoProcessor.flush的大致流程如下:
do
从flushSessions队列中取出session
判断session的状态,如果是OPENED,那么进行flushNow处理; 如果是关闭状态,那么就跳过; 如果是打开中,那么就加入到flushSessions队列中。
while(flushSessions队列不为空)
上述流程中对于每个session的flush处理主要的过程是这样:
flushNow:
final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); // Set limitation for the number of written bytes for read-write // fairness. I used maxReadBufferSize * 3 / 2, which yields best // performance in my experience while not breaking fairness much. final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() + (session.getConfig().getMaxReadBufferSize() >>> 1); int writtenBytes = 0; WriteRequest req = null; try { // Clear OP_WRITE setInterestedInWrite(session, false); do { // Check for pending writes. req = session.getCurrentWriteRequest(); // 获取当前session的写队列中的第一个请求。 if (req == null) { req = writeRequestQueue.poll(session); if (req == null) { break; } session.setCurrentWriteRequest(req); } int localWrittenBytes = 0; Object message = req.getMessage(); //获得写请求中的消息内容 if (message instanceof IoBuffer) { // writeBuffer其实就是将req中的消息内容写出去 localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes, currentTime); // 如果请求中的消息内容没有写完,那么就让processor再次select去监听,等待下次flush // 是不是说明网络状态不佳,让再次select??? if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) { // the buffer isn't empty, we re-interest it in writing writtenBytes += localWrittenBytes; setInterestedInWrite(session, true); return false; } }else if{ ... } // 当前这次写的数据量为0,说明是内核缓冲区满了,这个时候也对这个session重新注册写兴趣,然后让select去处理。 if (localWrittenBytes == 0) { // Kernel buffer is full. setInterestedInWrite(session, true); return false; } writtenBytes += localWrittenBytes; // 累加的写出数据大于等于最大的写上限 // 发生这种情况,是不是说明网络状态好,有两种情况出现: // 1) 一次写出的数据的长度就超过了最大写出上限,此时,为了公平调度各个session的写操作,不将继续对当前session进行写,而是将session加入到本次flush的session队尾,在本次flush中继续对当前session的写请求队列进行处理 // 2)通过多次循环的将session 的写请求队列中的多个消息请求连续写出,累计的写出长度超过了最大写出上限,此时,也是为了公平,不能继续对当前的session进行处理,加入到队列尾部。 if (writtenBytes >= maxWrittenBytes) { // Wrote too much scheduleFlush(session); return false; } // 写了一次只要在前面的三种情况下,没有返回退出,说明当前的消息内容处理完毕,可以释放。 if (message instanceof IoBuffer) { ((IoBuffer) message).free(); } } while (writtenBytes < maxWrittenBytes); }catch(..){ .... }
flush:
大致的思路如下(省略了session状态的处理)
do{ 获取session的状态 如果是OPENED flushedAll = flushNow(...) 如果flushedAll为true,说明正在处理的session的writeRequest全部被处理完了,但是,可能在处理过程中又有writeRequeset加入到这个session的写请求队列,这个时候,将这个session放到flushingSessions队列的末尾。继续下一个循环。 如果是CLOSED: ... 如果是OPENING: ... while(flushingSessions不为空)
另外两个函数writeBuffer和write比较简单,这里不列出来了。
总结
flush函数是处理多个session的写,主要是对处理过程中不同session状态处理。
flushNow函数是处理单个session的写可能出现的各种情况
writeBuffer函数处理分片问题
write函数处理直接的写