天天看點

Actor與并發

Future:(用來存放将來可能成功或失敗的結果)的使用

一、響應式四準則

設計目标:靈敏性、伸縮性、容錯性、事件驅動性

二、剖析Actor

1.AbstractActor:首先,我們繼承了AbstractActor

2.Receive:AbstractActor 類有一個receive 方法,其子類必須實作這個方法或是通過

構造函數調用該方法。receive 方法傳回的類型是PartialFunction。

Akka 為我們提供了一個抽象的構造方法類receiveBuilder,用于生成PartialFunction 作為傳回值。

3.receiveBuilder:連續調用receiveBuilder 的方法,為所有需要比對處理的輸入消

息類型提供響應方法的描述, 然後調用build() 方法生成需要傳回的PartialFunction

4.Match:receiveBuilder 提供了一些值得一提的match 方法,我們将提供一些示例,

展示如何分别使用這些方法來比對“ping”消息。

(1)match(class, function):描述了對于任何尚未比對的該類型的示例,應該如

何響應。

match(String.class, s -> {if(s.equals("Ping")) respondToPing(s);})      

(2)match(class, predicate, function):描述了對于predicate 條件函數為真的某特

定類型的消息,應該如何響應。

match(String.class, s -> s.equals("Ping"), s -> respondToPing(s))      

(3)matchEquals(object, function):描述了對于和傳入的第一個參數相等的消

息,應該如何響應。

matchEquals("Ping", s -> respondToPing(s))      

(4) matchAny(function):該函數比對所有尚未比對的消息。通常來說,最佳實

踐是傳回錯誤資訊,或者至少将錯誤資訊記錄到日志,幫助開發過程中的錯誤調試。

match 函數從上至下進行模式比對。是以可以先定義特殊情況,最後定義一般情況。

receiveBuilder
.matchEquals("Ping", s -> System.out.println("It's Ping: " + s))
.match(String.class, s -> System.out.println("It's a string: " + s))
.matchAny(x -> System.out.println("It's something else: " + x))
.build      

向sender()傳回消息:調用了sender()方法後,我們就可以傳回所收到的消息的響

應了。響應的對象既可能是一個Actor,也可能是來自于Actor 系統外部的請求。

第一種情況相當直接:傳回的消息會直接發送到該Actor 的收件信箱中

tell():sender()函數會傳回一個ActorRef。在上面的例子中,我們調用了sender().tell()。

tell()是最基本的單向消息傳輸模式。第一個參數是我們想要發送至對方信箱的消息。

第二個參數則是希望對方Actor 看到的發送者。

和第1 章初識Actor 中使用過的方法類似,我們描述了接收到的消息是String

時應該做出的響應。由于需要檢查接收到的字元串是否為“Ping”,是以需要進

行判斷,是以這裡使用的match 方法略有不同。然後描述響應行為:通過tell()

方法向sender()傳回一條消息。我們傳回的消息是字元串“Pong”。Java 的tell

方法要求提供消息發送者的身份:這裡使用ActorRef.noSender()表示沒有傳回

位址。

傳回akka.actor.Status.Failure:為了向發送方報告錯誤資訊,需要向其發送一條

消息。如果Actor 中抛出了異常,就會通知對其進行監督的Actor(将在第3 章

傳遞消息中進行介紹)。不過無論如何,如果想要報告錯誤消息,需要将錯誤發

送給發送方。如果發送方使用Future 來接收響應,那麼傳回錯誤消息會導緻

Future 的結果為失敗。

三、Actor的建立

通路Actor 的方法和通路普通對象的方法有所不同。我們從來都不會得到Actor 的

執行個體,從不調用Actor 的方法,也不直接改變Actor 的狀态,反之,隻向Actor 發送消息。

除此之外,我們也不會直接通路Actor 的成員,而是通過消息傳遞來請求擷取關于Actor

狀态的資訊。

在Akka 中,這個指向Actor 執行個體的引用叫做ActorRef。ActorRef 是一個無類型的引

用,将其指向的Actor 封裝起來,提供了更高層的抽象,并且給使用者提供了一種與Actor

進行通信的機制。

Actor 系統就是包含所有Actor 的地方。有一點可能相當明顯:我們也正是在Actor 系統中建立新的Actor 并擷取指向Actor 的引用。actorOf 方法會生成一個新的Actor,并傳回指向該Actor 的引用。

ActorRef actor = actorSystem.actorOf(Props.create(JavaPongActor.class));      

四、Props

為了保證能夠将Actor 的執行個體封裝起來,不讓其被外部直接通路,我們将所有構造

函數的參數傳給一個Props 的執行個體。Props 允許我們傳入Actor 的類型以及一個變長的參

數清單。

Props.create(PongActor.class, arg1, arg2, argn);      

如果Actor 的構造函數有參數,那麼推薦的做法是通過一個工廠方法來建立Props。

假如我們不希望Pong Actor 傳回“Pong”,而是希望其傳回另一條消息,那麼可能就會

需要這樣的構造參數。我們可以建立一個工廠方法,用于生成這樣的Props 示例:

public static Props props(String response) {
return Props.create(this.class, response);
}      

然後就可以使用Props 的工廠方法來建立Actor:

ActorRef actor = actorSystem.actorOf(JavaPongActor.props("PongFoo"));      

actorOf 建立一個Actor,并傳回指向該Actor 的引用ActorRef。除此之外,還有另

一種方法可以擷取指向Actor 的引用:actorSelection。

回顧:

。我們可以建立一個Actor,傳入一個構造參數清單構

建一個Props 執行個體,并将該Props 執行個體作為參數傳給system.actorOf 并調用system.actorOf

方法,得到指向該Actor 的引用。要為Actor 指定名字的話,隻需要将該名字作為參數傳

給actorOf 方法即可。最後,我們可以使用actorSelection 在本地或遠端系統上查找已有

的Actor。

五、Promise、Future 和事件驅動的程式設計模型

1.阻塞與事件驅動API

簡單的使用阻塞IO 調用資料庫的例子:

String username = getUsernameFromDatabase(userId);
System.out.println(username);      

“事件驅動”這個術語正是描述了這種方法:在發生某些特定事件時,就執行某些對應的代碼

有一點必須要再次強調:列印語句并不會運作在進行事件注冊的線程上。它會運作在

另一個線程上,該線程資訊由ExecutionContext 維護。Future 永遠是通過Execution Context

來建立的,是以我們可以選擇在哪裡運作Future 中真正需要執行的代碼。

2.使用Future 進行響應的Actor

首先建立一個ActorSystem,然後通過actorOf 在剛建立的Actor 系統中建立一個Actor

ActorSystem system = ActorSystem.create();
ActorRef actorRef = system.actorOf(Props.create(JavaPongActor.class));      

現在向Actor 詢問其對于某個消息的響應:

final Future sFuture = ask(actorRef, "Ping", 1000);      

我們調用ask 方法,傳入以下參數:

消息發送至的Actor 引用;

想要發送給Actor 的消息;

Future 的逾時參數:等待結果多久以後就認為詢問失敗。

ask 會傳回一個Scala Future,作為響應的占位符。在Actor 的代碼中,Actor 會

向sender()發送回一條消息,這條消息就是在ask 傳回的Scala Future 中将接收到的

響應。

雖然我們無法在Java 8 中使用Scala Future,但是可以通過之前導入的庫将其轉換為

CompletableFuture:
final CompletionStage<String> cs = toJava(sFuture);
final CompletableFuture<String> jFuture = (CompletableFuture<String>) cs;      

最後,我們調用get()方法将測試線程阻塞,并得到結果。在查詢失敗的例子中,get

方法會抛出一個從Actor 發送出的akka.status.Failure 異常。

六、了解Future 和Promise

1. Future——在類型中表達失敗與延遲

2.構造消息:

Get 消息:如果key 存在,就傳回value;