上一篇部落格我們介紹了ActorSystem的建立過程,下面我們就研究一下actor的建立過程。
val system = ActorSystem("firstActorSystem",ConfigFactory.load())
val helloActor= system.actorOf(Props(new HelloActor),"HelloActor")
helloActor ! "Hello"
普通情況下,我們一般使用ActorSystem的actorOf來建立actor,當然通過上一篇部落格的介紹,我們已經知道actorOf是繼承自ActorRefFactory的函數。
def actorOf(props: Props, name: String): ActorRef =
if (guardianProps.isEmpty) guardian.underlying.attachChild(props, name, systemService = false)
else throw new UnsupportedOperationException(
s"cannot create top-level actor [$name] from the outside on ActorSystem with custom user guardian")
也比較簡單,就是判斷一下guardianProps是不是為空,為空則調用guardian.underlying.attachChild方法建立一個ActorRef。new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start() 這段代碼顯示在建立ActorSystemImpl時,guardianProps一定為空,具體guardianProps的作用我們暫時先忽略。
def guardian: LocalActorRef = provider.guardian
/**
* Reference to the supervisor used for all top-level user actors.
*/
def guardian: LocalActorRef
通過定位guardian我們發現這是一個LocalActorRef,而且通過官方源碼的說明可以看出,這是一個root監督者,用來監督所有使用者建立的actor。Akka的actor是按照樹狀結建構立,都是有一定層級的,actor的路徑一般都是/user/actorParent1/actorChild1,其中guardian是user的位置。
/**
* Local (serializable) ActorRef that is used when referencing the Actor on its "home" node.
*
* INTERNAL API
*/
private[akka] class LocalActorRef private[akka] (
_system: ActorSystemImpl,
_props: Props,
_dispatcher: MessageDispatcher,
_mailboxType: MailboxType,
_supervisor: InternalActorRef,
override val path: ActorPath)
extends ActorRefWithCell with LocalRef
上面是LocalActorRef的定義。上一篇部落格我們也介紹了provider的建立過程,它預設是一個LocalActorRefProvider,那就可以找到guardian具體建立的過程了。
override lazy val guardian: LocalActorRef = {
val cell = rootGuardian.underlying
cell.reserveChild("user")
val ref = new LocalActorRef(system, system.guardianProps.getOrElse(Props(classOf[LocalActorRefProvider.Guardian], guardianStrategy)),
defaultDispatcher, defaultMailbox, rootGuardian, rootPath / "user")
cell.initChild(ref)
ref.start()
ref
}
分析上面的代碼我們看到,LocalActorRef建立時傳入了幾個非常重要的參數:defaultDispatcher、defaultMailbox、rootGuardian和rootPath / "user"。之是以重要,是因為通過它們我們可以再深入actor的建立過程。Dispatcher和mailbox都是actor運作非常重要的概念,其中mailbox負責存儲actor收到的消息,dispatcher負責從mailbox取消息,配置設定線程給actor執行具體的業務邏輯。我們逐一進行簡要分析。
/**
* The one and only default dispatcher.
*/
def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId)
/**
* The id of the default dispatcher, also the full key of the
* configuration of the default dispatcher.
*/
final val DefaultDispatcherId = "akka.actor.default-dispatcher"
通過追蹤defaultDispatcher的建立,我們最終定位到了上面這段代碼,很明顯是根據預設配置建立了akka.actor.default-dispatcher對應的MessageDispatcher執行個體。那麼akka.actor.default-dispatcher究竟是什麼呢?這個得從reference.conf裡面看一下。
default-dispatcher {
# Must be one of the following
# Dispatcher, PinnedDispatcher, or a FQCN to a class inheriting
# MessageDispatcherConfigurator with a public constructor with
# both com.typesafe.config.Config parameter and
# akka.dispatch.DispatcherPrerequisites parameters.
# PinnedDispatcher must be used together with executor=thread-pool-executor.
type = "Dispatcher"
# Which kind of ExecutorService to use for this dispatcher
# Valid options:
# - "default-executor" requires a "default-executor" section
# - "fork-join-executor" requires a "fork-join-executor" section
# - "thread-pool-executor" requires a "thread-pool-executor" section
# - "affinity-pool-executor" requires an "affinity-pool-executor" section
# - A FQCN of a class extending ExecutorServiceConfigurator
executor = "default-executor"
# This will be used if you have set "executor = "default-executor"".
# If an ActorSystem is created with a given ExecutionContext, this
# ExecutionContext will be used as the default executor for all
# dispatchers in the ActorSystem configured with
# executor = "default-executor". Note that "default-executor"
# is the default value for executor, and therefore used if not
# specified otherwise. If no ExecutionContext is given,
# the executor configured in "fallback" will be used.
default-executor {
fallback = "fork-join-executor"
}
}
很明顯這是一個fork-join-executor,那麼fork-join-executor具體是如何完成執行個體的建立呢?從lookup這段代碼來看,是通過MessageDispatcherConfigurator來構造的,根據類名來猜,它應該是讀取配置,然後建立MessageDispatcher類的執行個體的。那麼MessageDispatcherConfigurator具體是什麼呢?
abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: DispatcherPrerequisites) {
val config: Config = new CachingConfig(_config)
/**
* Returns an instance of MessageDispatcher given the configuration.
* Depending on the needs the implementation may return a new instance for
* each invocation or return the same instance every time.
*/
def dispatcher(): MessageDispatcher
def configureExecutor(): ExecutorServiceConfigurator = {
def configurator(executor: String): ExecutorServiceConfigurator = executor match {
case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case "affinity-pool-executor" ⇒ new AffinityPoolConfigurator(config.getConfig("affinity-pool-executor"), prerequisites)
case fqcn ⇒
val args = List(
classOf[Config] → config,
classOf[DispatcherPrerequisites] → prerequisites)
prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({
case exception ⇒ throw new IllegalArgumentException(
("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
make sure it has an accessible constructor with a [%s,%s] signature""")
.format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)
}).get
}
config.getString("executor") match {
case "default-executor" ⇒ new DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback")))
case other ⇒ configurator(other)
}
}
}
MessageDispatcherConfigurator代碼不是太長,簡單浏覽一下代碼就會發現,fork-join-executor對應了ForkJoinExecutorConfigurator。這個類是一個抽象類,裡面有一個dispatcher函數傳回MessageDispatcher,那麼究竟是哪個子類實作了這個方法呢?我們再來看一下lookupConfigurator的具體代碼,就會發現其中有一段configuratorFrom(config(id))代碼非常可疑,它建立了MessageDispatcherConfigurator類的一個執行個體。
private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
dispatcherConfigurators.get(id) match {
case null ⇒
// It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup.
// That shouldn't happen often and in case it does the actual ExecutorService isn't
// created until used, i.e. cheap.
val newConfigurator =
if (cachingConfig.hasPath(id)) configuratorFrom(config(id))
else throw new ConfigurationException(s"Dispatcher [$id] not configured")
dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
case null ⇒ newConfigurator
case existing ⇒ existing
}
case existing ⇒ existing
}
}
private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = {
if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render)
cfg.getString("type") match {
case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites)
case "BalancingDispatcher" ⇒
// FIXME remove this case in 2.4
throw new IllegalArgumentException("BalancingDispatcher is deprecated, use a BalancingPool instead. " +
"During a migration period you can still use BalancingDispatcher by specifying the full class name: " +
classOf[BalancingDispatcherConfigurator].getName)
case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites)
case fqn ⇒
val args = List(classOf[Config] → cfg, classOf[DispatcherPrerequisites] → prerequisites)
prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({
case exception ⇒
throw new ConfigurationException(
("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " +
"make sure it has constructor with [com.typesafe.config.Config] and " +
"[akka.dispatch.DispatcherPrerequisites] parameters")
.format(fqn, cfg.getString("id")), exception)
}).get
}
}
而進入到configuratorFrom函數就會發現,它根據配置的type字段分别建立不同的MessageDispatcherConfigurator,而前面的配置檔案中type是Dispatcher。那就對應了DispatcherConfigurator,這又是一個什麼類呢?它是一個MessageDispatcherConfigurator子類,并且實作了dispatcher函數。這個函數建立了最終的MessageDispatcher。這個類又調用了configureExecutor()方法傳入了一個ExecutorServiceConfigurator執行個體,根據前面的代碼我們知道這就是ForkJoinExecutorConfigurator。
/**
* Configurator for creating [[akka.dispatch.Dispatcher]].
* Returns the same dispatcher instance for each invocation
* of the `dispatcher()` method.
*/
class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance = new Dispatcher(
this,
config.getString("id"),
config.getInt("throughput"),
config.getNanosDuration("throughput-deadline-time"),
configureExecutor(),
config.getMillisDuration("shutdown-timeout"))
/**
* Returns the same dispatcher instance for each invocation
*/
override def dispatcher(): MessageDispatcher = instance
}
自此一個MessageDispatcher建立完成。這建立過程真是曲折蜿蜒啊,哈哈哈。不過有些是為了抽象、封裝,有些是為了可配置,稍微複雜了點。下面就分析defaultMailbox如何建立的。
private lazy val defaultMailbox = system.mailboxes.lookup(Mailboxes.DefaultMailboxId)
跟dispatcher有點類似,也是同樣的lookup建立的,當然這也是為了可配置(DefaultMailboxId = "akka.actor.default-mailbox")。跟蹤lookup來到以下代碼。
private def lookupConfigurator(id: String): MailboxType = {
mailboxTypeConfigurators.get(id) match {
case null ⇒
// It doesn't matter if we create a mailbox type configurator that isn't used due to concurrent lookup.
val newConfigurator = id match {
// TODO RK remove these two for Akka 2.3
case "unbounded" ⇒ UnboundedMailbox()
case "bounded" ⇒ new BoundedMailbox(settings, config(id))
case _ ⇒
if (!settings.config.hasPath(id)) throw new ConfigurationException(s"Mailbox Type [${id}] not configured")
val conf = config(id)
val mailboxType = conf.getString("mailbox-type") match {
case "" ⇒ throw new ConfigurationException(s"The setting mailbox-type, defined in [$id] is empty")
case fqcn ⇒
val args = List(classOf[ActorSystem.Settings] → settings, classOf[Config] → conf)
dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({
case exception ⇒
throw new IllegalArgumentException(
s"Cannot instantiate MailboxType [$fqcn], defined in [$id], make sure it has a public" +
" constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters",
exception)
}).get
}
if (!mailboxNonZeroPushTimeoutWarningIssued) {
mailboxType match {
case m: ProducesPushTimeoutSemanticsMailbox if m.pushTimeOut.toNanos > 0L ⇒
warn(s"Configured potentially-blocking mailbox [$id] configured with non-zero pushTimeOut (${m.pushTimeOut}), " +
s"which can lead to blocking behavior when sending messages to this mailbox. " +
s"Avoid this by setting `$id.mailbox-push-timeout-time` to `0`.")
mailboxNonZeroPushTimeoutWarningIssued = true
case _ ⇒ // good; nothing to see here, move along, sir.
}
}
mailboxType
}
mailboxTypeConfigurators.putIfAbsent(id, newConfigurator) match {
case null ⇒ newConfigurator
case existing ⇒ existing
}
case existing ⇒ existing
}
}
跟dispatcher建立有點類似,就是先查找有沒有,沒有就建立一個,隻不過不同的是,這段代碼隻是建立了MailboxType,而沒有直接建立真正的消息隊列,不過後面再具體分析。那akka.actor.default-mailbox究竟是什麼呢?同樣需要翻reference.conf配置
default-mailbox {
# FQCN of the MailboxType. The Class of the FQCN must have a public
# constructor with
# (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
mailbox-type = "akka.dispatch.UnboundedMailbox"
# If the mailbox is bounded then it uses this setting to determine its
# capacity. The provided value must be positive.
# NOTICE:
# Up to version 2.1 the mailbox type was determined based on this setting;
# this is no longer the case, the type must explicitly be a bounded mailbox.
mailbox-capacity = 1000
# If the mailbox is bounded then this is the timeout for enqueueing
# in case the mailbox is full. Negative values signify infinite
# timeout, which should be avoided as it bears the risk of dead-lock.
mailbox-push-timeout-time = 10s
# For Actor with Stash: The default capacity of the stash.
# If negative (or zero) then an unbounded stash is used (default)
# If positive then a bounded stash is used and the capacity is set using
# the property
stash-capacity = -1
}
在lookupConfigurator函數中有一段很重要的代碼:dynamicAccess.createInstanceFor[MailboxType](fqcn, args)。它同樣調用了dynamicAccess建立了一個MailboxType的執行個體,執行個體的類型就是mailbox-type的值。那麼akka.dispatch.UnboundedMailbox究竟又是怎麼樣的呢?
/**
* MailboxType is a factory to create MessageQueues for an optionally
* provided ActorContext.
*
* <b>Possibly Important Notice</b>
*
* When implementing a custom mailbox type, be aware that there is special
* semantics attached to `system.actorOf()` in that sending to the returned
* ActorRef may—for a short period of time—enqueue the messages first in a
* dummy queue. Top-level actors are created in two steps, and only after the
* guardian actor has performed that second step will all previously sent
* messages be transferred from the dummy queue into the real mailbox.
*/
trait MailboxType {
def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue
}
trait ProducesMessageQueue[T <: MessageQueue]
/**
* UnboundedMailbox is the default unbounded MailboxType used by Akka Actors.
*/
final case class UnboundedMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new UnboundedMailbox.MessageQueue
}
源碼中對MailboxType的描述也非常清楚。這是一個工廠類,是用來建立MessageQueues的,隻不過這個名字非常奇怪,為啥不叫MailboxFactory呢,或者MessageQueueFactory?鬼知道啊。
MailboxType的建立過程也比較清楚了,具體UnboundedMailbox.MessageQueue的類是怎麼樣的,繼承結構又是怎麼樣的,我們就不再繼續深入分析了。
下面我們來看guardian調用的第一個方法underlying,這個詞的意思是表面下的,下層的,它是一個ActorCell類型。看看它繼承的類,貌似還挺複雜的。
最終調用了ActorCell的attachChild方法,而這個方法調用了makeChild,最重要的代碼如下面紅色框表示,調用了ActorCell.provider的actorOf,通過initChild加入了目前的children,調用actor的start方法,actor建立結束。children具體的資料結構我們暫時也不再深入研究。
不過,通過ActorCell的構造函數以及繼承關系我們知道上面代碼中的provider就是ActorSystemImpl中的provider,也就是預設的LocalActorRefProvider,那我們還得回溯代碼去看具體的actorOf函數。
由于代碼很長,可以将無關的代碼折疊起來。如上圖,會先判斷目前有沒有router,很顯然沒有;又用deployer中的配置,判斷有沒有對目前的dispatcher和mailboxType進行覆寫,很顯然也沒有,一切保持原樣。最後一個if語句,如果async為true則建立RepointableActorRef,根據上面的代碼分析,async是true。RepointableActorRef建立完成之後,調用了initialize完成初始化。
/**
* Initialize: make a dummy cell which holds just a mailbox, then tell our
* supervisor that we exist so that he can create the real Cell in
* handleSupervise().
*
* Call twice on your own peril!
*
* This is protected so that others can have different initialization.
*/
def initialize(async: Boolean): this.type =
underlying match {
case null ⇒
swapCell(new UnstartedCell(system, this, props, supervisor))
swapLookup(underlying)
supervisor.sendSystemMessage(Supervise(this, async))
if (!async) point(false)
this
case other ⇒ throw new IllegalStateException("initialize called more than once!")
}
/**
* This method is supposed to be called by the supervisor in handleSupervise()
* to replace the UnstartedCell with the real one. It assumes no concurrent
* modification of the `underlying` field, though it is safe to send messages
* at any time.
*/
def point(catchFailures: Boolean): this.type
在initialize中,給supervisor給監督者發發送了一個Supervise消息,以便監督自己;然後調用了point,具體含義可參考官方源碼的注釋。其實RepointableActorRef還是比較麻煩的,讀者有興趣可以自己研究,不過我個人感覺它應該主要是為了防止在actor重新建立或建立的過程中消息不會丢失設計的。具體我也沒有太明白,後面再深入研究了。
到這裡system.actorOf基本就算執行結束,它傳回了一個InternalActorRef,這是ActorRef的一個子類。這樣,後續的代碼就可以使用 ! 或tell給actor發送消息了。不過我們雖然大緻研究了actor的建立過程,但并沒有進入深入的研究,比如,我們自身的actor的實作類是在什麼時候初始化的并不知道。當然這并不妨礙我們繼續研究akka的源碼。