最近碰到的異步處理的操作比較多,異步就是不等想要的結果傳回執行接下來的其他操作,等異步結果傳回後直接調用已經注冊好的處理方法完成後續操作。異步的思想是非常棒的,相比輪詢的方式而言,異步的實作方式無疑是高效并且優雅的。本文介紹了包括Future,AIO和有點類似于單機版的Map-Reduce的fork/join架構。
Guava ListenableFuture
使用JDK提供的線程池ExcuteService的execute(Runable runable)方法來執行不需要傳回結果的線程任務,而使用submit(Callable callable)方法需要線程任務傳回T類型的執行結果,方法傳回Future對象,使用Future的get方法可以擷取執行結果,而在執行get方法線上程傳回結果之前是阻塞的,jdk這對于想要異步的處理結果沒有提供相應的接口,guava的ListenableFuture接口就是為實作異步的擷取Future中的結果而出現的。
顧名思義,ListenableFuture是可監聽的Future,可以在結果傳回的時候以方法回調的方式實作異步的後續操作。那麼如何擷取ListenableFuture呢,方法有兩種:
1. 将jdk提供的Futhure轉換成ListenableFuture
2. 将ExcutorService線程池轉換成ListeningExcutorService,繼而擷取ListenableFuture
第一種方式我們使用如下擴充卡獲得Future對應的ListenableFuture
ListenableFuture<String> listenableFuture=JdkFutureAdapters.listenInPoolThread(future);
第二種方式我們使用一個線程池的修飾類獲得
ListeningExecutorService listeningThreadPool=MoreExecutors.listeningDecorator(threadPool);
ListenableFuture<String> listenableFuture=listeningThreadPool.submit(new Callable<String>() {....}
在擷取了ListenableFuture之後,我們同樣有兩種方式異步擷取線程執行結果
1. 添加FutureCallback執行回調方法
2. 為ListenableFuture添加監聽線程
第一種方法使用Futures的addCallback方法實作
Futures.addCallback(listenableFuture,new FutureCallback<String>(){
@Override
public void onSuccess(String result) {
System.out.println(result);
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
});
第二種方法不太推薦,使用listenableFuture.addListener的方法實作。
源碼解讀
我們看到guava的實作方式非常優雅,那麼它是怎麼實作這種異步回調的呢,以JdkFutureAdapters.listenInPoolThread為例,其實他是傳回了一個ListenableFutureAdapter的内部類,它實作了ListenableFuture并且繼承了ForwardingFuture類,然後調用callback的時候會建立一個Runable線程任務,其主要邏輯是使用Future的get方法阻塞擷取執行結果,在結果完成的時候回到callback類的onSuccess方法,如果出現異常則調用onFailure方法。
整個方法的設計使用了擴充卡模式,ListenableFuture是最終使用者需要的接口,ListenableFutureAdapter是擴充卡,ForwordingFuture實作了Future接口,是被适配者。
關于FutureTask
FutureTask是Future的一個實作類,它同時實作了Runable接口,直接使用線程池運作FutureTask,并且擷取阻塞擷取結果,其在異步方面并沒有做出改變。
本節代碼
所有測試代碼擷取可以點選這裡
參考文獻:guava并發,深入學習 FutureTask
Java AIO中的Future和CompleteHandler
Socket通信是Java網絡通信的一種方式,在基礎的阻塞BIO後出現了NIO,NIO采用了多路複用思想,使得一個線程可以監聽多個socket檔案描述符,使得在在一次輪訓的過程中可以檢視多個阻塞IO的狀态,相比BIO每次隻能監聽一個IO狀态,如果這個IO長期處于阻塞狀态,那麼其他IO操作如果準備好也無法執行。
BIO的另一個問題是在于對于每個準備好的IO操作必須配置設定一個線程進行實際的阻塞IO操作,這使得系統的線程數和已準備好的IO操作成線性關系。NIO的優勢在于它可以對多個已準備好的IO阻塞操作做打包操作,做線程數的縮減。
NIO對資料的操作是面向緩沖區的,而BIO是面向資料流的,NIO中面向緩沖區的IO操作是講所有的資料一次讀入到緩沖區内,進而做操作,這相比BIO的面向資料流的IO每次隻能讀入一個或多個位元組的方式,這種方式更加快捷。
NIO本質其實還是采用輪詢的方式去擷取已準備好的IO操作,實際的讀寫IO操作仍然是阻塞的,Java AIO是對NIO的又一次改進,其真正實作了異步的IO操作,包括擷取準備好的操作,讀寫操作都是異步的,其執行都會直接傳回一個Future對象,我們可以使用其get方法阻塞接下來的執行。
Java AIO主要用到的類有:
1. AsynchronousSocketChannel
2. AsynchronousServerSocketChannel
AsynchronousSocketChannel的connet方法為其準備io操作,write和read方法為其實際的IO操作,同樣的AsynchronousServerSocketChannel的accept方法為其準備io操作,讀寫操作是和AsynchronousSocketChannel一樣的。
以上方法都是異步的,也就是說方法會立即傳回,擷取異步執行結果的方法有兩種:
1. 擷取Future結果,使用Future的get方法擷取執行結果
2. 使用CompletionHandler的回調方法在結果傳回的時候擷取結果
以AsynchronousServerSocketChannel為例,其connet方法有兩個重載方式:
1. accept()無參方式傳回一個Future,可以使用get方法擷取連接配接的client。
2. accept(A attach,CompletionHandler
public class AioEchoServer {
private int PORT = ;
private String IP = "127.0.0.1";
private AsynchronousServerSocketChannel server;
public AioEchoServer() {
try {
server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP, PORT));
} catch (IOException e) {
e.printStackTrace();
}
}
public void startServer() {
//attachment參數可以被CompletionHandler接收
server.accept("attachment", new CompletionHandler<AsynchronousSocketChannel, String>() {
@Override
public void completed(AsynchronousSocketChannel client, String attachment) {
//擷取了對應的用戶端socket
try {
System.out.println("attachment: " + attachment);
ByteBuffer byteBuffer = ByteBuffer.allocate();
//這裡是非阻塞讀取,需要使用get等待用戶端傳回結果
int readResult = client.read(byteBuffer).get();
System.out.println("讀入資料量:" + readResult);
byteBuffer.flip();
System.out.println("Get from client:" + (new String(byteBuffer.array())));
int writeResult = client.write(byteBuffer).get();
if (writeResult > ) {
System.out.println(client.getRemoteAddress() + ": " + "response success! write length: " + writeResult);
} else {
System.out.println("write length <0");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
server.accept(null,this);
}
}
@Override
public void failed(Throwable exc, String attachment) {
exc.printStackTrace();
}
});
}
public static void main(String[] args) {
AioEchoServer aioEchoServer = new AioEchoServer();
aioEchoServer.startServer();
Scanner scanner = new Scanner(System.in);
scanner.nextLine();
}
}
用戶端的實作方式
public class AioClient {
public static void main(String[] args) {
String IP = "127.0.0.1";
int PORT = ;
final AsynchronousSocketChannel client;
try {
client = AsynchronousSocketChannel.open();
SocketAddress serverSocketAddress = new InetSocketAddress(IP, PORT);
client.connect(serverSocketAddress, "clientAttachment", new CompletionHandler<Void, String>() {
@Override
public void completed(Void result, String attachment) {
System.out.println(attachment);
try {
ByteBuffer byteBuffer = ByteBuffer.wrap("hello".getBytes());
//wrap後不用byteBuffer.flip(),此時position=0,limit是之前position的位置
//read後的postion是寫入的界限,limit=capbility,flip做的事情:limit=poition,position=0.
//須阻塞寫入
int writeResult = client.write(byteBuffer).get();
System.out.println("寫入Byte數:" + writeResult);
//須阻塞讀取
byteBuffer.clear();
int readResult = client.read(byteBuffer).get();
byteBuffer.flip();
System.out.println("server response: " + (new String(byteBuffer.array())));
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, String attachment) {
exc.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
Scanner scanner=new Scanner(System.in);
scanner.nextLine();
}
}
需要注意的有兩點,不論是client還是server,其write和read都是非阻塞的,模拟的情況下我們需要阻塞等待其結果傳回;二是ByteBuffer.wrap方法之後不用flip操作,其傳回的結果已經是flip過的,作者在這裡踩了坑。
單機版Map-Reduce fork/join架構
fork/join架構類似于單機版的Map-Reduce,如果一個計算任務資料量比較大,就将任務分解交給不同的線程去處理,然後最後彙總結果,這種計算模型對于多核處理器來說具有極大的優勢。使用fork/join架構的基本過程如下。
1. 實作一個類繼承自RecursiveTask(或者無傳回值的RecursiveAction)
1. 實作compute方法,将任務拆分成多個自實作的RecursiveTask
2. invokeAll(所有自實作的RecursiveTask);
3. 使用自實作的RecursiveTask的join方法擷取執行結果
4. 彙總傳回結果并傳回
- 使用ForkJoinPool的invoke方法調用上一步自定義的計算任務
下例實作了一個計算數組中所有數和的任務
public class CountTask extends RecursiveTask<Integer>{
private static int THRESHOLD=;
private int[] integers;
private int start;
private int end;
public CountTask(int[] integers,int start,int end){
this.integers=integers;
this.start=start;
this.end=end;
}
@Override
protected Integer compute() {
int len=end-start+;
boolean isOverThreshold=len>THRESHOLD;
int sum=;
if(!isOverThreshold){
for(int i=start;i<=end;i++){
sum+=integers[i];
}
}else{
int mid=(start+end)/;
CountTask leftTask=new CountTask(this.integers,start,mid);
CountTask rightTask=new CountTask(this.integers,mid+,end);
invokeAll(leftTask,rightTask);
//fork的作用是将目前任務放到workerThread裡面去做
//invokeAll是将其中一個放在本線程做,其他的調用fork
int leftResult=leftTask.join();
int rightResult=rightTask.join();
sum=leftResult+rightResult;
}
return sum;
}
}
public class ForkjoinTest {
public static void main(String[] args){
int[] integers={,,,,,,,,,};
ForkJoinPool forkJoinPool=new ForkJoinPool();
ForkJoinTask<Integer> task=new CountTask(integers,,);
int sum=forkJoinPool.invoke(task);
System.out.println(sum);
}
}
值得一提的是fork/join架構是jdk8的stream實作的計算模型,如果想要深入了解stream的實作原理,可以參考這篇文章Java8 Stream原理深度解析。
關于源碼
本文所有源碼可以從這裡獲得,本文首發表于我的部落格,歡迎關注!轉載須注明文章出處,作者保留文章所有權。