天天看點

網際網路架構(7):Socket網絡通信程式設計--BIO/NIO/AIO

二、Socket網絡通信程式設計

1、IO(BIO阻塞IO)

1.1基本概念

Socket又稱”套接字”,應用程式通常通過“套接字”向網絡送出請求或者應答網絡請求。

Socket和ServerSocket類庫位于java.net包中。ServerSocket用于伺服器端,Socket是履歷網絡連接配接時使用的。在連接配接成功是,應用程式兩端都會産生一個Socket執行個體,操作這個執行個體完成所需的會話。對于一個網絡連接配接來說,套接字是平等的,不因為在伺服器端或在用戶端而産生不同級别。不管是Socket還是ServerSocket他們的工作都是通過SocketImpl類及其子類完成的。

套接字之間的連接配接過程可以分為四個步驟(根據我們的例子來表述的,傳統TCP連接配接遵循3次握手,4次揮手即可):伺服器監聽,用戶端請求伺服器,伺服器确認,用戶端确認,進行通信。

  • (1)伺服器監聽:伺服器端套接字并不定位具體的用戶端套接字,而是出去等待連接配接的狀态,實時監控網絡狀态。
  • (2)用戶端請求:是指由用戶端的套接字提出的連接配接請求,要連接配接的目标是伺服器端的套接字。是以,用戶端的套接字必須首先描述它要連接配接的伺服器的套接字,指出伺服器端套接字的位址和端口,然後就想伺服器端套接字提出連接配接請求
  • (3)伺服器端連接配接确認:是指當伺服器端套接字監聽到或者說接收到用戶端套接字的連接配接請求,他就響應用戶端套接字的請求,建立一個新的線程,把伺服器端套接字的描述發給用戶端。
  • (4)用戶端連接配接确認:一旦用戶端确認了此描述,連接配接就建立好了。雙方開始進行通信。而伺服器端套接字繼續處于監聽狀态,繼續接受其他用戶端套接字的連接配接請求。

三次握手:首先Client端發送連接配接請求封包,Server段接受連接配接後回複ACK封包,并為這次連接配接配置設定資源。Client端接收到ACK封包後也向Server段發生ACK封包,并配置設定資源,這樣TCP連接配接就建立了。

四次揮手:假設Client端發起中斷連接配接請求,也就是發送FIN封包。Server端接到FIN封包後,意思是說”我Client端沒有資料要發給你了”,但是如果你還有資料沒有發送完成,則不必急着關閉Socket,可以繼續發送資料。是以你先發送ACK,”告訴Client端,你的請求我收到了,但是我還沒準備好,請繼續你等我的消息”。這個時候Client端就進入FIN_WAIT狀态,繼續等待Server端的FIN封包。當Server端确定資料已發送完成,則向Client端發送FIN封包,”告訴Client端,好了,我這邊資料發完了,準備好關閉連接配接了”。Client端收到FIN封包後,”就知道可以關閉連接配接了,但是他還是不相信網絡,怕Server端不知道要關閉,是以發送ACK後進入TIME_WAIT狀态,如果Server端沒有收到ACK則可以重傳。“,Server端收到ACK後,”就知道可以斷開連接配接了”。Client端等待了2MSL後依然沒有收到回複,則證明Server端已正常關閉,那好,我Client端也可以關閉連接配接了。Ok,TCP連接配接就這樣關閉了!

2、NIO(非阻塞IO)

2.1 基本概念

IO(BIO)與NIO的差別:其本質就是阻塞和非阻塞的差別。

  • 阻塞:應用程式在擷取網絡資料的時候,如果網絡傳輸資料很慢,那麼程式就一直等着,知道傳輸完畢為止。
  • 非阻塞:程式直接可以擷取已經準備就緒的資料,無需等待。

BIO為同步阻塞形式,NIO為同步非阻塞形式。NIO并沒有實作異步,在JDK1.7之後,更新了NIO包,支援異步非阻塞通信模型,即NIO2.0(AIO)

同步與異步:同步和異步一定是面向作業系統與應用程式對IO操作的層面上來差別的。

  • 同步:應用程式會直接參與IO讀寫操作,并且我們的應用程式會直接阻塞到某個方法上,直到資料準備就緒;或者采用輪詢的政策實時檢查資料的就緒狀态,如果就緒則擷取資料。
  • 異步:異步時所有的IO讀寫操作都交個作業系統處理,與我們的應用程式沒有直接關系,我們程式不需要關心IO讀寫,當作業系統完成了IO讀寫操作時,會給我們應用程式發送通知,我們的應用程式直接拿走資料即可。

同步說的是你的Server伺服器端的執行方式

阻塞說的是具體的技術,接收資料的方式、狀态(IO,NIO)

2.2 NIO程式設計介紹

幾個概念:

  • Buffer(緩沖區)
  • Channel(管道,通道)
  • Selectir(選擇器、多路複用器)

NIO的本質就是避免原始的TCP建立連接配接使用3次握手的操作,減少連接配接的開銷

2.2.1 Buffer(緩沖區)

Buffer是一個對象,它包含一些要寫入或者要讀取的資料。在NIO類庫中加入了Buffer對象,展現了新庫與原IO的一個重要的差別。在面向流的IO中,可以将資料直接寫入或者讀取到Stream對象中。在NIO庫中,所有資料都是用緩沖區處理的(讀寫)。緩沖區實質上是一個數組,通常它是一個位元組數組(ByteBuffer),也可以使用其他類型的資料。這個數組為緩沖區提供了資料的通路讀寫等操作屬性,如位置、容量、上限等概念。

  • Buffer類型:我們最常用的就是ByteBuffer,實際上每一種java基本類型都對應了一個種緩存區(除了Boolean類型以外)
    • ByteBuffer
    • CharBuffer
    • ShortBuffer
    • IntBuffer
    • LongBuffer
    • FloatBuffer
    • DoubleBuffer

2.2.2 NIO應用

  • SelectionKey.OP_ACCEPT 服務端接收用戶端連接配接事件
  • SelectionKey.OP_CONNECT 用戶端連接配接服務端事件
  • SelectionKey.OP_READ 讀事件
  • SelectionKey.OP_WRITE 寫事件

服務端和用戶端各自維護一個管理通道的對象,我們稱之為selector,該對象能檢測一個或多個通道 (channel) 上的事件。我們以服務端為例,如果服務端的selector上注冊了讀事件,某時刻用戶端給服務端發送了一些資料,阻塞I/O這時會調用read()方法阻塞地讀取資料,而NIO的服務端會在selector中添加一個讀事件。服務端的處理線程會輪詢地通路selector,如果通路selector時發現有感興趣的事件到達,則處理這些事件,如果沒有感興趣的事件到達,則處理線程會一直阻塞直到感興趣的事件到達為止。下面是我了解的java NIO的通信模型示意圖:

  • Server.java
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    public class Server implements Runnable{
        //1 多路複用器(管理所有的通道)
        private Selector seletor;
        //2 建立緩沖區
        private ByteBuffer readBuf = ByteBuffer.allocate(1024);
        //3 
        private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
        public Server(int port){
            try {
                //1 打開路複用器
                this.seletor = Selector.open();
                //2 打開伺服器通道
                ServerSocketChannel ssc = ServerSocketChannel.open();
                //3 設定伺服器通道為非阻塞模式
                ssc.configureBlocking(false);
                //4 綁定位址
                ssc.bind(new InetSocketAddress(port));
                //5 把伺服器通道注冊到多路複用器上,并且監聽阻塞事件
                ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
    
                System.out.println("Server start, port :" + port);
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            while(true){
                try {
                    //1 必須要讓多路複用器開始監聽
                    this.seletor.select();
                    //2 傳回多路複用器已經選擇的結果集
                    Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
                    //3 進行周遊
                    while(keys.hasNext()){
                        //4 擷取一個選擇的元素
                        SelectionKey key = keys.next();
                        //5 直接從容器中移除就可以了
                        keys.remove();
                        //6 如果是有效的
                        if(key.isValid()){
                            //7 如果為阻塞狀态
                            if(key.isAcceptable()){
                                this.accept(key);
                            }
                            //8 如果為可讀狀态
                            if(key.isReadable()){
                                this.read(key);
                            }
                            //9 寫資料
                            if(key.isWritable()){
                                this.write(key); //ssc
                            }
                        }
    
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void write(SelectionKey key){
            System.out.println("Server Write...");
            try {
    
                SocketChannel sc = (SocketChannel)key.channel();
                String nameString = "[" + sc.socket().getInetAddress().toString().substring(1) + "]";
                String respStr = nameString + " 你好,用戶端,Server已經準備就緒,可以傳輸資料." + System.currentTimeMillis();
                System.out.println(respStr);
                byte[] response = respStr.getBytes();
                this.writeBuf.put(response);
                this.writeBuf.flip();
                sc.write(this.writeBuf);
                this.writeBuf.clear();
                sc.register(this.seletor, SelectionKey.OP_READ);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    
        private void result(SelectionKey key, String info){
            SocketChannel sc = (SocketChannel)key.channel();
            try {
                String nameString = "[" + sc.socket().getInetAddress().toString().substring(1) + "]";
                this.writeBuf.clear();
                this.writeBuf.put((nameString + ":" + info).getBytes());
                this.writeBuf.flip();
                sc.write(this.writeBuf);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        private void read(SelectionKey key) {
            System.out.println("Server Read...");
            try {
                //1 清空緩沖區舊的資料
                this.readBuf.clear();
                //2 擷取之前注冊的socket通道對象
                SocketChannel sc = (SocketChannel) key.channel();
                //3 讀取資料
                int count = sc.read(this.readBuf);
                //4 如果沒有資料
                if(count == -1){
                    key.channel().close();
                    key.cancel();
                    return;
                }
                //5 有資料則進行讀取 讀取之前需要進行複位方法(把position 和limit進行複位)
                this.readBuf.flip();
                //6 根據緩沖區的資料長度建立相應大小的byte數組,接收緩沖區的資料
                byte[] bytes = new byte[this.readBuf.remaining()];
                //7 接收緩沖區資料
                this.readBuf.get(bytes);
                //8 列印結果
                String body = new String(bytes).trim();
                System.out.println("我是Server : " + body);
                sc.register(this.seletor, SelectionKey.OP_WRITE);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        private void accept(SelectionKey key) {
            System.out.println("Server Accept...");
            try {
                //1 擷取服務通道
                ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
                //2 執行阻塞方法
                SocketChannel sc = ssc.accept();
                //3 設定阻塞模式
                sc.configureBlocking(false);
                //4 注冊到多路複用器上,并設定讀取辨別
                sc.register(this.seletor, SelectionKey.OP_WRITE);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
    
            new Thread(new Server(8765)).start();;
        }
    
    
    }
               
  • Client.java
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    public class Client implements Runnable{
    
        private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
        private ByteBuffer readBuf = ByteBuffer.allocate(1024);
        private Selector selector;
    
        private String IP = "127.0.0.1";
        private int PORT = 8765;
        private InetSocketAddress address;
    
        public Client() {
            // TODO Auto-generated constructor stub
            try {
                this.address = new InetSocketAddress(IP,PORT);
                this.selector = Selector.open();
                SocketChannel clientChannel = SocketChannel.open();
                clientChannel.configureBlocking(false);
                //clientChannel.connect(address);
                //clientChannel.register(selector, SelectionKey.OP_CONNECT);
                //測試發現,如果先connect再register OP_CONNECT會導緻用戶端選擇不到connect事件
               clientChannel.register(selector, SelectionKey.OP_CONNECT);
                clientChannel.connect(address);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
    
        @Override
        public void run() {
            // TODO Auto-generated method stub
            while(true){
                try {
                    this.selector.select();
                    Iterator<SelectionKey> skIterator = this.selector.selectedKeys().iterator();
                    while(skIterator.hasNext()){
                        SelectionKey nextKey = skIterator.next();
                        skIterator.remove();
                        if( nextKey.isValid() ){
                            if( nextKey.isAcceptable() ){
                                this.accept(nextKey);
                            }else if(nextKey.isReadable()){
                                this.read(nextKey);
                            }else if(nextKey.isWritable()){
                                this.write(nextKey);
                            }else if(nextKey.isConnectable()){
                                this.connect(nextKey);
                            }
                        }
                    }
    
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    
        private void connect(SelectionKey key){
            System.out.println("Client Connect...");
            SocketChannel sc = (SocketChannel)key.channel();
            try {
                if( sc.isConnectionPending() ){
                    sc.finishConnect();
                }
                sc.configureBlocking(false);
                 //在和服務端連接配接成功之後,為了可以接收到服務端的資訊,需要給通道設定讀的權限
                sc.register(this.selector, SelectionKey.OP_READ);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }   
    
        private void accept(SelectionKey key){
            System.out.println("Client Accept...");
        }
    
        private void write(SelectionKey key){
            System.out.println("Client Write...");
            SocketChannel sc = (SocketChannel)key.channel();
            try {
                //定義一個位元組數組,然後使用系統錄入功能:
                byte[] bytes = new byte[1024];
                System.in.read(bytes);
                //清空緩沖區資料
                writeBuf.clear();
                //把資料放到緩沖區中
                writeBuf.put(bytes);
                //對緩沖區進行複位
                writeBuf.flip();
                //寫出資料
                sc.write(writeBuf);
                sc.register(this.selector, SelectionKey.OP_READ);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        private void read(SelectionKey key){
            System.out.println("Client Read...");
            SocketChannel sc = (SocketChannel)key.channel();
            try {
                this.readBuf.clear();
                sc.read(this.readBuf);
                this.readBuf.flip();
                byte[] bytes = new byte[this.readBuf.remaining()];
                this.readBuf.get(bytes);
                System.out.println("Client列印:" + new String(bytes).trim());
                sc.register(this.selector, SelectionKey.OP_WRITE);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
    
        //需要一個Selector 
        public static void main(String[] args) {
            new Thread(new Client()).start();
        }
    
    }
               
2.3 AIO程式設計介紹

AIO程式設計,在NIO基礎之上移入了異步通道的概念。并提供了異步檔案和異步套接字通道的實作,進而在真正意義上實作了異步非阻塞,在之前我們學習的NIO隻能是非阻塞而并非異步。而AIO他不需要通過多路複用器對注冊的通道進行輪詢操作即可實作異步讀寫,進而簡化了NIO程式設計模型。也可以稱之為NIO2.0,這種模式才真正的屬于我們異步非阻塞的模型。

AsynchronousServerSocketChannel

AsynchronousSocketChannel

  • Server.java
    import java.net.InetSocketAddress;
    import java.nio.channels.AsynchronousChannelGroup;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Server {
        //線程池
        private ExecutorService executorService;
        //線程組
        private AsynchronousChannelGroup threadGroup;
        //伺服器通道
        public AsynchronousServerSocketChannel assc;
    
        public Server(int port){
            try {
                //建立一個緩存池
                executorService = Executors.newCachedThreadPool();
                //建立線程組
                threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
                //建立伺服器通道
                assc = AsynchronousServerSocketChannel.open(threadGroup);
                //進行綁定
                assc.bind(new InetSocketAddress(port));
    
                System.out.println("server start , port : " + port);
                //進行阻塞
                assc.accept(this, new ServerCompletionHandler());
                //一直阻塞 不讓伺服器停止
                Thread.sleep(Integer.MAX_VALUE);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            Server server = new Server(8765);
        }
    
    }
               
  • Client.java
    import java.io.UnsupportedEncodingException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.util.concurrent.ExecutionException;
    
    public class Client implements Runnable{
    
        private AsynchronousSocketChannel asc ;
    
        public Client() throws Exception {
            asc = AsynchronousSocketChannel.open();
        }
    
        public void connect(){
            asc.connect(new InetSocketAddress("127.0.0.1", 8765));
        }
    
        public void write(String request){
            try {
                asc.write(ByteBuffer.wrap(request.getBytes())).get();
                read();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private void read() {
            ByteBuffer buf = ByteBuffer.allocate(1024);
            try {
                asc.read(buf).get();
                buf.flip();
                byte[] respByte = new byte[buf.remaining()];
                buf.get(respByte);
                System.out.println(new String(respByte,"utf-8").trim());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            while(true){
    
            }
        }
    
        public static void main(String[] args) throws Exception {
            Client c1 = new Client();
            c1.connect();
    
            Client c2 = new Client();
            c2.connect();
    
            Client c3 = new Client();
            c3.connect();
    
            new Thread(c1, "c1").start();
            new Thread(c2, "c2").start();
            new Thread(c3, "c3").start();
    
            Thread.sleep(1000);
    
            c1.write("c1 aaa");
            c2.write("c2 bbbb");
            c3.write("c3 ccccc");
        }
    
    }
               
  • ServerCompletionHandler.java
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.concurrent.ExecutionException;
    
    /**
     * @param AsynchronousSocketChannel異步的Socket對象
     * @param Server就是Server.java中accept方法中傳入的this,這裡為了能夠遞歸調用而傳入
     * @author jliu10
     *
     */
    public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {
    
        @Override
        public void completed(AsynchronousSocketChannel asc, Server attachment) {
            //當有下一個用戶端接入的時候 直接調用Server的accept方法,這樣反複執行下去,保證多個用戶端都可以阻塞
            //遞歸調用,以保證能夠接收新的連接配接請求
            attachment.assc.accept(attachment, this);//隻有調用了accept方法之後,server才能接收到新的client請求,并且調用一次accept隻能接收一個請求,是以再消耗了前一個accept之後需要遞歸調用accept方法來保證新的連接配接能夠接入
            read(asc);
        }
    
        private void read(final AsynchronousSocketChannel asc) {
            //讀取資料
            ByteBuffer buf = ByteBuffer.allocate(1024);
            asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer resultSize, ByteBuffer attachment) {
                    //進行讀取之後,重置辨別位
                    attachment.flip();
                    //獲得讀取的位元組數
                    System.out.println("Server -> " + "收到用戶端的資料長度為:" + resultSize);
                    //擷取讀取的資料
                    String resultData = new String(attachment.array()).trim();
                    System.out.println("Server -> " + "收到用戶端的資料資訊為:" + resultData);
                    String response = "伺服器響應, 收到了用戶端發來的資料: " + resultData;
                    write(asc, response);
                }
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    exc.printStackTrace();
                }
            });
        }
    
        private void write(AsynchronousSocketChannel asc, String response) {
            try {
                ByteBuffer buf = ByteBuffer.allocate(1024);
                buf.put(response.getBytes());
                buf.flip();
                asc.write(buf).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void failed(Throwable exc, Server attachment) {
            exc.printStackTrace();
        }
    
    }
               

繼續閱讀