AIO了解
用戶端代碼實作
package aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
/**
* @Auther: 李澤
* @Date: 2019/3/4 17:26
* @Description:
*/
public class Client implements Runnable{
private AsynchronousSocketChannel asc;
public Client( ) throws IOException {
asc = AsynchronousSocketChannel.open();
}
public void connect(){
asc.connect(new InetSocketAddress("127.0.0.1",8000));
}
public void write(String request){
try {
asc.write(ByteBuffer.wrap(request.getBytes())).get();
read();
}catch (Exception e){
e.printStackTrace();
}
}
private void read() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
//把數據讀入緩沖區中
asc.read(buffer).get();
//切換緩沖區的讀取模式
buffer.flip();
//構造一個位元組數組接受緩沖區中的資料
byte[] respBytes = new byte[buffer.remaining()];
buffer.get(respBytes);
System.out.println("用戶端接收伺服器端:"+new String(respBytes,"utf-8").trim());
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 功能描述: 簡單讓線程不停止。
*
* @auther: 李澤
* @date: 2019/3/4 18:09
*/
@Override
public void run() {
while (true){
}
}
public static void main(String[] args) throws IOException, InterruptedException {
Client c1 = new Client();
c1.connect();
Client c2 = new Client();
c2.connect();
Client c3 = new Client();
c3.connect();
new Thread(c1,"c1").start();
new Thread(c2,"c2").start();
new Thread(c3,"c3").start();
Thread.sleep(1000);
c1.write("c1 aaa");
c2.write("c2 bbhb");
c3.write("c3 cccc");
}
}
伺服器端代碼實作
package aio;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Auther: Administrator
* @Date: 2019/3/4 16:45
* @Description:
*/
public class Server {
//線程池
private ExecutorService executorService;
//AIO工作的線程組
private AsynchronousChannelGroup threadGroup;
//伺服器通道
public AsynchronousServerSocketChannel assc;
public Server(int port) {
try {
//建立一個可伸縮的線程池
executorService = Executors.newCachedThreadPool();
//建立幹活的線程組,負責連接配接上之前的所有的瑣碎的工作。
threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService,1);
//建立伺服器通道,并且設定為這個通道幹活的線程組
assc = AsynchronousServerSocketChannel.open(threadGroup);
//綁定端口
assc.bind(new InetSocketAddress(port));
System.out.println("server start!port:"+port);
//進行阻塞,實際上并沒有卡在這。
assc.accept(this,new ServerCompletionHandler());
//阻塞在這不讓服務停止,因為accept不會阻塞。
Thread.sleep(Integer.MAX_VALUE);
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) {
Server server = new Server(8000);
}
}
服務端業務處理handler實作
package aio;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
/**
* @Auther: 李澤
* @Date: 2019/3/4 16:57
* @Description:
*/
public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,Server> {
/**
* 功能描述: 建立連接配接之後才會調用,主要用來讀寫資料給用戶端。
*
* @auther: 李澤
* @date: 2019/3/4 17:09
*/
@Override
public void completed(AsynchronousSocketChannel result, Server attachment) {
//這個對象被使用了,accept方法被執行過了,如果不設定一個本類對象去執行任務的話,不重新監聽的話,新的請求
// 絕對進不來,是以要調用一下 server中 這句話的等效語句assc.accept(this,new ServerCompletionHandler());
//類似遞歸,讓這個類重新處于監聽狀态,處理下一個請求,沒有new新對象。各個對象之間互不幹擾。
attachment.assc.accept(attachment,this);
//讀取資料
read(result);
}
/**
* 功能描述: 具體的讀取邏輯
*
* @auther: 李澤
* @date: 2019/3/4 17:10
*/
private void read(AsynchronousSocketChannel asc) {
//建立緩沖區
ByteBuffer buffer = ByteBuffer.allocate(1024);
//拿出通道來執行讀取的業務
asc.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
//進行讀取之前重新設定pos和limit
attachment.flip();
//列印獲得的位元組數
System.out.println("resultSize = " + result);
//擷取讀取的資料
String body = new String(attachment.array()).trim();
System.out.println("server accept body = " + body);
String resp = "伺服器收到你發來的資料:"+ body;
write(asc,resp);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
/**
* 功能描述: 寫回用戶端,讀寫方法都是不阻塞的。
*
* @auther: 李澤
* @date: 2019/3/4 17:23
*/
private void write(AsynchronousSocketChannel asc, String resp) {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(resp.getBytes());
buffer.flip();
//get寫不寫都行
asc.write(buffer).get();
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Server attachment) {
exc.printStackTrace();
}
}