天天看點

Akka源碼分析-Actor發消息

前面兩篇文章簡單介紹了ActorSystem、actor以及dispatcher和mailbox的建立,下面我們就來看一下actor發消息的内部機制。

val system = ActorSystem("firstActorSystem",ConfigFactory.load())
val helloActor = system.actorOf(Props(new HelloActor),"HelloActor")
helloActor ! "Hello"           

  同樣還是回到一個簡單的akka應用,通過之前的分析我們知道,helloActor應該是一個RepointableActorRef類型的對象,那麼調用 ! 應該也是調用RepointableActorRef對應的 ! 方法。

def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = underlying.sendMessage(message, sender)           

  上面是RepointableActorRef對!方法的實作,其實就是調用underlying.sendMessage。怎麼樣,underliying是不是似曾相似呢?再來看看underliying的定義,它是一個Cell類,不過擷取過程稍顯複雜啊。

/*
  * H E R E   B E   D R A G O N S !
  *
  * There are two main functions of a Cell: message queueing and child lookup.
  * When switching out the UnstartedCell for its real replacement, the former
  * must be switched after all messages have been drained from the temporary
  * queue into the real mailbox, while the latter must be switched before
  * processing the very first message (i.e. before Cell.start()). Hence there
  * are two refs here, one for each function, and they are switched just so.
  */
 @volatile private var _cellDoNotCallMeDirectly: Cell = _
 @volatile private var _lookupDoNotCallMeDirectly: Cell = _
 
 def underlying: Cell = Unsafe.instance.getObjectVolatile(this, cellOffset).asInstanceOf[Cell]
 def lookup = Unsafe.instance.getObjectVolatile(this, lookupOffset).asInstanceOf[Cell]
 
 @tailrec final def swapCell(next: Cell): Cell = {
   val old = underlying
   if (Unsafe.instance.compareAndSwapObject(this, cellOffset, old, next)) old else swapCell(next)
 }
 
 @tailrec final def swapLookup(next: Cell): Cell = {
   val old = lookup
   if (Unsafe.instance.compareAndSwapObject(this, lookupOffset, old, next)) old else swapLookup(next)
 }           

  從官網源碼的注釋來看,這兩個cell的功能進行了嚴格區分。一個用來消息的出隊、入隊,一個用來查找child。不過從initialize的邏輯來看,剛開始underlying是一個UnstartedCell執行個體。

def sendMessage(msg: Envelope): Unit = {
   if (lock.tryLock(timeout.length, timeout.unit)) {
     try {
       val cell = self.underlying
       if (cellIsReady(cell)) {
         cell.sendMessage(msg)
       } else if (!queue.offer(msg)) {
         system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type " + msg.message.getClass + " due to enqueue failure"))
         system.deadLetters.tell(DeadLetter(msg.message, msg.sender, self), msg.sender)
       } else if (Mailbox.debug) println(s"$self temp queueing ${msg.message} from ${msg.sender}")
     } finally lock.unlock()
   } else {
     system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type" + msg.message.getClass + " due to lock timeout"))
     system.deadLetters.tell(DeadLetter(msg.message, msg.sender, self), msg.sender)
   }
 }           

  上面是UnstartedCell的sendMessage的具體實作。從代碼來看如果underlying已經ready的話,就調用相應的sendMessage方法否則就把消息暫存到JLinkedList裡面,其實就是java的LinkedList;如果暫存失敗,則把消息發送到eventStream,并轉發給deadLetters。那麼underlying怎麼判斷是ready呢?

private[this] final def cellIsReady(cell: Cell): Boolean = (cell ne this) && (cell ne null)           

  這判斷方法也挺簡單,就是判斷RepointableActorRef的underlying和目前的cell指針是不是相同。還記得underlying是怎麼初始化的嗎?沒錯,就是一個UnstartedCell。那麼underlying什麼時候被修改了呢,或者說什麼時候ready了呢?這個就要研究RepointableActorRef中用到underlying字段的地方了。

def point(catchFailures: Boolean): this.type =
    underlying match {
      case u: UnstartedCell ⇒
        val cell =
          try newCell(u)
          catch {
            case NonFatal(ex) if catchFailures ⇒
              val safeDispatcher = system.dispatchers.defaultGlobalDispatcher
              new ActorCell(system, this, props, safeDispatcher, supervisor).initWithFailure(ex)
          }
        /*
         * The problem here was that if the real actor (which will start running
         * at cell.start()) creates children in its constructor, then this may
         * happen before the swapCell in u.replaceWith, meaning that those
         * children cannot be looked up immediately, e.g. if they shall become
         * routees.
         */
        swapLookup(cell)
        cell.start()
        u.replaceWith(cell)
        this
      case null ⇒ throw new IllegalStateException("underlying cell is null")
      case _    ⇒ this // this happens routinely for things which were created async=false
    }           

  還記得initialize最後調用了point麼,我們來看看這個函數是幹啥的?看到沒,它在判斷underlying的類型,如果是UnstartedCell做了什麼呢?簡單來說就是它建立了一個新的ActorCell,然後調用新ActorCell的start函數,最後調用UnstartedCell的replaceWith函數。那麼replaceWith做了什麼呢?

def replaceWith(cell: Cell): Unit = locked {
  try {
    def drainSysmsgQueue(): Unit = {
      // using while in case a sys msg enqueues another sys msg
      while (sysmsgQueue.nonEmpty) {
        var sysQ = sysmsgQueue.reverse
        sysmsgQueue = SystemMessageList.LNil
        while (sysQ.nonEmpty) {
          val msg = sysQ.head
          sysQ = sysQ.tail
          msg.unlink()
          cell.sendSystemMessage(msg)
        }
      }
    }
 
    drainSysmsgQueue()
 
    while (!queue.isEmpty) {
      cell.sendMessage(queue.poll())
      // drain sysmsgQueue in case a msg enqueues a sys msg
      drainSysmsgQueue()
    }
  } finally {
    self.swapCell(cell)
  }
}           

  代碼也比較簡單,就是先把系統消息取出發送給新的Cell,然後把原來暫存的消息通過sendMessage轉發給新Cell。最後調用了原來的swapCell函數,用剛才新建立的ActorCell替換underlying。

/**
   * This is called by activate() to obtain the cell which is to replace the
   * unstarted cell. The cell must be fully functional.
   */
  def newCell(old: UnstartedCell): Cell =
    new ActorCell(system, this, props, dispatcher, supervisor).init(sendSupervise = false, mailboxType)           

  我們來看看新ActorCell的建立代碼,也比較簡單,就是new了一個ActorCell,然後調用init進行初始化。其實分析到這裡,基本也就清楚了,helloActor ! "Hello"最終調用了ActorCell的sendMessage方法。不過在ActorCell裡面并沒有直接找到sendMessage的方法,這是為啥呢?是不是我們分析錯了呢。在分析一下newCell方法我們會發現,它并沒有直接傳回ActorCell,而是傳回了ActorCell調用你init之後的對象,我們似乎沒有分析init,那就繼續看吧。

  通過追蹤代碼我們發現,init這是ActorCell從Dispatch繼承的方法。

/**
   * Initialize this cell, i.e. set up mailboxes and supervision. The UID must be
   * reasonably different from the previous UID of a possible actor with the same path,
   * which can be achieved by using ThreadLocalRandom.current.nextInt().
   */
  final def init(sendSupervise: Boolean, mailboxType: MailboxType): this.type = {
    /*
     * Create the mailbox and enqueue the Create() message to ensure that
     * this is processed before anything else.
     */
    val mbox = dispatcher.createMailbox(this, mailboxType)
 
    /*
     * The mailboxType was calculated taking into account what the MailboxType
     * has promised to produce. If that was more than the default, then we need
     * to reverify here because the dispatcher may well have screwed it up.
     */
    // we need to delay the failure to the point of actor creation so we can handle
    // it properly in the normal way
    val actorClass = props.actorClass
    val createMessage = mailboxType match {
      case _: ProducesMessageQueue[_] if system.mailboxes.hasRequiredType(actorClass) ⇒
        val req = system.mailboxes.getRequiredType(actorClass)
        if (req isInstance mbox.messageQueue) Create(None)
        else {
          val gotType = if (mbox.messageQueue == null) "null" else mbox.messageQueue.getClass.getName
          Create(Some(ActorInitializationException(
            self,
            s"Actor [$self] requires mailbox type [$req] got [$gotType]")))
        }
      case _ ⇒ Create(None)
    }
 
    swapMailbox(mbox)
    mailbox.setActor(this)
 
    //  NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
    mailbox.systemEnqueue(self, createMessage)
 
    if (sendSupervise) {
      //  NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
      parent.sendSystemMessage(akka.dispatch.sysmsg.Supervise(self, async = false))
    }
    this
  }           

  首先用dispatcher建立了mailbox,那麼dispatcher從哪裡來的呢?從Dispatch的定義我們看出,繼承Dispatch的一定子類必定是一個ActorCell,那麼很明顯,這個Dispatch就是子類ActorCell的的dispatcher字段。

private[akka] trait Dispatch { this: ActorCell ⇒           

  從前面的分析我們知道dispatcher是akka.dispatch.Dispatcher的一個執行個體,下面是createMailbox函數的具體實作。

/**
 * INTERNAL API
 */
protected[akka] def createMailbox(actor: akka.actor.Cell, mailboxType: MailboxType): Mailbox = {
  new Mailbox(mailboxType.create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue
}           

  下面是Mailbox的定義,它繼承了ForkJoinTask[Unit] 、SystemMessageQueue、Runnable,這好像可以放到線程池去執行的,不過我們先略過不作分析。

/**
 * Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation,
 * but can't be exposed to user defined mailbox subclasses.
 *
 * INTERNAL API
 */
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
  extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable           

  繼續分析init我們發現,它通過swapMailbox方法把新建立的mbox指派給了mailbox,然後又通過setActor把ActorCell與mailbox進行關聯,最後給mailBox發送了一個createMessage。這也不再深入分析,繼續回到Dispatch特質。

  我們發現ActorCell雖然沒有實作sendMessage,但它繼承的Dispatch實作了這個方法。

def sendMessage(msg: Envelope): Unit =
    try {
      val msgToDispatch =
        if (system.settings.SerializeAllMessages) serializeAndDeserialize(msg)
        else msg
 
      dispatcher.dispatch(this, msgToDispatch)
    } catch handleException           

  很明顯,最終調用了dispatcher的dispatch方法,把消息發送出去了。

/**
  * INTERNAL API
  */
 protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
   val mbox = receiver.mailbox
   mbox.enqueue(receiver.self, invocation)
   registerForExecution(mbox, true, false)
 }           

  上面是dispatch的方法,它調用receiver.mailbox的enqueue方法,把消息入隊列,然後調用registerForExecution。

/**
 * Returns if it was registered
 *
 * INTERNAL API
 */
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
  if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
    if (mbox.setAsScheduled()) {
      try {
        executorService execute mbox
        true
      } catch {
        case e: RejectedExecutionException ⇒
          try {
            executorService execute mbox
            true
          } catch { //Retry once
            case e: RejectedExecutionException ⇒
              mbox.setAsIdle()
              eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
              throw e
          }
      }
    } else false
  } else false
}           

  registerForExecution做了什麼呢?很明顯它修改了Mailbox的狀态使其變成Scheduled 。如果設定成功,則把該Mailbox放到executorService去排程。還記不記得Mailbox都實作了哪些接口呢:ForkJoinTask[Unit] 、SystemMessageQueue、Runnable。它當然是可以被線程池排程的啊。

  至此消息的發送就已經分析完畢了,通過上面的分析我們知道,發送消息的過程大概就是先把消息通過Mailbox的enque進入隊列,當然這預設實作就是akka.dispatch.UnboundedMailbox。Mailbox會在ForkJoinPool(預設是這樣的)線程池中申請一個線程進行排程,執行最終的run方法。

override final def run(): Unit = {
   try {
     if (!isClosed) { //Volatile read, needed here
       processAllSystemMessages() //First, deal with any system messages
       processMailbox() //Then deal with messages
     }
   } finally {
     setAsIdle() //Volatile write, needed here
     dispatcher.registerForExecution(this, false, false)
   }
 }           

  下面是run方法的具體實作,也比較簡單,就是調用processAllSystemMessages/processMailbox分别處理系統消息和使用者發送的消息,當然不會全部把消息處理完畢,會有一定的限制(dispatch的吞吐量參數)。最後設定mailbox狀态為idle,然後又調用了dispatcher.registerForExecution,進入下一次線程排程。mailbox這樣以循環的方式對隊列中的消息進行處理。

  由于時間關系,今天就先分析到這裡。我們已經知道了 ! 的内部細節,它隻是把消息放到了mailbox的隊列中,然後mailbox被線程池異步排程,循環處理隊列中的資料。當然考慮到多線程,這個隊列是一個一緻性隊列,線程安全。下一篇博文,我們會詳細介紹processMailbox的功能,下面隻是簡單的貼出這個函數的源碼,讀者也可以先簡單分析一下。

/**
  * Process the messages in the mailbox
  */
 @tailrec private final def processMailbox(
   left:       Int  = java.lang.Math.max(dispatcher.throughput, 1),
   deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
   if (shouldProcessMessage) {
     val next = dequeue()
     if (next ne null) {
       if (Mailbox.debug) println(actor.self + " processing message " + next)
       actor invoke next
       if (Thread.interrupted())
         throw new InterruptedException("Interrupted while processing actor messages")
       processAllSystemMessages()
       if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
         processMailbox(left - 1, deadlineNs)
     }
   }