天天看點

Akka源碼分析-Actor建立(續)

在上一遍部落格中,我們已經分析了actor建立的大緻過程,但隻是涉及到了Dipatcher/Mailbox/ActorCell/InternalActorRef等對象的建立,并沒有介紹我們自定義的繼承Actor特質的類如何完成初始化。這篇文章對這一部分内容進行簡單的補充。

在akka.actor.dungeon.init代碼中,有一段代碼我們當時沒有分析,此處對此代碼進行深入分析,然後才能找到Actor子類完成建立的真實過程。

Akka源碼分析-Actor建立(續)
上面是init的代碼片段,其中有一個局部變量createMessage,根據前後分析,它的值應該是Create這個case class。最後mailbox.systemEnqueue(self, createMessage)這個代碼給actor對應的郵箱發送了該消息。

/**
 * INTERNAL API
 */
@SerialVersionUID(1L)
private[akka] final case class Create(failure: Option[ActorInitializationException]) extends SystemMessage // sent to self from Dispatcher.register           

根據Create類名以及前後上下文分析,這應該是訓示Actor完成初始化的。那麼我們要分析一下actor是如何對該消息響應的。那麼究竟是哪段代碼對這個消息進行響應的呢?

如果讀過之前的文章,你肯定能想起來Mailbox在循環處理消息時,有一個processAllSystemMessages方法,這個方法裡面調用了actor的systemInvoke方法。具體源碼如下:

/**
   * Will at least try to process all queued system messages: in case of
   * failure simply drop and go on to the next, because there is nothing to
   * restart here (failure is in ActorCell somewhere …). In case the mailbox
   * becomes closed (because of processing a Terminate message), dump all
   * already dequeued message to deadLetters.
   */
  final def processAllSystemMessages() {
    var interruption: Throwable = null
    var messageList = systemDrain(SystemMessageList.LNil)
    while ((messageList.nonEmpty) && !isClosed) {
      val msg = messageList.head
      messageList = messageList.tail
      msg.unlink()
      if (debug) println(actor.self + " processing system message " + msg + " with " + actor.childrenRefs)
      // we know here that systemInvoke ensures that only "fatal" exceptions get rethrown
      actor systemInvoke msg
      if (Thread.interrupted())
        interruption = new InterruptedException("Interrupted while processing system messages")
      // don’t ever execute normal message when system message present!
      if ((messageList.isEmpty) && !isClosed) messageList = systemDrain(SystemMessageList.LNil)
    }
    /*
     * if we closed the mailbox, we must dump the remaining system messages
     * to deadLetters (this is essential for DeathWatch)
     */
    // 忽略剩餘源碼
  }           

我們來研究一下systemInvoke的代碼

/*
   * MESSAGE PROCESSING
   */
  //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
  final def systemInvoke(message: SystemMessage): Unit = {
    /*
     * When recreate/suspend/resume are received while restarting (i.e. between
     * preRestart and postRestart, waiting for children to terminate), these
     * must not be executed immediately, but instead queued and released after
     * finishRecreate returns. This can only ever be triggered by
     * ChildTerminated, and ChildTerminated is not one of the queued message
     * types (hence the overwrite further down). Mailbox sets message.next=null
     * before systemInvoke, so this will only be non-null during such a replay.
     */

    def calculateState: Int =
      if (waitingForChildrenOrNull ne null) SuspendedWaitForChildrenState
      else if (mailbox.isSuspended) SuspendedState
      else DefaultState

    @tailrec def sendAllToDeadLetters(messages: EarliestFirstSystemMessageList): Unit =
      if (messages.nonEmpty) {
        val tail = messages.tail
        val msg = messages.head
        msg.unlink()
        provider.deadLetters ! msg
        sendAllToDeadLetters(tail)
      }

    def shouldStash(m: SystemMessage, state: Int): Boolean =
      (state: @switch) match {
        case DefaultState                  ⇒ false
        case SuspendedState                ⇒ m.isInstanceOf[StashWhenFailed]
        case SuspendedWaitForChildrenState ⇒ m.isInstanceOf[StashWhenWaitingForChildren]
      }

    @tailrec
    def invokeAll(messages: EarliestFirstSystemMessageList, currentState: Int): Unit = {
      val rest = messages.tail
      val message = messages.head
      message.unlink()
      try {
        message match {
          case message: SystemMessage if shouldStash(message, currentState) ⇒ stash(message)
          case f: Failed ⇒ handleFailure(f)
          case DeathWatchNotification(a, ec, at) ⇒ watchedActorTerminated(a, ec, at)
          case Create(failure) ⇒ create(failure)
          case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher)
          case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher)
          case Recreate(cause) ⇒ faultRecreate(cause)
          case Suspend() ⇒ faultSuspend()
          case Resume(inRespToFailure) ⇒ faultResume(inRespToFailure)
          case Terminate() ⇒ terminate()
          case Supervise(child, async) ⇒ supervise(child, async)
          case NoMessage ⇒ // only here to suppress warning
        }
      } catch handleNonFatalOrInterruptedException { e ⇒
        handleInvokeFailure(Nil, e)
      }
      val newState = calculateState
      // As each state accepts a strict subset of another state, it is enough to unstash if we "walk up" the state
      // chain
      val todo = if (newState < currentState) unstashAll() reverse_::: rest else rest

      if (isTerminated) sendAllToDeadLetters(todo)
      else if (todo.nonEmpty) invokeAll(todo, newState)
    }

    invokeAll(new EarliestFirstSystemMessageList(message), calculateState)
  }           

由于我們隻是準備分析actor的建立過程,是以上面的代碼,我們隻關注對Create消息的處理:create(failure)。也就是說調用了create函數。

protected def create(failure: Option[ActorInitializationException]): Unit = {

    def clearOutActorIfNonNull(): Unit = {
      if (actor != null) {
        clearActorFields(actor, recreate = false)
        actor = null // ensure that we know that we failed during creation
      }
    }

    failure.foreach { throw _ }

    try {
      val created = newActor()
      actor = created
      created.aroundPreStart()
      checkReceiveTimeout
      if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
    } catch {
      case e: InterruptedException ⇒
        clearOutActorIfNonNull()
        Thread.currentThread().interrupt()
        throw ActorInitializationException(self, "interruption during creation", e)
      case NonFatal(e) ⇒
        clearOutActorIfNonNull()
        e match {
          case i: InstantiationException ⇒ throw ActorInitializationException(
            self,
            """exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either,
               a non-static inner class (in which case make it a static inner class or use Props(new ...) or Props( new Creator ... )
               or is missing an appropriate, reachable no-args constructor.
              """, i.getCause)
          case x ⇒ throw ActorInitializationException(self, "exception during creation", x)
        }
    }
  }           

我們來分析一下這個create函數。其中主要的邏輯都在try中,首先調用newActor函數,建立了Actor執行個體,然後指派給actor字段。actor字段我們已經知道,這是ActorCell的最終actor執行個體。

/*
   * ACTOR INSTANCE HANDLING
   */

  //This method is in charge of setting up the contextStack and create a new instance of the Actor
  protected def newActor(): Actor = {
    contextStack.set(this :: contextStack.get)
    try {
      behaviorStack = emptyBehaviorStack
      val instance = props.newActor()

      if (instance eq null)
        throw ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'")

      // If no becomes were issued, the actors behavior is its receive method
      behaviorStack = if (behaviorStack.isEmpty) instance.receive :: behaviorStack else behaviorStack
      instance
    } finally {
      val stackAfter = contextStack.get
      if (stackAfter.nonEmpty)
        contextStack.set(if (stackAfter.head eq null) stackAfter.tail.tail else stackAfter.tail) // pop null marker plus our context
    }
  }           

newActor函數源碼如上,抛去其他代碼,該函數調用了props.newActor建立了最終的Actor執行個體,也就是我們自定義的Actor子類。通過源碼注釋我們知道behaviorStack是actor目前行為的一個棧。如果讀者用過become的話,對這段代碼應該比較好了解。我們在actor内部使用become方法改變目前actor執行個體的時候,其實是把新的receive函數壓入棧頂,mailbox在調用receive時,其實是取出目前棧頂的receive函數進行處理的。當然這是akka以前版本的預設行為。為什麼這樣說呢?因為新版本預設行為就是簡單的把最新的receive函數替換舊receive函數,如果想恢複舊receive函數,需要開發者在編碼時,再次調用become用舊receive函數替換目前receive。為什麼要這麼做?當然是為了防止開發者惡意或者無意中胡亂調用become,造成棧溢出喽。

props.newActor我們不再深入分析,這應該就是通過反射建立Actor特質的子類,也就是我們自定義的actor。

至此,我們自定義的actor就真正完成了初始化。細心的讀者一定會發現,就連actor最終的執行個體化,都是異步的。因為newActor是通過Create消息觸發的,而Mailbox對所有消息的處理都是在單獨的線程處理的。如果actor的建立過程中有一些線程不安全的代碼,就需要注意喽。