天天看点

apache mina中写的过程分析

apache mina作为一个网络应用框架,经常被用作消息系统中,也就是说经常要去处理IO,因此要实现高效的写和读。

最近又看了看写的处理,始终发现不能完全的理解他的处理思路。

涉及的类

先看一下写操作中涉及的类。

分析过程

从最底下的AbstractPollingIoProcessor.flush来分析

在mina的处理流程中,当Processor接受到可以写可以读的Sessions后,会将他们加入到flush队列,而

AbstractPollingIoProcessor.flush就是flush的具体实现。

AbstractPollingIoProcessor.flush的大致流程如下:

do

从flushSessions队列中取出session

判断session的状态,如果是OPENED,那么进行flushNow处理;              如果是关闭状态,那么就跳过;              如果是打开中,那么就加入到flushSessions队列中。           

while(flushSessions队列不为空)

上述流程中对于每个session的flush处理主要的过程是这样:

flushNow:

final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();                  final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();                  // Set limitation for the number of written bytes for read-write              // fairness. I used maxReadBufferSize * 3 / 2, which yields best              // performance in my experience while not breaking fairness much.              final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()              + (session.getConfig().getMaxReadBufferSize() >>> 1);              int writtenBytes = 0;              WriteRequest req = null;                  try {              // Clear OP_WRITE              setInterestedInWrite(session, false);                  do {              // Check for pending writes.              req = session.getCurrentWriteRequest();  // 获取当前session的写队列中的第一个请求。                  if (req == null) {              req = writeRequestQueue.poll(session);                  if (req == null) {              break;              }                  session.setCurrentWriteRequest(req);              }                  int localWrittenBytes = 0;              Object message = req.getMessage(); //获得写请求中的消息内容                  if (message instanceof IoBuffer) {              // writeBuffer其实就是将req中的消息内容写出去              localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,              currentTime);                   // 如果请求中的消息内容没有写完,那么就让processor再次select去监听,等待下次flush              // 是不是说明网络状态不佳,让再次select???              if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {              // the buffer isn't empty, we re-interest it in writing              writtenBytes += localWrittenBytes;              setInterestedInWrite(session, true);              return false;              }              }else if{ ... }                  // 当前这次写的数据量为0,说明是内核缓冲区满了,这个时候也对这个session重新注册写兴趣,然后让select去处理。              if (localWrittenBytes == 0) {              // Kernel buffer is full.              setInterestedInWrite(session, true);              return false;              }                  writtenBytes += localWrittenBytes;                  // 累加的写出数据大于等于最大的写上限              //   发生这种情况,是不是说明网络状态好,有两种情况出现:              // 1) 一次写出的数据的长度就超过了最大写出上限,此时,为了公平调度各个session的写操作,不将继续对当前session进行写,而是将session加入到本次flush的session队尾,在本次flush中继续对当前session的写请求队列进行处理              //   2)通过多次循环的将session 的写请求队列中的多个消息请求连续写出,累计的写出长度超过了最大写出上限,此时,也是为了公平,不能继续对当前的session进行处理,加入到队列尾部。              if (writtenBytes >= maxWrittenBytes) {              // Wrote too much              scheduleFlush(session);              return false;              }                  // 写了一次只要在前面的三种情况下,没有返回退出,说明当前的消息内容处理完毕,可以释放。              if (message instanceof IoBuffer) {              ((IoBuffer) message).free();              }              } while (writtenBytes < maxWrittenBytes);              }catch(..){              .... }                

flush:

大致的思路如下(省略了session状态的处理)

do{                  获取session的状态              如果是OPENED              flushedAll = flushNow(...)                  如果flushedAll为true,说明正在处理的session的writeRequest全部被处理完了,但是,可能在处理过程中又有writeRequeset加入到这个session的写请求队列,这个时候,将这个session放到flushingSessions队列的末尾。继续下一个循环。                  如果是CLOSED:              ...              如果是OPENING:              ...              while(flushingSessions不为空)                

另外两个函数writeBuffer和write比较简单,这里不列出来了。

总结

flush函数是处理多个session的写,主要是对处理过程中不同session状态处理。

flushNow函数是处理单个session的写可能出现的各种情况

writeBuffer函数处理分片问题

write函数处理直接的写