天天看點

Mina2.0架構源碼剖析(一)

整個架構最核心的幾個包是:org.apache.mina.core.service, org.apache.mina.core.session, org.apache.mina.core.polling以及org.apache.mina.transport.socket。

      這一篇先來看org.apache.mina.core.service。第一個要說的接口是IoService,它是所有IoAcceptor和IoConnector的基接口.對于一個IoService,有哪些資訊需要我們關注呢?1)底層的中繼資料資訊TransportMetadata,比如底層的網絡服務提供者(NIO,ARP,RXTX等),2)通過這個服務建立一個新會話時,新會話的預設配置IoSessionConfig。3)此服務所管理的所有會話。4)與這個服務相關所産生的事件所對應的監聽者(IoServiceListener)。5)處理這個服務所管理的所有連接配接的處理器(IoHandler)。6)每個會話都有一個過濾器鍊(IoFilterChain),每個過濾器鍊通過其對應的IoFilterChainBuilder來負責建構。7)由于此服務管理了一系列會話,是以可以通過廣播的方式向所有會話發送消息,傳回結果是一個WriteFuture集,後者是一種表示未來預期結果的資料結構。8)服務建立的會話(IoSession)相關的資料通過IoSessionDataStructureFactory來提供。9)發送消息時有一個寫緩沖隊列。10)服務的閑置狀态有三種:讀端空閑,寫端空閑,雙端空閑。11)還提供服務的一些統計資訊,比如時間,資料量等。

      IoService這個服務是對于伺服器端的接受連接配接和用戶端發起連接配接這兩種行為的抽象。

     再來從伺服器看起,IoAcceptor是IoService 的子接口,它用于綁定到指定的ip和端口,進而接收來自用戶端的連接配接請求,同時會fire相應的用戶端連接配接成功接收/取消/失敗等事件給自己的IoHandle去處理。當伺服器端的Accpetor從早先綁定的ip和端口上取消綁定時,預設是所有的用戶端會話會被關閉,這種情況一般出現在伺服器挂掉了,則用戶端收到連接配接關閉的提示。這個接口最重要的兩個方法是bind()和unbind(),當這兩個方法被調用時,服務端的連接配接接受線程就啟動或關閉了。

     再來看一看用戶端的連接配接發起者接口IoConnector,它的功能和IoAcceptor基本對應的,它用于嘗試連接配接到伺服器指定的ip和端口,同時會fire相應的用戶端連接配接事件給自己的IoHandle去處理。當connet方法被調用後用于連接配接伺服器端的線程就啟動了,而當所有的連接配接嘗試都結束時線程就停止。嘗試連接配接的逾時時間可以自行設定。Connect方法傳回的結果是ConnectFuture,這和前面說的WriteFuture類似,在後面會有一篇專門講這個模式的應用。

     前面的IoAcceptor和IoConnector就好比是兩個負責握手的仆人,而真正代表會話的實際I/O操作的接口是IoProcessor,它對現有的Reactor模式架構的Java NIO架構繼續做了一層封裝。它的泛型參數指明了它能處理的會話類型。接口中最重要的幾個方法, add用于将指定會話加入到此Processor中, 讓它負責處理與此會話相關的所有I/O操作。由于寫操作會有一個寫請求隊列,flush就用于對指定會話的寫請求隊列進行強制刷資料。remove方法用于從此Processor中移除和關閉指定會話,這樣就可以關閉會話相關聯的連接配接并釋放所有相關資源。updateTrafficMask方法用于控制會話的I/O行為,比如是否允許讀/寫。

       然後來說說IoHandle接口,Mina中的所有I/O事件都是通過這個接口來處理的,這些事件都是上面所說的I/O Processor發出來的,要注意的一點是同一個I/O Processor線程是負責處理多個會話的。包括下面這幾個事件的處理:

複制代碼

public interface IoHandler 

{

    void sessionCreated(IoSession session) throws Exception;//會話建立

    void sessionOpened(IoSession session) throws Exception;//打開會話,與sessionCreated最大的差別是它是從另一個線程處調用的

    void sessionClosed(IoSession session) throws Exception;//會話結束,當連接配接關閉時被調用

    void sessionIdle(IoSession session, IdleStatus status) throws Exception;//會話空閑

    void exceptionCaught(IoSession session, Throwable cause) throws Exception;//異常捕獲,Mina會自動關閉此連接配接

    void messageReceived(IoSession session, Object message) throws Exception;//接收到消息

    void messageSent(IoSession session, Object message) throws Exception;//發送消息 

}

      IoHandlerAdapter就不說了,簡單地對IoHandler使用擴充卡模式封裝了下,讓具體的IoHandler子類從其繼承後,進而可以對自身需要哪些事件處理擁有自主權。

      來看看IoServiceListener接口,它用于監聽IoService相關的事件。

public interface IoServiceListener extends EventListener 

    void serviceActivated(IoService service) throws Exception;//激活了一個新service

    void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception; // service閑置

    void serviceDeactivated(IoService service) throws Exception;//挂起一個service

    void sessionCreated(IoSession session) throws Exception;//建立一個新會話

    void sessionDestroyed(IoSession session) throws Exception;//摧毀一個新會話

     IoServiceListenerSupport類就是負責将上面的IoService和其對應的各個IoServiceListener包裝到一起進行管理。下面是它的成員變量:

    private final IoService service;

    private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>();

    private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>();//被管理的會話集(其實就是服務所管理的會話集)

    private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions);//上面的會話集的隻讀版

    private final AtomicBoolean activated = new AtomicBoolean();//被管理的服務是否處于激活狀态

     激活事件就以會話建立為例來說明:

public void fireSessionCreated(IoSession session) 

        boolean firstSession = false;

        if (session.getService() instanceof IoConnector) 

{//若服務類型是Connector,則說明是用戶端的連接配接服務  

            synchronized (managedSessions) 

{//鎖住目前已經建立的會話集

                firstSession = managedSessions.isEmpty();//看服務所管理的會話集是否為空集

            }

        }

               if (managedSessions.putIfAbsent(Long.valueOf(session.getId()), session) != null) { // If already registered, ignore.

            return;

        if (firstSession)

{//第一個連接配接會話,fire一個虛拟的服務激活事件

            fireServiceActivated();

        //呼叫過濾器的事件處理

        session.getFilterChain().fireSessionCreated();// 會話建立

        session.getFilterChain().fireSessionOpened();//會話打開

        int managedSessionCount = managedSessions.size();

        //統計管理的會話數目

        if (managedSessionCount > largestManagedSessionCount)

            largestManagedSessionCount = managedSessionCount;

        cumulativeManagedSessionCount ++;

        //呼叫監聽者的事件處理函數

        for (IoServiceListener l : listeners) 

            try

 {

                l.sessionCreated(session);

            } catch (Throwable e)

                ExceptionMonitor.getInstance().exceptionCaught(e);

    }

     這裡值得注意的一個地方是斷開連接配接會話,設定了一個監聽鎖,直到所有連接配接會話被關閉後才放開這個鎖。

 複制代碼

  private void disconnectSessions() 

    {

        if (!(service instanceof IoAcceptor)) 

        {//確定服務類型是IoAcceptor

        if (!((IoAcceptor) service).isCloseOnDeactivation()) 

        {// IoAcceptor是否設定為在服務失效時關閉所有連接配接會話

        Object lock = new Object();//監聽鎖

        IoFutureListener<IoFuture> listener = new LockNotifyingListener(lock);

        for (IoSession s : managedSessions.values())

        {

            s.close().addListener(listener);//為每個會話的close動作增加一個監聽者

        try 

            synchronized (lock)

            {

                while (!managedSessions.isEmpty())

                {//所管理的會話還沒有全部結束,持鎖等待

                    lock.wait(500);

                }

        } catch (InterruptedException ie)

            // Ignored

    private static class LockNotifyingListener implements IoFutureListener<IoFuture> 

        private final Object lock;

        public LockNotifyingListener(Object lock) 

            this.lock = lock;

        public void operationComplete(IoFuture future) 

            synchronized (lock) 

                lock.notifyAll();

本文轉自Phinecos(洞庭散人)部落格園部落格,原文連結:http://www.cnblogs.com/phinecos/archive/2008/12/03/1347052.html,如需轉載請自行聯系原作者