天天看點

java nio 操作(2)異步阻塞 socket實作

一.同步阻塞io

bio就是阻塞式的io,網絡通信中對于多用戶端的連入,伺服器端總是與用戶端數量一緻的線程去處理每個用戶端任務,即,用戶端與線程數1:1,并且進行讀寫操作室阻塞的,當有你成千上完的用戶端進行連接配接,就導緻伺服器不斷的建立新的線程,最後導緻低通資源不足,後面的用戶端不能連接配接伺服器,并且連接配接入的用戶端并不是總是在于伺服器進行互動,很可能就隻是占用着資源而已。

二.僞異步io

三.javanio原理

目前在unp劃分的角度,異步io隻有aio。網絡通信中,nio也提供了socketchannel和serversocketchannel兩種不同的套接字通道來實作,可以設定阻塞餘非阻塞兩種模式,為了實作高負載高并發都采取非阻塞的模式。nio采用緩沖區buffer,實作對資料的讀寫操作,緩沖區是固定大小,并由内部狀态記錄有多少資料被放入或者取出。與阻塞io不同,阻塞io采用阻塞式流(stream)的方式進行讀寫,流是單向的隻能向一個方向讀資料或者寫資料,而通道是雙向的,可以同時在通道上發送和讀取資料,而且是非阻塞的,在沒有資料可讀可寫時可以去做别的事情。

       nio改進了上面的一對一或者m:n的模型。伺服器僅采用一個一個線程去處理所有用戶端線程,這就需要建立一個selector,并将其注冊到想要監控的信道上(注意是通過channel的方法來實作),并傳回一個selectorkey執行個體(包含通道和select以及感興趣的操作),selector就好像是一個觀察者,通過不斷輪詢所注冊的一組通道上有沒有等待的操作發生,當等待事件發生的時候可以做其他事情,當有信道出現感興趣的操作,則該信道就進入就緒狀态。

      slector的select方法阻塞等待又沒有就緒的通道,當出現就緒的信道或者等待逾時傳回,就緒信道的個數,若等待逾時則傳回-1,selectedkeys方法傳回就緒的信道。

   下面附代碼

這是處理感興趣信道的接口,因為他可以放在多個伺服器上是以把他做成了接口。

package nio;  

import java.io.ioexception;  

import java.nio.channels.selectionkey;  

public interface tcpprotocol {  

    void handleaccept(selectionkey key) throws ioexception;  

    void handleread(selectionkey key) throws ioexception;  

    void handlewrite(selectionkey key) throws ioexception;  

}  

<span style="font-family:fangsong_gb2312;">這是對上面接口的具體實作</span>  

import java.nio.bytebuffer;  

import java.nio.channels.serversocketchannel;  

import java.nio.channels.socketchannel;  

public class selectorprotocol implements tcpprotocol {  

    private int bufsize ;  

    public selectorprotocol(int buffsize){  

        this.bufsize = buffsize;  

    }  

     //服務端信道已經準備好了接收新的用戶端連接配接    

    public void handleaccept(selectionkey key) throws ioexception {    

        socketchannel clntchan = ((serversocketchannel) key.channel()).accept();    

        clntchan.configureblocking(false);    

        //将選擇器注冊到連接配接到的用戶端信道,并指定該信道key值的屬性為op_read,同時為該信道指定關聯的附件    

        clntchan.register(key.selector(), selectionkey.op_read, bytebuffer.allocate(bufsize));    

    }    

    //用戶端信道已經準備好了從信道中讀取資料到緩沖區    

    public void handleread(selectionkey key) throws ioexception{    

        socketchannel clntchan = (socketchannel) key.channel();    

        //擷取該信道所關聯的附件,這裡為緩沖區    

        bytebuffer buf = (bytebuffer) key.attachment();    

        long bytesread = clntchan.read(buf);    

        //如果read()方法傳回-1,說明用戶端關閉了連接配接,那麼用戶端已經接收到了與自己發送位元組數相等的資料,可以安全地關閉    

        if (bytesread == -1){     

            clntchan.close();    

        }else if(bytesread > 0){    

        //如果緩沖區總讀入了資料,則将該信道感興趣的操作設定為為可讀可寫    

        key.interestops(selectionkey.op_read | selectionkey.op_write);    

        }    

    //用戶端信道已經準備好了将資料從緩沖區寫入信道    

    public void handlewrite(selectionkey key) throws ioexception {    

    //擷取與該信道關聯的緩沖區,裡面有之前讀取到的資料    

    bytebuffer buf = (bytebuffer) key.attachment();    

    //重置緩沖區,準備将資料寫入信道    

    buf.flip();     

    socketchannel clntchan = (socketchannel) key.channel();    

    //将資料寫入到信道中    

    clntchan.write(buf);    

    if (!buf.hasremaining()){     

    //如果緩沖區中的資料已經全部寫入了信道,則将該信道感興趣的操作設定為可讀    

      key.interestops(selectionkey.op_read);    

    //為讀入更多的資料騰出空間    

    buf.compact();     

  }    

用戶端

import java.net.inetsocketaddress;  

import java.net.socketexception;  

public class tcpechoclientnonblocking {  

    public static void main(string args[]) throws exception{  

        //第一個參數作為要連接配接的服務端的主機名或ip  

        string server = "localhost";   

        //第二個參數為要發送到服務端的字元串  

        byte[] argument = "nihaopengyou".getbytes();  

        //如果有第三個參數,則作為端口号,如果沒有,則端口号設為7  

        int servport = 2002;  

        //建立一個信道,并設為非阻塞模式  

        socketchannel clntchan = socketchannel.open();  

        clntchan.configureblocking(false);  

        //向服務端發起連接配接  

        if (!clntchan.connect(new inetsocketaddress(server, servport))){  

            //不斷地輪詢連接配接狀态,直到完成連接配接  

            while (!clntchan.finishconnect()){  

                //在等待連接配接的時間裡,可以執行其他任務,以充分發揮非阻塞io的異步特性  

                //這裡為了示範該方法的使用,隻是一直列印"."  

                system.out.print(".");    

            }  

        }  

        //為了與後面列印的"."差別開來,這裡輸出換行符  

        system.out.print("\n");  

        //分别執行個體化用來讀寫的緩沖區  

        bytebuffer writebuf = bytebuffer.wrap(argument);  

        bytebuffer readbuf = bytebuffer.allocate(argument.length);  

        //接收到的總的位元組數  

        int totalbytesrcvd = 0;   

        //每一次調用read()方法接收到的位元組數  

        int bytesrcvd;   

        //循環執行,直到接收到的位元組數與發送的字元串的位元組數相等  

        while (totalbytesrcvd < argument.length){  

            //如果用來向通道中寫資料的緩沖區中還有剩餘的位元組,則繼續将資料寫入信道  

            if (writebuf.hasremaining()){  

                clntchan.write(writebuf);  

            //如果read()接收到-1,表明服務端關閉,抛出異常  

            if ((bytesrcvd = clntchan.read(readbuf)) == -1){  

                throw new socketexception("connection closed prematurely");  

            //計算接收到的總位元組數  

            totalbytesrcvd += bytesrcvd;  

            //在等待通信完成的過程中,程式可以執行其他任務,以展現非阻塞io的異步特性  

            //這裡為了示範該方法的使用,同樣隻是一直列印"."  

            system.out.print(".");   

        //列印出接收到的資料  

        system.out.println("received: " +  new string(readbuf.array(), 0, totalbytesrcvd));  

        //關閉信道  

        clntchan.close();  

伺服器

import java.nio.channels.selector;  

import java.util.iterator;  

public class tcpserverselector{  

    //緩沖區的長度  

    private static final int bufsize = 256;   

    //select方法等待信道準備好的最長時間  

    private static final int timeout = 3000;   

    public static void main(string[] args) throws ioexception {  

        if (args.length < 1){  

            throw new illegalargumentexception("parameter(s): <port> ...");  

        //建立一個選擇器  

        selector selector = selector.open();  

        for (string arg : args){  

            //執行個體化一個信道  

            serversocketchannel listnchannel = serversocketchannel.open();  

            //将該信道綁定到指定端口  

            listnchannel.socket().bind(new inetsocketaddress(integer.parseint(arg)));  

            system.out.println("啟動伺服器"+integer.parseint(arg));  

            //配置信道為非阻塞模式  

            listnchannel.configureblocking(false);  

            //将選擇器注冊到各個信道  

            listnchannel.register(selector, selectionkey.op_accept);  

        //建立一個實作了協定接口的對象  

        tcpprotocol protocol = new selectorprotocol(bufsize);  

        //不斷輪詢select方法,擷取準備好的信道所關聯的key集  

        while (true){  

            //一直等待,直至有信道準備好了i/o操作  

            if (selector.select(timeout) == 0){  

                //在等待信道準備的同時,也可以異步地執行其他任務,  

                //這裡隻是簡單地列印"."  

                system.out.print(".");  

                continue;  

            //擷取準備好的信道所關聯的key集合的iterator執行個體  

            iterator<selectionkey> keyiter = selector.selectedkeys().iterator();  

            //循環取得集合中的每個鍵值  

            while (keyiter.hasnext()){  

                selectionkey key = keyiter.next();   

                //如果服務端信道感興趣的i/o操作為accept  

                if (key.isacceptable()){  

                    protocol.handleaccept(key);  

                }  

                //如果用戶端信道感興趣的i/o操作為read  

                if (key.isreadable()){  

                    protocol.handleread(key);  

                //如果該鍵值有效,并且其對應的用戶端信道感興趣的i/o操作為write  

                if (key.isvalid() && key.iswritable()) {  

                    protocol.handlewrite(key);  

                //這裡需要手動從鍵集中移除目前的key  

                keyiter.remove();   

運作結果:

java nio 操作(2)異步阻塞 socket實作
java nio 操作(2)異步阻塞 socket實作