天天看點

AKKA-源碼-Actor的結構設計

這是一個很簡單的

AKKA

的使用執行個體,功能也很簡單,就是建立一個名叫

example1

actor

,然後向其發送一條消息

hello akka

,而

actor

在接受到消息時,将其列印出來。

object Example1 extends App {
    val actorSystem : ActorSystem = ActorSystem.create("exampleSystem")
    val actorRef : ActorRef = actorSystem.actorOf(Props[Example1], "example1")
    actorRef ! "hello akka"    
    actorSystem.shutdown()
}

class Example1 extends Actor {
    override def receive = {
        case msg : String => println(msg)
        case _ => unhandled()
    }
}
           

這個案例雖然很簡單,但使用

AKKA

所需的重要元件均已出現,

trait Actor

ActorRef

ActorSystem

,這三位是使用

AKKA

必須要接觸到的成員,通過繼承

trait Actor

來實作對消息的處理,

ActorRef

則用來進行消息的發送,

ActorSystem

則負責整個系統的管理。

AKKA

的世界全部都收納在

ActorSystem

中,

AKKA

的世界也是色彩缤紛,本文則主要來讨論

AKKA Actor

,需要注意這裡說的是

AKKA Actor

,而不是指上面的

trait Actor

AKKA Actor

是我自己定義的,用于區分

trait Actor

AKKA Actor

主要由

AKKA

中的三位成員組成,分别是

trait Actor

ActorRef

ActorCell

,其中

ActorCell

這個成員還沒有在上述代碼中出現,後面會介紹,就知道為啥不會出現了,用下面一張圖來簡單說明這三者之間的關系。

AKKA-源碼-Actor的結構設計

圖中的箭頭方向,表示互相之間的擁有關系,解釋如下:

  1. ActorContext

    是一個

    trait

    ,定義了一些接口,從

    Actor

    的視角,來看

    ActorCell

    是長什麼樣子;
  2. ActorCell

    中同時擁有

    Actor

    ActorRef

    的執行個體;
  3. ActorRef

    也擁有

    ActorCell

    的執行個體,這以為着

    ActorRef

    可以通過

    ActorCell

    來做事情;
  4. Actor

    中的對象中擁有

    ActorRef

    對象,所有在

    Actor

    内部可以具備

    ActorRef

    的行為;
  5. Actor

    中擁有

    ActorContext

    的執行個體,其實就是

    ActorCell

    ,但是隻能執行

    ActorContext

    中定義的接口;

既然這三位共同組成了

AKKA Actor

,那這三位有什麼分工呢?

首先來看

trait Actor

,這個特質的定義如下,源碼中的注釋已經把删除掉。

trait Actor {
  import Actor._

  // 擷取ActorContext執行個體,其實就是一個ActorCell,如果擷取不到,則會抛錯,
  // 目的就是要求通過actorOf方法來建立Actor,避免直接通過new的方式建立Actor對象
  implicit val context: ActorContext = {
    val contextStack = ActorCell.contextStack.get
    if ((contextStack.isEmpty) || (contextStack.head eq null))
      throw ActorInitializationException(s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " + "You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
    val c = contextStack.head
    ActorCell.contextStack.set(null :: contextStack)
    c
  }

  // 這裡就是擷取ActorRef執行個體,
  // 從這裡可以看出,其實也是通過ActorContext擷取到的,也就是擷取的就是ActorCell中的ActorRef
  implicit final val self = context.self

  // 擷取消息的發送者,也是通過ActorContext擷取到的
  final def sender(): ActorRef = context.sender()

  // 消息被真正處理的地方
  def receive: Actor.Receive

  // 接下來的幾個aroundXXX方法,都是對相應的XXX方法的一個包裝,模闆設計模式

  protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)

  protected[akka] def aroundPreStart(): Unit = preStart()

  protected[akka] def aroundPostStop(): Unit = postStop()

  protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message)

  protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason)

  def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy

  // Actor在啟動前需要進行的一些初始化操作
  @throws(classOf[Exception])
  def preStart(): Unit = ()

  // Actor在停止後需要進行的一些收尾操作
  @throws(classOf[Exception]) 
  def postStop(): Unit = ()

  // Actor在故障恢複時,重新開機前需要進行的操作
  @throws(classOf[Exception]) 
  def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    context.children foreach { child ⇒
      context.unwatch(child)
      context.stop(child)
    }
    postStop()
  }

  // Actor在故障恢複時,重新開機後,對Actor進行的初始化操作
  @throws(classOf[Exception])  
  def postRestart(reason: Throwable): Unit = {
    preStart()
  }

  def unhandled(message: Any): Unit = {
    message match {
      case Terminated(dead) ⇒ throw new DeathPactException(dead)
      case _                ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
    }
  }
}
           

trait Actor

定義的接口方法來看,都是跟一個

Actor

的自身行為相關的,比如

def receive: Actor.Receive
           

這個接口,也就是本文開頭處,

Example1

繼承

Actor

所實作的接口,是對消息進行處理的地方。

另外對于

preStart

postStop

preRestart

postRestart

這四個方法,則分别是用來定義一個

Actor

,在啟動前、關閉後、重新開機前、重新開機後,所執行的操作。其中

preStart

postStop

這兩個的預設實作是空;

preRestart

的預設實作則是先停掉所有的

child

,然後再調用

postStop

;而

postRestart

的預設實作則直接就是調用

preStart()

aroundPreStart

aroundPostStop

aroundPreRestart

aroundPostRestart

的預設實作,則是分别調用了

preStart

postStop

preRestart

postRestart

,并且可以看出,這裡就是一個模闆模式,如果實作的

Actor

對這個模闆不喜歡,完全可以自定義相應的

aroundXXX

方法,并且在

AKKA

内部,也是調用的

aroundXXX

方法。

trait Actor

這個特質就是用來對

Actor

的行為進行自定義的地方,這也就是為什麼

trait Actor

是對外開放的接口,因為消息的處理邏輯隻有程式猿知道呀,是以将這些接口都開發出來,而程式猿也隻需要關心消息的具體處理邏輯,不用去關心

AKKA

内部是如何來實作多線程。

另外在前面說過,在

Actor

中是包含了

ActorRef

ActorCell

對象的,在上面代碼中,也可以看到就是

self

context

這兩位,并且可以看出,

self

也是通過

context

擷取到的。是以說,在

Actor

内部,是可以通過

self

給自己發送一些消息,通過

context

設定一些參數等的,比如可以通過

setReceiveTimeout

方法來設定多久沒有收到消息,就給自己發送一個

ReceiveTimeout

的消息,通過

actorOf

來建立一個

child

等。

看完

trait Actor

,咱們再來看下

ActorRef

,源碼如下:

abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable {
  scalaRef: InternalActorRef ⇒

  // Actor的路徑
  def path: ActorPath

  final def compareTo(other: ActorRef) = {
    val x = this.path compareTo other.path
    if (x == ) if (this.path.uid < other.path.uid) - else if (this.path.uid == other.path.uid)  else     
    else x
  }

  // 向Actor發送一條消息
  final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender)

  def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender())

  @deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") 
  def isTerminated: Boolean

  final override def hashCode: Int = {
    if (path.uid == ActorCell.undefinedUid) path.hashCode
    else path.uid
  }

  final override def equals(that: Any): Boolean = that match {
    case other: ActorRef ⇒ path.uid == other.path.uid && path == other.path
    case _               ⇒ false  
  }

  override def toString: String =
    if (path.uid == ActorCell.undefinedUid) s"Actor[${path}]"    
    else s"Actor[${path}#${path.uid}]"
}
           

對于

ActorRef

的執行個體,常用的方法就是

tell

或者

!

這兩個方法,就是用來向

Actor

發送消息,從這裡提供的接口可以看出,主要的任務就是用來進行消息的接受,有點類似小蜜的作用,用來接受郵件。實際使用中,常用的也就是

ActorRef

,而作用則也主要是用來進消息的發送操作。

接下來來一下

ActorRef

的一個子類

InternalActorRef

,源碼如下:

private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope ⇒

  // 這幾個方法就是用來控制Actor的生命周期變化的

  def start(): Unit
  def resume(causedByFailure: Throwable): Unit
  def suspend(): Unit
  def restart(cause: Throwable): Unit
  def stop(): Unit

  // 發送一些系統消息
  def sendSystemMessage(message: SystemMessage): Unit

  // ActorRef的執行個體提供者
  def provider: ActorRefProvider

  // 擷取parent的ActorRef
  def getParent: InternalActorRef

  // 擷取child
  def getChild(name: Iterator[String]): InternalActorRef

  def isLocal: Boolean

  def isTerminated: Boolean
}
           

這裡子類仍然是一個

abstract class

,主要定義了控制

Actor

生命周期的操作,另外還提供了一些擷取

parent

child

等接口。

再繼續看下一個子類

ActorRefWithCell

,源碼如下:

private[akka] abstract class ActorRefWithCell extends InternalActorRef { this: ActorRefScope ⇒
  def underlying: Cell
  def children: immutable.Iterable[ActorRef]
  def getSingleChild(name: String): InternalActorRef
}
           

這個子類中有一個屬性

underlying

,是一個

Cell

的執行個體,前面說過,在

ActorRef

中擁有

ActorCell

,說的就是這個屬性,接下來會看到這個屬性的作用。

最後來看

ActorRef

的子類

LocalActorRef

的源碼:

private[akka] class LocalActorRef private[akka] (
  _system: ActorSystemImpl,
  _props: Props,
  _dispatcher: MessageDispatcher,
  _mailboxType: MailboxType,
  _supervisor: InternalActorRef,
  override val path: ActorPath)
  extends ActorRefWithCell with LocalRef {

  // 擷取一個ActorCell的執行個體,并進行初始化init操作
  private val actorCell: ActorCell = newActorCell(_system, this, _props, _dispatcher, _supervisor)
  actorCell.init(sendSupervise = true, _mailboxType)

  // new一個ActorCell出來
  protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, dispatcher: MessageDispatcher, supervisor: InternalActorRef): ActorCell =
    new ActorCell(system, ref, props, dispatcher, supervisor)

  protected def actorContext: ActorContext = actorCell

  @deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") 
  override def isTerminated: Boolean = actorCell.isTerminated

  override def start(): Unit = actorCell.start()

  override def suspend(): Unit = actorCell.suspend()

  override def resume(causedByFailure: Throwable): Unit = actorCell.resume(causedByFailure)

  override def stop(): Unit = actorCell.stop()

  override def getParent: InternalActorRef = actorCell.parent

  override def provider: ActorRefProvider = actorCell.provider

  def children: immutable.Iterable[ActorRef] = actorCell.children

  def getSingleChild(name: String): InternalActorRef = actorCell.getSingleChild(name)

  override def getChild(names: Iterator[String]): InternalActorRef = {  
    @tailrec    
    def rec(ref: InternalActorRef, name: Iterator[String]): InternalActorRef =
      ref match {
        case l: LocalActorRef ⇒
          val next = name.next() match {
            case ".." ⇒ l.getParent
            case ""   ⇒ l
            case any  ⇒ l.getSingleChild(any)
          }
          if (next == Nobody || name.isEmpty) next else rec(next, name)
        case _ ⇒
          ref.getChild(name)
      }

    if (names.isEmpty) this    
    else rec(this, names)
  }

  // ========= AKKA PROTECTED FUNCTIONS =========
  def underlying: ActorCell = actorCell
  override def sendSystemMessage(message: SystemMessage): Unit = actorCell.sendSystemMessage(message)

  override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.sendMessage(message, sender)

  override def restart(cause: Throwable): Unit = actorCell.restart(cause)

  @throws(classOf[java.io.ObjectStreamException])
  protected def writeReplace(): AnyRef = SerializedActorRef(this)
}
           

從上面的源碼可以看出,前面介紹的

ActorRef

本身以及抽象子類中定義的接口,在這裡都有了具體的實作,但是注意,所有的實作都是委托給

actorCell

這個屬性來操作的,從這裡,也就看出了

ActorRef

名稱的由來,它既是一個代理,它自身不會進行任何邏輯操作,它隻是代表

ActorCell

對外提供一些操作接口,是對

ActorCell

進行了有效的保護。

是以說,

ActorRef

ActorCell

的一個代理,它将

ActorCell

中允許對外開放的接口開放給了程式猿,其實也就是進行消息發送的

tell

!

方法,而子類

InternalActorRef

中定義的接口,則隻能在

AKKA

内部使用,并沒有開放給程式猿,主要

InternalActorRef

的定義中有

private[akka]

,說明它并不對外開放。

AKKA Actor

的三個成員中,已經介紹完了兩個,

trait Actor

提供了定義

Actor

行為的接口,

ActorRef

則隻提供了接受消息的接口,并且内部的各個方法的實作,全都是對

ActorCell

的代理,感覺說了半天,沒啥内容。但是事實就是如此,因為程式猿對

AKKA Actor

的三個成員中,能實際接觸到的就是

trait Actor

ActorRef

這兩位,一個用來定義消息的處理邏輯,一個用來進行消息的發送,至于

ActorCell

到底長啥樣,who care。這裡也是我覺得

AKKA Actor

設計比較好的地方,隻開放必要的接口給程式猿,開放給你的,都是你感興趣的,你不感興趣的,都給你很好的隐藏了起來。

再來繼續看

AKKA Actor

的第三位成員,也是核心所在的

ActorCell

,但是在此之前,先看一下

ActorContext

,源碼如下,

ActorContext

ActorCell

Actor

開放的視角,也就是在

Actor

中隻能看到這些接口。

trait ActorContext extends ActorRefFactory {

  // 與之相關聯的ActorRef
  def self: ActorRef

  // 通過Props擷取的Actor的配置
  def props: Props

  // 擷取目前設定的多久沒有接受到消息的timeout
  def receiveTimeout: Duration

  // 設定多久沒有接受到消息,就會發送一條ReceiveTimeout的消息,與上一個方法相對應
  // 可以用來監控消息源發送消息是否正常
  def setReceiveTimeout(timeout: Duration): Unit

  // 下面的become和unbecome則是可以用來動态的變換Actor的行為
  def become(behavior: Actor.Receive): Unit = become(behavior, discardOld = true)

  def become(behavior: Actor.Receive, discardOld: Boolean): Unit

  def unbecome(): Unit

  // 擷取目前消息的發送者
  def sender(): ActorRef

  // 擷取所有的children
  def children: immutable.Iterable[ActorRef]

  // 根據name擷取相應的child
  def child(name: String): Option[ActorRef]

  // 消息分發器,AKKA系統的線程池所在的地方
  implicit def dispatcher: ExecutionContextExecutor

  // actor system
  implicit def system: ActorSystem

  // 父節點
  def parent: ActorRef

  // watch和unwatch是一對,用來設定對某個ActorRef進行監控或者不監控
  // 對于watch的ActorRef,在其退出時,會有相應的通知
  def watch(subject: ActorRef): ActorRef

  def unwatch(subject: ActorRef): ActorRef

  final protected def writeObject(o: ObjectOutputStream): Unit =
    throw new NotSerializableException("ActorContext is not serializable!")
}
           

ActorContext

還繼承了

ActorRefFactory

,從名稱中就可以看出,是一個工廠類,主要提供了建構

Actor

的各種接口,這也就是為什麼在

Actor

中可以通過

context

來進行child的建立,這裡就不列出

ActorRefFactory

的源碼了。

ActorContext

ActorCell

的一個剖面,專門提供給

Actor

來使用,讓在

Actor

中可以進行child的建立和管理。

說完

ActorContext

,再來說另一個特質,也就是

Cell

,也就是細胞。

private[akka] trait Cell {
  // 與之關聯的ActorRef、ActorSystem
  def self: ActorRef
  def system: ActorSystem
  def systemImpl: ActorSystemImpl

  // 生命周期控制的方法
  def start(): this.type
  def suspend(): Unit
  def resume(causedByFailure: Throwable): Unit
  def restart(cause: Throwable): Unit  
  def stop(): Unit

  def isTerminated: Boolean

  // 父節點
  def parent: InternalActorRef

  // child相關的通路接口
  def childrenRefs: ChildrenContainer
  def getChildByName(name: String): Option[ChildStats]  
  def getSingleChild(name: String): InternalActorRef

  // 消息發送的各種姿勢,最終都是會寫入Mailbox的隊列中
  def sendMessage(msg: Envelope): Unit
  final def sendMessage(message: Any, sender: ActorRef): Unit =
    sendMessage(Envelope(message, sender, system)) 
  def sendSystemMessage(msg: SystemMessage): Unit

  def isLocal: Boolean

  // 也是檢查的Mailbox的隊列狀态
  def hasMessages: Boolean 
  def numberOfMessages: Int

  def props: Props
}
           

在前面說過,

ActorRef

是對

ActorCell

的代理,從這裡的可以看出,

ActorRef

中涉及的接口,基本都在這裡出現了。

主要的

trait

介紹完了,再來看下

ActorCell

的定義,這裡隻看

ActorCell

的定義,至于内部的各個方法的實作,這裡先不做解釋。

private[akka] class ActorCell(
  val system: ActorSystemImpl,
  val self: InternalActorRef,
  final val props: Props, // Must be final so that it can be properly cleared in clearActorCellFields  
  val dispatcher: MessageDispatcher,
  val parent: InternalActorRef)
  extends UntypedActorContext with AbstractActorContext with Cell
  with dungeon.ReceiveTimeout
  with dungeon.Children
  with dungeon.Dispatch
  with dungeon.DeathWatch
  with dungeon.FaultHandling {
  ...
}
           

首先看到

private[akka]

,說明

ActorCell

隻在

akka

包内可見,是以對于程式猿來說,是看不到它的,是以在使用過程中,是不會與

ActorCell

直接打交道的,最多通過

Actor

内部的

context

屬性看到

ActorCell

的一個剖面。

繼承的

UntypedActorContext

AbstractActorContext

,是對

ActorContext

的一層繼承,是為了适應Java環境下的使用,加入了一些Java下使用的API,

Cell

上面也介紹過。

後面還混入了五種特征,這些特質分别實作了

ActorCell

的某一方面的功能:

  • dungeon.ReceiveTimeout

    ,在介紹

    ActorContext

    中,提到

    receiveTimeout

    setReceiveTimeout

    方法,如何實作在設定的時間内沒有收到消息,就給出一個

    ReceiveTimeout

    的報警消息,就是這裡來控制的;
  • dungeon.Children

    ,是對

    Actor

    自身的child進行管理的地方;
  • dungeon.Dispatch

    ,則是利用dispatcher進行消息發送,并将mailbox注冊到線程池中執行的地方;
  • dungeon.DeathWatch

    ActorContext

    中的

    watch

    和-

    unwatch

    的具體的實作的地方;
  • dungeon.FaultHandling

    ,容錯機制的實作地方。

ActorCell

作為

AKKA Actor

三位成員中的核心所在,内容過于豐富,這裡就簡單介紹到此,後續會針對每塊功能單獨介紹。

總結:

  1. trait Actor

    作為行為定位中心,使用者可以在這裡進行目前

    Actor

    的各種行為的定義,如消息如何處理,在啟動前如果初始化,停止後如何收尾,以及故障重新開機時如果過度等行為;還可以通過

    context

    屬性,使用

    ActorCell

    暴露出來的一些接口,繼續動态行為的控制,child的建立,ReceiveTimeout等控制;
  2. ActorRef

    ,這位一位小蜜,職責很明确,就是幫

    ActorCell

    這位老闆進行郵件的接受,同時在

    AKKA

    内部,還可以進行

    Actor

    的啟停等操作,但這些所有的操作,本質都是代替

    ActorCell

    發号指令,最終還是得有

    ActorCell

    來執行;
  3. ActorCell

    ,作為

    AKKA Actor

    的核心所在,所有的動作,都是在其内部來實作,五髒六腑均在此處。

繼續閱讀