mina架構詳解
轉:http://blog.csdn.net/w13770269691/article/details/8614584
分類: web2013-02-26 17:13 12651人閱讀 評論(5) 收藏 舉報
Apache Mina Server 是一個網絡通信應用架構,也就是說,它主要是對基于TCP/IP、UDP/IP協定棧的通信架構(當然,也可以提供JAVA 對象的序列化服務、虛拟機管道通信服務等),Mina 可以幫助我們快速開發高性能、高擴充性的網絡通信應用,Mina 提供了事件驅動、異步(Mina 的異步IO 預設使用的是JAVA NIO 作為底層支援)操作的程式設計模型。Mina 主要有1.x 和2.x 兩個分支,這裡我們講解最新版本2.0,如果你使用的是Mina 1.x,那麼可能會有一些功能并不适用。學習本文檔,需要你已掌握JAVA IO、JAVA NIO、JAVASocket、JAVA 線程及并發庫(java.util.concurrent.*)的知識。Mina 同時提供了網絡通信的Server 端、Client 端的封裝,無論是哪端,Mina 在整個網通通信結構中都處于如下的位置:可見Mina 的API 将真正的網絡通信與我們的應用程式隔離開來,你隻需要關心你要發送、接收的資料以及你的業務邏輯即可。同樣的,無論是哪端,Mina 的執行流程如下所示:
(1.) IoService:這個接口在一個線程上負責套接字的建立,擁有自己的Selector,監聽是否有連接配接被建立。
(2.) IoProcessor:這個接口在另一個線程上,負責檢查是否有資料在通道上讀寫,也就是說它也擁有自己的Selector,這是與我們使用JAVA NIO 編碼時的一個不同之處,通常在JAVA NIO 編碼中,我們都是使用一個Selector,也就是不區分IoService與IoProcessor 兩個功能接口。另外,IoProcessor 負責調用注冊在IoService 上的過濾器,并在過濾器鍊之後調用IoHandler。
(3.) IoFilter:這個接口定義一組攔截器,這些攔截器可以包括日志輸出、黑名單過濾、資料的編碼(write 方向)與解碼(read 方向)等功能,其中資料的encode 與decode是最為重要的、也是你在使用Mina 時最主要關注的地方。
(4.) IoHandler:這個接口負責編寫業務邏輯,也就是接收、發送資料的地方。
1. 簡單的TCPServer:
(1.) 第一步:編寫IoService
按照上面的執行流程,我們首先需要編寫IoService,IoService 本身既是服務端,又是用戶端,我們這裡編寫服務端,是以使用IoAcceptor 實作,由于IoAcceptor 是與協定無關的,因為我們要編寫TCPServer,是以我們使用IoAcceptor 的實作NioSocketAcceptor,實際上底層就是調用java.nio.channels.ServerSocketChannel 類。當然,如果你使用了Apache 的APR 庫,那麼你可以選擇使AprSocketAcceptor 作為TCPServer 的實作,據傳說Apache APR庫的性能比JVM 自帶的本地庫高出很多。那麼IoProcessor 是由指定的IoService 内部建立并調用的,我們并不需要關心。
[java] view plaincopy
- IoAcceptor acceptor=new NioSocketAcceptor();
- acceptor.getSessionConfig().setReadBufferSize(2048);
- acceptor.getSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE,10);
- acceptor.bind(new InetSocketAddress(9123));
這段代碼我們初始化了服務端的TCP/IP 的基于NIO 的套接字,然後調用IoSessionConfig設定讀取資料的緩沖區大小、讀寫通道均在10 秒内無任何操作就進入空閑狀态。
(2.) 第二步:編寫過濾器
這裡我們處理最簡單的字元串傳輸,Mina 已經為我們提供了TextLineCodecFactory 編解碼器工廠來對字元串進行編解碼處理。
- acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(
- <span style="white-space:pre"> </span>Charset.forName("UTF-8"),
- <span style="white-space:pre"> </span>LineDelimeter.WINDOWS.getValue(),
- <span style="white-space:pre"> </span>LineDelimiter. WINDOWS.getValue()))
- );
這段代碼要在acceptor.bind()方法之前執行,因為綁定套接字之後就不能再做這些準備工作了。這裡先不用清楚編解碼器是如何工作的,這個是後面重點說明的内容,這裡你隻需要清楚,我們傳輸的以換行符為辨別的資料,是以使用了Mina 自帶的換行符編解碼器工廠。
(3.) 第三步:編寫IoHandler
這裡我們隻是簡單的列印Client 傳說過來的資料。
- public class MyIoHandler extends IoHandlerAdapter {
- // 這裡我們使用的SLF4J作為日志門面,至于為什麼在後面說明。
- private final static Logger log = LoggerFactory
- .getLogger(MyIoHandler.class);
- @Override
- public void messageReceived(IoSession session, Object message)
- throws Exception {
- String str = message.toString();
- log.info("The message received is [" + str + "]");
- if (str.endsWith("quit")) {
- session.close(true);
- return;
- }
然後我們把這個IoHandler 注冊到IoService:
- acceptor.setHandler(new MyIoHandler());
當然這段代碼也要在acceptor.bind()方法之前執行。然後我們運作MyServer 中的main 方法,你可以看到控制台一直處于阻塞狀态,此時,我們用telnet 127.0.0.1 9123 通路,然後輸入一些内容,當按下Enter鍵,你會發現資料在Server 端被輸出,但要注意不要輸入中文,因為Windows 的指令行視窗不會對傳輸的資料進行UTF-8 編碼。當輸入quit 結尾的字元串時,連接配接被斷開。這裡注意你如果使用的作業系統,或者使用的Telnet 軟體的換行符是什麼,如果不清楚,可以删掉第二步中的兩個紅色的參數,使用TextLineCodec 内部的自動識别機制。
2. 簡單的TCPClient:
這裡我們實作Mina 中的TCPClient,因為前面說過無論是Server 端還是Client 端,在Mina中的執行流程都是一樣的。唯一不同的就是IoService 的Client 端實作是IoConnector。
(1.) 第一步:編寫IoService并注冊過濾器
- public class MyClient {
- main方法:
- IoConnector connector=new NioSocketConnector();
- connector.setConnectTimeoutMillis(30000);
- connector.getFilterChain().addLast("codec",
- new ProtocolCodecFilter(
- new TextLineCodecFactory(
- Charset.forName("UTF-8"),
- LineDelimiter.WINDOWS.getValue(),
- LineDelimiter.WINDOWS.getValue()
- )
- connector.connect(new InetSocketAddress("localhost", 9123));
(2.) 第三步:編寫IoHandler
- public class ClientHandler extends IoHandlerAdapter {
- private final static Logger LOGGER = LoggerFactory
- .getLogger(ClientHandler.class);
- private final String values;
- public ClientHandler(String values) {
- this.values = values;
- public void sessionOpened(IoSession session) {
- session.write(values);
注冊IoHandler:
- connector.setHandler(new ClientHandler("你好!\r\n 大家好!"));
然後我們運作MyClient,你會發現MyServer 輸出如下語句:
The message received is [你好!]
The message received is [大家好!]
我們看到服務端是按照收到兩條消息輸出的,因為我們用的編解碼器是以換行符判斷資料是否讀取完畢的。
3. 介紹Mina的TCP的主要接口:
通過上面的兩個示例,你應該對Mina 如何編寫TCP/IP 協定棧的網絡通信有了一些感性的認識。
(1.)IoService:
這個接口是服務端IoAcceptor、用戶端IoConnector 的抽象,提供IO 服務和管理IoSession的功能,它有如下幾個常用的方法:
A. TransportMetadata getTransportMetadata():
這個方法擷取傳輸方式的中繼資料描述資訊,也就是底層到底基于什麼的實作,譬如:nio、apr 等。
B. void addListener(IoServiceListener listener):
這個方法可以為IoService 增加一個監聽器,用于監聽IoService 的建立、活動、失效、空閑、銷毀,具體可以參考IoServiceListener 接口中的方法,這為你參與IoService 的生命周期提供了機會。
C. void removeListener(IoServiceListener listener):
這個方法用于移除上面的方法添加的監聽器。
D. void setHandler(IoHandler handler):
這個方法用于向IoService 注冊IoHandler,同時有getHandler()方法擷取Handler。
E. Map<Long,IoSession> getManagedSessions():
這個方法擷取IoService 上管理的所有IoSession,Map 的key 是IoSession 的id。
F. IoSessionConfig getSessionConfig():
這個方法用于擷取IoSession 的配置對象,通過IoSessionConfig 對象可以設定Socket 連接配接的一些選項。
(2.)IoAcceptor:
這個接口是TCPServer 的接口,主要增加了void bind()監聽端口、void unbind()解除對套接字的監聽等方法。這裡與傳統的JAVA 中的ServerSocket 不同的是IoAcceptor 可以多次調用bind()方法(或者在一個方法中傳入多個SocketAddress 參數)同時監聽多個端口。
3.)IoConnector:
這個接口是TCPClient 的接口, 主要增加了ConnectFuture connect(SocketAddressremoteAddress,SocketAddress localAddress)方法,用于與Server 端建立連接配接,第二個參數如果不傳遞則使用本地的一個随機端口通路Server 端。這個方法是異步執行的,同樣的,也可以同時連接配接多個服務端。
(4.)IoSession:
這個接口用于表示Server 端與Client 端的連接配接,IoAcceptor.accept()的時候傳回執行個體。
這個接口有如下常用的方法:
A. WriteFuture write(Object message):
這個方法用于寫資料,該操作是異步的。
B. CloseFuture close(boolean immediately):
這個方法用于關閉IoSession,該操作也是異步的,參數指定true 表示立即關閉,否則就在所有的寫操作都flush 之後再關閉。
C. Object setAttribute(Object key,Object value):
這個方法用于給我們向會話中添加一些屬性,這樣可以在會話過程中都可以使用,類似于HttpSession 的setAttrbute()方法。IoSession 内部使用同步的HashMap 存儲你添加的自
定義屬性。
D. SocketAddress getRemoteAddress():
這個方法擷取遠端連接配接的套接字位址。
E. void suspendWrite():
這個方法用于挂起寫操作,那麼有void resumeWrite()方法與之配對。對于read()方法同樣适用。
F. ReadFuture read():
這個方法用于讀取資料, 但預設是不能使用的, 你需要調用IoSessionConfig 的setUseReadOperation(true)才可以使用這個異步讀取的方法。一般我們不會用到這個方法,因為這個方法的内部實作是将資料儲存到一個BlockingQueue,假如是Server 端,因為大量的Client 端發送的資料在Server 端都這麼讀取,那麼可能會導緻記憶體洩漏,但對于Client,可能有的時候會比較便利。
G. IoService getService():
這個方法傳回與目前會話對象關聯的IoService 執行個體。
關于TCP連接配接的關閉:
無論在用戶端還是服務端,IoSession 都用于表示底層的一個TCP 連接配接,那麼你會發現無論是Server 端還是Client 端的IoSession 調用close()方法之後,TCP 連接配接雖然顯示關閉, 但主線程仍然在運作,也就是JVM 并未退出,這是因為IoSession 的close()僅僅是關閉了TCP的連接配接通道,并沒有關閉Server 端、Client 端的程式。你需要調用IoService 的dispose()方法停止Server 端、Client 端。
(5.)IoSessionConfig:
這個方法用于指定此次會話的配置,它有如下常用的方法:
A. void setReadBufferSize(int size):
這個方法設定讀取緩沖的位元組數,但一般不需要調用這個方法,因為IoProcessor 會自動調整緩沖的大小。你可以調用setMinReadBufferSize()、setMaxReadBufferSize()方法,這樣無論IoProcessor 無論如何自動調整,都會在你指定的區間。
B. void setIdleTime(IdleStatus status,int idleTime):
這個方法設定關聯在通道上的讀、寫或者是讀寫事件在指定時間内未發生,該通道就進入空閑狀态。一旦調用這個方法,則每隔idleTime 都會回調過濾器、IoHandler 中的sessionIdle()方法。
C. void setWriteTimeout(int time):
這個方法設定寫操作的逾時時間。
D. void setUseReadOperation(boolean useReadOperation):
這個方法設定IoSession 的read()方法是否可用,預設是false。
(6.)IoHandler:
這個接口是你編寫業務邏輯的地方,從上面的示例代碼可以看出,讀取資料、發送資料基本都在這個接口總完成,這個執行個體是綁定到IoService 上的,有且隻有一個執行個體(沒有給一個IoService 注入一個IoHandler 執行個體會抛出異常)。它有如下幾個方法:
A. void sessionCreated(IoSession session):
這個方法當一個Session 對象被建立的時候被調用。對于TCP 連接配接來說,連接配接被接受的時候調用,但要注意此時TCP 連接配接并未建立,此方法僅代表字面含義,也就是連接配接的對象IoSession 被建立完畢的時候,回調這個方法。對于UDP 來說,當有資料包收到的時候回調這個方法,因為UDP 是無連接配接的。
B. void sessionOpened(IoSession session):
這個方法在連接配接被打開時調用,它總是在sessionCreated()方法之後被調用。對于TCP 來說,它是在連接配接被建立之後調用,你可以在這裡執行一些認證操作、發送資料等。對于UDP 來說,這個方法與sessionCreated()沒什麼差別,但是緊跟其後執行。如果你每隔一段時間,發送一些資料,那麼sessionCreated()方法隻會在第一次調用,但是sessionOpened()方法每次都會調用。
C. void sessionClosed(IoSession session) :
對于TCP 來說,連接配接被關閉時,調用這個方法。對于UDP 來說,IoSession 的close()方法被調用時才會毀掉這個方法。
D. void sessionIdle(IoSession session, IdleStatus status) :
這個方法在IoSession 的通道進入空閑狀态時調用,對于UDP 協定來說,這個方法始終不會被調用。
E. void exceptionCaught(IoSession session, Throwable cause) :
這個方法在你的程式、Mina 自身出現異常時回調,一般這裡是關閉IoSession。
F. void messageReceived(IoSession session, Object message) :
接收到消息時調用的方法,也就是用于接收消息的方法,一般情況下,message 是一個IoBuffer 類,如果你使用了協定編解碼器,那麼可以強制轉換為你需要的類型。通常我們都是會使用協定編解碼器的, 就像上面的例子, 因為協定編解碼器是
TextLineCodecFactory,是以我們可以強制轉message 為String 類型。
G. void messageSent(IoSession session, Object message) :
當發送消息成功時調用這個方法,注意這裡的措辭,發送成功之後,也就是說發送消息是不能用這個方法的。
發送消息的時機:
發送消息應該在sessionOpened()、messageReceived()方法中調用IoSession.write()方法完成。因為在sessionOpened()方法中,TCP 連接配接已經真正打開,同樣的在messageReceived()方法TCP 連接配接也是打開狀态,隻不過兩者的時機不同。sessionOpened()方法是在TCP 連接配接建立之後,接收到資料之前發送;messageReceived()方法是在接收到資料之後發送,你可以完成依據收到的内容是什麼樣子,決定發送什麼樣的資料。因為這個接口中的方法太多,是以通常使用擴充卡模式IoHandlerAdapter,覆寫你所感興趣的方法即可。
(7.)IoBuffer:
這個接口是對JAVA NIO 的ByteBuffer 的封裝,這主要是因為ByteBuffer 隻提供了對基本資料類型的讀寫操作,沒有提供對字元串等對象類型的讀寫方法,使用起來更為友善,另外,ByteBuffer 是定長的,如果想要可變,将很麻煩。IoBuffer 的可變長度的實作類似于StringBuffer。IoBuffer 與ByteBuffer 一樣,都是非線程安全的。本節的一些内容如果不清楚,可以參考java.nio.ByteBuffer 接口。這個接口有如下常用的方法:
A. static IoBuffer allocate(int capacity,boolean useDirectBuffer):
這個方法内部通過SimpleBufferAllocator 建立一個執行個體,第一個參數指定初始化容量,第二個參數指定使用直接緩沖區還是JAVA 記憶體堆的緩存區,預設為false。
B. void free():
釋放緩沖區,以便被一些IoBufferAllocator 的實作重用,一般沒有必要調用這個方法,除非你想提升性能(但可能未必效果明顯)。
C. IoBuffer setAutoExpand(boolean autoExpand):
這個方法設定IoBuffer 為自動擴充容量,也就是前面所說的長度可變,那麼可以看出長度可變這個特性預設是不開啟的。
D. IoBuffer setAutoShrink(boolean autoShrink):
這個方法設定IoBuffer 為自動收縮,這樣在compact()方法調用之後,可以裁減掉一些沒有使用的空間。如果這個方法沒有被調用或者設定為false,你也可以通過調用shrink()方法手動收縮空間。
E. IoBuffer order(ByteOrder bo):
這個方法設定是Big Endian 還是Little Endian,JAVA 中預設是Big Endian,C++和其他語言一般是Little Endian。
F. IoBuffer asReadOnlyBuffer():
這個方法設定IoBuffer 為隻讀的。
G. Boolean prefixedDataAvailable(int prefixLength,int maxDataLength):
這個方法用于資料的最開始的1、2、4 個位元組表示的是資料的長度的情況,
prefixLentgh表示這段資料的前幾個位元組(隻能是1、2、4 的其中一個),代表的是這段資料的長度,
maxDataLength 表示最多要讀取的位元組數。傳回結果依賴于等式
remaining()-prefixLength>=maxDataLength,也就是總的資料-表示長度的位元組,剩下的位元組數要比打算讀取的位元組數大或者相等。
H. String getPrefixedString(int prefixLength,CharsetDecoder decoder):
如果上面的方法傳回true,那麼這個方法将開始讀取表示長度的位元組之後的資料,注意要保持這兩個方法的prefixLength 的值是一樣的。
G、H 兩個方法在後面講到的PrefixedStringDecoder 中的内部實作使用。
IoBuffer 剩餘的方法與ByteBuffer 都是差不多的,額外增加了一些便利的操作方法,例如:
IoBuffer putString(String value,CharsetEncoder encoder)可以友善的以指定的編碼方式存儲字元串、InputStream asInputStream()方法從IoBuffer 剩餘的未讀的資料中轉為輸入流等。
(8.)IoFuture:
在Mina 的很多操作中,你會看到傳回值是XXXFuture,實際上他們都是IoFuture 的子類,看到這樣的傳回值,這個方法就說明是異步執行的,主要的子類有ConnectFuture、CloseFuture 、ReadFuture 、WriteFuture 。這個接口的大部分操作都和
java.util.concurrent.Future 接口是類似的,譬如:await()、awaitUninterruptibly()等,一般我們常用awaitUninterruptibly()方法可以等待異步執行的結果傳回。這個接口有如下常用的方法:
A. IoFuture addListener(IoFutureListener<?> listener):
這個方法用于添加一個監聽器, 在異步執行的結果傳回時監聽器中的回調方法operationComplete(IoFuture future),也就是說,這是替代awaitUninterruptibly()方法另一種等待異步執行結果的方法,它的好處是不會産生阻塞。
B. IoFuture removeListener(IoFutureListener<?> listener):
這個方法用于移除指定的監聽器。
C. IoSession getSession():
這個方法傳回目前的IoSession。舉個例子,我們在用戶端調用connect()方法通路Server 端的時候,實際上這就是一個異步執行的方法,也就是調用connect()方法之後立即傳回,執行下面的代碼,而不管是否連
接成功。那麼如果我想在連接配接成功之後執行一些事情(譬如:擷取連接配接成功後的IoSession對象),該怎麼辦呢?按照上面的說明,你有如下兩種辦法:
第一種:
- ConnectFuture future = connector.connect(new InetSocketAddress(
- HOSTNAME, PORT));
- // 等待是否連接配接成功,相當于是轉異步執行為同步執行。
- future.awaitUninterruptibly();
- // 連接配接成功後擷取會話對象。如果沒有上面的等待,由于connect()方法是異步的,session
- 可能會無法擷取。
- session = future.getSession();
第二種:
- future.addListener(new IoFutureListener<ConnectFuture>() {
- public void operationComplete(ConnectFuture future) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- IoSession session = future.getSession();
- System.out.println("++++++++++++++++++++++++++++");
- });
- System.out.println("*************");
為了更好的看清楚使用監聽器是異步的,而不是像awaitUninterruptibly()那樣會阻塞主線程的執行,我們在回調方法中暫停5 秒鐘,然後輸出+++,在最後輸出***。我們執行代碼之後,你會發現首先輸出***(這證明了監聽器是異步執行的),然後IoSession 對象Created,系統暫停5 秒,然後輸出+++,最後IoSession 對象Opened,也就是TCP 連接配接建立。
4.日志配置:
前面的示例代碼中提到了使用SLF4J 作為日志門面,這是因為Mina 内部使用的就是SLF4J,你也使用SLF4J 可以與之保持一緻性。Mina 如果想啟用日志跟蹤Mina 的運作細節,你可以配置LoggingFilter 過濾器,這樣你可
以看到Session 建立、打開、空閑等一系列細節在日志中輸出,預設SJF4J 是按照DEBUG級别輸出跟蹤資訊的,如果你想給某一類别的Mina 運作資訊輸出指定日志輸出級别,可以調用LoggingFilter 的setXXXLogLevel(LogLevel.XXX)。
例:
- LoggingFilter lf = new LoggingFilter();
- lf.setSessionOpenedLogLevel(LogLevel.ERROR);
- acceptor.getFilterChain().addLast("logger", lf);
這裡IoSession 被打開的跟蹤資訊将以ERROR 級别輸出到日志。
5.過濾器:
前面我們看到了LoggingFilter、ProtocolCodecFilter 兩個過濾器,一個負責日志輸出,一個負責資料的編解碼,通過最前面的Mina 執行流程圖,在IoProcessor 與IoHandler 之間可以有很多的過濾器,這種設計方式為你提供可插拔似的擴充功能提供了非常便利的方式,目前的Apache CXF、Apache Struts2 中的攔截器也都是一樣的設計思路。Mina 中的IoFilter 是單例的,這與CXF、Apache Struts2 沒什麼差別。IoService 執行個體上會綁定一個DefaultIoFilterChainBuilder 執行個體,DefaultIoFilterChainBuilder 會把使用内部的EntryImpl 類把所有的過濾器按照順序連在一起,組成一個過濾器鍊。
DefaultIoFilterChainBuilder 類如下常用的方法:
A. void addFirst(String name,IoFilter filter):
這個方法把過濾器添加到過濾器鍊的頭部,頭部就是IoProcessor 之後的第一個過濾器。同樣的addLast()方法把過濾器添加到過濾器鍊的尾部。
B. void addBefore(String baseName,String name,IoFilter filter):
這個方法将過濾器添加到baseName 指定的過濾器的前面,同樣的addAfter()方法把過濾器添加到baseName 指定的過濾器的後面。這裡要注意無論是那種添加方法,每個過濾器的名字(參數name)必須是唯一的。
C. IoFilter remove(Stirng name):
這個方法移除指定名稱的過濾器,你也可以調用另一個重載的remove()方法,指定要移除的IoFilter 的類型。
D. List<Entry> getAll():
這個方法傳回目前IoService 上注冊的所有過濾器。預設情況下,過濾器鍊中是空的,也就是getAll()方法傳回長度為0 的List,但實際Mina内部有兩個隐藏的過濾器:HeadFilter、TailFilter,分别在List 的最開始和最末端,很明顯,TailFilter 在最末端是為了調用過濾器鍊之後,調用IoHandler。但這兩個過濾器對你來說是透明的,可以忽略它們的存在。編寫一個過濾器很簡單,你需要實作IoFilter 接口,如果你隻關注某幾個方法,可以繼承IoFilterAdapter 擴充卡類。IoFilter 接口中主要包含兩類方法,一類是與IoHandler 中的方法名一緻的方法,相當于攔截IoHandler 中的方法,另一類是IoFilter 的生命周期回調方法,這些回調方法的執行順序和解釋如下所示:
(1.)init()在首次添加到鍊中的時候被調用,但你必須将這個IoFilter 用
ReferenceCountingFilter 包裝起來,否則init()方法永遠不會被調用。
(2.)onPreAdd()在調用添加到鍊中的方法時被調用,但此時還未真正的加入到鍊。
(3.)onPostAdd()在調用添加到鍊中的方法後被調,如果在這個方法中有異常抛出,則過濾器會立即被移除,同時destroy()方法也會被調用(前提是使用ReferenceCountingFilter包裝)。
(4.)onPreRemove()在從鍊中移除之前調用。
(5.)onPostRemove()在從鍊中移除之後調用。
(6.)destory()在從鍊中移除時被調用,使用方法與init()要求相同。
無論是哪個方法,要注意必須在實作時調用參數nextFilter 的同名方法,否則,過濾器鍊的執行将被中斷,IoHandler 中的同名方法一樣也不會被執行,這就相當于Servlet 中的Filter 必須調用filterChain.doFilter(request,response)才能繼續前進是一樣的道理。
示例:
- public class MyIoFilter implements IoFilter {
- public void destroy() throws Exception {
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%�stroy");
- public void exceptionCaught(NextFilter nextFilter, IoSession
- session,
- Throwable cause) throws Exception {
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%exceptionCaught");
- nextFilter.exceptionCaught(session, cause);
- public void filterClose(NextFilter nextFilter, IoSession session)
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterClose");
- nextFilter.filterClose(session);
- public void filterWrite(NextFilter nextFilter, IoSession session,
- WriteRequest writeRequest) throws Exception {
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterWrite");
- nextFilter.filterWrite(session, writeRequest);
- public void init() throws Exception {
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%init");
- public void messageReceived(NextFilter nextFilter, IoSession
- Object message) throws Exception {
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived");
- nextFilter.messageReceived(session, message);
- public void messageSent(NextFilter nextFilter, IoSession session,
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageSent");
- nextFilter.messageSent(session, writeRequest);
- public void onPostAdd(IoFilterChain parent, String name,
- NextFilter nextFilter) throws Exception {
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd");
- public void onPostRemove(IoFilterChain parent, String name,
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostRemove");
- public void onPreAdd(IoFilterChain parent, String name,
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd");
- public void onPreRemove(IoFilterChain parent, String name,
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreRemove");
- public void sessionClosed(NextFilter nextFilter, IoSession session)
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionClosed");
- nextFilter.sessionClosed(session);
- public void sessionCreated(NextFilter nextFilter, IoSession session)
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated");
- nextFilter.sessionCreated(session);
- public void sessionIdle(NextFilter nextFilter, IoSession session,
- IdleStatus status) throws Exception {
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle");
- nextFilter.sessionIdle(session, status);
- public void sessionOpened(NextFilter nextFilter, IoSession session)
- System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened");
- nextFilter.sessionOpened(session);
我們将這個攔截器注冊到上面的TCPServer 的IoAcceptor 的過濾器鍊中的最後一個:
- acceptor.getFilterChain().addLast("myIoFilter",
- new ReferenceCountingFilter(new MyIoFilter()));
這裡我們将MyIoFilter 用ReferenceCountingFilter 包裝起來,這樣你可以看到init()、destroy()方法調用。我們啟動用戶端通路,然後關閉用戶端,你會看到執行順序如下所示:
init onPreAdd onPostAdd sessionCreated sessionOpened messageReceived filterClose sessionClosed onPreRemove onPostRemove destroy。
IoHandler 的對應方法會跟在上面的對應方法之後執行,這也就是說從橫向(單獨的看一個過濾器中的所有方法的執行順序)上看,每個過濾器的執行順序是上面所示的順序;從縱向(方法鍊的調用)上看,如果有filter1、filter2 兩個過濾器,sessionCreated()方法的執行順序如下所示:
filter1-sessionCreated filter2-sessionCreated IoHandler-sessionCreated。
這裡你要注意init、onPreAdd、onPostAdd 三個方法并不是在Server 啟動時調用的,而是IoSession 對象建立之前調用的,也就是說IoFilterChain.addXXX()方法僅僅負責初始化過濾器并注冊過濾器,但并不調用任何方法,包括init()初始化方法也是在IoProcessor 開始工作的時候被調用。IoFilter 是單例的,那麼init()方法是否隻被執行一次呢?這個是不一定的,因為IoFilter是被IoProcessor 調用的,而每個IoService 通常是關聯多個IoProcessor,是以IoFilter的init()方法是在每個IoProcessor 線程上隻執行一次。關于Mina 的線程問題,我們後面會詳細讨論,這裡你隻需要清楚,init()與destroy()的調用次數與IoProceesor 的個數有關,假如一個IoService 關聯了3 個IoProcessor,有五個并發的用戶端請求,那麼你會看到三次init()方法被調用,以後将不再會調用。Mina中自帶的過濾器:
過濾器 說明
BlacklistFilter 設定一些IP 位址為黑名單,不允許通路。
BufferedWriteFilter 設定輸出時像BufferedOutputStream 一樣進行緩沖。
CompressionFilter 設定在輸入、輸出流時啟用JZlib 壓縮。
ConnectionThrottleFilter 這個過濾器指定同一個IP 位址(不含端口号)上的請求在多長的毫秒值内可以有一個請求,如果小于指定的時間間隔就有連續兩個請求,那麼第二個請求将被忽略(IoSession.close())。正如Throttle 的名字一樣,調節通路的頻率這個過濾器最好放在過濾器鍊的前面。
FileRegionWriteFilter 如果你想使用File 對象進行輸出,請使用這個過濾器。要注意,你需要使用WriteFuture 或者在
messageSent() 方法中關閉File 所關聯的FileChannel 通道。
StreamWriteFilter 如果你想使用InputStream 對象進行輸出,請使用這個過濾器。要注意,你需要使用WriteFuture或者在messageSent()方法中關閉File 所關聯的
FileChannel 通道。NoopFilter 這個過濾器什麼也不做,如果你想測試過濾器鍊是否起作用,可以用它來測試。
ProfilerTimerFilter 這個過濾器用于檢測每個事件方法執行的時間,是以最好放在過濾器鍊的前面。
ProxyFilter 這個過濾器在用戶端使用ProxyConnector 作為實作時,會自動加入到過濾器鍊中,用于完成代理功能。
RequestResponseFilter 暫不知曉。
SessionAttributeInitializingFilter 這個過濾器在IoSession 中放入一些屬性(Map),通常放在過濾器的前面,用于放置一些初始化的資訊。
MdcInjectionFilter 針對日志輸出做MDC 操作,可以參考LOG4J 的MDC、NDC 的文檔。
WriteRequestFilter CompressionFilter、RequestResponseFilter 的基類,用于包裝寫請求的過濾器。
還有一些過濾器,會在各節中詳細讨論,這裡沒有列出,譬如:前面的LoggingFilger 日志過濾器。
6.協定編解碼器:
前面說過,協定編解碼器是在使用Mina 的時候你最需要關注的對象,因為在網絡傳輸的資料都是二進制資料(byte),而你在程式中面向的是JAVA 對象,這就需要你實作在發送資料時将JAVA 對象編碼二進制資料,而接收資料時将二進制資料解碼為JAVA 對象(這個可不是JAVA 對象的序列化、反序列化那麼簡單的事情)。Mina 中的協定編解碼器通過過濾器ProtocolCodecFilter 構造,這個過濾器的構造方法需要一個ProtocolCodecFactory,這從前面注冊TextLineCodecFactory 的代碼就可以看出來。
ProtocolCodecFactory 中有如下兩個方法:
public interface ProtocolCodecFactory {
ProtocolEncoder getEncoder(IoSession session) throws Exception;
ProtocolDecoder getDecoder(IoSession session) throws Exception;
}
是以,建構一個ProtocolCodecFactory 需要ProtocolEncoder、ProtocolDecoder 兩個執行個體。你可能要問JAVA 對象和二進制資料之間如何轉換呢?這個要依據具體的通信協定,也就是Server 端要和Client 端約定網絡傳輸的資料是什麼樣的格式,譬如:第一個位元組表示資料長度,第二個位元組是資料類型,後面的就是真正的資料(有可能是文字、有可能是圖檔等等),然後你可以依據長度從第三個位元組向後讀,直到讀取到指定第一個位元組指定長度的資料。
簡單的說,HTTP 協定就是一種浏覽器與Web 伺服器之間約定好的通信協定,雙方按照指定的協定編解碼資料。我們再直覺一點兒說,前面一直使用的TextLine 編解碼器就是在讀取網絡上傳遞過來的資料時,隻要發現哪個位元組裡存放的是ASCII 的10、13 字元(/r、/n),就認為之前的位元組就是一個字元串(預設使用UTF-8 編碼)。以上所說的就是各種協定實際上就是網絡七層結構中的應用層協定,它位于網絡層(IP)、傳輸層(TCP)之上,Mina 的協定編解碼器就是讓你實作一套自己的應用層協定棧。
(6-1.)簡單的編解碼器示例:
下面我們舉一個模拟電信營運商短信協定的編解碼器實作,假設通信協定如下所示:
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
這裡的第一行表示狀态行,一般表示協定的名字、版本号等,第二行表示短信的發送号碼,第三行表示短信接收的号碼,第四行表示短信的位元組數,最後的内容就是短信的内容。上面的每一行的末尾使用ASC II 的10(/n)作為換行符,因為這是純文字資料,協定要
求雙方使用UTF-8 對字元串編解碼。實際上如果你熟悉HTTP 協定,上面的這個精簡的短信協定和HTTP 協定的組成是非常像的,第一行是狀态行,中間的是消息報頭,最後面的是消息正文。在解析這個短信協定之前,你需要知曉TCP 的一個事項,那就是資料的發送沒有規模性,所謂的規模性就是作為資料的接收端,不知道到底什麼時候資料算是讀取完畢,是以應用層協定在制定的時候,必須指定資料讀取的截至點。一般來說,有如下三種方式設定資料讀取的長度:
(1.)使用分隔符,譬如:TextLine 編解碼器。你可以使用/r、/n、NUL 這些ASC II 中的特殊的字元來告訴資料接收端,你隻要遇見分隔符,就表示資料讀完了,不用在那裡傻等着不知道還有沒有資料沒讀完啊?我可不可以開始把已經讀取到的位元組解碼為指定的資料類型了啊?
(2.)定長的位元組數,這種方式是使用長度固定的資料發送,一般适用于指令發送,譬如:資料發送端規定發送的資料都是雙位元組,AA 表示啟動、BB 表示關閉等等。
(3.)在資料中的某個位置使用一個長度域,表示資料的長度,這種處理方式最為靈活,上面的短信協定中的那個L 就是短信文字的位元組數,其實HTTP 協定的消息報頭中的Content-Length 也是表示消息正文的長度,這樣資料的接收端就知道我到底讀到多長的
位元組數就表示不用再讀取資料了。相比較解碼(位元組轉為JAVA 對象,也叫做拆包)來說,編碼(JAVA 對象轉為位元組,也叫做打包)就很簡單了,你隻需要把JAVA 對象轉為指定格式的位元組流,write()就可以了。下面我們開始對上面的短信協定進行編解碼處理。
第一步,協定對象:
- public class SmsObject {
- private String sender;// 短信發送者
- private String receiver;// 短信接受者
- private String message;// 短信内容
- public String getSender() {
- return sender;
- public void setSender(String sender) {
- this.sender = sender;
- public String getReceiver() {
- return receiver;
- public void setReceiver(String receiver) {
- this.receiver = receiver;
- public String getMessage() {
- return message;
- public void setMessage(String message) {
- this.message = message;
第二步,編碼器:
在Mina 中編寫編碼器可以實作ProtocolEncoder,其中有encode()、dispose()兩個方法需要實作。這裡的dispose()方法用于在銷毀編碼器時釋放關聯的資源,由于這個方法一般我們并不關心,是以通常我們直接繼承擴充卡ProtocolEncoderAdapter。
- public class CmccSipcEncoder extends ProtocolEncoderAdapter {
- private final Charset charset;
- public CmccSipcEncoder(Charset charset) {
- this.charset = charset;
- public void encode(IoSession session, Object message,
- ProtocolEncoderOutput out) throws Exception {
- SmsObject sms = (SmsObject) message;
- CharsetEncoder ce = charset.newEncoder();
- IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
- String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0";
- String sender = sms.getSender();
- String receiver = sms.getReceiver();
- String smsContent = sms.getMessage();
- buffer.putString(statusLine + '/n', ce);
- buffer.putString("S: " + sender + '/n', ce);
- buffer.putString("R: " + receiver + '/n', ce);
- buffer
- .putString("L: " + (smsContent.getBytes(charset).length)
- + "/n",
- ce);
- buffer.putString(smsContent, ce);
- buffer.flip();
- out.write(buffer);
這裡我們依據傳入的字元集類型對message 對象進行編碼,編碼的方式就是按照短信協定拼裝字元串到IoBuffer 緩沖區,然後調用ProtocolEncoderOutput 的write()方法輸出位元組流。這裡要注意生成短信内容長度時的紅色代碼,我們使用String 類與Byte[]類型之間的轉換方法獲得轉為位元組流後的位元組數。
解碼器的編寫有以下幾個步驟:
A. 将 encode()方法中的message 對象強制轉換為指定的對象類型;
B. 建立IoBuffer 緩沖區對象,并設定為自動擴充;
C. 将轉換後的message 對象中的各個部分按照指定的應用層協定進行組裝,并put()到IoBuffer 緩沖區;
D. 當你組裝資料完畢之後,調用flip()方法,為輸出做好準備,切記在write()方法之前,要調用IoBuffer 的flip()方法,否則緩沖區的position 的後面是沒有資料可以用來輸出的,你必須調用flip()方法将position 移至0,limit 移至剛才的position。這個flip()方法的含義請參看java.nio.ByteBuffer。
E. 最後調用ProtocolEncoderOutput 的write()方法輸出IoBuffer 緩沖區執行個體。
第三步,解碼器:
在Mina 中編寫解碼器,可以實作ProtocolDecoder 接口,其中有decode()、finishDecode()、dispose()三個方法。這裡的finishDecode()方法可以用于處理在IoSession 關閉時剩餘的未讀取資料,一般這個方法并不會被使用到,除非協定中未定義任何辨別資料什麼時候截止的約定,譬如:Http 響應的Content-Length 未設定,那麼在你認為讀取完資料後,關閉TCP連接配接(IoSession 的關閉)後,就可以調用這個方法處理剩餘的資料,當然你也可以忽略調剩餘的資料。同樣的,一般情況下,我們隻需要繼承擴充卡ProtocolDecoderAdapter,關注decode()方法即可。但前面說過解碼器相對編碼器來說,最麻煩的是資料發送過來的規模,以聊天室為例,一個TCP 連接配接建立之後,那麼隔一段時間就會有聊天内容發送過來,也就是decode()方法會被往複調用,這樣處理起來就會非常麻煩。那麼Mina 中幸好提供了CumulativeProtocolDecoder類,從名字上可以看出累積性的協定解碼器,也就是說隻要有資料發送過來,這個類就會去讀取資料,然後累積到内部的IoBuffer 緩沖區,但是具體的拆包(把累積到緩沖區的資料解碼為JAVA 對象)交由子類的doDecode()方法完成,實際上CumulativeProtocolDecoder就是在decode()反複的調用暴漏給子類實作的doDecode()方法。
具體執行過程如下所示:
A. 你的doDecode()方法傳回true 時,CumulativeProtocolDecoder 的decode()方法會首先判斷你是否在doDecode()方法中從内部的IoBuffer 緩沖區讀取了資料,如果沒有,則會抛出非法的狀态異常,也就是你的doDecode()方法傳回true 就表示你已經消費了本次資料(相當于聊天室中一個完整的消息已經讀取完畢),進一步說,也就是此時你必須已經消費過内部的IoBuffer 緩沖區的資料(哪怕是消費了一個位元組的資料)。如果驗證過通過,那麼CumulativeProtocolDecoder 會檢查緩沖區内是否還有資料未讀取,如果有就繼續調用doDecode()方法,沒有就停止對doDecode()方法的調用,直到有新的資料被緩沖。
B. 當你的doDecode()方法傳回false 時,CumulativeProtocolDecoder 會停止對doDecode()方法的調用,但此時如果本次資料還有未讀取完的,就将含有剩餘資料的IoBuffer 緩沖區儲存到IoSession 中,以便下一次資料到來時可以從IoSession 中提取合并。如果發現本次資料全都讀取完畢,則清空IoBuffer 緩沖區。簡而言之,當你認為讀取到的資料已經夠解碼了,那麼就傳回true,否則就傳回false。這個CumulativeProtocolDecoder 其實最重要的工作就是幫你完成了資料的累積,因為這個工作是很煩瑣的。
- public class CmccSipcDecoder extends CumulativeProtocolDecoder {
- public CmccSipcDecoder(Charset charset) {
- protected boolean doDecode(IoSession session, IoBuffer in,
- ProtocolDecoderOutput out) throws Exception {
- CharsetDecoder cd = charset.newDecoder();
- int matchCount = 0;
- String statusLine = "", sender = "", receiver = "", length = "",
- sms = "";
- int i = 1;
- while (in.hasRemaining()) {
- byte b = in.get();
- buffer.put(b);
- if (b == 10 && i < 5) {
- matchCount++;
- if (i == 1) {
- statusLine = buffer.getString(matchCount, cd);
- statusLine = statusLine.substring(0,
- statusLine.length() - 1);
- matchCount = 0;
- buffer.clear();
- if (i == 2) {
- sender = buffer.getString(matchCount, cd);
- sender = sender.substring(0, sender.length() -1);
- if (i == 3) {
- receiver = buffer.getString(matchCount, cd);
- receiver = receiver.substring(0, receiver.length()
- 1);
- if (i == 4) {
- length = buffer.getString(matchCount, cd);
- length = length.substring(0, length.length() -1);
- i++;
- } else if (i == 5) {
- if (matchCount == Long.parseLong(length.split(": ")[1]))
- {
- sms = buffer.getString(matchCount, cd);
- break;
- } else {
- SmsObject smsObject = new SmsObject();
- smsObject.setSender(sender.split(": ")[1]);
- smsObject.setReceiver(receiver.split(": ")[1]);
- smsObject.setMessage(sms);
- out.write(smsObject);
- return false;
我們的這個短信協定解碼器使用/n(ASCII 的10 字元)作為分解點,一個位元組一個位元組的讀取,那麼第一次發現/n 的位元組位置之前的部分,必然就是短信協定的狀态行,依次類推,你就可以解析出來發送者、接受者、短信内容長度。然後我們在解析短信内容時,使用擷取到的長度進行讀取。全部讀取完畢之後, 然後構造SmsObject 短信對象, 使用ProtocolDecoderOutput 的write()方法輸出,最後傳回false,也就是本次資料全部讀取完畢,告知CumulativeProtocolDecoder 在本次資料讀取中不需要再調用doDecode()方法了。這裡需要注意的是兩個狀态變量i、matchCount,i 用于記錄解析到了短信協定中的哪一行(/n),matchCount 記錄在目前行中讀取到了哪一個位元組。狀态變量在解碼器中經常被使用,我們這裡的情況比較簡單,因為我們假定短信發送是在一次資料發送中完成的,是以狀态變量的使用也比較簡單。假如資料的發送被拆成了多次(譬如:短信協定的短信内容、消息報頭被拆成了兩次資料發送),那麼上面的代碼勢必就會存在問題,因為當第二次調用doDecode()方法時,狀态變量i、matchCount 勢必會被重置,也就是原來的狀态值并沒有被儲存。那麼我們如何解決狀态儲存的問題呢?答案就是将狀态變量儲存在IoSession 中或者是Decoder 執行個體自身,但推薦使用前者,因為雖然Decoder 是單例的,其中的執行個體變量儲存的狀态在Decoder 執行個體銷毀前始終保持,但Mina 并不保證每次調用doDecode()方法時都是同一個線程(這也就是說第一次調用doDecode()是IoProcessor-1 線程,第二次有可能就是IoProcessor-2 線程),這就會産生多線程中的執行個體變量的可視性(Visibility,具體請參考JAVA 的多線程知識)問題。IoSession中使用一個同步的HashMap 儲存對象,是以你不需要擔心多線程帶來的問題。使用IoSession 儲存解碼器的狀态變量通常的寫法如下所示:
A. 在解碼器中定義私有的内部類Context,然後将需要儲存的狀态變量定義在Context 中存儲。
B. 在解碼器中定義方法擷取這個Context 的執行個體,這個方法的實作要優先從IoSession 中擷取Context。
具體代碼示例如下所示:
// 上下文作為儲存狀态的内部類的名字,意思很明顯,就是讓狀态跟随上下文,在整個調用過程中都可以被保持。
- public class XXXDecoder extends CumulativeProtocolDecoder{
- private final AttributeKey CONTEXT =
- new AttributeKey(getClass(), "context" );
- public Context getContext(IoSession session){
- Context ctx=(Context)session.getAttribute(CONTEXT);
- if(ctx==null){
- ctx=new Context();
- session.setAttribute(CONTEXT,ctx);
- private class Context {
- //狀态變量
注意這裡我們使用了Mina 自帶的AttributeKey 類來定義儲存在IoSession 中的對象的鍵值,這樣可以有效的防止鍵值重複。另外,要注意在全部處理完畢之後,狀态要複位,譬如:聊天室中的一條消息讀取完畢之後,狀态變量要變為初始值,以便下次處理時重新使用。
第四步,編解碼工廠:
- public class CmccSipcCodecFactory implements ProtocolCodecFactory {
- private final CmccSipcEncoder encoder;
- private final CmccSipcDecoder decoder;
- public CmccSipcCodecFactory() {
- this(Charset.defaultCharset());
- public CmccSipcCodecFactory(Charset charSet) {
- this.encoder = new CmccSipcEncoder(charSet);
- this.decoder = new CmccSipcDecoder(charSet);
- public ProtocolDecoder getDecoder(IoSession session) throws
- Exception {
- return decoder;
- public ProtocolEncoder getEncoder(IoSession session) throws
- return encoder;
實際上這個工廠類就是包裝了編碼器、解碼器,通過接口中的getEncoder()、getDecoder()方法向ProtocolCodecFilter 過濾器傳回編解碼器執行個體,以便在過濾器中對資料進行編解碼處理。
第五步,運作示例:
下面我們修改最一開始的示例中的MyServer、MyClient 的代碼,如下所示:
- acceptor.getFilterChain().addLast(
- "codec",
- new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset
- .forName("UTF-8"))));
- connector.getFilterChain().addLast(
- new ProtocolCodecFilter(new
- CmccSipcCodecFactory(
- Charset.forName("UTF-8"))));
- 然後我們在ClientHandler 中發送一條短信:
- SmsObject sms = new SmsObject();
- sms.setSender("15801012253");
- sms.setReceiver("18869693235");
- sms.setMessage("你好!Hello World!");
- session.write(sms);
最後我們在MyIoHandler 中接收這條短資訊:
- log.info("The message received is [" + sms.getMessage() + "]");
你會看到Server 端的控制台輸出如下資訊:
The message received is [你好!Hello World!]
(6-2.)複雜的解碼器:
下面我們講解一下如何在解碼器中儲存狀态變量,也就是真正的實作上面所說的Context。
我們假設這樣一種情況,有兩條短信:
他們按照上面的顔色辨別發送,也就是說紅色部分、藍色部分、綠色部分分别發送(調用三次IoSession.write()方法),那麼如果你還用上面的CmccSipcDecoder,将無法工作,因為第一次資料流(紅色部分)發送過取時,資料是不完整的,無法解析出一條短資訊,當二次資料流(藍色部分)發送過去時,已經可以解析出第一條短資訊了,但是第二條短信還是不完整的,需要等待第三次資料流(綠色部分)的發送。注意:由于模拟資料發送的規模性問題很麻煩,是以這裡采用了這種極端的例子說明問題,雖不具有典型性,但很能說明問題,這就足夠了,是以不要追究這種發送消息是否在真實環境中存在,更不要追究其合理性。
CmccSispcDecoder 類改為如下的寫法:
- private final AttributeKey CONTEXT = new AttributeKey(getClass(),
- "context");
- Context ctx = getContext(session);
- int matchCount = ctx.getMatchCount();
- int line = ctx.getLine();
- IoBuffer buffer = ctx.innerBuffer;
- String statusLine = ctx.getStatusLine(),
- sender = ctx.getSender(),
- receiver = ctx.getReceiver(),
- length = ctx.getLength(),
- sms = ctx.getSms();
- if (line < 4 && b == 10) {
- if (line == 0) {
- ctx.setStatusLine(statusLine);
- if (line == 1) {
- sender = sender.substring(0, sender.length() - 1);
- ctx.setSender(sender);
- if (line == 2) {
- receiver = receiver.substring(0, receiver.length() -
- ctx.setReceiver(receiver);
- if (line == 3) {
- length = length.substring(0, length.length() - 1);
- ctx.setLength(length);
- line++;
- } else if (line == 4) {
- ctx.setSms(sms);
- // 由于下面的break,這裡需要調用else外面的兩行代碼
- ctx.setMatchCount(matchCount);
- ctx.setLine(line);
- if (ctx.getLine() == 4
- && Long.parseLong(ctx.getLength().split(": ")[1]) == ctx
- .getMatchCount()) {
- ctx.reset();
- return true;
- private Context getContext(IoSession session) {
- Context context = (Context) session.getAttribute(CONTEXT);
- if (context == null){
- context = new Context();
- session.setAttribute(CONTEXT, context);
- return context;
- private final IoBuffer innerBuffer;
- private String statusLine = "";
- private String sender = "";
- private String receiver = "";
- private String length = "";
- private String sms = "";
- public Context() {
- innerBuffer = IoBuffer.allocate(100).setAutoExpand(true);
- private int matchCount = 0;
- private int line = 0;
- public int getMatchCount() {
- return matchCount;
- public void setMatchCount(int matchCount) {
- this.matchCount = matchCount;
- public int getLine() {
- return line;
- public void setLine(int line) {
- this.line = line;
- public String getStatusLine() {
- return statusLine;
- public void setStatusLine(String statusLine) {
- this.statusLine = statusLine;
- public String getLength() {
- return length;
- public void setLength(String length) {
- this.length = length;
- public String getSms() {
- return sms;
- public void setSms(String sms) {
- this.sms = sms;
- public void reset() {
- this.innerBuffer.clear();
- this.matchCount = 0;
- this.line = 0;
- this.statusLine = "";
- this.sender = "";
- this.receiver = "";
- this.length = "";
- this.sms = "";
這裡我們做了如下的幾步操作:
(1.) 所有記錄狀态的變量移到了Context 内部類中,包括記錄讀到短信協定的哪一行的line。每一行讀取了多少個位元組的matchCount,還有記錄解析好的狀态行、發送者、接受者、短信内容、累積資料的innerBuffer 等。這樣就可以在資料不能完全解碼,等待下一次doDecode()方法的調用時,還能承接上一次調用的資料。
(2.) 在 doDecode()方法中主要的變化是各種狀态變量首先是從Context 中擷取,然後操作之後,将最新的值setXXX()到Context 中儲存。
(3.) 這裡注意doDecode()方法最後的判斷,當認為不夠解碼為一條短資訊時,傳回false,也就是在本次資料流解碼中不要再調用doDecode()方法;當認為已經解碼出一條短資訊時,輸出短消息,然後重置所有的狀态變量,傳回true,也就是如果本次資料流解碼中還有沒解碼完的資料,繼續調用doDecode()方法。下面我們對用戶端稍加改造,來模拟上面的紅、藍、綠三次發送聊天短資訊的情況:
MyClient:
- for (int i = 0; i < 3; i++) {
- System.out.println("****************" + i);
這裡我們為了友善示範,不在IoHandler 中發送消息,而是直接在MyClient 中發送,你要注意的是三次發送都要使用同一個IoSession,否則就不是從同一個通道發送過去的了。
CmccSipcEncoder:
- String sender = "15801012253";
- String receiver = "15866332698";
- String smsContent = "你好!Hello World!";
- IoBuffer buffer2 = IoBuffer.allocate(100).setAutoExpand(true);
- buffer2.putString("L: " + (smsContent.getBytes(charset).length)
- + "/n",ce);
- buffer2.putString(smsContent, ce);
- buffer2.putString(statusLine + '/n', ce);
- buffer2.flip();
- out.write(buffer2);
- IoBuffer buffer3 = IoBuffer.allocate(100).setAutoExpand(true);
- buffer3.putString("S: " + sender + '/n', ce);
- buffer3.putString("R: " + receiver + '/n', ce);
- buffer3.putString("L: " + (smsContent.getBytes(charset).length)
- buffer3.putString(smsContent, ce);
- buffer3.putString(statusLine + '/n', ce);
- buffer3.flip();
- out.write(buffer3);
上面的這段代碼要配合MyClient來操作,你需要做的是在MyClient中的紅色輸出語句處設定斷點,然後第一調用時CmccSipcEncoder中注釋掉藍、綠色的代碼,也就是發送兩條短資訊的第一部分(紅色的代碼),依次類推,也就是MyClient的中的三次斷點中,分别執行CmccSipcEncoder中的紅、藍、綠三段代碼,也就是模拟兩條短信的三段發送。你會看到Server端的運作結果是:當MyClient第一次到達斷點時,沒有短資訊被讀取到,當MyClient第二次到達斷點時,第一條短資訊輸出,當MyClient第三次到達斷點時,第二條短資訊輸出。
Mina中自帶的解碼器:
解碼器 說明
CumulativeProtocolDecoder 累積性解碼器,上面我們重點說明了這個解碼器的用法。
SynchronizedProtocolDecoder 這個解碼器用于将任何一個解碼器包裝為一個線程安全的解碼器,用于解決上面說的每次執行decode()方法時可能線程不是上一次的線程的問題,但這樣會在高并發時,大大降低系統的性能。
TextLineDecoder 按照文本的換行符( Windows:/r/n 、Linux:/n、Mac:/r)解碼資料。
PrefixedStringDecoder 這個類繼承自CumulativeProtocolDecoder類,用于讀取資料最前端的1、2、4 個位元組表示後面的資料長度的資料。譬如:一個段資料的前兩個位元組表示後面的真實資料的長度,那麼你就可以用這個方法進行解碼。
(6-3.)多路分離的解碼器:
假設一段資料發送過來之後,需要根據某種條件決定使用哪個解碼器,而不是像上面的例子,固定使用一個解碼器,那麼該如何做呢?幸好Mina 提供了org.apache.mina.filter.codec.demux 包來完成這種多路分離(Demultiplexes)的解碼工作,也就是同時注冊多個解碼器,然後運作時依據傳入的資料決定到底使用哪個解碼器來工作。所謂多路分離就是依據條件分發到指定的解碼器,譬如:上面的短信協定進行擴充,可以依據狀态行來判斷使用1.0 版本的短信協定解碼器還是2.0版本的短信協定解碼器。
下面我們使用一個簡單的例子,說明這個多路分離的解碼器是如何使用的,需求如下所示:
(1.) 用戶端傳入兩個int 類型的數字,還有一個char 類型的符号。
(2.) 如果符号是+,服務端就是用1 号解碼器,對兩個數字相加,然後把結果傳回給用戶端。
(3.) 如果符号是-,服務端就使用2 号解碼器,将兩個數字變為相反數,然後相加,把結果傳回給用戶端。
Demux 開發編解碼器主要有如下幾個步驟:
A. 定義Client 端、Server 端發送、接收的資料對象。
B. 使用Demux 編寫編碼器是實作MessageEncoder<T>接口,T 是你要編碼的資料對象,這個MessageEncoder 會在DemuxingProtocolEncoder 中調用。
C. 使用Demux 編寫編碼器是實作MessageDecoder 接口,這個MessageDecoder 會在DemuxingProtocolDecoder 中調用。
D. 在 DemuxingProtocolCodecFactory 中調用addMessageEncoder()、addMessageDecoder()方法組裝編解碼器。
MessageEncoder的接口如下所示:
- public interface MessageEncoder<T> {
- void encode(IoSession session, T message, ProtocolEncoderOutput out)
- throws Exception;
你注意到消息編碼器接口與在ProtocolEncoder 中沒什麼不同,差別就是Object message被泛型具體化了類型,你不需要手動的類型轉換了。
MessageDecoder的接口如下所示:
- public interface MessageDecoder {
- static MessageDecoderResult OK = MessageDecoderResult.OK;
- static MessageDecoderResult NEED_DATA =
- MessageDecoderResult.NEED_DATA;
- static MessageDecoderResult NOT_OK = MessageDecoderResult.NOT_OK;
- MessageDecoderResult decodable(IoSession session, IoBuffer in);
- MessageDecoderResult decode(IoSession session, IoBuffer in,
- ProtocolDecoderOutput out) throws Exception;
- void finishDecode(IoSession session, ProtocolDecoderOutput out)
(1.)decodable()方法有三個傳回值,分别表示如下的含義:
A. MessageDecoderResult.NOT_OK:表示這個解碼器不适合解碼資料,然後檢查其它解碼器,如果都不滿足會抛異常;
B. MessageDecoderResult.NEED_DATA:表示目前的讀入的資料不夠判斷是否能夠使用這個解碼器解碼,然後再次調用decodable()方法檢查其它解碼器,如果都是NEED_DATA,則等待下次輸入;
C. MessageDecoderResult.OK: 表示這個解碼器可以解碼讀入的資料, 然後則調用MessageDecoder 的decode()方法。這裡注意decodable()方法對參數IoBuffer in 的任何操作在方法結束之後,都會複原,也就是你不必擔心在調用decode()方法時,position 已經不在緩沖區的起始位置。這個方法相當于是預讀取,用于判斷是否是可用的解碼器。
(2.)decode()方法有三個傳回值,分别表示如下的含義:
A. MessageDecoderResult.NOT_OK:表示解碼失敗,會抛異常;
B. MessageDecoderResult.NEED_DATA:表示資料不夠,需要讀到新的資料後,再次調用decode()方法。
C. MessageDecoderResult.OK:表示解碼成功。
代碼示範:
(1.)用戶端發送的資料對象:
- public class SendMessage {
- private int i = 0;
- private int j = 0;
- private char symbol = '+';
- public char getSymbol() {
- return symbol;
- public void setSymbol(char symbol) {
- this.symbol = symbol;
- public int getI() {
- return i;
- public void setI(int i) {
- this.i = i;
- public int getJ() {
- return j;
- public void setJ(int j) {
- this.j = j;
(2.)服務端發送的傳回結果對象:
- public class ResultMessage {
- private int result = 0;
- public int getResult() {
- return result;
- public void setResult(int result) {
- this.result = result;
(3.)用戶端使用的SendMessage的編碼器:
- public class SendMessageEncoder implements MessageEncoder<SendMessage>
- public void encode(IoSession session, SendMessage message,
- IoBuffer buffer = IoBuffer.allocate(10);
- buffer.putChar(message.getSymbol());
- buffer.putInt(message.getI());
- buffer.putInt(message.getJ());
這裡我們的SendMessage、ResultMessage 中的字段都是用長度固定的基本資料類型,這樣IoBuffer 就不需要自動擴充了,提高性能。按照一個char、兩個int 計算,這裡的IoBuffer隻需要10 個位元組的長度就可以了。
(4.)服務端使用的SendMessage的1号解碼器:
- public class SendMessageDecoderPositive implements MessageDecoder {
- public MessageDecoderResult decodable(IoSession session, IoBuffer in)
- if (in.remaining() < 2)
- return MessageDecoderResult.NEED_DATA;
- else {
- char symbol = in.getChar();
- if (symbol == '+') {
- return MessageDecoderResult.OK;
- return MessageDecoderResult.NOT_OK;
- public MessageDecoderResult decode(IoSession session, IoBuffer in,
- SendMessage sm = new SendMessage();
- sm.setSymbol(in.getChar());
- sm.setI(in.getInt());
- sm.setJ(in.getInt());
- out.write(sm);
- public void finishDecode(IoSession session, ProtocolDecoderOutput
- out)
- // undo
因為用戶端發送的SendMessage 的前兩個位元組(char)就是符号位,是以我們在decodable()方法中對此條件進行了判斷,之後讀到兩個位元組,并且這兩個位元組表示的字元是+時,才認為這個解碼器可用。
(5.)服務端使用的SendMessage的2号解碼器:
- public class SendMessageDecoderNegative implements MessageDecoder {
- if (symbol == '-') {
- sm.setI(-in.getInt());
- sm.setJ(-in.getInt());
(6.)服務端使用的ResultMessage的編碼器:
- public class ResultMessageEncoder implements
- MessageEncoder<ResultMessage> {
- public void encode(IoSession session, ResultMessage message,
- IoBuffer buffer = IoBuffer.allocate(4);
- buffer.putInt(message.getResult());
(7.)用戶端使用的ResultMessage的解碼器:
- public class ResultMessageDecoder implements MessageDecoder {
- if (in.remaining() < 4)
- else if (in.remaining() == 4)
- else
- ResultMessage rm = new ResultMessage();
- rm.setResult(in.getInt());
- out.write(rm);
(8.)組裝這些編解碼器的工廠:
- public class MathProtocolCodecFactory extends
- DemuxingProtocolCodecFactory {
- public MathProtocolCodecFactory(boolean server) {
- if (server) {
- super.addMessageEncoder(ResultMessage.class,
- ResultMessageEncoder.class);
- super.addMessageDecoder(SendMessageDecoderPositive.class);
- super.addMessageDecoder(SendMessageDecoderNegative.class);
- super
- .addMessageEncoder(SendMessage.class,
- SendMessageEncoder.class);
- super.addMessageDecoder(ResultMessageDecoder.class);
這個工廠類我們使用了構造方法的一個布爾類型的參數,以便其可以在Server 端、Client端同時使用。我們以Server 端為例,你可以看到調用兩次addMessageDecoder()方法添加了1 号、2 号解碼器,其實DemuxingProtocolDecoder 内部在維護了一個MessageDecoder數組,用于儲存添加的所有的消息解碼器,每次decode()的時候就調用每個MessageDecoder的decodable()方法逐個檢查,隻要發現一個MessageDecoder 不是對應的解碼器,就從數組中移除,直到找到合适的MessageDecoder,如果最後發現數組為空,就表示沒找到對應的MessageDecoder,最後抛出異常。
(9.)Server端:
- public class Server {
- public static void main(String[] args) throws Exception {
- IoAcceptor acceptor = new NioSocketAcceptor();
- acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,
- 5);
- acceptor.getFilterChain().addLast("codec",
- MathProtocolCodecFactory(true)));
- acceptor.setHandler(new ServerHandler());
- acceptor.bind(new InetSocketAddress(9123));
(10.)Server端使用的IoHandler:
- public class ServerHandler extends IoHandlerAdapter {
- .getLogger(ServerHandler.class);
- public void sessionIdle(IoSession session, IdleStatus status)
- SendMessage sm = (SendMessage) message;
- log.info("The message received is [ " + sm.getI() + " "
- + sm.getSymbol() + " " + sm.getJ() + " ]");
- rm.setResult(sm.getI() + sm.getJ());
- session.write(rm);
(11.)Client端:
- public class Client {
- public static void main(String[] args) throws Throwable {
- IoConnector connector = new NioSocketConnector();
- connector.getFilterChain().addLast("logger", new
- LoggingFilter());
- MathProtocolCodecFactory(false)));
- connector.setHandler(new ClientHandler());
(12.)Client端的IoHandler:
- public void sessionOpened(IoSession session) throws Exception {
- sm.setI(100);
- sm.setJ(99);
- sm.setSymbol('+');
- session.write(sm);
- public void messageReceived(IoSession session, Object message) {
- ResultMessage rs = (ResultMessage) message;
- LOGGER.info(String.valueOf(rs.getResult()));
你嘗試改變(12.)中的紅色代碼中的正負号,會看到服務端使用了兩個不同的解碼器對其進行處理。
7.線程模型配置:
Mina 中的很多執行環節都使用了多線程機制,用于提高性能。Mina 中預設在三個地方使用了線程:
(1.) IoAcceptor:
這個地方用于接受用戶端的連接配接建立,每監聽一個端口(每調用一次bind()方法),都啟用一個線程,這個數字我們不能改變。這個線程監聽某個端口是否有請求到來,一旦發現,則建立一個IoSession 對象。因為這個動作很快,是以有一個線程就夠了。
(2.) IoConnector:
這個地方用于與服務端建立連接配接,每連接配接一個服務端(每調用一次connect()方法),就啟用一個線程,我們不能改變。同樣的,這個線程監聽是否有連接配接被建立,一旦發現,則建立一個IoSession 對象。因為這個動作很快,是以有一個線程就夠了。
(3.) IoProcessor:
這個地方用于執行真正的IO 操作,預設啟用的線程個數是CPU 的核數+1,譬如:單CPU 雙核的電腦,預設的IoProcessor 線程會建立3 個。這也就是說一個IoAcceptor 或者IoConnector 預設會關聯一個IoProcessor 池,這個池中有3 個IoProcessor。因為IO 操作耗費資源,是以這裡使用IoProcessor 池來完成資料的讀寫操作,有助于提高性能。這也就是前面說的IoAccetor、IoConnector 使用一個Selector,而IoProcessor 使用自己單獨的Selector 的原因。那麼為什麼IoProcessor 池中的IoProcessor 數量隻比CPU 的核數大1 呢?因為IO 讀寫操作是耗費CPU 的操作,而每一核CPU 同時隻能運作一個線程,是以IoProcessor 池中的IoProcessor 的數量并不是越多越好。
這個IoProcessor 的數量可以調整,如下所示:
IoAcceptor acceptor=new NioSocketAcceptor(5);
IoConnector connector=new NioSocketConnector(5);
這樣就會将IoProcessor 池中的數量變為5 個,也就是說可以同時處理5 個讀寫操作。還記得前面說過Mina 的解碼器要使用IoSession 儲存狀态變量,而不是Decoder 本身,這是因為Mina 不保證每次執行doDecode()方法的都是同一個IoProcessor 這句話嗎?其實這個問題的根本原因是IoProcessor 是一個池,每次IoSession 進入空閑狀态時(無讀些資料發生),IoProcessor 都會被回收到池中,以便其他的IoSession 使用,是以當IoSession從空閑狀态再次進入繁忙狀态時,IoProcessor 會再次配置設定給其一個IoProcessor 執行個體,而此時已經不能保證還是上一次繁忙狀态時的那個IoProcessor 了。你還會發現IoAcceptor 、IoConnector 還有一個構造方法, 你可以指定一個java.util.concurrent.Executor 類作為線程池對象,那麼這個線程池對象是做什麼用的呢?其實就是用于建立(1.)、(2.)中的用于監聽是否有TCP 連接配接建立的那個線程,預設情況下,使用Executors.newCachedThreadPool()方法建立Executor 執行個體,也就是一個無界的線程池(具體内容請參看JAVA 的并發庫)。大家不要試圖改變這個Executor 的執行個體,也就是使用内置的即可,否則可能會造成一些莫名其妙的問題,譬如:性能在某個通路量級别時,突然下降。因為無界線程池是有多少個Socket 建立,就配置設定多少個線程,如果你改為Executors 的其他建立線程池的方法,建立了一個有界線程池,那麼一些請求将無法得到及時響應,進而出現一些問題。
下面我們完整的綜述一下Mina 的工作流程:
(1.) 當 IoService 執行個體建立的時候,同時一個關聯在IoService 上的IoProcessor 池、線程池也被建立;
(2.) 當 IoService 建立套接字(IoAcceptor 的bind()或者是IoConnector 的connect()方法被調用)時,IoService 從線程池中取出一個線程,監聽套接字端口;
(3.) 當 IoService 監聽到套接字上有連接配接請求時,建立IoSession 對象,從IoProcessor池中取出一個IoProcessor 執行個體執行這個會話通道上的過濾器、IoHandler;
(4.) 當這條IoSession 通道進入空閑狀态或者關閉時,IoProcessor 被回收。上面說的是Mina 預設的線程工作方式,那麼我們這裡要講的是如何配置IoProcessor 的多線程工作方式。因為一個IoProcessor 負責執行一個會話上的所有過濾器、IoHandler,也
就是對于IO 讀寫操作來說,是單線程工作方式(就是按照順序逐個執行)。假如你想讓某個事件方法(譬如:sessionIdle()、sessionOpened()等)在單獨的線程中運作(也就是非IoProcessor 所在的線程),那麼這裡就需要用到一個ExecutorFilter 的過濾器。你可以看到IoProcessor 的構造方法中有一個參數是java.util.concurrent.Executor,也就是可以讓IoProcessor 調用的過濾器、IoHandler 中的某些事件方法線上程池中配置設定的線程上獨立運作,而不是運作在IoProcessor 所在的線程。
acceptor.getFilterChain().addLast("exceutor", new ExecutorFilter());
我們看到是用這個功能,簡單的一行代碼就可以了。那麼ExecutorFilter 還有許多重載的構造方法,這些重載的有參構造方法,參數主要用于指定如下資訊:
(1.) 指定線程池的屬性資訊,譬如:核心大小、最大大小、等待隊列的性質等。你特别要關注的是ExecutorFilter 内部預設使用的是OrderedThreadPoolExecutor 作為線程池的實作,從名字上可以看出是保證各個事件在多線程執行中的順序(譬如:各個事件方
法的執行是排他的,也就是不可能出現兩個事件方法被同時執行;messageReceived()總是在sessionClosed() 方法之前執行), 這是因為多線程的執行是異步的, 如果沒有OrderedThreadPoolExecutor 來保證IoHandler 中的方法的調用順序,可能會出現嚴重的問題。但是如果你的代碼确實沒有依賴于IoHandler 中的事件方法的執行順序,那麼你可以使用UnorderedThreadPoolExecutor 作為線程池的實作。是以,你也最好不要改變預設的Executor 實作,否則,事件的執行順序就會混亂,譬如:messageReceived()、messageSent()方法被同時執行。
(2.) 哪些事件方法被關注,也就哪些事件方法用這個線程池執行。線程池可以異步執行的事件類型是位于IoEventType 中的九個枚舉值中除了SESSION_CREATED 之外的其餘八個,這說明Session 建立的事件隻能與IoProcessor 在同一個線程上執行。
- public enum IoEventType {
- SESSION_CREATED,
- SESSION_OPENED,
- SESSION_CLOSED,
- MESSAGE_RECEIVED,
- MESSAGE_SENT,
- SESSION_IDLE,
- EXCEPTION_CAUGHT,
- WRITE,
- CLOSE,
預設情況下,沒有配置關注的事件類型,有如下六個事件方法會被自動使用線程池異步執行:
IoEventType.EXCEPTION_CAUGHT,
IoEventType.MESSAGE_RECEIVED,
IoEventType.MESSAGE_SENT,
IoEventType.SESSION_CLOSED,
IoEventType.SESSION_IDLE,
IoEventType.SESSION_OPENED
其實ExecutorFilter 的工作機制很簡單,就是在調用下一個過濾器的事件方法時,把其交給Executor 的execute(Runnable runnable)方法來執行,其實你自己在IoHandler 或者某個過濾器的事件方法中開啟一個線程,也可以完成同樣的功能,隻不過這樣做,你就失去了程式的可配置性,線程調用的代碼也會完全耦合在代碼中。但要注意的是絕對不能開啟線程讓其執行sessionCreated()方法。如果你真的打算使用這個ExecutorFilter,那麼最好想清楚它該放在過濾器鍊的哪個位置,針對哪些事件做異步處理機制。一般ExecutorFilter 都是要放在ProtocolCodecFilter 過濾器的後面,也就是不要讓編解碼運作在獨立的線程上,而是要運作在IoProcessor 所在的線程,因為編解碼處理的資料都是由IoProcessor 讀取和發送的,沒必要開啟新的線程,否則性能反而會下降。一般使用ExecutorFilter 的典型場景是将業務邏輯(譬如:耗時的資料庫操作)放在單獨的線程中運作,也就是說與IO 處理無關的操作可以考慮使用ExecutorFilter 來異步執行。