IoFilter:過濾器層
這裡我們做一個解碼的編碼的過濾層,這也是mina中最常用的。首先我們需要定義屬于我們自己的協定,也就是資料包的格式:别以為這很複雜,其實很簡單的。
我們知道資料都是位元組類型的,那麼我們的協定格式如下:前兩位表示資料包的長度(一個short類型正好兩個位元組),第三位是閑置位,後面的是資料。長度是閑置位和
資料長度的和。這樣我們就可以根據前兩位确定,我們的資料包到那裡結束。那麼我們循環這麼讀,就會取得所有的資料包。是不是很簡單啊,這個格式就是我們的協定。
為了更簡單,這裡我們用戶端發往服務端的資料進行編碼和解碼,服務端發往用戶端的就不編碼了,用戶端也就不用解碼。服務端使用mina,用戶端我們就使用基本的socket nio。
編碼工廠類:
public class CodecFactory extends DemuxingProtocolCodecFactory{
public CodecFactory(){
super.addMessageEncoder(String.class, Encoder.class);
super.addMessageDecoder(Decoder.class);
}
}
解碼類:
import java.util.ArrayList;
import java.util.List;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.demux.MessageDecoder;
import org.apache.mina.filter.codec.demux.MessageDecoderResult;
public class Decoder implements MessageDecoder {
private byte[] r_curPkg = null;
private int r_pos = -1; // 包計數器
static private final int PKG_SIZE_BYTES = 2;//包長度
public Decoder() { }
@Override
public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
return MessageDecoderResult.OK;
}
@Override
public MessageDecoderResult decode(IoSession session, IoBuffer in,ProtocolDecoderOutput out) throws Exception {
List<String> list = new ArrayList<String>();
while (in.remaining() >= PKG_SIZE_BYTES || (r_pos >= 0 && in.hasRemaining())) {// 循環接收包,4為一個整型,表示包長度b, 如果上一個包未接收完成時,繼續接收
// 如果上個包已收完整,則建立新的包
if (r_pos == -1) {
//得到下一個包的長度,長度不包括前兩位,即包的長度=壓縮位長度+資料位長度
int pkgLen = in.getShort();
//如果包長度小于0,那麼此包錯誤,解碼失敗,傳回。
if (pkgLen < 0) {
return MessageDecoderResult.NOT_OK;
}
in.get();
r_curPkg = new byte[pkgLen-1]; //數組長度為資料長度
r_pos = 0;
}
int need = r_curPkg.length - r_pos; //需要讀取的資料長度
int length = in.remaining();//緩沖區中可讀的資料長度
if (length >= need) {// 可以把目前包讀完整
in.get(r_curPkg, r_pos, need); // 複制緩沖區中的資料到r_curPkg中
// 處理接收到一個完整的包資料後,把包添加到池中,判斷是否需要需要解壓
byte[] data = r_curPkg;
String str = new String(data);
list.add(str);
r_curPkg = null;
r_pos = -1;
} else {
// 如果剩下的位元組數,不夠一個包則
int remainBytes = in.remaining();
in.get(r_curPkg, r_pos, remainBytes);
r_pos += remainBytes;
return MessageDecoderResult.NEED_DATA;
}
}
for (String protocol : list) {
out.write(protocol);
}
return MessageDecoderResult.OK;
}
@Override
public void finishDecode(IoSession session, ProtocolDecoderOutput out) {
}
}
編碼類:(沒有進行編碼,隻進行了資料發送)
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.filter.codec.demux.MessageEncoder;
public class Encoder implements MessageEncoder<String>{
public Encoder(){
}
@Override
public void encode(IoSession session, String message, ProtocolEncoderOutput out)
throws Exception {
System.out.println("encode..................");
String value = (String) message;
IoBuffer buf = IoBuffer.allocate(value.getBytes().length);
buf.setAutoExpand(true);
if (value != null){
buf.put(value.trim().getBytes());
}
buf.flip();
out.write(buf);
out.flush();
}
}
IoService層:
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class MinaServer {
private static final int PORT = 9123;
public static void main(String [] args) throws IOException{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new CodecFactory()));
acceptor.setHandler(new ServerHandler());
acceptor.getSessionConfig().setReadBufferSize( 3 );
acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
acceptor.bind( new InetSocketAddress(PORT) );
}
}
到這裡我們的服務端代碼就寫完了,
用戶端實作
<span style="font-size:12px">public class SocketClient {
public static void main(String...args)throws Exception{
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost",9123));
byte [] bytes = "aaaa".getBytes();
//對資料包進行編碼
ByteBuffer buffer = ByteBuffer.allocate(bytes.length+3);
buffer.putShort((short)(bytes.length+1)); //包長度
buffer.put((byte)1);//閑置位
buffer.put(bytes);//資料
buffer.flip();
socketChannel.write(buffer);
socketChannel.socket().shutdownOutput();
String obj = receive(socketChannel);
System.out.println(obj);
}
private static String receive(SocketChannel socketChannel)throws Exception{
ByteBuffer buffer = ByteBuffer.allocate(1024);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int size = 0;
byte [] bytes = null;
while((size = socketChannel.read(buffer))>=0){
buffer.flip();
bytes = new byte[size];
buffer.get(bytes);
baos.write(bytes);
buffer.clear();
}
bytes = baos.toByteArray();
baos.close();
return new String(bytes);
}
}
</span>