這是一個很簡單的
AKKA
的使用執行個體,功能也很簡單,就是建立一個名叫
example1
的
actor
,然後向其發送一條消息
hello akka
,而
actor
在接受到消息時,将其列印出來。
object Example1 extends App {
val actorSystem : ActorSystem = ActorSystem.create("exampleSystem")
val actorRef : ActorRef = actorSystem.actorOf(Props[Example1], "example1")
actorRef ! "hello akka"
actorSystem.shutdown()
}
class Example1 extends Actor {
override def receive = {
case msg : String => println(msg)
case _ => unhandled()
}
}
這個案例雖然很簡單,但使用
AKKA
所需的重要元件均已出現,
trait Actor
、
ActorRef
、
ActorSystem
,這三位是使用
AKKA
必須要接觸到的成員,通過繼承
trait Actor
來實作對消息的處理,
ActorRef
則用來進行消息的發送,
ActorSystem
則負責整個系統的管理。
AKKA
的世界全部都收納在
ActorSystem
中,
AKKA
的世界也是色彩缤紛,本文則主要來讨論
AKKA Actor
,需要注意這裡說的是
AKKA Actor
,而不是指上面的
trait Actor
。
AKKA Actor
是我自己定義的,用于區分
trait Actor
。
AKKA Actor
主要由
AKKA
中的三位成員組成,分别是
trait Actor
、
ActorRef
、
ActorCell
,其中
ActorCell
這個成員還沒有在上述代碼中出現,後面會介紹,就知道為啥不會出現了,用下面一張圖來簡單說明這三者之間的關系。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICdzFWRoRXdvN1LclHdpZXYyd2LcBzNvwVZ2x2bzNXak9CX90TQNNkRrFlQKBTSvwFbslmZvwFMwQzLcVmepNHdu9mZvwFVywUNMZTY18CX052bm9CX9UFRPNTTU5kMFpWTmZEWjZXUYpVd1kmYr50MZV3YyI2cKJDT29GRjBjUIF2LcRHelR3LcJzLctmch1mclRXY39zN1YTMyUTMxEjMxgDM3EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
圖中的箭頭方向,表示互相之間的擁有關系,解釋如下:
-
是一個ActorContext
,定義了一些接口,從trait
的視角,來看Actor
是長什麼樣子;ActorCell
-
中同時擁有ActorCell
、Actor
的執行個體;ActorRef
-
也擁有ActorRef
的執行個體,這以為着ActorCell
可以通過ActorRef
來做事情;ActorCell
-
中的對象中擁有Actor
對象,所有在ActorRef
内部可以具備Actor
的行為;ActorRef
-
中擁有Actor
的執行個體,其實就是ActorContext
,但是隻能執行ActorCell
中定義的接口;ActorContext
既然這三位共同組成了
AKKA Actor
,那這三位有什麼分工呢?
首先來看
trait Actor
,這個特質的定義如下,源碼中的注釋已經把删除掉。
trait Actor {
import Actor._
// 擷取ActorContext執行個體,其實就是一個ActorCell,如果擷取不到,則會抛錯,
// 目的就是要求通過actorOf方法來建立Actor,避免直接通過new的方式建立Actor對象
implicit val context: ActorContext = {
val contextStack = ActorCell.contextStack.get
if ((contextStack.isEmpty) || (contextStack.head eq null))
throw ActorInitializationException(s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " + "You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
val c = contextStack.head
ActorCell.contextStack.set(null :: contextStack)
c
}
// 這裡就是擷取ActorRef執行個體,
// 從這裡可以看出,其實也是通過ActorContext擷取到的,也就是擷取的就是ActorCell中的ActorRef
implicit final val self = context.self
// 擷取消息的發送者,也是通過ActorContext擷取到的
final def sender(): ActorRef = context.sender()
// 消息被真正處理的地方
def receive: Actor.Receive
// 接下來的幾個aroundXXX方法,都是對相應的XXX方法的一個包裝,模闆設計模式
protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)
protected[akka] def aroundPreStart(): Unit = preStart()
protected[akka] def aroundPostStop(): Unit = postStop()
protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message)
protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason)
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
// Actor在啟動前需要進行的一些初始化操作
@throws(classOf[Exception])
def preStart(): Unit = ()
// Actor在停止後需要進行的一些收尾操作
@throws(classOf[Exception])
def postStop(): Unit = ()
// Actor在故障恢複時,重新開機前需要進行的操作
@throws(classOf[Exception])
def preRestart(reason: Throwable, message: Option[Any]): Unit = {
context.children foreach { child ⇒
context.unwatch(child)
context.stop(child)
}
postStop()
}
// Actor在故障恢複時,重新開機後,對Actor進行的初始化操作
@throws(classOf[Exception])
def postRestart(reason: Throwable): Unit = {
preStart()
}
def unhandled(message: Any): Unit = {
message match {
case Terminated(dead) ⇒ throw new DeathPactException(dead)
case _ ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
}
}
}
從
trait Actor
定義的接口方法來看,都是跟一個
Actor
的自身行為相關的,比如
def receive: Actor.Receive
這個接口,也就是本文開頭處,
Example1
繼承
Actor
所實作的接口,是對消息進行處理的地方。
另外對于
preStart
、
postStop
、
preRestart
、
postRestart
這四個方法,則分别是用來定義一個
Actor
,在啟動前、關閉後、重新開機前、重新開機後,所執行的操作。其中
preStart
、
postStop
這兩個的預設實作是空;
preRestart
的預設實作則是先停掉所有的
child
,然後再調用
postStop
;而
postRestart
的預設實作則直接就是調用
preStart()
。
而
aroundPreStart
、
aroundPostStop
、
aroundPreRestart
、
aroundPostRestart
的預設實作,則是分别調用了
preStart
、
postStop
、
preRestart
、
postRestart
,并且可以看出,這裡就是一個模闆模式,如果實作的
Actor
對這個模闆不喜歡,完全可以自定義相應的
aroundXXX
方法,并且在
AKKA
内部,也是調用的
aroundXXX
方法。
trait Actor
這個特質就是用來對
Actor
的行為進行自定義的地方,這也就是為什麼
trait Actor
是對外開放的接口,因為消息的處理邏輯隻有程式猿知道呀,是以将這些接口都開發出來,而程式猿也隻需要關心消息的具體處理邏輯,不用去關心
AKKA
内部是如何來實作多線程。
另外在前面說過,在
Actor
中是包含了
ActorRef
和
ActorCell
對象的,在上面代碼中,也可以看到就是
self
和
context
這兩位,并且可以看出,
self
也是通過
context
擷取到的。是以說,在
Actor
内部,是可以通過
self
給自己發送一些消息,通過
context
設定一些參數等的,比如可以通過
setReceiveTimeout
方法來設定多久沒有收到消息,就給自己發送一個
ReceiveTimeout
的消息,通過
actorOf
來建立一個
child
等。
看完
trait Actor
,咱們再來看下
ActorRef
,源碼如下:
abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable {
scalaRef: InternalActorRef ⇒
// Actor的路徑
def path: ActorPath
final def compareTo(other: ActorRef) = {
val x = this.path compareTo other.path
if (x == ) if (this.path.uid < other.path.uid) - else if (this.path.uid == other.path.uid) else
else x
}
// 向Actor發送一條消息
final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender)
def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender())
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2")
def isTerminated: Boolean
final override def hashCode: Int = {
if (path.uid == ActorCell.undefinedUid) path.hashCode
else path.uid
}
final override def equals(that: Any): Boolean = that match {
case other: ActorRef ⇒ path.uid == other.path.uid && path == other.path
case _ ⇒ false
}
override def toString: String =
if (path.uid == ActorCell.undefinedUid) s"Actor[${path}]"
else s"Actor[${path}#${path.uid}]"
}
對于
ActorRef
的執行個體,常用的方法就是
tell
或者
!
這兩個方法,就是用來向
Actor
發送消息,從這裡提供的接口可以看出,主要的任務就是用來進行消息的接受,有點類似小蜜的作用,用來接受郵件。實際使用中,常用的也就是
ActorRef
,而作用則也主要是用來進消息的發送操作。
接下來來一下
ActorRef
的一個子類
InternalActorRef
,源碼如下:
private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope ⇒
// 這幾個方法就是用來控制Actor的生命周期變化的
def start(): Unit
def resume(causedByFailure: Throwable): Unit
def suspend(): Unit
def restart(cause: Throwable): Unit
def stop(): Unit
// 發送一些系統消息
def sendSystemMessage(message: SystemMessage): Unit
// ActorRef的執行個體提供者
def provider: ActorRefProvider
// 擷取parent的ActorRef
def getParent: InternalActorRef
// 擷取child
def getChild(name: Iterator[String]): InternalActorRef
def isLocal: Boolean
def isTerminated: Boolean
}
這裡子類仍然是一個
abstract class
,主要定義了控制
Actor
生命周期的操作,另外還提供了一些擷取
parent
、
child
等接口。
再繼續看下一個子類
ActorRefWithCell
,源碼如下:
private[akka] abstract class ActorRefWithCell extends InternalActorRef { this: ActorRefScope ⇒
def underlying: Cell
def children: immutable.Iterable[ActorRef]
def getSingleChild(name: String): InternalActorRef
}
這個子類中有一個屬性
underlying
,是一個
Cell
的執行個體,前面說過,在
ActorRef
中擁有
ActorCell
,說的就是這個屬性,接下來會看到這個屬性的作用。
最後來看
ActorRef
的子類
LocalActorRef
的源碼:
private[akka] class LocalActorRef private[akka] (
_system: ActorSystemImpl,
_props: Props,
_dispatcher: MessageDispatcher,
_mailboxType: MailboxType,
_supervisor: InternalActorRef,
override val path: ActorPath)
extends ActorRefWithCell with LocalRef {
// 擷取一個ActorCell的執行個體,并進行初始化init操作
private val actorCell: ActorCell = newActorCell(_system, this, _props, _dispatcher, _supervisor)
actorCell.init(sendSupervise = true, _mailboxType)
// new一個ActorCell出來
protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, dispatcher: MessageDispatcher, supervisor: InternalActorRef): ActorCell =
new ActorCell(system, ref, props, dispatcher, supervisor)
protected def actorContext: ActorContext = actorCell
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2")
override def isTerminated: Boolean = actorCell.isTerminated
override def start(): Unit = actorCell.start()
override def suspend(): Unit = actorCell.suspend()
override def resume(causedByFailure: Throwable): Unit = actorCell.resume(causedByFailure)
override def stop(): Unit = actorCell.stop()
override def getParent: InternalActorRef = actorCell.parent
override def provider: ActorRefProvider = actorCell.provider
def children: immutable.Iterable[ActorRef] = actorCell.children
def getSingleChild(name: String): InternalActorRef = actorCell.getSingleChild(name)
override def getChild(names: Iterator[String]): InternalActorRef = {
@tailrec
def rec(ref: InternalActorRef, name: Iterator[String]): InternalActorRef =
ref match {
case l: LocalActorRef ⇒
val next = name.next() match {
case ".." ⇒ l.getParent
case "" ⇒ l
case any ⇒ l.getSingleChild(any)
}
if (next == Nobody || name.isEmpty) next else rec(next, name)
case _ ⇒
ref.getChild(name)
}
if (names.isEmpty) this
else rec(this, names)
}
// ========= AKKA PROTECTED FUNCTIONS =========
def underlying: ActorCell = actorCell
override def sendSystemMessage(message: SystemMessage): Unit = actorCell.sendSystemMessage(message)
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.sendMessage(message, sender)
override def restart(cause: Throwable): Unit = actorCell.restart(cause)
@throws(classOf[java.io.ObjectStreamException])
protected def writeReplace(): AnyRef = SerializedActorRef(this)
}
從上面的源碼可以看出,前面介紹的
ActorRef
本身以及抽象子類中定義的接口,在這裡都有了具體的實作,但是注意,所有的實作都是委托給
actorCell
這個屬性來操作的,從這裡,也就看出了
ActorRef
名稱的由來,它既是一個代理,它自身不會進行任何邏輯操作,它隻是代表
ActorCell
對外提供一些操作接口,是對
ActorCell
進行了有效的保護。
是以說,
ActorRef
是
ActorCell
的一個代理,它将
ActorCell
中允許對外開放的接口開放給了程式猿,其實也就是進行消息發送的
tell
和
!
方法,而子類
InternalActorRef
中定義的接口,則隻能在
AKKA
内部使用,并沒有開放給程式猿,主要
InternalActorRef
的定義中有
private[akka]
,說明它并不對外開放。
AKKA Actor
的三個成員中,已經介紹完了兩個,
trait Actor
提供了定義
Actor
行為的接口,
ActorRef
則隻提供了接受消息的接口,并且内部的各個方法的實作,全都是對
ActorCell
的代理,感覺說了半天,沒啥内容。但是事實就是如此,因為程式猿對
AKKA Actor
的三個成員中,能實際接觸到的就是
trait Actor
和
ActorRef
這兩位,一個用來定義消息的處理邏輯,一個用來進行消息的發送,至于
ActorCell
到底長啥樣,who care。這裡也是我覺得
AKKA Actor
設計比較好的地方,隻開放必要的接口給程式猿,開放給你的,都是你感興趣的,你不感興趣的,都給你很好的隐藏了起來。
再來繼續看
AKKA Actor
的第三位成員,也是核心所在的
ActorCell
,但是在此之前,先看一下
ActorContext
,源碼如下,
ActorContext
是
ActorCell
對
Actor
開放的視角,也就是在
Actor
中隻能看到這些接口。
trait ActorContext extends ActorRefFactory {
// 與之相關聯的ActorRef
def self: ActorRef
// 通過Props擷取的Actor的配置
def props: Props
// 擷取目前設定的多久沒有接受到消息的timeout
def receiveTimeout: Duration
// 設定多久沒有接受到消息,就會發送一條ReceiveTimeout的消息,與上一個方法相對應
// 可以用來監控消息源發送消息是否正常
def setReceiveTimeout(timeout: Duration): Unit
// 下面的become和unbecome則是可以用來動态的變換Actor的行為
def become(behavior: Actor.Receive): Unit = become(behavior, discardOld = true)
def become(behavior: Actor.Receive, discardOld: Boolean): Unit
def unbecome(): Unit
// 擷取目前消息的發送者
def sender(): ActorRef
// 擷取所有的children
def children: immutable.Iterable[ActorRef]
// 根據name擷取相應的child
def child(name: String): Option[ActorRef]
// 消息分發器,AKKA系統的線程池所在的地方
implicit def dispatcher: ExecutionContextExecutor
// actor system
implicit def system: ActorSystem
// 父節點
def parent: ActorRef
// watch和unwatch是一對,用來設定對某個ActorRef進行監控或者不監控
// 對于watch的ActorRef,在其退出時,會有相應的通知
def watch(subject: ActorRef): ActorRef
def unwatch(subject: ActorRef): ActorRef
final protected def writeObject(o: ObjectOutputStream): Unit =
throw new NotSerializableException("ActorContext is not serializable!")
}
ActorContext
還繼承了
ActorRefFactory
,從名稱中就可以看出,是一個工廠類,主要提供了建構
Actor
的各種接口,這也就是為什麼在
Actor
中可以通過
context
來進行child的建立,這裡就不列出
ActorRefFactory
的源碼了。
ActorContext
是
ActorCell
的一個剖面,專門提供給
Actor
來使用,讓在
Actor
中可以進行child的建立和管理。
說完
ActorContext
,再來說另一個特質,也就是
Cell
,也就是細胞。
private[akka] trait Cell {
// 與之關聯的ActorRef、ActorSystem
def self: ActorRef
def system: ActorSystem
def systemImpl: ActorSystemImpl
// 生命周期控制的方法
def start(): this.type
def suspend(): Unit
def resume(causedByFailure: Throwable): Unit
def restart(cause: Throwable): Unit
def stop(): Unit
def isTerminated: Boolean
// 父節點
def parent: InternalActorRef
// child相關的通路接口
def childrenRefs: ChildrenContainer
def getChildByName(name: String): Option[ChildStats]
def getSingleChild(name: String): InternalActorRef
// 消息發送的各種姿勢,最終都是會寫入Mailbox的隊列中
def sendMessage(msg: Envelope): Unit
final def sendMessage(message: Any, sender: ActorRef): Unit =
sendMessage(Envelope(message, sender, system))
def sendSystemMessage(msg: SystemMessage): Unit
def isLocal: Boolean
// 也是檢查的Mailbox的隊列狀态
def hasMessages: Boolean
def numberOfMessages: Int
def props: Props
}
在前面說過,
ActorRef
是對
ActorCell
的代理,從這裡的可以看出,
ActorRef
中涉及的接口,基本都在這裡出現了。
主要的
trait
介紹完了,再來看下
ActorCell
的定義,這裡隻看
ActorCell
的定義,至于内部的各個方法的實作,這裡先不做解釋。
private[akka] class ActorCell(
val system: ActorSystemImpl,
val self: InternalActorRef,
final val props: Props, // Must be final so that it can be properly cleared in clearActorCellFields
val dispatcher: MessageDispatcher,
val parent: InternalActorRef)
extends UntypedActorContext with AbstractActorContext with Cell
with dungeon.ReceiveTimeout
with dungeon.Children
with dungeon.Dispatch
with dungeon.DeathWatch
with dungeon.FaultHandling {
...
}
首先看到
private[akka]
,說明
ActorCell
隻在
akka
包内可見,是以對于程式猿來說,是看不到它的,是以在使用過程中,是不會與
ActorCell
直接打交道的,最多通過
Actor
内部的
context
屬性看到
ActorCell
的一個剖面。
繼承的
UntypedActorContext
、
AbstractActorContext
,是對
ActorContext
的一層繼承,是為了适應Java環境下的使用,加入了一些Java下使用的API,
Cell
上面也介紹過。
後面還混入了五種特征,這些特質分别實作了
ActorCell
的某一方面的功能:
-
,在介紹dungeon.ReceiveTimeout
中,提到ActorContext
和receiveTimeout
方法,如何實作在設定的時間内沒有收到消息,就給出一個setReceiveTimeout
的報警消息,就是這裡來控制的;ReceiveTimeout
-
,是對dungeon.Children
自身的child進行管理的地方;Actor
-
,則是利用dispatcher進行消息發送,并将mailbox注冊到線程池中執行的地方;dungeon.Dispatch
-
,dungeon.DeathWatch
中的ActorContext
和-watch
的具體的實作的地方;unwatch
-
,容錯機制的實作地方。dungeon.FaultHandling
ActorCell
作為
AKKA Actor
三位成員中的核心所在,内容過于豐富,這裡就簡單介紹到此,後續會針對每塊功能單獨介紹。
總結:
-
作為行為定位中心,使用者可以在這裡進行目前trait Actor
的各種行為的定義,如消息如何處理,在啟動前如果初始化,停止後如何收尾,以及故障重新開機時如果過度等行為;還可以通過Actor
屬性,使用context
暴露出來的一些接口,繼續動态行為的控制,child的建立,ReceiveTimeout等控制;ActorCell
-
,這位一位小蜜,職責很明确,就是幫ActorRef
這位老闆進行郵件的接受,同時在ActorCell
内部,還可以進行AKKA
的啟停等操作,但這些所有的操作,本質都是代替Actor
發号指令,最終還是得有ActorCell
來執行;ActorCell
-
,作為ActorCell
的核心所在,所有的動作,都是在其内部來實作,五髒六腑均在此處。AKKA Actor