天天看點

Mina2.0架構源碼剖析(七)

前面介紹完了org.apache.mina.core.session這個包,現在開始進入org.apache.mina.core. polling包。這個包裡包含了實作基于輪詢政策(比如NIO的select調用或其他類型的I/O輪詢系統調用(如epoll,poll,kqueue等)的基類。

先來看AbstractPollingIoAcceptor這個抽象基類,它繼承自AbstractIoAcceptor,兩個泛型參數分别是所處理的會話和伺服器端socket連接配接。底層的sockets會被不斷檢測,并當有任何一個socket需要被處理時就會被喚醒去處理。這個類封裝了伺服器端socket的bind,accept和dispose等動作,其成員變量Executor負責接受來自用戶端的連接配接請求,另一個AbstractPollingIoProcessor用于處理用戶端的I/O操作請求,如讀寫和關閉連接配接。

其最重要的幾個成員變量是:

  private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();//注冊隊列

    private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();//取消注冊隊列

    private final Map<SocketAddress, H> boundHandles = Collections

            .synchronizedMap(new HashMap<SocketAddress, H>());//本地位址到伺服器socket的映射表

先來看看當服務端調用bind後的處理過程:

複制代碼

protected final Set<SocketAddress> bind0(

            List<? extends SocketAddress> localAddresses) throws Exception {

        AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);//注冊請求

        registerQueue.add(request);//加入注冊隊列中,等待worker處理

        //建立一個Worker執行個體,開始工作

        startupWorker();

        wakeup();

        request.awaitUninterruptibly();

        // 更新本地綁定位址

        Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();

        for (H handle : boundHandles.values()) {

            newLocalAddresses.add(localAddress(handle));

        }

        return newLocalAddresses;

    }

     真正的負責接收用戶端請求的工作都是Worker線程完成的,

private class Worker implements Runnable {

        public void run() {

            int nHandles = 0;

            while (selectable) {

                try {

                    // Detect if we have some keys ready to be processed

                    boolean selected = select();//檢測是否有SelectionKey已經可以被處理了

                    nHandles += registerHandles();//注冊伺服器sockets句柄,這樣做的目的是将Selector的狀态置于OP_ACCEPT,并綁定到所監聽的端口上,表明接受了可以接收的來自用戶端的連接配接請求,

                    if (selected) {

                        processHandles(selectedHandles());//處理可以被處理的SelectionKey狀态為OP_ACCEPT的伺服器socket句柄集(即真正處理來自用戶端的連接配接請求)

                    }

                    nHandles -= unregisterHandles();//檢查是否有取消連接配接的用戶端請求

                    if (nHandles == 0) {

                        synchronized (lock) {

                            if (registerQueue.isEmpty()

                                    && cancelQueue.isEmpty()) {//完成工作

                                worker = null;

                                break;

                            }

                        }

                } catch (Throwable e) {

                    ExceptionMonitor.getInstance().exceptionCaught(e);

                    try {

                        Thread.sleep(1000);//線程休眠一秒

                    } catch (InterruptedException e1) {

                        ExceptionMonitor.getInstance().exceptionCaught(e1);

                }

            }

            if (selectable && isDisposing()) {//釋放資源

                selectable = false;

                    if (createdProcessor) {

                        processor.dispose();

                } finally {

                        synchronized (disposalLock) {

                            if (isDisposing()) {

                                destroy();

                    } catch (Exception e) {

                        ExceptionMonitor.getInstance().exceptionCaught(e);

                    } finally {

                        disposalFuture.setDone();

private int registerHandles() {//注冊伺服器sockets句柄

        for (;;) {

            AcceptorOperationFuture future = registerQueue.poll();

            Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();

            List<SocketAddress> localAddresses = future.getLocalAddresses();

            try {

                for (SocketAddress a : localAddresses) {

                    H handle = open(a);//打開指定位址,傳回伺服器socket句柄

                    newHandles.put(localAddress(handle), handle);//加入位址—伺服器socket映射表中

                boundHandles.putAll(newHandles);//更新本地綁定位址集

                // and notify.

                future.setDone();//完成注冊過程

                return newHandles.size();

            } catch (Exception e) {

                future.setException(e);

            } finally {

                // Roll back if failed to bind all addresses.

                if (future.getException() != null) {

                    for (H handle : newHandles.values()) {

                        try {

                            close(handle);//關閉伺服器socket句柄

                        } catch (Exception e) {

                            ExceptionMonitor.getInstance().exceptionCaught(e);

                    wakeup();

        private void processHandles(Iterator<H> handles) throws Exception {//處理來自用戶端的連接配接請求

            while (handles.hasNext()) {

                H handle = handles.next();

                handles.remove();

                T session = accept(processor, handle);//為一個伺服器socket句柄handle真正接收來自用戶端的請求,在給定的所關聯的processor上傳回會話session

                if (session == null) {

                    break;

                finishSessionInitialization(session, null, null);//結束會話初始化

                // add the session to the SocketIoProcessor

                session.getProcessor().add(session);

     這個類中有個地方值得注意,就是wakeup方法,它是用來中斷select方法的,當注冊隊列或取消注冊隊列發生變化時需要調用它,可以參看本類的一個子類NioSocketAcceptor的實作:

    protected boolean select() throws Exception {

        return selector.select() > 0;

    protected void wakeup() {

        selector.wakeup();

     我們可以查閱jdk文檔,它對Selector的select方法有如下解釋:選擇一組鍵,其相應的通道已為 I/O 操作準備就緒。 此方法執行處于阻塞模式的選擇操作。僅在至少選擇一個通道、調用此選擇器的 wakeup 方法、目前的線程已中斷,或者給定的逾時期滿(以先到者為準)後此方法才傳回。

參考資料

1,《Java NIO非阻塞伺服器示例》

本文轉自Phinecos(洞庭散人)部落格園部落格,原文連結:http://www.cnblogs.com/phinecos/archive/2008/12/08/1350315.html,如需轉載請自行聯系原作者