天天看點

mina socket底層主流程源碼實作

一,mina的架構

mina socket底層主流程源碼實作

mina 架構可以大緻分為三部分,ioService ,ioFilterChain , IoHandler

ioService:用于接受服務或者連接配接服務,例如socket 接收器,連接配接器。

ioFilterChain:對接受的資料或發送的資料進行處理,例如接收到的位元組數組轉化成字元串,打日志等。

ioHandler:業務處理類,我們自己寫的業務邏輯,例如對接受到的資料進行怎麼樣的資料,需要傳回什麼資料。

二,聊天室代碼。

下面看下mina-2.0.9中聊天室例子。聊天室伺服器的代碼:

public static void main(String[] args) throws Exception {
        NioSocketAcceptor acceptor = new NioSocketAcceptor();//建立接收器,用于接受socket連接配接。
        DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();//擷取filterChain
 
        MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();//擷取mdc inject filter
        chain.addLast("mdc", mdcInjectionFilter);//設定filterChain 鍊中最好一個過濾器為mdcInjectionFilter
 
        // Add SSL filter if SSL is enabled.
        if (USE_SSL) {
            addSSLSupport(chain);
        }
 
        chain.addLast("codec", new ProtocolCodecFilter(
                new TextLineCodecFactory()));//位元組組和字元串轉換 filter
 
        addLogger(chain); //日志 filter
 
        // Bind
        acceptor.setHandler(new ChatProtocolHandler()); //設定接受的資訊的業務處理類
        acceptor.bind(new InetSocketAddress(PORT));//綁定端口
 
        System.out.println("Listening on port " + PORT);
    }      

然後看下ChatProtocolHandler ,該類是繼承了IoHandlerAdapter類,看下具體實作邏輯的方法,簡化了大部分代碼。

@Override
    public void messageReceived(IoSession session, Object message) {
        String theMessage = (String) message;//接受得來的字元串
        session.write("LOGIN OK");//傳回登入成功,
    }      

上面代碼出現了一個session的概念,這個session的概念在mina中是比較重要的,下面來看下,mina在底層幫我們做了哪些事情。

主要是看下

NioSocketAcceptor acceptor = new NioSocketAcceptor();//建立接收器,用于接受socket連接配接。
 acceptor.bind(new InetSocketAddress(PORT));//綁定端口      

這兩行代碼背後mina都做了哪些事情。

先看下下面這行代碼初始化的時候,都初始化了哪些屬性,下圖的屬性是初始化中比較關鍵的。

三,接受新連接配接

NioSocketAcceptor acceptor = new NioSocketAcceptor();//建立接收器,用于接受socket連接配接。      

NioSocketAcceptor的繼承圖如下,屬性隻列出了相對重要的屬性。

mina socket底層主流程源碼實作

1.初始化sessionConfig的實作是 DefaultSocketSessionConfig(),裡面配置了一些參數用于後續建立session.

2.初始化executor, 

executor = Executors.newCachedThreadPool(); 

newCachedThreadPool線程池的用法如下:

“建立一個可根據需要建立新線程的線程池,但是在以前構造的線程可用時将重用它們。對于執行很多短期異步任務的程式而言,這些線程池通常可提高程式性能。調用 execute 将重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則建立一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。是以,長時間保持空閑的線程池不會使用任何資源。注意,可以使用 ThreadPoolExecutor 構造方法建立具有類似屬性但細節不同(例如逾時參數)的線程池。 ”

3.初始化processor,建立了下面這個對象。

mina socket底層主流程源碼實作

new SimpleIoProcessorPool<S>( NioProcessor.class),該對象有兩個比較關鍵的屬性,分别是executor和pool。executor和上面一樣是一個Executors.newCachedThreadPool(),pool為一個NioProcessor的數組。NioProcessor如下,也有兩個關鍵屬性,selector選擇器和executor線程池(newCachedThreadPool)。

mina socket底層主流程源碼實作

看下這行代碼背後都做了哪些東西。

acceptor.bind(new InetSocketAddress(PORT));//綁定端口

1.這塊代碼主要做的事情就是啟動了一個線程,使用者監聽連接配接。

主要的runnable為AbstractPollingIoAcceptor.Acceptor

Acceptor中會調用NioSocketAcceptor.open方法,在open方法中,我們能看到熟悉的建立接收器的代碼。簡化代碼如下:

protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
        // Creates the listening ServerSocket
        ServerSocketChannel channel = null;
        if (selectorProvider != null) {
            channel = selectorProvider.openServerSocketChannel();
        } else {
            channel = ServerSocketChannel.open();//建立一個ServerSocketChannel對象
        }
  
            // This is a non blocking socket channel
            channel.configureBlocking(false);//設定管道為非阻塞
            // Configure the server socket,
            ServerSocket socket = channel.socket();//擷取管道中關聯的ServerSocket
            // Set the reuseAddress flag accordingly with the setting
            socket.setReuseAddress(isReuseAddress());
            // and bind.
            
                socket.bind(localAddress, getBacklog());//綁定位址
         ...
            // Register the channel within the selector for ACCEPT event
            channel.register(selector, SelectionKey.OP_ACCEPT);//管道注冊到選擇器,監聽socket的接受事件,該selector為NioSocketAcceptor的屬性
           
       ....
        return channel;
    }      

看下Acceptor的run方法,同樣可以看到熟悉的selector代碼:

private class Acceptor implements Runnable {
        public void run() {
            assert (acceptorRef.get() == this);
            int nHandles = 0;
            // Release the lock
            lock.release();
            while (selectable) {
                try {
                    // Detect if we have some keys ready to be processed
                    // The select() will be woke up if some new connection
                    // have occurred, or if the selector has been explicitly
                    // woke up
                    int selected = select();//第一次通路到這個地方的時候,會在AbstractPollingIoAcceptor. bindInternal()中的wakeUp()方法中會被打斷,直接執行下面registerHandles的初始化。
                    // this actually sets the selector to OP_ACCEPT,
                    // and binds to the port on which this class will
                    // listen on
                    nHandles += registerHandles();//主要是執行上面的open方法。
                    // Now, if the number of registred handles is 0, we can
                    // quit the loop: we don't have any socket listening
                    // for incoming connection.
                    if (nHandles == 0) {
                       .....
                    }
                    if (selected > 0) {//如果有新連接配接進來
                        // We have some connection request, let's process
                        // them here.
                        processHandles(selectedHandles());
                    }
                    // check to see if any cancellation request has been made.
                    nHandles -= unregisterHandles();
                } .....
            // Cleanup all the processors, and shutdown the acceptor.
            if (selectable && isDisposing()) {
                selectable = false;
               ......
        }}}      

然後看下下面這段代碼背後都做了哪些事情。

if (selected > 0) {//如果有新連接配接進來
    // We have some connection request, let's process
    // them here.
    processHandles(selectedHandles());
                    }      

selectedHandles()方法中主要是擷取目前selector中已經就緒的selectedKey 的集合,具體的方法如下:

@Override
    protected Iterator<ServerSocketChannel> selectedHandles() {
        return new ServerSocketChannelIterator(selector.selectedKeys());
    }      

processHandles的的方法如下:

private void processHandles(Iterator<H> handles) throws Exception {
            while (handles.hasNext()) {
                H handle = handles.next();
                handles.remove();
                // Associates a new created connection to a processor,
                // and get back a session
                S session = accept(processor, handle);//生成一個NioSocketSession
                if (session == null) {
                    continue;
                }
                initSession(session, null, null);//對session中的部分attribute和writeRequestQueue等進行預設的初始化。
                // add the session to the SocketIoProcessor
                session.getProcessor().add(session);
            }
        }      

看一下accept方法

@Override
    protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
        SelectionKey key = null;
        if (handle != null) {
            key = handle.keyFor(selector);//
        }
        if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
            return null;
        }
        // accept the connection from the client
        SocketChannel ch = handle.accept();//接受一個新連接配接
        if (ch == null) {
            return null;
        }
        return new NioSocketSession(this, processor, ch); 
    }      

new NioSocketSession(this, processor, ch)中有幾個主要的操作:

this.service = service;//設定目前目前session關聯的IoService在這裡是NioSocketAcceptor
 this.handler = service.getHandler();//設定目前session關聯的handler,在這裡對應的是new ChatProtocolHandler()
 sessionId = idGenerator.incrementAndGet();//擷取唯一的sessionId
 this.channel = channel;//設定目前的Channel,這裡為新進來的socket連接配接對應的SocketChannel
 this.processor = processor;//設定目前IoProcessor<NioSession> processor ,這裡設為AbstractPollingIoAcceptor.processor 也就是上面初始化的SimpleIoProcessorPool(..)
 filterChain = new DefaultIoFilterChain(this);//初始化預設的filterChain
 this.config.setAll(service.getSessionConfig());//設定SessionCofig 為NioSocketAcceptor.getSessionConfig();      

NioSocketSession的繼承圖如下:屬性隻列出了關鍵的屬性。

mina socket底層主流程源碼實作

再看下

initSession(session, null, null);//對session中的部分attribute和writeRequestQueue等進行預設的初始化。

最後是:

session.getProcessor().add(session);

session.getProcessor()擷取剛才初始化processor,SimpleIoProcessorPool 的一個對象

在看下SimpleIoProcessorPool的add(Session session)方法:

public final void add(S session) {
        getProcessor(session).add(session);//toread
    }      

getProcessor(session) 中首先會在SimpleIoProcessorPool.pool中去取一個IoProcessor<S> 的執行個體,這裡是NioProcessor執行個體,然後session和這個NioProcessor的執行個體關聯起來。簡化代碼如下:

IoProcessor<S> processor  = pool[Math.abs((int) session.getId()) % pool.length];
 session.setAttributeIfAbsent(PROCESSOR, processor);      

然後再看下NioProcessor.add(Session session)方法,其實是執行了NioProcessor的父類AbstractPollingIoProcessor的add方法。

該add方法,首先是在Queue<S> newSessions這個隊列中增加了上面傳進來的session,然後啟動了一個新的線程AbstractPollingIoProcessor的内部類 Processor。簡化的run方法如下:

private class Processor implements Runnable {
        public void run() {
      int selected = select(SELECT_TIMEOUT);//擷取目前是否有新的請求進來。
      nSessions += handleNewSessions();
     if (selected > 0) {
                        //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
                        process();
                    }
      flush(currentTime);//如果有session有寫請求在這裡面處理
        } 
}      

hanldeNewSession()中對剛才在

Queue<S> newSessions

增加的新session進行初始化操作,首先是:NioProcessor中的初始化方法:

@Override
    protected void init(NioSession session) throws Exception {
        SelectableChannel ch = (SelectableChannel) session.getChannel();//擷取session關聯的SocketChannel
        ch.configureBlocking(false);//設定為非阻塞
        session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));//把socketChannel注冊到NioProcessor的selector中,并且監聽的時間為OP_READ,可讀事件。
    }      

然後是初始session的filterChain,初始化為session關聯的IoService的FilterChainBuilder,這裡就是我們一開始初始化NioSocketAcceptor的filterChain

代碼如下:

// Build the filter chain of this session.
IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
chainBuilder.buildFilterChain(session.getFilterChain());      

四,處理請求

然後執行下面的代碼,如果目前NioProcessor.selector有可讀的channel的話,執行process()方法

if (selected > 0) {
                        //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
                        process();
                    }      

process()方法就是真正處理資料和執行業務邏輯的地方了。首先會調用filterChain然後執行設定的handler.

process()方法中主要是read()方法,該方法主要是讀取socket中的資料,并且執行filterChain和handler  簡化的代碼如下:

private void read(S session) {
        IoSessionConfig config = session.getConfig();
        int bufferSize = config.getReadBufferSize();
        IoBuffer buf = IoBuffer.allocate(bufferSize);
        final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
        try {
            int readBytes = 0;
            int ret;
            try {
                if (hasFragmentation) {
                    while ((ret = read(session, buf)) > 0) {//讀取資料
                        readBytes += ret;
                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);
                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }
            if (readBytes > 0) {
                IoFilterChain filterChain = session.getFilterChain();//擷取filterChain
                filterChain.fireMessageReceived(buf);//執行filterChain中的messageReceived事件。
                buf = null;
            IoFilterChain filterChain = session.getFilterChain();
            filterChain.fireExceptionCaught(e);
        }
    }      

session如有寫的請求的話,先把請求封裝成WriteRequest,最後把結果存在session.writeRequestQueue中,待執行flush方法的時候,才真正的把資料寫進去。

寫操作具體的代碼在AbstractPollingIoProcessor的flushNow裡,最終寫操作是在NioProcessor的write方法中完成的

NioProcessor. write如下代碼 , 就是主要就是Channnel的write方法了:

@Override
    protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
        if (buf.remaining() <= length) {
            return session.getChannel().write(buf.buf());
        }
        int oldLimit = buf.limit();
        buf.limit(buf.position() + length);
        try {
            return session.getChannel().write(buf.buf());
        } finally {
            buf.limit(oldLimit);
        }
    }      

五,概括

最後來看下mina中IoService,IoSession,IoProcessor,IoFilterChain,Handler之間的關系。還是以上面聊天室的為例,從伺服器開始監聽端口,到有第一個請求進來,流程如下。

mina socket底層主流程源碼實作

1.NioSocketAcceptor 會啟動一個線程用于接受socket連接配接,另外還會建立一個NioProcessor池.在建立NioSocketAcceptor之後需要設定IoFilterChain和Handler,Handler就是具體的業務處理邏輯。

2.當有一個新的連接配接進來的時候,會建立一個NioSocketSession,這個session會和新建立的socketChannel關聯起來.

3.在NioProcessor池中取出一個NioProcessor來處理該session.

4.session關聯IoFilterChain,當有Channel有讀寫事件的時候都需要經過FilterChain,FilterChain中根據不同的事件,例如接受消息,寫事件,發送事件都有特定的方法。

5.NioProcessor中會啟動一個線程用于監聽該socketChannel是否有新消息進來,sochetChannel在selector中注冊了OP_READ這個事件。NioProcessor中主要就是處理socketChannel中發生的各種事件,包括讀寫操作。

end~