Akka Dispatcher是維持Akka Actor動作的核心元件,是整個Akka架構的引擎。它是基于Java的Executor架構來實作的。Dispatcher控制和協調消息并将其分發給運作在底層線程上的Actor,由它來負責排程資源的優化,并保證任務以最快的速度執行。
Akka的高穩定性是建立在“Let It Crash”模型之上的,該模型是基于Supervision和Monitoring實作的。通過定義Supervision和監管政策,實作系統異常處理。
Akka為了保證事務的一緻,引入了STM的概念。STM使用的是“樂觀鎖”,執行臨界區代碼後,會檢測是否産生沖突,如果産生沖突,将復原修改,重新執行臨界區代碼。
Akka中,Dispatcher基于Java Executor架構來實作,提供了異步執行任務的能力。Executor是基于生産者——消費者模型來建構的。這意味着任務的送出和任務的執行是在不同的線程中隔離執行的,即送出任務的線程與執行任務的線程是不同的。
Executor架構有兩個重要實作:
ThreadPoolExecutor:該實作從預定義的線程池中選取線程來執行任務。
ForkJoinPool:使用相同的線程池模型,提供了工作竊取的支援。
Dispatcher運作線上程之上,負責分發其郵箱裡面的Actors和Messages到executor中的線程上運作。在Akka中,提供了4種類型的Dispatcher:
- Dispatcher
- Pinned Dispatcher
- Balancing Dispatcher
-
Calling Thread Dispatcher
對應的也有4種預設的郵箱:
- Unbounded mailbox
- Bounded mailbox
- Unbounded priority mailbox
- Bounded priority mailbox
為Actor指定派發器
一般Actor都會有預設的派發器,如果要指定派發器,要做兩件事:
1)在執行個體化Actor時,指定派發器:
2)建立Actor時,使用withDispatcher指定派發器,如my-dispatcher,然後在applicaction.conf配置檔案中配置派發器
使用Dispatcher派發器
my-dispatcher{
# Dispatcher是基于事件的派發器名稱
type = Dispatcher
# 使用何種ExecutionService
executor = "fork-join-executor"
# 配置fork join池
fork-join-executor{
# 容納基于倍數的并行數的線程數下限
parallelism-min = 2
# 并行數(線程)(CPU核數*2)
parallelism-factor = 2.0
# 容納基于倍數的并行數量的線程數上限
parallelism-max = 10
}
# throughput定義了線程切換到另一個Actor之前處理的消息數上限
# 設定為1表示盡可能公平
throughput = 100
}
使用PinnedDispatcher派發器
my-dispatcher{
# Dispatcher是基于事件的派發器名稱
type = PinnedDispatcher
# 使用何種ExecutionService
executor = "thread-pool-executor"
# 配置fork join池
thread-pool-executor{
# 容納基于倍數的并行數的線程數下限
parallelism-min = 2
# 并行數(線程)(CPU核數*2)
parallelism-factor = 2.0
# 容納基于倍數的并行數量的線程數上限
parallelism-max = 10
}
# throughput定義了線程切換到另一個Actor之前處理的消息數上限
# 設定為1表示盡可能公平
throughput = 100
}
不同派發器的介紹
-
Dispatcher
Dispatcher是Akka中預設的派發器,它是基于事件的分發器,該派發器綁定一組Actor到線程池中。該派發器有如下特點:
1)每一個Actor都有自己的郵箱
2)該派發器都可以被任意數量的Actor共享
3)該派發器可以由ThreadPoolExecutor或ForkJoinPool提供支援
4)該派發器是非阻塞的。
-
Balancing Dispatcher
該派發器是基于事件的分發器,它會将任務比較多的Actor的任務重新分發到比較閑的Actor上運作。該派發器有如下特點:
1)所有Actor共用一個郵箱
2)該派發器隻能被同一種類型的Actor共享
3)該派發器可以由ThreadPoolExecutor或ForkJoinPool提供支援
-
Pinned Dispatcher
該派發器為每一個Actor提供一個單一的、專用的線程。這種做法在I/O操作或者長時間運作的計算中很有用。該派發器有如下特點:
1)每一個Actor都有自己的郵箱
2)每一個Actor都有專用的線程,該線程不能和其他Actor共享
3)該派發器有一個Executor線程池
4)該派發器在阻塞上進行了優化,如:如果程式正在進行I/O操作,那麼這個Actor将會等到任務執行完成。這種阻塞型的操作在性能上要比預設的Dispatcher要好。
-
Calling Thread Dispatcher
該派發器主要用于測試,并且在目前線程運作任務,不會建立新線程,該派發器有如下特點:
1)每一個Actor都有自己的郵箱
2)該派發器都可以被任意數量的Actor共享
3)該派發器由調用線程支援
郵箱
郵箱用于儲存接收的消息,在Akka中除使用BalancingDispather分發器的Actor以外,每個Actor都擁有自己的郵箱。使用同一個BalancingDispather的所有Actor共享同一個郵箱執行個體。
郵箱是基于Java concurrent中的隊列來實作的,它有如下特點:
1)阻塞隊列,直到隊列空間可用,或者隊列中有可用元素
2)有界隊列,它的大小是被限制的
預設的郵箱實作
-
UnboundedMailbox
底層是一個java.util.concurrent.ConcurrentLinkedQueue
是否阻塞:No
是否有界:No
-
BoundedMailbox
底層是一個java.util.concurrent.LinkedBlockingQueue
是否阻塞:Yes
是否有界:Yes
-
UnboundedPriorityMailbox
底層是一個java.util.concurrent.PriorityBlockingQueue
是否阻塞:Yes
是否有界:No
-
BoundedPriorityMailbox
底層是一個java.util.PriorityBlockingQueue
是否阻塞:Yes
是否有界:Yes
還有一些預設的持久郵箱。
Router
當處理到來的消息流時,我們需要一個actor來引導消息路由到目标actor,進而提高消息的配置設定效率。在Akka中這個 actor就是Router。它所管理的一些目标actor叫做routees
Akka定義好的一些Router:
- akka.routing.RoundRobinRouter:輪轉路由器将消息按照輪轉順序發送給routers
- akka.routing.RandomRouter:随機路由器随機選擇一個router,并将消息發送給這個router
- akka.routing.SmallestMailboxRouter:最小郵箱路由器會在routers中選擇郵箱裡資訊最少的router,然後把消息發送給它。
- akka.routing.BroadcastRouter:廣播路由器将相同的消息發送給所有的routers
- akka.routing.ScatterGatherFirstCompletedRouter:靈活路由器先将消息廣播到所有routers,傳回最先完成任務的router的結果給調用者。
路由器的使用
- RoundRobinPool 和 RoundRobinGroupRouter對routees使用輪詢機制
- RandomPool 和 RandomGroupRouter随機選擇routees發送消息
- BalancingPool嘗試從繁忙的routee重新配置設定任務到空閑routee,所有的routee共享一個mailbox
- SmallestMailboxPoolRouter建立的所有routees中誰郵箱中的消息最少發給誰
- BroadcastPool 和 BroadcastGroup廣播的路由器将接收到的消息轉發到它所有的routee。
- ScatterGatherFirstCompletedPool 和 ScatterGatherFirstCompletedGroup将消息發送給所有的routees,然後等待到收到第一個回複,将結果發送回原始發送者。其他的回複将被丢棄
- TailChoppingPool 和 TailChoppingGroup将首先發送消息到一個随機挑取的routee,短暫的延遲後發給第二個routee(從剩餘的routee中随機挑選),以此類推。它等待第一個答複,并将它轉回給原始發送者。其他答複将被丢棄。此Router的目标是通過查詢到多個routee來減少延遲,假設其他的actor可能比第一個actor更快響應。
-
ConsistentHashingPool 和 ConsistentHashingGroup對消息使用一緻性哈希(consistent hashing)選擇routee
有三種方式定義哪些資料作為一緻性哈希鍵
定義路由的hashMapping,将傳入的消息映射到它們一緻哈希鍵。這使決策對發送者透明。·
這些消息可能會實作ConsistentHashable。鍵是消息的一部分,并很友善地與消息定義一起定義。·
消息可以被包裝在一個ConsistentHashableEnvelope中,來定義哪些資料可以用來做一緻性哈希。發送者知道要使用的鍵。
路由器的使用要先建立路由器後使用。 AKKA的路由由router和衆多的routees組成,router和routees都是actor.Router即路由,是負責負載均衡和路由的抽象,有兩種方法來建立router:
1.Actor Group
2.Actor Pool
當處理到來的消息流時,我們需要一個actor來引導消息路由到目标actor,進而提高消息的配置設定效率。在Akka中這個 actor就是Router。它所管理的一些目标actor叫做routees
根據不同的情況需要,Akka提供了幾種路由政策。當然也可以建立自己的路由及政策。Akka提供的路由政策如下:
- akka.routing.RoundRobinRoutingLogic 輪詢
- akka.routing.RandomRoutingLogic 随機
- akka.routing.SmallestMailboxRoutingLogic 空閑
- akka.routing.BroadcastRoutingLogic 廣播
- akka.routing.ScatterGatherFirstCompletedRoutingLogic 分散聚集
- akka.routing.TailChoppingRoutingLogic 尾部斷續
- akka.routing.ConsistentHashingRoutingLogic 一緻性哈希
建立Router Actor
建立router actor 有兩種方式:
- Pool(池)——routees都是router 的子actor,如果routees終止,router将把它們移除
- Group(群組)——routees都建立在router的外部,router通過使用actor來選擇将消息發送到指定路徑,但不監管routees是否終止。Router actor 向 routees 發送消息,與向普通actor發送消息一樣通過其ActorRef。Router actor 不會改變消息的發送人,routees 回複消息時發送回原始發件人,而不是Router actor。
Pool(池)可以通過配置并使用代碼在配置中擷取的方法來實作 (例如建立一個輪詢Router向5個routees發送消息)
Group(群組)有時我們需要單獨地建立routees,然後提供一個Router來供其使用。可以通過将routees的路徑傳遞給Router的配置,消息将通過ActorSelection來發送到這些路徑。
有兩種方式建立路由器:
Pool(池)
import akka.actor._
import akka.routing.{ActorRefRoutee, FromConfig, RoundRobinGroup, RoundRobinPool, RoundRobinRoutingLogic, Router}
object HelloScala {
def main(args: Array[String]): Unit = {
// 建立router
val _system = ActorSystem("testRouter")
// 通知代碼來實作路由器
val hahaRouter = _system.actorOf(RoundRobinPool(5).props(Props[WorkerRoutee]),"router111")
hahaRouter ! RouteeMsg(333)
val myRouter = _system.actorOf(Props[WorkerRoutee].withRouter(RoundRobinPool(nrOfInstances = 5)))
myRouter ! RouteeMsg(22)
val masterRouter = _system.actorOf(Props[MasterRouter],"masterRouter")
masterRouter ! RouteeMsg(100)
}
}
class MasterRouter extends Actor{
var masterRouter = {
val routees = Vector.fill(3){
val r = context.actorOf(Props[WorkerRoutee])
context watch r
ActorRefRoutee(r)
}
Router(RoundRobinRoutingLogic(),routees)
}
override def receive: Receive = {
case w: RouteeMsg =>
masterRouter.route(w,sender())
case Terminated(a) =>
masterRouter = masterRouter.removeRoutee(a)
val r = context.actorOf(Props[WorkerRoutee])
context watch r
masterRouter = masterRouter.addRoutee(r)
}
}
// 定義routee對應的actor類型
case class RouteeMsg(s: Int)
class WorkerRoutee extends Actor{
override def receive: Receive = {
case RouteeMsg(s) =>
println(s"${self.path} mesage#$s")
val caleActor = context.actorOf(Props[Cale])
caleActor ! RouteeMsg(s)
case _ =>
println(s"${self.path}")
}
}
class WorkerRoutee2 extends Actor{
override def receive: Receive = {
case RouteeMsg(s) =>
println(s"${self.path} mesage#@@@@@$s")
val caleActor = context.actorOf(Props[Cale])
caleActor ! RouteeMsg(s)
case _ =>
println(s"${self.path}")
}
}
class Cale extends Actor{
override def receive: Receive = {
case RouteeMsg(s) =>
println(s"${self.path} message#$s")
case _ =>
println(s"${self.path}")
}
}
Group(群組)
import akka.actor._
import akka.routing.{ RoundRobinGroup}
object HelloScala {
def main(args: Array[String]): Unit = {
val _system = ActorSystem("AkkaTestActor")
val tActor = _system.actorOf(Props[TestActor],"testActor")
tActor ! RouteeMsg(13333)
}
}
class TestActor extends Actor{
val routee1 = context.actorOf(Props[WorkerRoutee],"w1")
val routee2 = context.actorOf(Props[WorkerRoutee],"w2")
val routee3 = context.actorOf(Props[WorkerRoutee],"w3")
val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString)
val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter")
override def receive = {
case RouteeMsg(s) =>
testRouter ! RouteeMsg(s)
case _ =>
}
}
// 定義routee對應的actor類型
case class RouteeMsg(s: Int)
class WorkerRoutee extends Actor{
override def receive: Receive = {
case RouteeMsg(s) =>
println(s"${self.path} mesage#$s")
val caleActor = context.actorOf(Props[Cale])
caleActor ! RouteeMsg(s)
case _ =>
println(s"${self.path}")
}
}
class Cale extends Actor{
override def receive: Receive = {
case RouteeMsg(s) =>
println(s"${self.path} message#$s")
case _ =>
println(s"${self.path}")
}
}
特殊消息
Broadcast消息用于向Router所有的routee發送一條消息,不管該Router通常是如何路由消息的。
PoisonPill消息無論哪個actor收到PosionPill消息都會被停止。但是對于PoisonPill消息Router不會将其傳給routees。但仍然能影響到routees,因為Router停止時它的子actor也會停止,就可能會造成消息未處理。是以我們可以将PoisonPill包裝到Broadcast消息中。這樣Router所管理的所有routees将會處理完消息後再處理PoisonPill并停止。
Kill消息當Kill消息被發送到Router,Router将内部處理該消息,并且不會将它發送到其routee。Router将抛出ActorKilledException并失敗,然後Router根據監管的政策,被恢複、重新開機或終止。Router的子routee也将被暫停,也受Router監管的影響,但是獨立在Router外部建立的routee将不會被影響。
import akka.actor._
import akka.routing.{Broadcast, RoundRobinGroup}
object HelloScala {
def main(args: Array[String]): Unit = {
val _system = ActorSystem("AkkaTestActor")
val tActor = _system.actorOf(Props[TestActor],"testActor")
tActor ! PoisonPill
}
}
class TestActor extends Actor{
val routee1 = context.actorOf(Props[WorkerRoutee],"w1")
val routee2 = context.actorOf(Props[WorkerRoutee],"w2")
val routee3 = context.actorOf(Props[WorkerRoutee],"w3")
val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString)
val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter")
override def receive = {
case RouteeMsg(s) =>
testRouter ! RouteeMsg(s)
case RouteeBroadcast =>
testRouter ! Broadcast // 用于向Router所有的routee發送一條消息,不管該Router通常是如何路由消息的。
case Broadcast =>
println("TestActor receive a broadcast message")
case Kill =>
testRouter ! Kill// 當Kill消息被發送到Router,Router将内部處理該消息,并且不會将它發送到其routee。
case PoisonPill =>
testRouter ! PoisonPill // 無論哪個actor收到PosionPill消息都會被停止。但是對于PoisonPill消息Router不會将其傳給routees。
case _ =>
}
}
// 定義routee對應的actor類型
case class RouteeMsg(s: Int)
// 定義廣播資訊
case object RouteeBroadcast
class WorkerRoutee extends Actor{
override def receive: Receive = {
case RouteeMsg(s) =>
println(s"${self.path} mesage#$s")
val caleActor = context.actorOf(Props[Cale])
caleActor ! RouteeMsg(s)
case Broadcast =>
println("WorkerRoutee receive a broadcast message")
case Kill =>
println("WorkerRoutee receive a Kill message")
case PoisonPill =>
println("WorkerRoutee receive a PoisonPill message")
case _ =>
println(s"${self.path}")
}
}
class Cale extends Actor{
override def receive: Receive = {
case RouteeMsg(s) =>
println(s"${self.path} message#$s")
case Broadcast =>
println("Cale receive a broadcast message")
case _ =>
println(s"${self.path}")
}
}
遠端部署Router
既可以建立本地actor來作為Router,也可以指令Router在任一遠端主機上部署子actor。需要将路由配置放在RemoteRouterConfig下,在遠端部署的路徑類中要添加akka-remote子產品:
import akka.actor._
import akka.remote.routing.{RemoteRouterConfig}
import akka.routing.{Broadcast,RoundRobinPool}
object HelloScala {
def main(args: Array[String]): Unit = {
val _system = ActorSystem("AkkaTestActor")
val addresses = Seq(
Address("akka.tcp","remotesys","otherhost",6666),
AddressFromURIString("akka.tcp://[email protected]:6666")
)
// WorkerRoutee 路由部署到遠端的主機上
val routerRemote = _system.actorOf(RemoteRouterConfig(RoundRobinPool(5),addresses).props(Props[WorkerRoutee]))
}
}
// 定義routee對應的actor類型
case class RouteeMsg(s: Int)
class WorkerRoutee extends Actor{
override def receive: Receive = {
case RouteeMsg(s) =>
println(s"${self.path} mesage#$s")
val caleActor = context.actorOf(Props[Cale])
caleActor ! RouteeMsg(s)
case Broadcast =>
println("WorkerRoutee receive a broadcast message")
case Kill =>
println("WorkerRoutee receive a Kill message")
case PoisonPill =>
println("WorkerRoutee receive a PoisonPill message")
case _ =>
println(s"${self.path}")
}
}
class Cale extends Actor{
override def receive: Receive = {
case RouteeMsg(s) =>
println(s"${self.path} message#$s")
case Broadcast =>
println("Cale receive a broadcast message")
case _ =>
println(s"${self.path}")
}
}