Akka中通過下面的方法向actor發送消息
- ! tell 意味着 “fire-and-forget”,即異步的發送消息無需等待傳回結果
-
? ask 異步發送消息并傳回代表可能回複的Future。
消息在每個發件人的基礎上是有序的。
MailBox
Akka郵箱包含發往Actor的消息。通常每個Actor都有自己的郵箱,但是也有例外,比如BalancingPool所有路由将共享一個郵箱執行個體。
其中MessageQueue(akka.dispatch.MessageQueue)是形成Akka郵箱的心元件之一。
發送給Actor的普通消息将被排入隊列(并随後出隊列)它至少需要支援N個生産者和1個消費者的線程安全。 它實作了入隊列,出隊列等方法
def enqueue(receiver: ActorRef, handle: Envelope): Unit
def dequeue(): Envelope
def numberOfMessages: Int
def hasMessages: Boolean
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit
其中Envelope封裝了message:Any和sender:ActorRef兩個成員
SystemMessageQueue提供了systemEnqueue(入隊列)和systemDrain(全部出隊列)方法。MailBox繼承自系統消息隊列SystemMessageQueue和ForkJoinTask,實作了Runnable接口,同時包含ActorCell成員和MessageQueue成員
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {
var actor: ActorCell = _
}
其中ForkJoinTask是用少數線程執行海量獨立任務的極好架構(獨立任務指的是任務和任務之間不要有共享資料,否則會有并發通路的問題)
MailBox代理了MessageQueue的所有方法。MessageQueue的具體類型,根據MailBoxType的不同而不同。
tell 操作
在建立ActorSystem時,初始化預設的dispatcher,預設ForkJoinPool(ExecutorService)
在使用actorRef ! Message發送消息時,調用了actorCell對應的sendMessage方法,其中調用了dispatcher.dispatch方法
可以在ActorRef中可以看到
def ! (message: Any)(implicit sender: ActorRef = Actor.noSender): Unit
在ActorCell.scala中
final def sendMessage(message: Any, sender: ActorRef): Unit =
sendMessage(Envelope(message, sender, system))
之後可以追蹤到dungeon的Dispatch.scala檔案
def sendMessage(msg: Envelope): Unit =
try {
val msgToDispatch =
if (system.settings.SerializeAllMessages) serializeAndDeserialize(msg)
else msg
dispatcher.dispatch(this, msgToDispatch)
} catch handleException
而代碼裡的dispatcher.dispatch可以在dispatch.Dispatcher中找到:
/**
* INTERNAL API
*/
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
}
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) {
try {
executorService execute mbox
true
} catch {
case e: RejectedExecutionException ⇒
try {
executorService execute mbox
true
} catch { //Retry once
case e: RejectedExecutionException ⇒
mbox.setAsIdle()
eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
throw e
}
}
} else false
} else false
}
dispatch方法做了兩件事情:
一是将消息放到actorCell的消息隊列中(maiBox 是 ActorCell 的成員變量)
二是調用dispather底層的線程池executor execute mbox執行mbox.run()(mailBox繼承了
Runnable 接口是以能放入ExecutorService 中執行),
override final def run(): Unit = {
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() //First, deal with any system messages
processMailbox() //Then deal with messages
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
}
}
/**
* Process the messages in the mailbox
*/
@tailrec private final def processMailbox(
left: Int = java.lang.Math.max(dispatcher.throughput, ),
deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else L): Unit =
if (shouldProcessMessage) {
val next = dequeue()
if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor invoke next
if (Thread.interrupted())
throw new InterruptedException("Interrupted while processing actor messages")
processAllSystemMessages()
if ((left > ) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < ))
processMailbox(left - , deadlineNs)
}
}
執行mbox.run()中,先從SystemMessage連結清單中處理系統消息,
然後從MessageQueue成員中處理使用者消息。
處理使用者消息時,run 是一個遞歸函數,每次調用處理一個消息,
處理邏輯通過調用actorCell的invoke方法實作,根據dispatcher
的throughput決定處理多少條消息,
根據dispatcher的throughputDeadlineTime決定處理多長時間,
長度和時間在處理完一條消息後檢查一次。
final def invoke(messageHandle: Envelope): Unit = {
val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout]
try {
currentMessage = messageHandle
if (influenceReceiveTimeout)
cancelReceiveTimeout()
messageHandle.message match {
case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle)
case msg ⇒ receiveMessage(msg)
}
currentMessage = null // reset current message after successful invocation
} catch handleNonFatalOrInterruptedException { e ⇒
handleInvokeFailure(Nil, e)
} finally {
if (influenceReceiveTimeout)
checkReceiveTimeout // Reschedule receive timeout
}
}
final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)
對 PoisonKill, Terminate 系統消息的處理在 autoReceiveMessage 中,
對普通消息的處理在 receiveMessage 中,
private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack
可以看到behaviorStack 是一個 List[Actor.Receive],
type Receive = PartialFunction[Any, Unit]
其中Receive (PartialFunction[Any, Unit])函數就是我們寫的對 message 的處理邏輯。
因為 Actor 支援通過 become/unbecome 切換形态,
是以behaviorStack.head就是目前的Receive處理邏輯。
對于ForkJoinPool這種executor,每次執行execute(mbox)時,實
際上都是先建立一個繼承自ForkJoinTask的MailboxExecutionTask,
其中的exec方法調用mbox.run方法,是以每次執行都會建立一個ForkJoinTask對象。
還有一點,消息隊列都是放到actor對應的mailbox中(以Envelope的形式封裝消息本身和sender),
而執行的task對象會放到Executor的每個線程對應的工作隊列中,task和消息分别使用不同的隊列。
參考
https://doc.akka.io/docs/akka/snapshot/mailboxes.html
https://doc.akka.io/docs/akka/snapshot/actors.html#send-messages
http://spartan1.iteye.com/blog/1641322