一,mina的架構

mina 架構可以大緻分為三部分,ioService ,ioFilterChain , IoHandler
ioService:用于接受服務或者連接配接服務,例如socket 接收器,連接配接器。
ioFilterChain:對接受的資料或發送的資料進行處理,例如接收到的位元組數組轉化成字元串,打日志等。
ioHandler:業務處理類,我們自己寫的業務邏輯,例如對接受到的資料進行怎麼樣的資料,需要傳回什麼資料。
二,聊天室代碼。
下面看下mina-2.0.9中聊天室例子。聊天室伺服器的代碼:
public static void main(String[] args) throws Exception {
NioSocketAcceptor acceptor = new NioSocketAcceptor();//建立接收器,用于接受socket連接配接。
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();//擷取filterChain
MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();//擷取mdc inject filter
chain.addLast("mdc", mdcInjectionFilter);//設定filterChain 鍊中最好一個過濾器為mdcInjectionFilter
// Add SSL filter if SSL is enabled.
if (USE_SSL) {
addSSLSupport(chain);
}
chain.addLast("codec", new ProtocolCodecFilter(
new TextLineCodecFactory()));//位元組組和字元串轉換 filter
addLogger(chain); //日志 filter
// Bind
acceptor.setHandler(new ChatProtocolHandler()); //設定接受的資訊的業務處理類
acceptor.bind(new InetSocketAddress(PORT));//綁定端口
System.out.println("Listening on port " + PORT);
}
然後看下ChatProtocolHandler ,該類是繼承了IoHandlerAdapter類,看下具體實作邏輯的方法,簡化了大部分代碼。
@Override
public void messageReceived(IoSession session, Object message) {
String theMessage = (String) message;//接受得來的字元串
session.write("LOGIN OK");//傳回登入成功,
}
上面代碼出現了一個session的概念,這個session的概念在mina中是比較重要的,下面來看下,mina在底層幫我們做了哪些事情。
主要是看下
NioSocketAcceptor acceptor = new NioSocketAcceptor();//建立接收器,用于接受socket連接配接。
acceptor.bind(new InetSocketAddress(PORT));//綁定端口
這兩行代碼背後mina都做了哪些事情。
先看下下面這行代碼初始化的時候,都初始化了哪些屬性,下圖的屬性是初始化中比較關鍵的。
三,接受新連接配接
NioSocketAcceptor acceptor = new NioSocketAcceptor();//建立接收器,用于接受socket連接配接。
NioSocketAcceptor的繼承圖如下,屬性隻列出了相對重要的屬性。
1.初始化sessionConfig的實作是 DefaultSocketSessionConfig(),裡面配置了一些參數用于後續建立session.
2.初始化executor,
executor = Executors.newCachedThreadPool();
newCachedThreadPool線程池的用法如下:
“建立一個可根據需要建立新線程的線程池,但是在以前構造的線程可用時将重用它們。對于執行很多短期異步任務的程式而言,這些線程池通常可提高程式性能。調用 execute 将重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則建立一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。是以,長時間保持空閑的線程池不會使用任何資源。注意,可以使用 ThreadPoolExecutor 構造方法建立具有類似屬性但細節不同(例如逾時參數)的線程池。 ”
3.初始化processor,建立了下面這個對象。
new SimpleIoProcessorPool<S>( NioProcessor.class),該對象有兩個比較關鍵的屬性,分别是executor和pool。executor和上面一樣是一個Executors.newCachedThreadPool(),pool為一個NioProcessor的數組。NioProcessor如下,也有兩個關鍵屬性,selector選擇器和executor線程池(newCachedThreadPool)。
看下這行代碼背後都做了哪些東西。
acceptor.bind(new InetSocketAddress(PORT));//綁定端口
1.這塊代碼主要做的事情就是啟動了一個線程,使用者監聽連接配接。
主要的runnable為AbstractPollingIoAcceptor.Acceptor
Acceptor中會調用NioSocketAcceptor.open方法,在open方法中,我們能看到熟悉的建立接收器的代碼。簡化代碼如下:
protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
// Creates the listening ServerSocket
ServerSocketChannel channel = null;
if (selectorProvider != null) {
channel = selectorProvider.openServerSocketChannel();
} else {
channel = ServerSocketChannel.open();//建立一個ServerSocketChannel對象
}
// This is a non blocking socket channel
channel.configureBlocking(false);//設定管道為非阻塞
// Configure the server socket,
ServerSocket socket = channel.socket();//擷取管道中關聯的ServerSocket
// Set the reuseAddress flag accordingly with the setting
socket.setReuseAddress(isReuseAddress());
// and bind.
socket.bind(localAddress, getBacklog());//綁定位址
...
// Register the channel within the selector for ACCEPT event
channel.register(selector, SelectionKey.OP_ACCEPT);//管道注冊到選擇器,監聽socket的接受事件,該selector為NioSocketAcceptor的屬性
....
return channel;
}
看下Acceptor的run方法,同樣可以看到熟悉的selector代碼:
private class Acceptor implements Runnable {
public void run() {
assert (acceptorRef.get() == this);
int nHandles = 0;
// Release the lock
lock.release();
while (selectable) {
try {
// Detect if we have some keys ready to be processed
// The select() will be woke up if some new connection
// have occurred, or if the selector has been explicitly
// woke up
int selected = select();//第一次通路到這個地方的時候,會在AbstractPollingIoAcceptor. bindInternal()中的wakeUp()方法中會被打斷,直接執行下面registerHandles的初始化。
// this actually sets the selector to OP_ACCEPT,
// and binds to the port on which this class will
// listen on
nHandles += registerHandles();//主要是執行上面的open方法。
// Now, if the number of registred handles is 0, we can
// quit the loop: we don't have any socket listening
// for incoming connection.
if (nHandles == 0) {
.....
}
if (selected > 0) {//如果有新連接配接進來
// We have some connection request, let's process
// them here.
processHandles(selectedHandles());
}
// check to see if any cancellation request has been made.
nHandles -= unregisterHandles();
} .....
// Cleanup all the processors, and shutdown the acceptor.
if (selectable && isDisposing()) {
selectable = false;
......
}}}
然後看下下面這段代碼背後都做了哪些事情。
if (selected > 0) {//如果有新連接配接進來
// We have some connection request, let's process
// them here.
processHandles(selectedHandles());
}
selectedHandles()方法中主要是擷取目前selector中已經就緒的selectedKey 的集合,具體的方法如下:
@Override
protected Iterator<ServerSocketChannel> selectedHandles() {
return new ServerSocketChannelIterator(selector.selectedKeys());
}
processHandles的的方法如下:
private void processHandles(Iterator<H> handles) throws Exception {
while (handles.hasNext()) {
H handle = handles.next();
handles.remove();
// Associates a new created connection to a processor,
// and get back a session
S session = accept(processor, handle);//生成一個NioSocketSession
if (session == null) {
continue;
}
initSession(session, null, null);//對session中的部分attribute和writeRequestQueue等進行預設的初始化。
// add the session to the SocketIoProcessor
session.getProcessor().add(session);
}
}
看一下accept方法
@Override
protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
SelectionKey key = null;
if (handle != null) {
key = handle.keyFor(selector);//
}
if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
return null;
}
// accept the connection from the client
SocketChannel ch = handle.accept();//接受一個新連接配接
if (ch == null) {
return null;
}
return new NioSocketSession(this, processor, ch);
}
new NioSocketSession(this, processor, ch)中有幾個主要的操作:
this.service = service;//設定目前目前session關聯的IoService在這裡是NioSocketAcceptor
this.handler = service.getHandler();//設定目前session關聯的handler,在這裡對應的是new ChatProtocolHandler()
sessionId = idGenerator.incrementAndGet();//擷取唯一的sessionId
this.channel = channel;//設定目前的Channel,這裡為新進來的socket連接配接對應的SocketChannel
this.processor = processor;//設定目前IoProcessor<NioSession> processor ,這裡設為AbstractPollingIoAcceptor.processor 也就是上面初始化的SimpleIoProcessorPool(..)
filterChain = new DefaultIoFilterChain(this);//初始化預設的filterChain
this.config.setAll(service.getSessionConfig());//設定SessionCofig 為NioSocketAcceptor.getSessionConfig();
NioSocketSession的繼承圖如下:屬性隻列出了關鍵的屬性。
再看下
initSession(session, null, null);//對session中的部分attribute和writeRequestQueue等進行預設的初始化。
最後是:
session.getProcessor().add(session);
session.getProcessor()擷取剛才初始化processor,SimpleIoProcessorPool 的一個對象
在看下SimpleIoProcessorPool的add(Session session)方法:
public final void add(S session) {
getProcessor(session).add(session);//toread
}
getProcessor(session) 中首先會在SimpleIoProcessorPool.pool中去取一個IoProcessor<S> 的執行個體,這裡是NioProcessor執行個體,然後session和這個NioProcessor的執行個體關聯起來。簡化代碼如下:
IoProcessor<S> processor = pool[Math.abs((int) session.getId()) % pool.length];
session.setAttributeIfAbsent(PROCESSOR, processor);
然後再看下NioProcessor.add(Session session)方法,其實是執行了NioProcessor的父類AbstractPollingIoProcessor的add方法。
該add方法,首先是在Queue<S> newSessions這個隊列中增加了上面傳進來的session,然後啟動了一個新的線程AbstractPollingIoProcessor的内部類 Processor。簡化的run方法如下:
private class Processor implements Runnable {
public void run() {
int selected = select(SELECT_TIMEOUT);//擷取目前是否有新的請求進來。
nSessions += handleNewSessions();
if (selected > 0) {
//LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
process();
}
flush(currentTime);//如果有session有寫請求在這裡面處理
}
}
hanldeNewSession()中對剛才在
Queue<S> newSessions
增加的新session進行初始化操作,首先是:NioProcessor中的初始化方法:
@Override
protected void init(NioSession session) throws Exception {
SelectableChannel ch = (SelectableChannel) session.getChannel();//擷取session關聯的SocketChannel
ch.configureBlocking(false);//設定為非阻塞
session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));//把socketChannel注冊到NioProcessor的selector中,并且監聽的時間為OP_READ,可讀事件。
}
然後是初始session的filterChain,初始化為session關聯的IoService的FilterChainBuilder,這裡就是我們一開始初始化NioSocketAcceptor的filterChain
代碼如下:
// Build the filter chain of this session.
IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
chainBuilder.buildFilterChain(session.getFilterChain());
四,處理請求
然後執行下面的代碼,如果目前NioProcessor.selector有可讀的channel的話,執行process()方法
if (selected > 0) {
//LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
process();
}
process()方法就是真正處理資料和執行業務邏輯的地方了。首先會調用filterChain然後執行設定的handler.
process()方法中主要是read()方法,該方法主要是讀取socket中的資料,并且執行filterChain和handler 簡化的代碼如下:
private void read(S session) {
IoSessionConfig config = session.getConfig();
int bufferSize = config.getReadBufferSize();
IoBuffer buf = IoBuffer.allocate(bufferSize);
final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
try {
int readBytes = 0;
int ret;
try {
if (hasFragmentation) {
while ((ret = read(session, buf)) > 0) {//讀取資料
readBytes += ret;
if (!buf.hasRemaining()) {
break;
}
}
} else {
ret = read(session, buf);
if (ret > 0) {
readBytes = ret;
}
}
} finally {
buf.flip();
}
if (readBytes > 0) {
IoFilterChain filterChain = session.getFilterChain();//擷取filterChain
filterChain.fireMessageReceived(buf);//執行filterChain中的messageReceived事件。
buf = null;
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e);
}
}
session如有寫的請求的話,先把請求封裝成WriteRequest,最後把結果存在session.writeRequestQueue中,待執行flush方法的時候,才真正的把資料寫進去。
寫操作具體的代碼在AbstractPollingIoProcessor的flushNow裡,最終寫操作是在NioProcessor的write方法中完成的
NioProcessor. write如下代碼 , 就是主要就是Channnel的write方法了:
@Override
protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
if (buf.remaining() <= length) {
return session.getChannel().write(buf.buf());
}
int oldLimit = buf.limit();
buf.limit(buf.position() + length);
try {
return session.getChannel().write(buf.buf());
} finally {
buf.limit(oldLimit);
}
}
五,概括
最後來看下mina中IoService,IoSession,IoProcessor,IoFilterChain,Handler之間的關系。還是以上面聊天室的為例,從伺服器開始監聽端口,到有第一個請求進來,流程如下。
1.NioSocketAcceptor 會啟動一個線程用于接受socket連接配接,另外還會建立一個NioProcessor池.在建立NioSocketAcceptor之後需要設定IoFilterChain和Handler,Handler就是具體的業務處理邏輯。
2.當有一個新的連接配接進來的時候,會建立一個NioSocketSession,這個session會和新建立的socketChannel關聯起來.
3.在NioProcessor池中取出一個NioProcessor來處理該session.
4.session關聯IoFilterChain,當有Channel有讀寫事件的時候都需要經過FilterChain,FilterChain中根據不同的事件,例如接受消息,寫事件,發送事件都有特定的方法。
5.NioProcessor中會啟動一個線程用于監聽該socketChannel是否有新消息進來,sochetChannel在selector中注冊了OP_READ這個事件。NioProcessor中主要就是處理socketChannel中發生的各種事件,包括讀寫操作。
end~