天天看點

Java網絡程式設計-Socket程式設計初涉五(AIO模型的簡易用戶端-伺服器)

推薦:​​Java網絡程式設計彙總​​

Java網絡程式設計-Socket程式設計初涉五(AIO模型的簡易用戶端-伺服器)

什麼是AIO模型?

​​Java新一代網絡程式設計模型AIO原理​​

功能

這裡實作一個很簡單的用戶端-伺服器,用戶端連接配接伺服器後,發送消息給伺服器,伺服器直接傳回該消息給用戶端即可,這隻是個嘗鮮版,以後也會用AIO模型來實作多人聊天室。

伺服器

伺服器的主要邏輯如下:

  • 打開管道​

    ​AsynchronousServerSocketChannel​

    ​,綁定、監聽端口(啟動伺服器)。
  • 伺服器異步調用​

    ​accept​

    ​​(通過​

    ​CompletionHandler​

    ​​的實作類​

    ​AcceptHandler​

    ​​來實作異步調用,因為有回調機制),使用​

    ​System.in.read()​

    ​​來減少​

    ​while​

    ​​循環中頻繁異步調用​

    ​accept​

    ​。
  • 當有用戶端連接配接成功後,觸發回調函數​

    ​completed​

    ​​(​

    ​AcceptHandler​

    ​​類),然後再異步調用一次​

    ​accept​

    ​​,等待下一個用戶端連接配接成功時觸發(先存着);回調函數​

    ​completed​

    ​​會把​

    ​AsynchronousSocketChannel​

    ​​傳過來,調用​

    ​AsynchronousSocketChannel​

    ​​的​

    ​read​

    ​方法,來讀取用戶端發送過來的消息。
  • 将​

    ​AsynchronousSocketChannel​

    ​​的​

    ​read​

    ​​和​

    ​write​

    ​​方法實作異步調用(通過​

    ​CompletionHandler​

    ​​的實作類​

    ​ClientHandler​

    ​​ 來實作異步調用,因為有回調機制),為了厘清楚是​

    ​read​

    ​​還是​

    ​write​

    ​​,​

    ​attachment​

    ​​傳一個​

    ​Map​

    ​​(Map裡面記錄必要資料​

    ​buffer​

    ​​、​

    ​type​

    ​​)。當讀取用戶端發送的消息後,再将該消息發送給用戶端(都是通過​

    ​AsynchronousSocketChannel​

    ​​,是以将它定義為​

    ​ClientHandler​

    ​​ 類的屬性),即異步調用​

    ​read​

    ​​,觸發回調函數(​

    ​read​

    ​​的回調函數),在回調函數裡面再異步調用​

    ​write​

    ​​,觸發回調函數(​

    ​write​

    ​​的回調函數),在回調函數裡面再異步調用​

    ​read​

    ​。

可能會看起來懵懵懂懂,可以結合代碼來了解,代碼有一定的注釋。

伺服器完整代碼:

package aio.test;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.HashMap;
import java.util.Map;

public class Server {

    final String LOCALHOST = "localhost";
    final int DEFAULT_PORT = 8888;
    AsynchronousServerSocketChannel serverChannel;

    private void close(Closeable closeable){
        if(closeable != null){
            try {
                closeable.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void start(){
        try {
            // 打開通道
            serverChannel = AsynchronousServerSocketChannel.open();
            // 綁定、監聽端口
            serverChannel.bind(new InetSocketAddress(DEFAULT_PORT));
            System.out.println("啟動伺服器,監聽端口:"+DEFAULT_PORT+"...");

            while(true){
                // 異步調用
                serverChannel.accept(null,new AcceptHandler());
                // 不會頻繁調用accept的小技巧
                System.in.read();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally{
            close(serverChannel);
        }
    }

    private class AcceptHandler implements
            CompletionHandler<AsynchronousSocketChannel , Object> {
        @Override
        public void completed(AsynchronousSocketChannel result, Object attachment) {
            //此次調用accept完成,再一次調用accept,等待下一個用戶端連接配接
            if(serverChannel.isOpen()) {
                serverChannel.accept(null , this);
            }

            AsynchronousSocketChannel clientChannel = result;
            if(clientChannel != null && clientChannel.isOpen()){
                ClientHandler handler = new ClientHandler(clientChannel);

                ByteBuffer buffer = ByteBuffer.allocate(1024);
                Map<String , Object> info = new HashMap<>();
                info.put("type", "read");
                info.put("buffer" , buffer);
                clientChannel.read(buffer , info, handler);
            }
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            // 處理錯誤
        }
    }

    private class ClientHandler implements
            CompletionHandler<Integer , Object>{

        private AsynchronousSocketChannel clientChannel;

        public ClientHandler(AsynchronousSocketChannel channel){
            this.clientChannel = channel;
        }
        @Override
        public void completed(Integer result, Object attachment) {
            Map<String , Object> info = (Map<String, Object>) attachment;
            String type = (String) info.get("type");

            if(type.equals("read")){
                ByteBuffer buffer = (ByteBuffer) info.get("buffer");
                // 讀模式
                buffer.flip();
                info.put("type" , "write");
                clientChannel.write(buffer ,info , this);
                // 寫模式(也相當于清空)
                buffer.clear();
            }
            else if(type.equals("write")){
                ByteBuffer buffer = ByteBuffer.allocate(1024);

                info.put("type" , "read");
                info.put("buffer" , buffer);

                clientChannel.read(buffer , info , this);
            }
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            // 處理錯誤
        }
    }

    public static void main(String[] args) {
        Server server = new Server();
        server.start();
    }
}      

用戶端

為了讓大家了解其他實作異步調用的方法,用戶端使用​

​Future​

​​來實作異步調用(正如該單詞的英文意思​

​将來​

​​、​

​未來​

​)。

​Future​

​​實作簡單的異步調用,會簡單一點。

調用AIO模型元件的一些方法會傳回一個​​

​Future​

​​執行個體,再調用該執行個體的​

​get​

​方法會阻塞,等待該方法調用完成,阻塞便解除(具體看代碼吧,用戶端比較簡單,注釋還是比較全的)。

package aio.test;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class Client {
    final String LOCALHOST = "localhost";
    final int DEFAULT_PORT = 8888;

    AsynchronousSocketChannel clientChannel;

    private void close(Closeable closeable){
        if(closeable != null){
            try {
                closeable.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void start(){
        try {
            // 建立Channel
            clientChannel = AsynchronousSocketChannel.open();
            Future<Void> future = clientChannel.connect(
                    new InetSocketAddress(LOCALHOST , DEFAULT_PORT));
            // 當未連接配接成功前,這裡是阻塞的
            future.get();

            // 等待使用者的輸入
            BufferedReader consoleReader =
                    new BufferedReader(new InputStreamReader(System.in));

            while(true){
                String input = consoleReader.readLine();

                byte[] inputBytes = input.getBytes();
                // 得到buffer的模式是讀模式,可以Debug看一看
                ByteBuffer buffer = ByteBuffer.wrap(inputBytes);
                Future<Integer> writeResult = clientChannel.write(buffer);

                // 等待成功寫入使用者管道
                writeResult.get();
                // 寫模式
                buffer.clear();
                Future<Integer> readResult = clientChannel.read(buffer);
                // 等待成功讀取使用者管道
                readResult.get();
                String echo = new String(buffer.array());
                System.out.println(echo);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            close(clientChannel);
        }
    }

    public static void main(String[] args) {
        Client client = new Client();
        client.start();
    }
}      

測試

Java網絡程式設計-Socket程式設計初涉五(AIO模型的簡易用戶端-伺服器)
Java網絡程式設計-Socket程式設計初涉五(AIO模型的簡易用戶端-伺服器)

測試沒什麼問題。