天天看點

基于Java的Socket程式設計的簡單學習

什麼是Socket

網絡由下往上分為 實體層 、資料鍊路層 、 網絡層 、 傳輸層 、 會話層 、 表現層 和 應用層。IP協定對應于網絡層,TCP協定對應于傳輸層,而HTTP協定對應于應用層。TCP/IP協定是傳輸層協定,主要解決資料如何在網絡中傳輸,而HTTP協定是應用層協定,主要解決如何包裝資料。

socket,又稱套接字,是在不同的程序間進行網絡通訊的一種協定、約定或者說是規範。對于socket程式設計,它更多的時候像是基于TCP/UDP等協定做的一層封裝或者說抽象,是一套系統所提供的用于進行網絡通信相關程式設計的接口。

Socket的建立過程

基于Java的Socket程式設計的簡單學習

本質上,socket是對tcp連接配接(當然也有可能是udp等其他連接配接)協定,在程式設計層面上的簡化和抽象。

我們可以從最簡單的單次通信做一個小demo進行學習

BaseSocket:

public class BaseSocket {

    public int port;
    public String host;
    public static final int MAX_BUFFER_SIZE = 1024; 
    public ServerSocket serverSocket;
    public Socket socket;
    public InputStream inputStream;
    public OutputStream outputStream;

    public void close() {

        try {
            if (this.inputStream != null) {
                this.inputStream.close();
            }
            if (this.outputStream != null) {
                this.outputStream.close();
            }
            if (this.socket != null) {
                this.socket.close();
            }
            if (this.serverSocket != null) {
                this.serverSocket.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
           

BaseSocketServer:

public class BaseSocketServer extends BaseSocket {


    public int getPort() {
        return this.port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    BaseSocketServer(int port) {
        this.port = port;
    }

    /**
     * 單次通信
     */
    public void runServerSingle() {
        try {
            this.serverSocket = new ServerSocket(this.port);
            System.out.println("----------base socket server started------------");
            this.socket = serverSocket.accept();
            this.inputStream = socket.getInputStream();
            byte[] readBytes = new byte[MAX_BUFFER_SIZE];

            int msgLen;
            StringBuilder stringBuilder = new StringBuilder();

            while ((msgLen = inputStream.read(readBytes)) != -1) {
                stringBuilder.append(new String(readBytes, 0, msgLen, "UTF-8"));
            }

            System.out.println("Get message from client : " + stringBuilder.toString());

            this.close();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 雙向通信
     */
    public void runServer() {
        try {
            this.serverSocket = new ServerSocket(port);
            System.out.println("----------base socket server started------------");
            this.socket = serverSocket.accept();
            this.inputStream = socket.getInputStream();
            byte[] readBytes = new byte[MAX_BUFFER_SIZE];

            int msgLen;
            StringBuilder stringBuilder = new StringBuilder();

            while ((msgLen = this.inputStream.read(readBytes)) != -1) {
                stringBuilder.append(new String(readBytes, 0, msgLen, "UTF-8"));
            }

            System.out.println("received message: " + stringBuilder.toString());

            //告訴用戶端接受完畢,之後隻能發送
            this.socket.shutdownInput();

            this.outputStream = socket.getOutputStream();

            String receipt = "we received your message : " + stringBuilder.toString();

            this.outputStream.write(receipt.getBytes("UTF-8"));

            this.close();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        BaseSocketServer baseSocketServer = new BaseSocketServer(8888);
        //單向通信
        //baseSocketServer.runServerSingle();
        //雙向通信
        baseSocketServer.runServer();
    }
}
           

BaseSocketClient:

public class BaseSocketClient extends BaseSocket {


    BaseSocketClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    /**
     * 擷取連接配接
     */
    public void connectServer() {
        try {
            this.socket = new Socket(this.host, this.port);
            this.outputStream = this.socket.getOutputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 單向通信
     *
     * @param message 消息内容
     */
    public void sendSingle(String message) {
        try {
            this.outputStream.write(message.getBytes("UTF-8"));

            this.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
     /**
     * 雙向通信
     * @param message 消息
     */
    public void sendMessage(String message) {
        try {

            this.outputStream.write(message.getBytes("UTF-8"));

            //發送完畢
            this.socket.shutdownOutput();

            this.inputStream = this.socket.getInputStream();
            byte[] readBytes = new byte[MAX_BUFFER_SIZE];
            int msgLen;
            StringBuilder stringBuilder = new StringBuilder();

            while ((msgLen = inputStream.read(readBytes)) != -1) {
                stringBuilder.append(new String(readBytes, 0, msgLen, "UTF-8"));
            }
            System.out.println("got receipt: " + stringBuilder.toString());

            this.inputStream.close();
            this.close();

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        BaseSocketClient baseSocketClient = new BaseSocketClient("127.0.0.1", 8888);
        baseSocketClient.connectServer();
        //單向通信
        //baseSocketClient.sendSingle("hello");
        //雙向通信
        baseSocketClient.sendMessage("hello");

    }
}
           
  • 先運作Server後運作Client便可以得到Client發送給Server的消息。這裡的IO操作實作,我們使用了一個大小為MAX_BUFFER_SIZE的byte數組作為緩沖區,然後從輸入流中取出位元組放置到緩沖區,再從緩沖區中取出位元組建構到字元串中去,這在輸入流檔案很大時非常有用。

單向通信詳情可以參考,runServerSingle與sendSingle.

單向通信顯然有點浪費通道。socket連接配接支援全雙工的雙向通信(底層是tcp),上邊的例子中,雙向通信,服務端在收到用戶端的消息後,将傳回給用戶端一個回執。

雙向通信與單向類似,不同的一點是,在 進行一次消息傳遞之後不是真正意義上的close資源。而是調用了

this.socket.shutdownOutput();
this.socket.shutdownInput();
           

借此告知服務端或者用戶端消息已經發送\接受完畢。調用stream的close會導緻sockt的關閉,雖然調用上面兩個方法也會關閉流,但不會關閉socket,隻是無法繼續發送消息。

如何發送多條消息呢?

我們的上述例子在進行一次發送\回執之後就會close,沒有辦法進行接下來的消息發送。下次需要重建立立連接配接,這樣是很耗費資源的。其實我們可以做到建立一次連接配接分次發送多條消息,我們有兩張方式進行多條消息的區分。

我們先看一個例子:

CycleSocketServer:

public class CycleSocketServer extends BaseSocket{

    public int getPort() {
        return this.port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    CycleSocketServer(int port) {
        this.port = port;
    }

    public void runServerForSign() {
        try {
            this.serverSocket = new ServerSocket(this.port);
            System.out.println("base socket server started.");

            this.socket = serverSocket.accept();

            this.inputStream = socket.getInputStream();

            Scanner scanner = new Scanner(inputStream);

            //循環接收并列印消息
            while (scanner.hasNextLine()) {
                System.out.println("get info from client: " + scanner.nextLine());
            }
            scanner.close();
            this.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 根據長度界定,消息傳遞分為兩步
     * 1.發送長度
     * 2.擷取消息
     */
    public void runServer() {
        try {

            this.serverSocket = new ServerSocket(this.port);
            this.socket = serverSocket.accept();
            System.out.println("server socket running!");
            this.inputStream = socket.getInputStream();
            byte[] bytes;

            while (true) {
                //先讀取第一個位元組
                int first = this.inputStream.read();
                //是-1則表示輸入流已經關閉
                if (first == -1) {
                    this.close();
                    break;
                }
                //讀取第二個位元組
                int second = this.inputStream.read();

                //用位運算将兩個位元組拼起來成為真正的長度
                int length = (first <<8 ) +second;

                bytes = new byte[length];

                this.inputStream.read(bytes);

                System.out.println("receive message : " + new String(bytes,"UTF-8"));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        CycleSocketServer cycleSocketServer = new CycleSocketServer(8888);

        //cycleSocketServer.runServerForSign();

        cycleSocketServer.runServer();

    }
}
           

CycleSocketClient:

public class CycleSocketClient extends BaseSocket {


    CycleSocketClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    /**
     * 擷取連接配接
     */
    public void connectServer() {
        try {
            this.socket = new Socket(this.host, this.port);
            this.outputStream = this.socket.getOutputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void sendForSign(String message) {
        //約定 \n為消息結束标記
        String sendMsg = message + "\n";

        try {
            this.outputStream.write(sendMsg.getBytes("UTF-8"));


        } catch (IOException e) {
            System.out.println(sendMsg);
            e.printStackTrace();
        }
    }

    public void sendMessage(String message){

        try {
            //将message轉化為bytes數組
            byte[] bytes = message.getBytes("UTF-8");

            //傳輸兩個位元組長度。采用位移實作
            int length = bytes.length;
            this.outputStream.write(length >> 8);
            this.outputStream.write(length);

            //傳輸完長度之後
            this.outputStream.write(bytes);

        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    public static void main(String[] args) {
        CycleSocketClient cycleSocketClient = new CycleSocketClient("127.0.0.1", 8888);
        cycleSocketClient.connectServer();
        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()) {
            String message = scanner.nextLine();

            //cycleSocketClient.send(message);
            cycleSocketClient.sendMessage(message);
        }
    }

}
           
  • 使用特殊符号作為結束限定

    最簡單的辦法是使用一些特殊的符号來标記一次發送完成,服務端隻要讀到對應的符号就可以完成一次讀取,然後進行相關的處理操作。

    上面的例子中我們使用換行符\n來标記一次發送的結束,服務端每接收到一個消息,就列印一次,并且使用了Scanner來簡化操作.

    運作後效果是,用戶端每輸入一行文字按下回車後,服務端就會列印出對應的消息讀取記錄。

    詳情可以參考:runServerForSign與runServerForSign

  • 根據長度限定

    我們之是以不好定位消息什麼時候結束,是因為我們不能夠确定每次消息的長度。

    那麼其實可以先将消息的長度發送出去,當服務端知道消息的長度後,就能夠完成一次消息的接收了。

    總的來說,發送一次消息變成了兩個步驟

  1. 發送消息的長度
  2. 發送消息

    最後的問題就是,“發送消息的長度”這一步驟所發送的位元組量必須是固定的,否則我們仍然會陷入僵局。

    一般來說,我們可以使用固定的位元組數來儲存消息的長度,比如規定前2個位元組就是消息的長度,不過這樣我們能夠傳送的消息最大長度也就被固定死了,以2個位元組為例,我們發送的消息最大長度不超過2^16個位元組即64K。

    在上面的例子中,sendMessage與runServer 便是按照長度進行界定的。詳情可以參考code

利用多線程實作server與client的互動

在上述的例子中,消息的接收方并不能主動地向對方發送消息,換句話說我們并沒有實作真正的互相對話,這主要是因為消息的發送和接收這兩個動作并不能同時進行,是以我們需要使用兩個線程,其中一個用于監聽鍵盤輸入并将其寫入socket,另一個則負責監聽socket并将接受到的消息顯示。出于簡單考慮,我們直接讓主線程負責鍵盤監聽和消息發送,同時另外開啟一個線程用于拉取消息并顯示。

ListenThread:

public class ListenThread extends BaseSocket implements Runnable {
    ListenThread(Socket socket) {
        this.socket = socket;
        try {
            this.inputStream = socket.getInputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                int first = this.inputStream.read();

                if (first == -1) {
                    this.close();
                    throw new RuntimeException("disconnected");
                    //break;
                }

                int second = this.inputStream.read();
                int msglen = (first << 8) + second;
                byte[] bytes = new byte[msglen];

                this.inputStream.read(bytes);

                System.out.println("message from  [ " + this.socket.getInetAddress() + " ] is " + new String(bytes, "UTF-8"));

            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }
}
           

ChatSocket:

public class ChatSocket extends BaseSocket {

    //ExecutorService threadPool = Executors.newFixedThreadPool(100);

    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

    /**
     * 使用factory
     * 建立線程池
     */
    ExecutorService threadPool = new ThreadPoolExecutor(5,10,1,
            TimeUnit.SECONDS,new ArrayBlockingQueue<>(1024),threadFactory,new ThreadPoolExecutor.AbortPolicy());

    public void runAsServer(int port) {
        try {
            this.serverSocket = new ServerSocket(port);

            System.out.println("server started at port " + port);

            //等待用戶端的加入
            this.socket = this.serverSocket.accept();
            System.out.println("successful connected with " + socket.getInetAddress());

            //啟動監聽線程
             this.threadPool.submit(new ListenThread(this.socket));

            waitAndSend();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void runAsClient(String host, int port) {
        try {
            this.socket = new Socket(host, port);
            System.out.println("successful connected to server " + socket.getInetAddress());
            this.threadPool.submit(new ListenThread(this.socket));
            waitAndSend();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void waitAndSend() {
        try {
            this.outputStream = this.socket.getOutputStream();
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                sendMessage(scanner.nextLine());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void sendMessage(String message) {
        try {
            byte[] bytes = message.getBytes("UTF-8");
            int length = bytes.length;
            this.outputStream.write(length >> 8);
            this.outputStream.write(length);
            this.outputStream.write(bytes);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Scanner scanner = new Scanner(System.in);
        ChatSocket chatSocket = new ChatSocket();
        System.out.println("select connect type: 1 for server and 2 for client");
        int type = scanner.nextInt();

        if (type == 1) {
            System.out.print("input server port :");
            int port = scanner.nextInt();
            chatSocket.runAsServer(port);
        } else if (type == 2) {
            System.out.print("input server host: ");
            String host = scanner.next();
            System.out.print("input server port: ");
            int port = scanner.nextInt();
            chatSocket.runAsClient(host, port);
        }
    }
}
           

作為服務端,如果一次隻跟一個用戶端建立socket連接配接,未免顯得太過浪費資源,是以我們完全可以讓服務端和多個用戶端建立多個socket。

那麼既然要處理多個連接配接,就不得不面對并發問題了(當然,你也可以寫循環輪流處理)。我們可以使用多線程來處理并發,不過線程的建立和銷毀都會消耗大量的資源和時間,是以最好一步到位,我們用一個線程池來實作。線程池的相關用法感興趣的同學可以了解以下。

demo連結

繼續閱讀