Actor的生命周期是使用Hooks展現和控制的,我們可以重寫相關的Hooks,進而實作對Actor生命周期各環節的細粒度控制。各事件發生順序關系如下:
-
在構造函數之後調用。prestart():
-
在重新開機之前調用。postStop():
-
預設情況下會調用 postStop()。preRestart(reason, message):
-
預設情況下會調用 preStart()。postRestart():
注意: preRestart 和 postRestart 隻在重新開機的時候才會被調用。它們預設調用了 preStart 和 postStop,但是調用它們的時候就不再直接調用 preStart 和 postStop 了。 這樣我們就能夠決定, 到底是隻在 Actor 啟動或停止的時候調用一次 preStart 和postStop,還是每次重新開機一個 Actor 的時候就調用 preStart 和postStop。
從上圖我們可以發現Actor的生命周期主要包含三個狀态:啟動、停止和重新開機。
啟動Start政策
Start政策一般用于初始化資源,調用preStart Hook。當Akka通過Props建構一個Actor後,會調用構造函數,之後調用preStart。
override def preStart={
log.info ("Starting storage actor...")
initDB
}
停止Stop政策
Stop政策一般用于回收資源,一個Actor可能因為完成運算、發生異常又或者人為通過發送Kill,PoisonPill強行終止等而進入停止(stopping)狀态。而這個終止過程分為兩步:
- Actor将挂起對郵箱的處理,并向所有子Actor發送終止指令,然後處理來自子Actor的終止消息直到所有的子Actor都完成終止。
- 終止自己,調用postStop方法,清空郵箱,向DeathWatch釋出
,通知其監管者。Terminated
整個人過程保證Actor系統中的子樹以一種有序的方式終止,将終止指令傳播到葉子結點并收集它們回送的确認消息給被終止的監管者。如果其中某個Actor沒有響應(即由于處理消息用了太長時間以至于沒有收到終止指令),整個過程将會被阻塞。
override def postStop={
log.info ("Stopping storage actor...")
db.release
}
重新開機Restart政策
Restart政策是最為複雜的一種情況,在一個Actor的生命周期裡可能因為多種原因發生重新開機。造成一個Actor需要重新開機的原因可能有下面幾個:
(1)在處理某特定消息時造成了系統性的異常,必須通過重新開機來清理系統錯誤;
(2)内部狀态毀壞,必須通過重新開機來重新建構狀态;
(3)在處理消息時無法使用到一些依賴資源,需要重新開機來重新配置資源;
Actor的重新開機過程也是一個遞歸的過程,由于其過程比較複雜,先上個圖:
在預設情況下 ,重新開機過程主要分為以下幾步:
- actor被挂起;
- 調用舊執行個體的supervisionStrategy.handleSupervisorFailing方法(預設實作為挂起所有的子actor);
- 調用preRestart方法,從上面的源碼可以看出來,preRestart方法将所有的children stop掉了!(Stop動作,大家注意!),并調用postStop回收資源;
- 調用舊執行個體的supervisionStrategy.handleSupervisorRestarted方法(預設實作為向所有剩下的子actor發送重新開機請求);
- 等待所有子actor終止直到preRestart最終結束;
- 再次調用之前提供的actor工廠建立新的actor執行個體;
- 對新執行個體調用postRestart;
- 恢複運作新的actor;
預設實作:比如說構造方法不帶參數的,就是一個預設實作。
常見疑難點
Restart政策,和Stop政策有什麼不同的地方?
Stop政策會調用postStop(),Restart政策也會調用postStop(),但是Restart政策不是通過Stop政策來停止舊的Actor,UID和Path都沒變。也就是說,在被Restart之後,不用重新擷取ActorRef。
Actor是由UID和Path來唯一辨別的,也就是說ActorRef也是通過UID和Path來定位。在Actor被Stop之後,新的Actor是可以用這個Path的,但是舊的ActorRef是不能用的,因為UID不一樣。
preRestart Hook有什麼特别之處?
預設的preRestart Hook會将所有的Children通過Stop政策停止,這個時候Children就是通過Stop政策->Start政策啟動的,而不是被遞歸Restart。如果有外部的Actor持有舊的Chidren ActorRef,那這個Ref就是不能用的,因為雖然Path是對的,但是UID已經變了!
postRestart Hook有什麼特别之處?
預設postRestart是調用preStart(),這樣在重新開機的過程中,構造函數和preStart方法都會被重新調用,如果有個資源隻想初始化一次,那麼就必須重寫掉這個方法。是以一般建立children是放在preStart裡面。
override def preStart(): Unit = {
// 初始化children
}
// 重寫postRestart防止preStart每次重新開機都被調用
override def postRestart(reason: Throwable): Unit = ()
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
// 任然要清理自己,但是不Stop children
postStop()
}
Actor生命周期測試代碼
ParentActor.scala :接收從 Main.scala 裡發送的消息,初始化或控制 ChildActor。
package com.lp.akka.notes.lifecycle
import akka.actor.{Actor, ActorLogging, Props, ReceiveTimeout}
/**
* @author li.pan
* @version 1.0.0
* @Description 父Actor
* @createTime 2021年01月13日 13:04:00
*/
class ParentActor extends Actor with ActorLogging {
println("start pActor ")
def receive = {
case "test" => log.info("received test")
case ("newChild", name: String) => context.actorOf(Props[ChildActor], name)
case ("stop", name: String) => {
val child = context.actorFor(self.path + "/" + name);
context.stop(child)
}
case ReceiveTimeout => throw new RuntimeException("received timeout"); // 每隔逾時時間沒收到消息就抛出異常
case "suicide" =>
case x: Any => log.info("received unknown message :" + x)
}
/**
* 在 actor 執行個體化後執行,重新開機時不會執行
*/
override def preStart {
println("actor:" + self.path + ", parent preStart ")
}
/**
* 在 actor 正常終止後執行,異常重新開機時不會執行。
*/
override def postStop {
println("actor:" + self.path + ",parent postStop .")
}
/**
* 在 actor 異常重新開機前儲存狀态
*/
override def preRestart(reason: Throwable, message: Option[Any]) {
println("actor:" + self.path + ", preRestart parent, reason:" + reason + ", message:" + message)
}
/**
* 在 actor 異常重新開機後恢複狀态
*/
override def postRestart(reason: Throwable) {
println("actor:" + self.path + ", postRestart parent, reason:" + reason)
}
}
ChildActor.scala:子 Actor,被 ParentActor 控制。
package com.lp.akka.notes.lifecycle
import akka.actor.Actor
/**
* @author li.pan
* @version 1.0.0
* @Description 子 Actor
* @createTime 2021年01月13日 13:03:00
*/
class ChildActor extends Actor {
override def receive() = {
case "abc" => println("get abc string ")
case "exception" => throw new NullPointerException()
case _ => println("children cann't handle unknown message")
}
override def preStart {
println("actor:" + self.path + ",child preStart .")
}
override def postStop {
println("actor:" + self.path + ",child postStop .")
}
override def preRestart(reason: Throwable, message: Option[Any]) {
println("actor:" + self.path + ",preRestart child, reason:" + reason + ", message:" + message)
}
override def postRestart(reason: Throwable) {
println("actor:" + self.path + ",postRestart child, reason:" + reason)
}
}
LifeCycleMainApp:初始化、發送消息給Actor。
package com.lp.akka.notes.lifecycle
import akka.actor.{ActorSystem, Kill, Props}
/**
* @author li.pan
* @version 1.0.0
* @Description 主類
* @createTime 2021年01月13日 13:05:00
*/
object LifeCycleMainApp extends App {
// 建構Actor系統
val system = ActorSystem("lpLocalSys")
// 通過Props方式建立父Actor
val pActor = system.actorOf(Props[ParentActor], name = "pActor")
pActor ! ("newChild", "child-1")
pActor ! ("newChild", "child-2")
pActor ! "test"
val parent = system.actorSelection("akka://lpLocalSys/user/pActor")
parent ! "test"
// parent ! ("stop", "child-2")
val child2 = system.actorSelection("akka://lpLocalSys/user/pActor/child-2")
child2 ! Kill // 殺死 child2
// child2 ! "exception"
Thread.sleep(5000) // 等待 child2 被殺死
pActor ! ("newChild", "child-2")
// Thread.sleep(5000)
// myActor ! ("newChild", "child-2")
}
運作結果:
在 LifeCycleMainApp 裡,如果不等的 child-2 被殺死就建立同名的 actor将導緻名為 myactor 的父 actor 異常,使它重新開機,而根據 Akka Actor 的監管政策,它也會重新開機它的子 Actor,是以 child-1 和child-2 也會被重新開機,輸出如下
從輸出可以看到:父 Actor 首先調用 preRestart ,然後被執行個體化,再調用 postRestart,最後再重新開機它的子 Actor,子 Actor 也遵循上述的步驟。
參考文獻
- 《Akka入門與實踐》
- https://coderbee.net/index.php/akka/20140814/1000
- https://www.jianshu.com/p/16de393ec5b4
- https://www.codetd.com/article/4668375
- https://www.cnblogs.com/junjiang3/p/9747594.html
關注公衆号 ,專注于java大資料領域離線、實時技術幹貨定期分享!回複
Akka
領取《Akka入門與實踐》書籍,個人網站 www.lllpan.top