天天看點

采用Java nio 實作的一個簡單的伺服器

伺服器代碼:

package server.nio;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Server {


    private static Logger log = LoggerFactory.getLogger(Server.class);

    private static final Integer Default_Port = ;

    private static final Integer Default_Timeout = ;

    private static ConcurrentLinkedQueue<SocketChannel> workeQueue = new ConcurrentLinkedQueue<>();

    private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private Integer port;

    private Integer timeout;



    public Server() {
        this(Default_Port,Default_Timeout);
    }

    public Server(Integer port){
        this(port,Default_Timeout);
    }

    public Server(Integer port,Integer timeout) throws IllegalArgumentException{
        if(port == null) port = Default_Port;
        if(timeout == null) timeout = Default_Timeout;

        this.port = port;
        this.timeout = timeout;

        long st = System.currentTimeMillis();
        try {
            log.debug("伺服器開始啟動-->>");
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(Default_Port));

            //啟動監聽workQueue隊列的線程
            log.debug("任務隊列監聽線程啟動開始--->>");
            startListener();
            log.debug("任務隊列監聽線程啟動完成---<<");
            //啟動伺服器

            long et = System.currentTimeMillis();
            log.debug("伺服器啟動完成:--<<");
            log.debug("啟動耗時:"+(et-st));
            start(serverSocketChannel);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void startListener() {
        Thread t = new Thread(new Runnable() {
            public void run() {
                while(true){
                    final SocketChannel sc = workeQueue.poll();
                    try {
                        if(sc != null){
                            if(log.isDebugEnabled()){
                                log.debug("開始處理用戶端請求:"+sc);
                            }
                            executorService.submit(new Callable<Response>() {

                                @Override
                                public Response call() throws Exception {

                                    //建立位元組緩沖區
                                    ByteBuffer bytes = ByteBuffer.allocate();
                                    ArrayList<Byte> list = new ArrayList<>();
                                    try {
                                        //從通道中讀取位元組到緩沖區
                                        while(sc.read(bytes) != -){

                                            bytes.flip();
                                            //判斷位元組緩沖區是否已滿
                                            while(bytes.remaining() > ){

                                                //将緩沖區中的位元組數組放到list中
                                                //位元組緩沖區滿,但通道中的資料可能還沒有讀取完,是以需要将位元組先儲存起來
                                                //最後統一處理(如果不這樣做,可能會出現意外的結果:如最後轉化成字元串,如不這樣做,會因位元組不完整而亂碼)
                                                list.add(bytes.get());
                                                //清空位元組緩沖區,以接收通道中剩餘的資料
                                            }
                                            bytes.clear();
                                        }

                                        byte[] temp = new byte[list.size()];
                                        //将list中的資料放到temp位元組數組中
                                        for(int i=;i<list.size();i++){
                                            temp[i] = list.get(i);
                                        }

                                        //反序列化開始
                                        try{
                                            ByteArrayInputStream bin = new ByteArrayInputStream(temp);
                                            ObjectInputStream oin = new ObjectInputStream(bin);
                                            Object obj = oin.readObject();

//                                          Object obj = ObjectUtil.deserializeJdk(temp);
                                            System.out.println(obj);
                                        }catch(Exception e){
                                            log.debug("反序列化對象失敗:請檢查是否實作Serializable接口:");
                                            e.printStackTrace();
                                        }
                                        //反序列化開始結束


                                    } catch (Exception e) {
                                        e.printStackTrace();
                                    }

                                    //處理響應對象

//                                  ByteArrayOutputStream baos = new ByteArrayOutputStream();
//                                  ObjectOutputStream oos = new ObjectOutputStream(baos);
//                                  //将傳回的對象序列化
//                                  oos.writeObject(response);
//                                  //将byte數組轉換成通道可以接受的ByteBuffer類型
//                                  ByteBuffer writeBytes = ByteBuffer.wrap(baos.toByteArray());
//                                  sc.write(writeBytes);
//                                  sc.close();
                                    return null;
                                }
                            });

                        }
                    } catch (Exception e) {

                        e.printStackTrace();
                    }finally{
                        if(sc != null){
                            log.debug("用戶端連結:"+sc+":關閉");
                        }else{
                            try {
                                Thread.sleep();
                            } catch (InterruptedException e) {
                            }
                            log.debug("等待連接配接....");
                        }
                    }
                }
            }
        });

        t.start();
    }

    private void start(ServerSocketChannel serverSocketChannel) {
        while(true){
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                if(socketChannel != null){
                    if(log.isDebugEnabled()){
                        log.debug("用戶端連接配接放進任務隊列中:"+serverSocketChannel);
                    }
                    workeQueue.add(socketChannel);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 響應對象
     * @author Administrator
     *
     */
    public class Response implements Serializable {
//      private Integer status;
//      
//      private Integer code;
//      
//      private String desc;
//      
//      private Object data;
//
//      
//      public String getDesc() {
//          return desc;
//      }
//
//      public void setDesc(String desc) {
//          this.desc = desc;
//      }
//
//      public Integer getStatus() {
//          return status;
//      }
//
//      public void setStatus(Integer status) {
//          this.status = status;
//      }
//
//      public Integer getCode() {
//          return code;
//      }
//
//      public void setCode(Integer code) {
//          this.code = code;
//      }
//
//      public Object getData() {
//          return data;
//      }
//
//      public void setData(Object data) {
//          this.data = data;
//      }
//      
    }
}
           

用戶端類:

package client.nio;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class Client {

    public static void main(String[] args) throws Exception {

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), ));

//      String test1 = "hello world";
        String test = "Java IO的各種流是阻塞的。這意味着,當一個線程調用read() 或 write()時,該線程被阻塞,直到有一些資料被讀取,或資料完全寫入。該線程在此期間不能再幹任何事情了。 Java NIO的非阻塞模式,使一個線程從某通道發送請求讀取資料,但是它僅能得到目前可用的資料,如果目前沒有資料可用時,就什麼都不會擷取。而不是保持線程阻塞,是以直至資料變的可以讀取之前,該線程可以繼續做其他的事情。 非阻塞寫也是如此。一個線程請求寫入一些資料到某通道,但不需要等待它完全寫入,這個線程同時可以去做别的事情。 線程通常将非阻塞IO的空閑時間用于在其它通道上執行IO操作,是以一個單獨的線程現在可以管理多個輸入和輸出通道(channel)。";

        //序列化
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        ObjectOutputStream oout = new ObjectOutputStream(bout);
        oout.writeObject(test);

        //建立位元組緩沖區并發送給服務端
        ByteBuffer buffer = ByteBuffer.wrap(bout.toByteArray());
        socketChannel.write(buffer);

        socketChannel.close();

    }
}
           

測試:

啟動伺服器:
package server.nio;

public class Main {

    public static void main(String[] args) {
        Server server = new Server();

    }
}
![這裡寫圖檔描述](https://img-blog.csdn.net/20161208180036256?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcXFfMjUwMTQ1OTk=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
啟動用戶端并發送消息:

![這裡寫圖檔描述](https://img-blog.csdn.net/20161208175459426?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcXFfMjUwMTQ1OTk=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)