天天看點

學習socket nio 之 mina執行個體(2)

IoFilter:過濾器層

        這裡我們做一個解碼的編碼的過濾層,這也是mina中最常用的。首先我們需要定義屬于我們自己的協定,也就是資料包的格式:别以為這很複雜,其實很簡單的。

       我們知道資料都是位元組類型的,那麼我們的協定格式如下:前兩位表示資料包的長度(一個short類型正好兩個位元組),第三位是閑置位,後面的是資料。長度是閑置位和

資料長度的和。這樣我們就可以根據前兩位确定,我們的資料包到那裡結束。那麼我們循環這麼讀,就會取得所有的資料包。是不是很簡單啊,這個格式就是我們的協定。

     為了更簡單,這裡我們用戶端發往服務端的資料進行編碼和解碼,服務端發往用戶端的就不編碼了,用戶端也就不用解碼。服務端使用mina,用戶端我們就使用基本的socket nio。

編碼工廠類:

public class CodecFactory extends DemuxingProtocolCodecFactory{
    public CodecFactory(){
        super.addMessageEncoder(String.class, Encoder.class);
        super.addMessageDecoder(Decoder.class);
    }
}      

解碼類:

import java.util.ArrayList;
import java.util.List;
 
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.demux.MessageDecoder;
import org.apache.mina.filter.codec.demux.MessageDecoderResult;
public class Decoder implements MessageDecoder {
    
    private byte[] r_curPkg = null;
    private int r_pos = -1; // 包計數器
    static private final int PKG_SIZE_BYTES = 2;//包長度
    
    public Decoder() { }
 
    @Override
    public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
        return MessageDecoderResult.OK;
    }
    
    @Override
    public MessageDecoderResult decode(IoSession session, IoBuffer in,ProtocolDecoderOutput out) throws Exception {
        List<String> list = new ArrayList<String>();
        while (in.remaining() >= PKG_SIZE_BYTES || (r_pos >= 0 && in.hasRemaining())) {// 循環接收包,4為一個整型,表示包長度b, 如果上一個包未接收完成時,繼續接收
            // 如果上個包已收完整,則建立新的包
            if (r_pos == -1) {
                //得到下一個包的長度,長度不包括前兩位,即包的長度=壓縮位長度+資料位長度
                int pkgLen = in.getShort(); 
                
                //如果包長度小于0,那麼此包錯誤,解碼失敗,傳回。
                if (pkgLen < 0) {
                    return MessageDecoderResult.NOT_OK;
                }
                in.get(); 
                r_curPkg = new byte[pkgLen-1]; //數組長度為資料長度
                r_pos = 0;
            }
            int need = r_curPkg.length - r_pos; //需要讀取的資料長度
            int length = in.remaining();//緩沖區中可讀的資料長度
            if (length >= need) {// 可以把目前包讀完整
                in.get(r_curPkg, r_pos, need); // 複制緩沖區中的資料到r_curPkg中
                // 處理接收到一個完整的包資料後,把包添加到池中,判斷是否需要需要解壓
                byte[] data = r_curPkg;
                String str = new String(data);
                list.add(str);
                r_curPkg = null;
                r_pos = -1;
            } else {
                // 如果剩下的位元組數,不夠一個包則
                int remainBytes = in.remaining();
                in.get(r_curPkg, r_pos, remainBytes);
                r_pos += remainBytes;
                return MessageDecoderResult.NEED_DATA;
            }
        }
        for (String protocol : list) {
            out.write(protocol);
        }
        return MessageDecoderResult.OK;
    }
 
    @Override
    public void finishDecode(IoSession session, ProtocolDecoderOutput out) {
 
    }
    
    
}      

編碼類:(沒有進行編碼,隻進行了資料發送)

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.filter.codec.demux.MessageEncoder;
public class Encoder implements MessageEncoder<String>{
 
    public Encoder(){
        
    }
 
    @Override
    public void encode(IoSession session, String message, ProtocolEncoderOutput out)
            throws Exception {
            System.out.println("encode..................");
            String value = (String) message;  
            IoBuffer buf = IoBuffer.allocate(value.getBytes().length);  
            buf.setAutoExpand(true);  
            if (value != null){
                buf.put(value.trim().getBytes());  
            }  
            buf.flip();  
            out.write(buf); 
            out.flush();
    }
}      

IoService層:

import java.io.IOException;
import java.net.InetSocketAddress;
 
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 
public class MinaServer {
 
    private static final int PORT = 9123;
    
    public static void main(String [] args) throws IOException{
        
        IoAcceptor acceptor = new NioSocketAcceptor();
        acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
                acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new CodecFactory()));
        acceptor.setHandler(new ServerHandler());
        
        
        acceptor.getSessionConfig().setReadBufferSize( 3 );
                acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
        acceptor.bind( new InetSocketAddress(PORT) );
    }
}      

到這裡我們的服務端代碼就寫完了,

用戶端實作

<span style="font-size:12px">public class SocketClient {
 
    public static void main(String...args)throws Exception{
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost",9123));
        byte [] bytes = "aaaa".getBytes();
        
        //對資料包進行編碼
        ByteBuffer buffer = ByteBuffer.allocate(bytes.length+3);
        buffer.putShort((short)(bytes.length+1)); //包長度
        buffer.put((byte)1);//閑置位
        buffer.put(bytes);//資料
        buffer.flip();
        socketChannel.write(buffer);
        socketChannel.socket().shutdownOutput();
        
        String obj = receive(socketChannel);
        System.out.println(obj);
    }
    
    private static String receive(SocketChannel socketChannel)throws Exception{
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        int size = 0;
        byte [] bytes = null;
        while((size = socketChannel.read(buffer))>=0){
            buffer.flip();
            bytes = new byte[size];
            buffer.get(bytes);
            baos.write(bytes);
            buffer.clear();
        }
        bytes = baos.toByteArray();
        baos.close();
        return new String(bytes);
    }
}
</span>      

所有的代碼都寫完了,先啟動服務端的MinaServer,然後再啟動用戶端,我們就會看到結果。

繼續閱讀