天天看點

Java網絡程式設計-Socket程式設計初涉三(僞異步I/O模型的簡易多人聊天室)

推薦:​​Java網絡程式設計彙總​​

Java網絡程式設計-Socket程式設計初涉三(僞異步I/O模型的簡易多人聊天室)

什麼是僞異步I/O模型?

​​BIO模型與僞異步I/O模型​​

現在應該了解了什麼是​

​僞異步I/O模型​

​​了吧,看起來它和​

​BIO模型​

​​其實差别不大,隻不過​

​僞異步I/O模型​

​​是用線程池來管理線程去和用戶端進行資料互動,而​

​BIO模型​

​​是每次用戶端的連接配接請求成功後,都建立新的線程去與用戶端進行資料互動,雖然差别不大,但​

​僞異步I/O模型​

​的優勢還是很明顯的,尤其當用戶端連接配接請求并發數比較大時。

兩個模型差别不大,也就意味着需要改動代碼的地方很少,如果需要代碼解釋,請看下面這篇部落格。

​Java網絡程式設計-Socket程式設計初涉二(基于BIO模型的簡易多人聊天室)​​

改進

隻需要改動伺服器端的代碼,将每次用戶端連接配接成功後都建立新線程來與用戶端進行資料互動,改進成由線程池來管理線程去與用戶端進行資料互動。

ChatServer類(​

​有改動​

​)。

package bio.chatroom.server;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ChatServer {

    private final int DEFAULT_PORT = 8888;
    private final String QUIT = "quit";

    private ExecutorService executorService;
    private ServerSocket serverSocket;

    // 把用戶端的port當作用戶端的id
    private Map<Integer , Writer> connectedClients;

    public ChatServer(){
        executorService = Executors.newFixedThreadPool(10);
        connectedClients = new HashMap<>();
    }

    public synchronized void addClient(Socket socket) throws IOException {
        if(socket != null){
            int port = socket.getPort();
            BufferedWriter writer = new BufferedWriter(
                    new OutputStreamWriter(socket.getOutputStream())
            );
            connectedClients.put(port , writer);
            System.out.println("用戶端["+port+"]已連接配接到伺服器");
        }
    }

    public synchronized void removeClient(Socket socket) throws IOException {
        if(socket != null){
            int port = socket.getPort();
            if(connectedClients.containsKey(port)){
                connectedClients.get(port).close();
                connectedClients.remove(port);
                System.out.println("用戶端["+port+"]已斷開連接配接");
            }
        }
    }

    public synchronized void forwardMessage(Socket socket , String fwdMsg) throws IOException {
        // 發送消息的端口
        int sendMessagePort = socket.getPort();
        for(Integer port : connectedClients.keySet()){
            if(!port.equals(sendMessagePort)){
                Writer writer = connectedClients.get(port);
                writer.write(fwdMsg);
                writer.flush();
            }
        }
    }

    public boolean readyToQuit(String msg){
        return QUIT.equalsIgnoreCase(msg);
    }

    public synchronized void close(){
        if(serverSocket != null){
            try {
                serverSocket.close();
                System.out.println("關閉了ServerSocket");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void start(){
        try {
            // 建立ServerSocket,綁定和監聽端口
            serverSocket = new ServerSocket(DEFAULT_PORT);
            System.out.println("啟動伺服器,監聽端口"+DEFAULT_PORT+"...");

            while(true){
                // 等待用戶端連接配接
                Socket socket = serverSocket.accept();
                // 建立ChatHandler線程
//                new Thread(new ChatHandler(this , socket)).start();
                executorService.execute(new ChatHandler(this , socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            close();
        }
    }

    public static void main(String[] args) {
        ChatServer server = new ChatServer();
        server.start();
    }
}      

ChatHandler類(​

​無改動​

​)。

package bio.chatroom.server;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

public class ChatHandler implements Runnable{

    private ChatServer server;
    private Socket socket;

    public ChatHandler(ChatServer server , Socket socket){
        this.server = server;
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            // 存儲新上線使用者
            server.addClient(socket);

            // 讀取使用者發送的消息
            BufferedReader reader = new BufferedReader(
                    new InputStreamReader(socket.getInputStream())
            );

            String msg = null;
            while((msg = reader.readLine()) != null){
                String fwdMsg = "用戶端["+socket.getPort()+"]:"+msg+"\n";
                System.out.print(fwdMsg);

                // 将消息轉發給聊天室裡線上的其他使用者
                server.forwardMessage(socket , fwdMsg);

                // 檢查使用者是否準備退出
                if(server.readyToQuit(msg)){
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                // 從伺服器移除退出的使用者
                server.removeClient(socket);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}      

ChatClient類(​

​無改動​

​)。

package bio.chatroom.client;

import java.io.*;
import java.net.Socket;

public class ChatClient {

    private final String DEFAULT_SERVER_HOST = "127.0.0.1";
    private final int DEFAULT_PORT = 8888;
    private final String QUIT = "quit";

    private Socket socket;
    private BufferedReader reader;
    private BufferedWriter writer;

    // 發送消息給伺服器
    public void send(String msg) throws IOException {
        if(!socket.isOutputShutdown()){
            writer.write(msg+"\n");
            writer.flush();
        }
    }

    // 接收伺服器的消息
    public String receive() throws IOException {
        String msg = null;
        if(!socket.isInputShutdown()){
            msg = reader.readLine();
        }
        return msg;
    }

    // 檢查使用者是否準備退出
    public boolean readyToQuit(String msg){
        return QUIT.equalsIgnoreCase(msg);
    }

    public void close(){
        if(writer != null){
            try {
                writer.close();
                System.out.println("關閉socket");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void start(){
        try {
            // 建立socket
            socket = new Socket(DEFAULT_SERVER_HOST , DEFAULT_PORT);

            // 建立IO流
            reader = new BufferedReader(
                    new InputStreamReader(socket.getInputStream())
            );
            writer = new BufferedWriter(
                    new OutputStreamWriter(socket.getOutputStream())
            );

            // 處理使用者的輸入
            new Thread(new UserInputHandler(this)).start();

            // 讀取伺服器轉發的消息
            String msg = null;
            while((msg = receive()) != null){
                System.out.println(msg);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally{
            close();
        }
    }

    public static void main(String[] args) {
        ChatClient client = new ChatClient();
        client.start();
    }
}      

UserInputHandler類(​

​無改動​

​)。

package bio.chatroom.client;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class UserInputHandler implements Runnable{

    private ChatClient client;

    public UserInputHandler(ChatClient client){
        this.client = client;
    }

    @Override
    public void run() {
        try {
            // 等待使用者輸入消息
            BufferedReader consoleReader = new BufferedReader(
                    new InputStreamReader(System.in)
            );

            while(true){
                String input = consoleReader.readLine();

                // 向伺服器發送消息
                client.send(input);

                //檢查使用者是否準備退出
                if(client.readyToQuit(input)){
                    break;
                }
            }
        } catch (IOException e){
            e.printStackTrace();
        }
    }
}      

改動的地方

增加了一個屬性。

private ExecutorService executorService;      

構造器也需要改,需要初始化這個​

​executorService​

​。

public ChatServer(){
        executorService = Executors.newFixedThreadPool(10);
        connectedClients = new HashMap<>();
    }      

當用戶端連接配接成功後,建立新線程的地方也需要改。

// 建立ChatHandler線程
//                new Thread(new ChatHandler(this , socket)).start();
                executorService.execute(new ChatHandler(this , socket));      

就需要改這三個地方。

由改進的代碼可以知道,當大量用戶端向伺服器發出連接配接請求後,伺服器雖然可以與這些用戶端進行連接配接,但最多隻有​

​10個用戶端同時線上​

​​,而其他的用戶端會處于等待狀态,也就是伺服器最多同時存在​

​10個線程來與用戶端進行資料互動​

​。

關于線程池的使用,這裡也隻是涉及皮毛,也不打算多講,相信大家都看得懂。

這裡便完成了​

​僞異步I/O模型​

​的簡易多人聊天室,大家可以自己實作一下。

測試

為了友善測試,将線程池允許容納的線程數設定為​

​2​

​。

Java網絡程式設計-Socket程式設計初涉三(僞異步I/O模型的簡易多人聊天室)
Java網絡程式設計-Socket程式設計初涉三(僞異步I/O模型的簡易多人聊天室)
Java網絡程式設計-Socket程式設計初涉三(僞異步I/O模型的簡易多人聊天室)
Java網絡程式設計-Socket程式設計初涉三(僞異步I/O模型的簡易多人聊天室)