第三部分:與裝置Actor一起工作
akka版本2.5.8
版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。
在之前的話題中,我們解釋了如何在高層次來看待actor系統,即要如何去表示元件,如何安排actor的層次結構。在本節中,我們會看到如何實作其中的裝置actor。
如果我們使用對象,我們會将API設計為接口,并擁有一組會被實作類實作的抽象的方法。但是在actor的世界裡,協定(protocols)取代了接口。雖然我們不能在程式設計語言内形式化通用協定,但是我們可以編寫它們最基本的元素——消息。是以,我們會從定義我們希望發給裝置的消息開始我們的程式。
給裝置的消息
裝置actor的工作很簡單:
1、收集溫度測量資訊
2、當被查詢時,報告最後一次的測量值
然而,在裝置啟動時不會立刻就獲得溫度測量資訊,是以,我們需要考慮溫度測量資訊不存在的情況。這也允許我們的actor在沒有寫子產品的時候來測試讀子產品,因為裝置可以簡單地報告一個空結果。
從裝置擷取但前溫度的協定很簡單,actor需要:
1、等待取目前溫度的請求
2、回應這個請求:
①擁有目前的溫度資料
②辨別目前溫度資料還不可用
我們需要兩個消息,一個用來請求,一個用來回複。我們的第一次嘗試可能如下所示:
final case object ReadTemperature
final case class RespondTemperature(value: Option[Double])
這兩條消息貌似涵蓋了所有我們所需要的功能,然而,我們選擇方法的時候必須要考慮應用程式的分布式特性。雖然actor在JVM本地通信與遠端通信的基本機制相同,但是我們需要牢記以下幾點:
1、本地資訊與遠端資訊的傳輸延遲有很大的不同,有些因素,如網絡帶寬、資訊大小都會産生作用。
2、可靠性必須被重視,因為在遠端資訊傳遞中會涉及到很多的步驟,這也會增大失敗的幾率。
3、本地消息僅僅是在JVM内部傳遞引用,是以不會對消息有很多的限制,但是遠端傳輸可能會限制消息的大小。
另外,在JVM内部傳遞消息顯然是可靠性很高的,但是當actor因為程式員的錯誤而在處理資訊時失敗了,那麼系統的表現就會和遠端網絡請求中遠端處理消息崩潰一緻。盡管這是兩個場景,服務一會就會被恢複(actor會被監管者重新開機,主機會被操作員或監控系統重新開機),但是個别的請求可能會在故障中丢失。是以,我們要悲觀一些,在丢失任何資訊的情況下都要保證系統安全
進一步了解協定中的靈活性需求,将有助于我們去考慮Akka消息順序和消息傳遞保證。Akka為消息發送提供了以下行為:
1、最多隻有一次傳遞,即不保證送達
2、資訊是被每個發送者接收者對來維護的
以下章節将讨論行為中的更多細節:
1、資訊傳遞
2、資訊排序
資訊傳遞
消息傳遞子系統提供的消息傳遞語義通常分為以下幾類:
1、最多傳遞一次(At-most-once delivery),每個消息被發送零或一次,這意味着資訊可能會丢失,但永遠不會被重複接收到
2、至少傳遞一次(At-least-once delivery),每個消息都可能被潛在地發送很多次,直到有一次成功。這意味着資訊可能會被重複接收,但永遠不會丢失
3、準确地發送一次(Exactly-once delivery),每個消息都被精準地發送給接收者一次,消息不會丢失也不會重複接收
Akka使用第一種行為,它是最節省資源的,并且性能最好。它擁有最小的實作開銷,因為可以使用發送即忘(fire-and-forget)政策,而不用在發送者内儲存發送狀态。第二點,也不需要對傳輸丢失進行計數。這些增加了發送結束後保持狀态、發送完畢确認的開銷。準确地發送一次資訊的方式開銷是最大的,由于其很差的性能表現,除了在發送端增加上述所說的開銷外,還需要在接收端增加過濾重複消息的機制。
在actor系統中,我們需要确定一個消息被保證的含義,在哪種情況下認為傳輸已經完成:
1、當消息被送出到網絡上時?
2、當消息被接收者主機接收到時?
3、當消息被放到接收者actor的郵箱裡時?
4、當消息接收者actor開始處理這個資訊時?
5、當消息接受者actor處理完這個消息時
大多數架構和協定聲稱保證傳輸,實際上它們提供了類似于4和5的東西。雖然這聽起來是合理的,但是實際上真的有用嗎?要了解其中的含義,請考慮一個簡單的問題:使用者嘗試下一個訂單,并且我們認為一旦它進入了訂單資料庫,就代表它已經被成功處理了。
如果我們依賴于第五點,即消息被成功處理,那麼actor需要盡快在處理完後報告成功狀态,這個actor就有義務在訂單被送出到它的API後進行校驗、處理,然後放入訂單資料庫。不幸的是,當API被調用後,這些情況可能會發生:
1、主機崩潰
2、反序列化失敗
3、校驗失敗
4、資料庫不可通路
5、發生程式錯誤
這說明傳輸保證不能被認為是領域級别的保證。我們隻想讓它在完全處理完訂單并将其持久化後報告成功狀态。唯一能報告成功狀态的實體是應用程式本身,因為隻有它了解領域内保證傳輸需要有哪些需求。沒有一個通用的系統可以搞清楚某個特定領域中什麼情況才會被認為是成功。
在這個特定的例子中,我們隻想在成功寫入資料庫之後發出成功信号,資料庫确認已經安全地将訂單存儲起來。由于這些原因,Akka将保證程式的責任提升給了應用程式本身,即你必須自己去實作這些。這給了你完全的控制權,讓你可以保護你需要保護的内容。現在,讓我們考慮下Akka為我們提供的消息排序,以便輕松推理應用程式邏輯。
資訊排序
在Akka裡對于一個給定的發送接收actor對。直接從A到B的消息不會被無序接收。直接這個詞強調這隻适用于直接向接收者發動消息,而不包括中間有協調員的情況。
如果:
1、actor向
A1
發送了資訊
A2
,
M1
,
M2
2、actor
M3
向
A3
發送了資訊
A2
,
M4
,
M5
M6
這意味着對于Akka消息:
1、必須在
M1
和
M2
M3
前被發送
2、
必須在
M2
M3
前被發送
3、
必須在
M4
和
M5
M6
前被發送
4、
必須在
M5
M6
前被發送
5、
看到的
A2
和
A1
A3
的資訊可能是交錯出現的
6、目前我們沒有保證傳輸,所有消息都有可能會被丢棄,比如沒有到達
A2
這些保證達到了一個很好的平衡:從一個actor接收到有序的消息使我們可以友善地建構易于推理的系統。另一方面,允許不同actor的消息交錯接受給了我們足夠的自由度,讓我們可以實作高性能的actor系統。
有關傳輸保證的完整細節,棄權那個參考參考頁面。
為裝置消息添加靈活性
我們的第一個查詢協定是正确的,但是沒有考慮分布式應用程式的執行。如果我們想在actor中實作重傳(因為請求逾時),以便查詢裝置actor,或者我們想在查詢多個actor時關聯請求和回複。是以,我們在消息裡添加了一個字段,以便請求者可以提供一個ID(我們會在接下來的步驟裡把代碼添加到應用程式裡):
final case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])
定義裝置actor和讀取協定
正如我們在Hello World執行個體裡學習到的,每個actor定義了其能接受到的消息種類。我們的裝置actor有義務使用相同的ID參數來回應請求,這将看起來如下所示:
import akka.actor.{ Actor, ActorLogging, Props }
object Device {
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
final case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])
}
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
import Device._
var lastTemperatureReading: Option[Double] = None
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
override def receive: Receive = {
case ReadTemperature(id) ⇒
sender() ! RespondTemperature(id, lastTemperatureReading)
}
}
注意代碼中的:
1、伴生對象定義了如何建立actor,期中
Device
props
方法的參數包含裝置的ID和所屬的組ID,這在之後将會用到。
2、伴生對象包含了我們之前所述的消息的定義。
3、在
類裡,
Device
的值初始化為
lastTemperatureReading
,并且actor可以簡單地将它傳回。
None
測試actor
基于上面的簡單actor,我們可以寫一個簡單的測試用例。在測試代碼路徑下的
com.lightbend.akka.sample
包裡添加
DeviceSpec.scala
檔案。(我們使用ScalaTest,你也可以使用其他測試架構)
你可以通過在sbt提示符下運作
test
來運作測試。
"reply with empty reading if no temperature is known" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(Device.ReadTemperature(requestId = ), probe.ref)
val response = probe.expectMsgType[Device.RespondTemperature]
response.requestId should ===()
response.value should ===(None)
}
現在當actor接收到傳感器的資訊時,需要一種方式來改變其溫度狀态。
添加一個寫入協定
寫入協定的目的是在接受到包含溫度的資訊時更新
currentTemperature
字段。同樣,我們使用一個簡單的消息來定義寫入協定,就像這樣:
然而,這種方式沒有考慮讓發送者知道溫度記錄是否被處理,我們已經看到Akka并不保證消息傳輸,并且把提供消息成功提示留給了應用程式來做。在我們的場景下,我們希望在更新溫度之後給發送者一個确認消息。例如:
final case class TemperatureRecorded(requestId: Long)
。就像之前場景中溫度的請求和回應一樣,添加一個ID字段提供了極大的靈活性。
有讀寫消息的actor
将讀寫協定放在一起,裝置actor看起來就會像這樣:
import akka.actor.{ Actor, ActorLogging, Props }
object Device {
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
final case class RecordTemperature(requestId: Long, value: Double)
final case class TemperatureRecorded(requestId: Long)
final case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])
}
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
import Device._
var lastTemperatureReading: Option[Double] = None
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
override def receive: Receive = {
case RecordTemperature(id, value) ⇒
log.info("Recorded temperature reading {} with {}", value, id)
lastTemperatureReading = Some(value)
sender() ! TemperatureRecorded(id)
case ReadTemperature(id) ⇒
sender() ! RespondTemperature(id, lastTemperatureReading)
}
}
我們現在還需要寫一個新的測試用例,同時執行讀/請求和寫/記錄:
"reply with latest temperature reading" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(Device.RecordTemperature(requestId = , ), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = ))
deviceActor.tell(Device.ReadTemperature(requestId = ), probe.ref)
val response1 = probe.expectMsgType[Device.RespondTemperature]
response1.requestId should ===()
response1.value should ===(Some())
deviceActor.tell(Device.RecordTemperature(requestId = , ), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = ))
deviceActor.tell(Device.ReadTemperature(requestId = ), probe.ref)
val response2 = probe.expectMsgType[Device.RespondTemperature]
response2.requestId should ===()
response2.value should ===(Some())
}
接下來
到目前為止,我們已經開始設計我們的整體架構,并且我們編寫了與領域直接對應的第一個actor。我們之後需要建立一個用來維護裝置組和裝置actor的元件。