天天看點

5.java實作socket程式設計之AIO方式

AIO了解

用戶端代碼實作

package aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;

/**
 * @Auther: 李澤
 * @Date: 2019/3/4 17:26
 * @Description:
 */
public class Client implements Runnable{
    private AsynchronousSocketChannel asc;

    public Client( ) throws IOException {
        asc = AsynchronousSocketChannel.open();
    }
    public void connect(){
        asc.connect(new InetSocketAddress("127.0.0.1",8000));
    }
    public void write(String request){
        try {
                asc.write(ByteBuffer.wrap(request.getBytes())).get();
                read();
         }catch (Exception e){
             e.printStackTrace();
         }
    }

    private void read() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        try {
            //把數據讀入緩沖區中
            asc.read(buffer).get();
            //切換緩沖區的讀取模式
            buffer.flip();
            //構造一個位元組數組接受緩沖區中的資料
            byte[] respBytes = new byte[buffer.remaining()];
            buffer.get(respBytes);
            System.out.println("用戶端接收伺服器端:"+new String(respBytes,"utf-8").trim());
         }catch (Exception e){
             e.printStackTrace();
         }
    }
    /**
     * 功能描述: 簡單讓線程不停止。
     *
     * @auther: 李澤
     * @date: 2019/3/4 18:09
     */
    @Override
    public void run() {
        while (true){

        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Client c1 = new Client();
        c1.connect();
        Client c2 = new Client();
        c2.connect();
        Client c3 = new Client();
        c3.connect();

        new Thread(c1,"c1").start();
        new Thread(c2,"c2").start();
        new Thread(c3,"c3").start();

        Thread.sleep(1000);

        c1.write("c1 aaa");
        c2.write("c2 bbhb");
        c3.write("c3 cccc");
    }
}      

伺服器端代碼實作

package aio;

import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Auther: Administrator
 * @Date: 2019/3/4 16:45
 * @Description:
 */
public class Server {
    //線程池
    private ExecutorService executorService;
    //AIO工作的線程組
    private AsynchronousChannelGroup threadGroup;
    //伺服器通道
    public AsynchronousServerSocketChannel assc;

    public Server(int port) {
         try {
          //建立一個可伸縮的線程池
          executorService = Executors.newCachedThreadPool();
          //建立幹活的線程組,負責連接配接上之前的所有的瑣碎的工作。
          threadGroup =  AsynchronousChannelGroup.withCachedThreadPool(executorService,1);
          //建立伺服器通道,并且設定為這個通道幹活的線程組
          assc = AsynchronousServerSocketChannel.open(threadGroup);
          //綁定端口
             assc.bind(new InetSocketAddress(port));
             System.out.println("server start!port:"+port);
             //進行阻塞,實際上并沒有卡在這。
             assc.accept(this,new ServerCompletionHandler());
            //阻塞在這不讓服務停止,因為accept不會阻塞。
             Thread.sleep(Integer.MAX_VALUE);
         }catch (Exception e){
             e.printStackTrace();
         }
    }

    public static void main(String[] args) {
        Server server = new Server(8000);
    }
}      

服務端業務處理handler實作

package aio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

/**
 * @Auther: 李澤
 * @Date: 2019/3/4 16:57
 * @Description:
 */
public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,Server> {
    /**
     * 功能描述: 建立連接配接之後才會調用,主要用來讀寫資料給用戶端。
     *
     * @auther: 李澤
     * @date: 2019/3/4 17:09
     */
    @Override
    public void completed(AsynchronousSocketChannel result, Server attachment) {
        //這個對象被使用了,accept方法被執行過了,如果不設定一個本類對象去執行任務的話,不重新監聽的話,新的請求
        // 絕對進不來,是以要調用一下 server中 這句話的等效語句assc.accept(this,new ServerCompletionHandler());
        //類似遞歸,讓這個類重新處于監聽狀态,處理下一個請求,沒有new新對象。各個對象之間互不幹擾。
        attachment.assc.accept(attachment,this);
        //讀取資料
        read(result);
    }
    /**
     * 功能描述: 具體的讀取邏輯
     *
     * @auther: 李澤
     * @date: 2019/3/4 17:10
     */
    private void read(AsynchronousSocketChannel asc) {
        //建立緩沖區
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //拿出通道來執行讀取的業務
        asc.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                //進行讀取之前重新設定pos和limit
                attachment.flip();
                //列印獲得的位元組數
                System.out.println("resultSize = " + result);
                //擷取讀取的資料
                String body = new String(attachment.array()).trim();
                System.out.println("server accept body = " + body);
                String resp = "伺服器收到你發來的資料:"+ body;
                write(asc,resp);
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                exc.printStackTrace();
            }
        });
    }
    /**
     * 功能描述: 寫回用戶端,讀寫方法都是不阻塞的。
     *
     * @auther: 李澤
     * @date: 2019/3/4 17:23
     */
    private void write(AsynchronousSocketChannel asc, String resp) {
         try {
             ByteBuffer buffer = ByteBuffer.allocate(1024);
             buffer.put(resp.getBytes());
             buffer.flip();
             //get寫不寫都行
             asc.write(buffer).get();
         }catch (Exception e){
             e.printStackTrace();
         }
    }

    @Override
    public void failed(Throwable exc, Server attachment) {
        exc.printStackTrace();
    }
}