apache mina作為一個網絡應用架構,經常被用作消息系統中,也就是說經常要去處理IO,是以要實作高效的寫和讀。
最近又看了看寫的處理,始終發現不能完全的了解他的處理思路。
涉及的類
先看一下寫操作中涉及的類。
分析過程
從最底下的AbstractPollingIoProcessor.flush來分析
在mina的處理流程中,當Processor接受到可以寫可以讀的Sessions後,會将他們加入到flush隊列,而
AbstractPollingIoProcessor.flush就是flush的具體實作。
AbstractPollingIoProcessor.flush的大緻流程如下:
do
從flushSessions隊列中取出session
判斷session的狀态,如果是OPENED,那麼進行flushNow處理; 如果是關閉狀态,那麼就跳過; 如果是打開中,那麼就加入到flushSessions隊列中。
while(flushSessions隊列不為空)
上述流程中對于每個session的flush處理主要的過程是這樣:
flushNow:
final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); // Set limitation for the number of written bytes for read-write // fairness. I used maxReadBufferSize * 3 / 2, which yields best // performance in my experience while not breaking fairness much. final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() + (session.getConfig().getMaxReadBufferSize() >>> 1); int writtenBytes = 0; WriteRequest req = null; try { // Clear OP_WRITE setInterestedInWrite(session, false); do { // Check for pending writes. req = session.getCurrentWriteRequest(); // 擷取目前session的寫隊列中的第一個請求。 if (req == null) { req = writeRequestQueue.poll(session); if (req == null) { break; } session.setCurrentWriteRequest(req); } int localWrittenBytes = 0; Object message = req.getMessage(); //獲得寫請求中的消息内容 if (message instanceof IoBuffer) { // writeBuffer其實就是将req中的消息内容寫出去 localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes, currentTime); // 如果請求中的消息内容沒有寫完,那麼就讓processor再次select去監聽,等待下次flush // 是不是說明網絡狀态不佳,讓再次select??? if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) { // the buffer isn't empty, we re-interest it in writing writtenBytes += localWrittenBytes; setInterestedInWrite(session, true); return false; } }else if{ ... } // 目前這次寫的資料量為0,說明是核心緩沖區滿了,這個時候也對這個session重新注冊寫興趣,然後讓select去處理。 if (localWrittenBytes == 0) { // Kernel buffer is full. setInterestedInWrite(session, true); return false; } writtenBytes += localWrittenBytes; // 累加的寫出資料大于等于最大的寫上限 // 發生這種情況,是不是說明網絡狀态好,有兩種情況出現: // 1) 一次寫出的資料的長度就超過了最大寫出上限,此時,為了公平排程各個session的寫操作,不将繼續對目前session進行寫,而是将session加入到本次flush的session隊尾,在本次flush中繼續對目前session的寫請求隊列進行處理 // 2)通過多次循環的将session 的寫請求隊列中的多個消息請求連續寫出,累計的寫出長度超過了最大寫出上限,此時,也是為了公平,不能繼續對目前的session進行處理,加入到隊列尾部。 if (writtenBytes >= maxWrittenBytes) { // Wrote too much scheduleFlush(session); return false; } // 寫了一次隻要在前面的三種情況下,沒有傳回退出,說明目前的消息内容處理完畢,可以釋放。 if (message instanceof IoBuffer) { ((IoBuffer) message).free(); } } while (writtenBytes < maxWrittenBytes); }catch(..){ .... }
flush:
大緻的思路如下(省略了session狀态的處理)
do{ 擷取session的狀态 如果是OPENED flushedAll = flushNow(...) 如果flushedAll為true,說明正在處理的session的writeRequest全部被處理完了,但是,可能在處理過程中又有writeRequeset加入到這個session的寫請求隊列,這個時候,将這個session放到flushingSessions隊列的末尾。繼續下一個循環。 如果是CLOSED: ... 如果是OPENING: ... while(flushingSessions不為空)
另外兩個函數writeBuffer和write比較簡單,這裡不列出來了。
總結
flush函數是處理多個session的寫,主要是對處理過程中不同session狀态處理。
flushNow函數是處理單個session的寫可能出現的各種情況
writeBuffer函數處理分片問題
write函數處理直接的寫