天天看點

apache mina中寫的過程分析

apache mina作為一個網絡應用架構,經常被用作消息系統中,也就是說經常要去處理IO,是以要實作高效的寫和讀。

最近又看了看寫的處理,始終發現不能完全的了解他的處理思路。

涉及的類

先看一下寫操作中涉及的類。

分析過程

從最底下的AbstractPollingIoProcessor.flush來分析

在mina的處理流程中,當Processor接受到可以寫可以讀的Sessions後,會将他們加入到flush隊列,而

AbstractPollingIoProcessor.flush就是flush的具體實作。

AbstractPollingIoProcessor.flush的大緻流程如下:

do

從flushSessions隊列中取出session

判斷session的狀态,如果是OPENED,那麼進行flushNow處理;              如果是關閉狀态,那麼就跳過;              如果是打開中,那麼就加入到flushSessions隊列中。           

while(flushSessions隊列不為空)

上述流程中對于每個session的flush處理主要的過程是這樣:

flushNow:

final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();                  final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();                  // Set limitation for the number of written bytes for read-write              // fairness. I used maxReadBufferSize * 3 / 2, which yields best              // performance in my experience while not breaking fairness much.              final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()              + (session.getConfig().getMaxReadBufferSize() >>> 1);              int writtenBytes = 0;              WriteRequest req = null;                  try {              // Clear OP_WRITE              setInterestedInWrite(session, false);                  do {              // Check for pending writes.              req = session.getCurrentWriteRequest();  // 擷取目前session的寫隊列中的第一個請求。                  if (req == null) {              req = writeRequestQueue.poll(session);                  if (req == null) {              break;              }                  session.setCurrentWriteRequest(req);              }                  int localWrittenBytes = 0;              Object message = req.getMessage(); //獲得寫請求中的消息内容                  if (message instanceof IoBuffer) {              // writeBuffer其實就是将req中的消息内容寫出去              localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,              currentTime);                   // 如果請求中的消息内容沒有寫完,那麼就讓processor再次select去監聽,等待下次flush              // 是不是說明網絡狀态不佳,讓再次select???              if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {              // the buffer isn't empty, we re-interest it in writing              writtenBytes += localWrittenBytes;              setInterestedInWrite(session, true);              return false;              }              }else if{ ... }                  // 目前這次寫的資料量為0,說明是核心緩沖區滿了,這個時候也對這個session重新注冊寫興趣,然後讓select去處理。              if (localWrittenBytes == 0) {              // Kernel buffer is full.              setInterestedInWrite(session, true);              return false;              }                  writtenBytes += localWrittenBytes;                  // 累加的寫出資料大于等于最大的寫上限              //   發生這種情況,是不是說明網絡狀态好,有兩種情況出現:              // 1) 一次寫出的資料的長度就超過了最大寫出上限,此時,為了公平排程各個session的寫操作,不将繼續對目前session進行寫,而是将session加入到本次flush的session隊尾,在本次flush中繼續對目前session的寫請求隊列進行處理              //   2)通過多次循環的将session 的寫請求隊列中的多個消息請求連續寫出,累計的寫出長度超過了最大寫出上限,此時,也是為了公平,不能繼續對目前的session進行處理,加入到隊列尾部。              if (writtenBytes >= maxWrittenBytes) {              // Wrote too much              scheduleFlush(session);              return false;              }                  // 寫了一次隻要在前面的三種情況下,沒有傳回退出,說明目前的消息内容處理完畢,可以釋放。              if (message instanceof IoBuffer) {              ((IoBuffer) message).free();              }              } while (writtenBytes < maxWrittenBytes);              }catch(..){              .... }                

flush:

大緻的思路如下(省略了session狀态的處理)

do{                  擷取session的狀态              如果是OPENED              flushedAll = flushNow(...)                  如果flushedAll為true,說明正在處理的session的writeRequest全部被處理完了,但是,可能在處理過程中又有writeRequeset加入到這個session的寫請求隊列,這個時候,将這個session放到flushingSessions隊列的末尾。繼續下一個循環。                  如果是CLOSED:              ...              如果是OPENING:              ...              while(flushingSessions不為空)                

另外兩個函數writeBuffer和write比較簡單,這裡不列出來了。

總結

flush函數是處理多個session的寫,主要是對處理過程中不同session狀态處理。

flushNow函數是處理單個session的寫可能出現的各種情況

writeBuffer函數處理分片問題

write函數處理直接的寫