天天看點

僞異步I/O

1.僞異步I/O通信是采用線程池和任務隊列實作的一種通信架構,當有新的用戶端接入時,将用戶端的Socket封裝成一個task投遞到後端的線程池中進行處理,JDK的線程池維護一個消息隊列和N個活躍線程,對消息隊列中的任務進行處理,由于線程池可以設定消息隊列的大小和最大線程數,是以,它的資源占用是可控的,無論多少個用戶端并發通路,都不會導緻醫院的耗盡和當機

僞異步i/o代碼

1.serverTime

package com.afan.wbio;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

public class TimeServer {
    public static void main(String[] args) throws IOException{
        int port = ;
        if(args != null && args.length > ){
            try{
                port = Integer.valueOf(args[]);
            }catch(Exception e){

            }
        }

        ServerSocket server = null;
        try {
            server = new ServerSocket(port);
            System.out.println("the time server is start in port:"+port);
            Socket socket = null;
            TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(,);
            while(true){
                socket = server.accept();
                singleExecutor.execute(new TimeServerHandler(socket));
            }
        } catch (Exception e) {
            if(server != null){
                System.out.println("the time server close");
                server.close();
                server = null;
            }
        }
    }

}
           

2.線程池代碼

package com.afan.wbio;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TimeServerHandlerExecutePool {

    private ExecutorService executor;

    public TimeServerHandlerExecutePool(int maxPoolSize,int queueSize){
        executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, L,TimeUnit.SECONDS
                , new ArrayBlockingQueue<Runnable>(queueSize));
    }

    public void execute(Runnable task){
        executor.execute(task);
    }
}
           

3.server端處理類

package com.afan.wbio;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.sql.Date;

public class TimeServerHandler implements Runnable {

    private Socket socket;

    public TimeServerHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(),true);

            String currentTime = null;
            String body = null;

            while(true){
                body = in.readLine();
                if(body == null){
                    break;
                }
                System.out.println("the time server receive order :" + body);
                currentTime = "Query TIME ORDER".equals(body) ? new Date(System.currentTimeMillis()).toString()
                        :"BAD ORDER";
                out.println(currentTime);
            }
        } catch (Exception e) {
            if(in != null){
                try {
                    in.close();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
            if(out != null){
                out.close();
                out = null;
            }
            if(this.socket != null){
                try {
                    this.socket.close();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
                this.socket = null;
            }
        }

    }

}
           

4.用戶端代碼

package com.afan.wbio;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class TimeClient {

    public static void main(String[] args){
        int port = ;
        if(args != null && args.length > ){
            try {
                port = Integer.valueOf(args[]);
            } catch (Exception e) {
                //采用預設值
            }
        }

        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;

        try {
            socket = new Socket("127.0.0.1",port);
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(),true);
            out.println("QUERY TIME ORDER");
            System.out.println("send order 2 server succeed.");
            String resp = in.readLine();
            System.out.println("NOW IS :"+ resp);
        } catch (Exception e) {
            // 不需要處理
        }finally{
            if(in != null){
                try {
                    in.close();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
            if(out != null){
                out.close();
                out = null;
            }
            if(socket != null){
                try {
                    socket.close();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
                socket = null;
            }
        }
    }

}
           

僞異步i/o采用了線程池實作,避免了為每個請求都建立一個獨立線程造成的線程資源耗盡問題,但是由于它底層的通信依然采用的是同步阻塞模型,是以無法從根本上解決問題

java中對IO流處理的api在進行讀取操作擷取是寫入操作的時候,将一直阻塞下去,直到發生如下3種情況:

有資料可讀/可寫
    可用資料已經讀取完畢/寫入完成
    發生空指針或者I/O異常
           

繼續閱讀