天天看點

Socket網絡架構 MINA

<a href="http://apps.hi.baidu.com/share/user/65276d79647265616d5f7869616f3608">http://apps.hi.baidu.com/share/user/65276d79647265616d5f7869616f3608</a>

MINA是一個網絡應用架構,在不犧牲性能和可擴充性的前提下用于解決如下問題:

1: 快速開發自己的英勇。

2:高可維護性,高可複用性:網絡I/O編碼,消息的編/解碼,業務邏輯互相分離。

3:相對容易的進行單元測試。

3.1 IoFilters:

IoFilter為MINA的功能擴充提供了接口。它攔截所有的IO事件進行事件的預處理和後處理 (AOP)。我們可以把它想象成

Servlet的filters。

IoFilter能夠實作以下幾種目的:

事件日志

性能 檢測

資料轉換(e.g. SSL support),codec

防火牆…等等

3.2 codec: ProtocolCodecFactory

MINA提供了友善的Protocol支援。如上說講,codec在 IoFilters中設定。

通過它的Encoder和Decoder,可以友善的擴充并支援各種基于Socket的網絡協定,比如HTTP服務 器、FTP伺服器、Telnet伺服器等等。

要實作自己的編碼/解碼器(codec)隻需要實作interface: ProtocolCodecFactory即可.

在MINA 1.0版本,MINA已經實作了幾個常用的(codec factory):

DemuxingProtocolCodecFactory, 

NettyCodecFactory, 

ObjectSerializationCodecFactory, 

TextLineCodecFactory

其中: 

TextLineCodecFactory:

A ProtocolCodecFactory that performs encoding and decoding between a text line data and a Java 

string object. This codec is useful especially when you work with a text-based protocols such as SMTP and IMAP.

ObjectSerializationCodecFactory:

A ProtocolCodecFactory that serializes and deserializes Java objects. This codec is very useful when 

you have to prototype your application rapidly without any specific codec.

DemuxingProtocolCodecFactory:

A composite ProtocolCodecFactory that consists of multiple MessageEncoders and MessageDecoders. ProtocolEncoder

and ProtocolDecoder this factory returns demultiplex incoming messages and buffers to appropriate MessageEncoders 

and MessageDecoders.

NettyCodecFactory:

A MINA ProtocolCodecFactory that provides encoder and decoder for Netty2 Messages and MessageRecognizers.

3.3 business logic: IoHandler

MINA中,所有的業務邏輯都有實作了IoHandler的class完成

interfaceHandles:

all protocol events fired by MINA. There are 6 event handler methods, and they are all invoked by MINA automatically. 

當事件發生時,将觸發IoHandler中的方 法:

sessionCreated, sessionOpened, sessionClosed, sessionIdle, exceptionCaught, messageReceived, messageSent

MINA 1.O中,IoHandler的實作類:

ChainedIoHandler, DemuxingIoHandler, IoHandlerAdapter, SingleSessionIoHandlerDelegate, StreamIoHandler 

具體 細節可參考javadoc。

3.4   MINA的進階主題:線程模式

MINA通過它靈活的filter機制來提供多種線程模型。

沒有線程池過濾器被使用時 MINA運作在一個單線程模式。

如果添加了一個IoThreadPoolFilter到IoAcceptor,将得到一個leader- follower模式的線程池。

如果再添加一個ProtocolThreadPoolFilter,server将有兩個線程池;

一個 (IoThreadPoolFilter)被用于對message對象進行轉換,另外一個(ProtocolThreadPoolFilter)被用于處 理業務邏輯。 

SimpleServiceRegistry加上IoThreadPoolFilter和 ProtocolThreadPoolFilter的預設實作即可适用于需

要高伸縮性的應用。如果想使用自己的線程模型,請參考 SimpleServiceRegistry的源代碼,并且自己

初始化Acceptor。

IoThreadPoolFilter threadPool = new IoThreadPoolFilter();threadPool.start();

IoAcceptor acceptor = new SocketAcceptor();

acceptor.getFilterChain().addLast( "threadPool", threadPool);

ProtocolThreadPoolFilter threadPool2 = new ProtocolThreadPoolFilter();

threadPool2.start();

ProtocolAcceptor acceptor2 = new IoProtocolAcceptor( acceptor );

acceptor2.getFilterChain().addLast( "threadPool", threadPool2 );

...

threadPool2.stop();

threadPool.stop();

采用MINA進行socket開發,一般步驟如下:

1:

server:

IoAcceptor acceptor = new SocketAcceptor(); //建立client接收器

or client:

SocketConnector connector = new SocketConnector();  //建立一個連接配接器

2:server的屬性配置:

SocketAcceptorConfig cfg = new SocketAcceptorConfig();

cfg.setReuseAddress(true);

cfg.getFilterChain().addLast(

"codec",

new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) ); //對象序列化 codec factory

cfg.getFilterChain().addLast( "logger", new LoggingFilter() );

3:綁定 address和business logic

acceptor.bind(

new InetSocketAddress( SERVER_PORT ),

new ServerSessionHandler( ), cfg ); // 綁定address和handler

client:

connector.connect(new InetSocketAddress( HOSTNAME, PORT ),

new ClientSessionHandler(msg), cfg );

下面的這個簡單的example示範client和server傳遞object的過程:

Message.java

public class Message implements Serializable {

    private int type;

private int status;

private String msgBody;

public Message(int type, int status, String msgBody) 

{

this.type = type;

this.status = status;

this.msgBody = msgBody;

}

    public String getMsgBody() {

return msgBody;

    public void setMsgBody(String msgBody) {

    public int getStatus() {

return status;

    public void setStatus(int status) {

    public int getType() {

return type;

    public void setType(int type) {

Client.java

public class Client

private static final String HOSTNAME = "localhost";

private static final int PORT = 8080;

private static final int CONNECT_TIMEOUT = 30; // seconds

public static void main( String[] args ) throws Throwable

SocketConnector connector = new SocketConnector();        

// Configure the service.

SocketConnectorConfig cfg = new SocketConnectorConfig();

cfg.setConnectTimeout( CONNECT_TIMEOUT );

new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );

        cfg.getFilterChain().addLast( "logger", new LoggingFilter() );

IoSession session; 

Message msg = new Message(0,1,"hello");

    }

ClientSessionHandler.java

public class ClientSessionHandler extends IoHandlerAdapter

private Object msg;

public ClientSessionHandler(Object msg) 

this.msg = msg;

public void sessionOpened( IoSession session )

session.write(this.msg);

    public void messageReceived( IoSession session, Object message )

System.out.println("in messageReceived!");

Message rm = (Message ) message;        

SessionLog.debug(session, rm.getMsgBody());

System.out.println("message is: " + rm.getMsgBody());

session.write(rm);

    public void exceptionCaught( IoSession session, Throwable cause )

session.close();

Server.java

public class Server

private static final int SERVER_PORT = 8080;

    public static void main( String[] args ) throws Throwable

// Prepare the service configuration. 

cfg.setReuseAddress( true );

        cfg.getFilterChain().addLast(

        acceptor.bind(

new ServerSessionHandler( ), cfg );

        System.out.println( "The server Listening on port " + SERVER_PORT );

ServerSessionHandler.java

public class ServerSessionHandler extends IoHandlerAdapter

// set idle time to 60 seconds

session.setIdleTime( IdleStatus.BOTH_IDLE, 60 );

session.setAttribute("times",new Integer(0));

System.out.println("in messageReceived");

int times = ((Integer)(session.getAttribute("times"))).intValue();

System.out.println("tiems = " + times);

// communicate 30 times,then close the session.

if (times &lt; 30)

times++;

session.setAttribute("times", new Integer(times));           

Message msg;

msg = (Message) message;

msg.setMsgBody("in server side: " + msg.getMsgBody()); 

System.out.println("begin send msg: " + msg.getMsgBody());

session.write(msg);

else

    public void sessionIdle( IoSession session, IdleStatus status )

SessionLog.info( session, "Disconnecting the idle." );

// disconnect an idle client

// close the connection on exceptional situation

MINA自己附帶的Demo已經很好的說明了它的運用。

值得一提的是它 的SumUp:用戶端發送幾個數字,服務端求和後并傳回結果。這個簡單的程式示範了如何自己實作CODEC。

補充提示:

下載下傳并運作MINA的demo程式還頗非周折:

運作MINA demo appli擦tion:

1:在JDK5

産 生錯誤:

Exception in thread "main" java.lang.NoClassDefFoundError: edu/emory/mathcs/backport/java/util/concurrent/Executor

at org.apache.mina.example.reverser.Main.main(Main.java:44)

察看mina的 QA email: 

<a>http://www.mail-archive.com/[email protected]/msg02252.html</a>

原來需要下載下傳:backport-util-concurrent.jar并加入classpath

<a>http://dcl.mathcs.emory.edu/util/backport-util-concurrent/</a>

繼續運作還是報錯:

Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory

原來MINA采用了slf4j項目作為log,繼續下載下傳

slf4j-simple.jar等,并加入classpath:

<a>http://www.slf4j.org/download.html</a>