天天看點

Akka Dispatchers和Routers

Akka Dispatcher是維持Akka Actor動作的核心元件,是整個Akka架構的引擎。它是基于Java的Executor架構來實作的。Dispatcher控制和協調消息并将其分發給運作在底層線程上的Actor,由它來負責排程資源的優化,并保證任務以最快的速度執行。

Akka的高穩定性是建立在“Let It Crash”模型之上的,該模型是基于Supervision和Monitoring實作的。通過定義Supervision和監管政策,實作系統異常處理。

Akka為了保證事務的一緻,引入了STM的概念。STM使用的是“樂觀鎖”,執行臨界區代碼後,會檢測是否産生沖突,如果産生沖突,将復原修改,重新執行臨界區代碼。

Akka中,Dispatcher基于Java Executor架構來實作,提供了異步執行任務的能力。Executor是基于生産者——消費者模型來建構的。這意味着任務的送出和任務的執行是在不同的線程中隔離執行的,即送出任務的線程與執行任務的線程是不同的。

Executor架構有兩個重要實作:

ThreadPoolExecutor:該實作從預定義的線程池中選取線程來執行任務。

ForkJoinPool:使用相同的線程池模型,提供了工作竊取的支援。

Dispatcher運作線上程之上,負責分發其郵箱裡面的Actors和Messages到executor中的線程上運作。在Akka中,提供了4種類型的Dispatcher:

  • Dispatcher
  • Pinned Dispatcher
  • Balancing Dispatcher
  • Calling Thread Dispatcher

    對應的也有4種預設的郵箱:

  • Unbounded mailbox
  • Bounded mailbox
  • Unbounded priority mailbox
  • Bounded priority mailbox

為Actor指定派發器

一般Actor都會有預設的派發器,如果要指定派發器,要做兩件事:

1)在執行個體化Actor時,指定派發器:

2)建立Actor時,使用withDispatcher指定派發器,如my-dispatcher,然後在applicaction.conf配置檔案中配置派發器

使用Dispatcher派發器

my-dispatcher{
  # Dispatcher是基于事件的派發器名稱
  type = Dispatcher
  # 使用何種ExecutionService
   executor = "fork-join-executor"
  # 配置fork join池
  fork-join-executor{
    # 容納基于倍數的并行數的線程數下限
    parallelism-min = 2
    # 并行數(線程)(CPU核數*2)
    parallelism-factor = 2.0
    # 容納基于倍數的并行數量的線程數上限
    parallelism-max = 10
  }
  # throughput定義了線程切換到另一個Actor之前處理的消息數上限
  # 設定為1表示盡可能公平
  throughput = 100
}
           

使用PinnedDispatcher派發器

my-dispatcher{
  # Dispatcher是基于事件的派發器名稱
  type = PinnedDispatcher
  # 使用何種ExecutionService
  executor = "thread-pool-executor"
  # 配置fork join池
  thread-pool-executor{
    # 容納基于倍數的并行數的線程數下限
    parallelism-min = 2
    # 并行數(線程)(CPU核數*2)
    parallelism-factor = 2.0
    # 容納基于倍數的并行數量的線程數上限
    parallelism-max = 10
  }
  # throughput定義了線程切換到另一個Actor之前處理的消息數上限
  # 設定為1表示盡可能公平
  throughput = 100
}
           

不同派發器的介紹

  • Dispatcher

    Dispatcher是Akka中預設的派發器,它是基于事件的分發器,該派發器綁定一組Actor到線程池中。該派發器有如下特點:

    1)每一個Actor都有自己的郵箱

    2)該派發器都可以被任意數量的Actor共享

    3)該派發器可以由ThreadPoolExecutor或ForkJoinPool提供支援

    4)該派發器是非阻塞的。

  • Balancing Dispatcher

    該派發器是基于事件的分發器,它會将任務比較多的Actor的任務重新分發到比較閑的Actor上運作。該派發器有如下特點:

    1)所有Actor共用一個郵箱

    2)該派發器隻能被同一種類型的Actor共享

    3)該派發器可以由ThreadPoolExecutor或ForkJoinPool提供支援

  • Pinned Dispatcher

    該派發器為每一個Actor提供一個單一的、專用的線程。這種做法在I/O操作或者長時間運作的計算中很有用。該派發器有如下特點:

    1)每一個Actor都有自己的郵箱

    2)每一個Actor都有專用的線程,該線程不能和其他Actor共享

    3)該派發器有一個Executor線程池

    4)該派發器在阻塞上進行了優化,如:如果程式正在進行I/O操作,那麼這個Actor将會等到任務執行完成。這種阻塞型的操作在性能上要比預設的Dispatcher要好。

  • Calling Thread Dispatcher

    該派發器主要用于測試,并且在目前線程運作任務,不會建立新線程,該派發器有如下特點:

    1)每一個Actor都有自己的郵箱

    2)該派發器都可以被任意數量的Actor共享

    3)該派發器由調用線程支援

郵箱

郵箱用于儲存接收的消息,在Akka中除使用BalancingDispather分發器的Actor以外,每個Actor都擁有自己的郵箱。使用同一個BalancingDispather的所有Actor共享同一個郵箱執行個體。

郵箱是基于Java concurrent中的隊列來實作的,它有如下特點:

1)阻塞隊列,直到隊列空間可用,或者隊列中有可用元素

2)有界隊列,它的大小是被限制的

預設的郵箱實作

  • UnboundedMailbox

    底層是一個java.util.concurrent.ConcurrentLinkedQueue

    是否阻塞:No

    是否有界:No

  • BoundedMailbox

    底層是一個java.util.concurrent.LinkedBlockingQueue

    是否阻塞:Yes

    是否有界:Yes

  • UnboundedPriorityMailbox

    底層是一個java.util.concurrent.PriorityBlockingQueue

    是否阻塞:Yes

    是否有界:No

  • BoundedPriorityMailbox

    底層是一個java.util.PriorityBlockingQueue

    是否阻塞:Yes

    是否有界:Yes

    還有一些預設的持久郵箱。

Router

當處理到來的消息流時,我們需要一個actor來引導消息路由到目标actor,進而提高消息的配置設定效率。在Akka中這個 actor就是Router。它所管理的一些目标actor叫做routees

Akka定義好的一些Router:

  • akka.routing.RoundRobinRouter:輪轉路由器将消息按照輪轉順序發送給routers
  • akka.routing.RandomRouter:随機路由器随機選擇一個router,并将消息發送給這個router
  • akka.routing.SmallestMailboxRouter:最小郵箱路由器會在routers中選擇郵箱裡資訊最少的router,然後把消息發送給它。
  • akka.routing.BroadcastRouter:廣播路由器将相同的消息發送給所有的routers
  • akka.routing.ScatterGatherFirstCompletedRouter:靈活路由器先将消息廣播到所有routers,傳回最先完成任務的router的結果給調用者。

路由器的使用

  • RoundRobinPool 和 RoundRobinGroupRouter對routees使用輪詢機制
  • RandomPool 和 RandomGroupRouter随機選擇routees發送消息
  • BalancingPool嘗試從繁忙的routee重新配置設定任務到空閑routee,所有的routee共享一個mailbox
  • SmallestMailboxPoolRouter建立的所有routees中誰郵箱中的消息最少發給誰
  • BroadcastPool 和 BroadcastGroup廣播的路由器将接收到的消息轉發到它所有的routee。
  • ScatterGatherFirstCompletedPool 和 ScatterGatherFirstCompletedGroup将消息發送給所有的routees,然後等待到收到第一個回複,将結果發送回原始發送者。其他的回複将被丢棄
  • TailChoppingPool 和 TailChoppingGroup将首先發送消息到一個随機挑取的routee,短暫的延遲後發給第二個routee(從剩餘的routee中随機挑選),以此類推。它等待第一個答複,并将它轉回給原始發送者。其他答複将被丢棄。此Router的目标是通過查詢到多個routee來減少延遲,假設其他的actor可能比第一個actor更快響應。
  • ConsistentHashingPool 和 ConsistentHashingGroup對消息使用一緻性哈希(consistent hashing)選擇routee

    有三種方式定義哪些資料作為一緻性哈希鍵

    定義路由的hashMapping,将傳入的消息映射到它們一緻哈希鍵。這使決策對發送者透明。·

    這些消息可能會實作ConsistentHashable。鍵是消息的一部分,并很友善地與消息定義一起定義。·

    消息可以被包裝在一個ConsistentHashableEnvelope中,來定義哪些資料可以用來做一緻性哈希。發送者知道要使用的鍵。

路由器的使用要先建立路由器後使用。 AKKA的路由由router和衆多的routees組成,router和routees都是actor.Router即路由,是負責負載均衡和路由的抽象,有兩種方法來建立router:

1.Actor Group

2.Actor Pool

當處理到來的消息流時,我們需要一個actor來引導消息路由到目标actor,進而提高消息的配置設定效率。在Akka中這個 actor就是Router。它所管理的一些目标actor叫做routees

根據不同的情況需要,Akka提供了幾種路由政策。當然也可以建立自己的路由及政策。Akka提供的路由政策如下:

  • akka.routing.RoundRobinRoutingLogic 輪詢
  • akka.routing.RandomRoutingLogic 随機
  • akka.routing.SmallestMailboxRoutingLogic 空閑
  • akka.routing.BroadcastRoutingLogic 廣播
  • akka.routing.ScatterGatherFirstCompletedRoutingLogic 分散聚集
  • akka.routing.TailChoppingRoutingLogic 尾部斷續
  • akka.routing.ConsistentHashingRoutingLogic 一緻性哈希

建立Router Actor

建立router actor 有兩種方式:

  1. Pool(池)——routees都是router 的子actor,如果routees終止,router将把它們移除
  2. Group(群組)——routees都建立在router的外部,router通過使用actor來選擇将消息發送到指定路徑,但不監管routees是否終止。Router actor 向 routees 發送消息,與向普通actor發送消息一樣通過其ActorRef。Router actor 不會改變消息的發送人,routees 回複消息時發送回原始發件人,而不是Router actor。

Pool(池)可以通過配置并使用代碼在配置中擷取的方法來實作 (例如建立一個輪詢Router向5個routees發送消息)

Group(群組)有時我們需要單獨地建立routees,然後提供一個Router來供其使用。可以通過将routees的路徑傳遞給Router的配置,消息将通過ActorSelection來發送到這些路徑。

有兩種方式建立路由器:

Pool(池)

import akka.actor._
import akka.routing.{ActorRefRoutee, FromConfig, RoundRobinGroup, RoundRobinPool, RoundRobinRoutingLogic, Router}
object HelloScala {
  def main(args: Array[String]): Unit = {
    // 建立router
    val _system = ActorSystem("testRouter")
    // 通知代碼來實作路由器
    val hahaRouter = _system.actorOf(RoundRobinPool(5).props(Props[WorkerRoutee]),"router111")
    hahaRouter ! RouteeMsg(333)
    val myRouter = _system.actorOf(Props[WorkerRoutee].withRouter(RoundRobinPool(nrOfInstances = 5)))
    myRouter ! RouteeMsg(22)

    val masterRouter = _system.actorOf(Props[MasterRouter],"masterRouter")
    masterRouter ! RouteeMsg(100)
  }
}
class MasterRouter extends Actor{
  var masterRouter = {
    val routees = Vector.fill(3){
      val r = context.actorOf(Props[WorkerRoutee])
      context watch r
      ActorRefRoutee(r)
    }
    Router(RoundRobinRoutingLogic(),routees)
  }
  override def receive: Receive = {

    case w: RouteeMsg =>
      masterRouter.route(w,sender())
    case Terminated(a) =>
      masterRouter = masterRouter.removeRoutee(a)
      val r = context.actorOf(Props[WorkerRoutee])
      context watch r
      masterRouter = masterRouter.addRoutee(r)

  }
}

// 定義routee對應的actor類型
case class RouteeMsg(s: Int)

class WorkerRoutee extends Actor{
  override def receive: Receive = {
    case RouteeMsg(s) =>
      println(s"${self.path} mesage#$s")
      val caleActor = context.actorOf(Props[Cale])
      caleActor ! RouteeMsg(s)
    case _ =>
      println(s"${self.path}")
  }
}

class WorkerRoutee2 extends Actor{
  override def receive: Receive = {
    case RouteeMsg(s) =>
      println(s"${self.path} mesage#@@@@@$s")
      val caleActor = context.actorOf(Props[Cale])
      caleActor ! RouteeMsg(s)
    case _ =>
      println(s"${self.path}")
  }
}

class Cale extends Actor{
  override def receive: Receive = {
    case RouteeMsg(s) =>
      println(s"${self.path} message#$s")
    case _ =>
      println(s"${self.path}")
  }
}



           

Group(群組)

import akka.actor._
import akka.routing.{ RoundRobinGroup}
object HelloScala {
  def main(args: Array[String]): Unit = {
    val _system = ActorSystem("AkkaTestActor")
    val tActor = _system.actorOf(Props[TestActor],"testActor")
    tActor ! RouteeMsg(13333)
 }
}

class TestActor extends  Actor{
  val routee1 = context.actorOf(Props[WorkerRoutee],"w1")
  val routee2 = context.actorOf(Props[WorkerRoutee],"w2")
  val routee3 = context.actorOf(Props[WorkerRoutee],"w3")
  val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString)

  val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter")
  override def receive = {
    case RouteeMsg(s) =>
      testRouter ! RouteeMsg(s)
    case _ =>
  }
}

// 定義routee對應的actor類型
case class RouteeMsg(s: Int)

class WorkerRoutee extends Actor{
  override def receive: Receive = {
    case RouteeMsg(s) =>
      println(s"${self.path} mesage#$s")
      val caleActor = context.actorOf(Props[Cale])
      caleActor ! RouteeMsg(s)
    case _ =>
      println(s"${self.path}")
  }
}


class Cale extends Actor{
  override def receive: Receive = {
    case RouteeMsg(s) =>
      println(s"${self.path} message#$s")
    case _ =>
      println(s"${self.path}")
  }
}



           

特殊消息

Broadcast消息用于向Router所有的routee發送一條消息,不管該Router通常是如何路由消息的。

PoisonPill消息無論哪個actor收到PosionPill消息都會被停止。但是對于PoisonPill消息Router不會将其傳給routees。但仍然能影響到routees,因為Router停止時它的子actor也會停止,就可能會造成消息未處理。是以我們可以将PoisonPill包裝到Broadcast消息中。這樣Router所管理的所有routees将會處理完消息後再處理PoisonPill并停止。

Kill消息當Kill消息被發送到Router,Router将内部處理該消息,并且不會将它發送到其routee。Router将抛出ActorKilledException并失敗,然後Router根據監管的政策,被恢複、重新開機或終止。Router的子routee也将被暫停,也受Router監管的影響,但是獨立在Router外部建立的routee将不會被影響。

import akka.actor._
import akka.routing.{Broadcast, RoundRobinGroup}
object HelloScala {
  def main(args: Array[String]): Unit = {
    val _system = ActorSystem("AkkaTestActor")
    val tActor = _system.actorOf(Props[TestActor],"testActor")
    tActor ! PoisonPill
 }
}



class TestActor extends  Actor{
  val routee1 = context.actorOf(Props[WorkerRoutee],"w1")
  val routee2 = context.actorOf(Props[WorkerRoutee],"w2")
  val routee3 = context.actorOf(Props[WorkerRoutee],"w3")
  val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString)

  val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter")
  override def receive = {
    case RouteeMsg(s) =>
      testRouter ! RouteeMsg(s)
    case RouteeBroadcast =>
      testRouter ! Broadcast // 用于向Router所有的routee發送一條消息,不管該Router通常是如何路由消息的。
    case Broadcast =>
      println("TestActor receive a broadcast message")
    case Kill =>
      testRouter ! Kill// 當Kill消息被發送到Router,Router将内部處理該消息,并且不會将它發送到其routee。
    case PoisonPill =>
      testRouter ! PoisonPill // 無論哪個actor收到PosionPill消息都會被停止。但是對于PoisonPill消息Router不會将其傳給routees。
    case _ =>
  }
}

// 定義routee對應的actor類型
case class RouteeMsg(s: Int)
// 定義廣播資訊
case object RouteeBroadcast

class WorkerRoutee extends Actor{
  override def receive: Receive = {
    case RouteeMsg(s) =>
      println(s"${self.path} mesage#$s")
      val caleActor = context.actorOf(Props[Cale])
      caleActor ! RouteeMsg(s)
    case Broadcast =>
      println("WorkerRoutee receive a broadcast message")
    case Kill =>
      println("WorkerRoutee receive a Kill message")

    case PoisonPill =>
      println("WorkerRoutee receive a PoisonPill message")

    case _ =>
      println(s"${self.path}")
  }
}


class Cale extends Actor{
  override def receive: Receive = {
    case RouteeMsg(s) =>
      println(s"${self.path} message#$s")
    case Broadcast =>
      println("Cale receive a broadcast message")
    case _ =>
      println(s"${self.path}")
  }
}



           

遠端部署Router

既可以建立本地actor來作為Router,也可以指令Router在任一遠端主機上部署子actor。需要将路由配置放在RemoteRouterConfig下,在遠端部署的路徑類中要添加akka-remote子產品:

import akka.actor._
import akka.remote.routing.{RemoteRouterConfig}
import akka.routing.{Broadcast,RoundRobinPool}
object HelloScala {
  def main(args: Array[String]): Unit = {
    val _system = ActorSystem("AkkaTestActor")
    val addresses = Seq(
      Address("akka.tcp","remotesys","otherhost",6666),
      AddressFromURIString("akka.tcp://[email protected]:6666")
    )
    // WorkerRoutee 路由部署到遠端的主機上
    val routerRemote = _system.actorOf(RemoteRouterConfig(RoundRobinPool(5),addresses).props(Props[WorkerRoutee]))
 }
}

// 定義routee對應的actor類型
case class RouteeMsg(s: Int)
class WorkerRoutee extends Actor{
  override def receive: Receive = {
    case RouteeMsg(s) =>
      println(s"${self.path} mesage#$s")
      val caleActor = context.actorOf(Props[Cale])
      caleActor ! RouteeMsg(s)
    case Broadcast =>
      println("WorkerRoutee receive a broadcast message")
    case Kill =>
      println("WorkerRoutee receive a Kill message")

    case PoisonPill =>
      println("WorkerRoutee receive a PoisonPill message")

    case _ =>
      println(s"${self.path}")
  }
}
class Cale extends Actor{
  override def receive: Receive = {
    case RouteeMsg(s) =>
      println(s"${self.path} message#$s")
    case Broadcast =>
      println("Cale receive a broadcast message")
    case _ =>
      println(s"${self.path}")
  }
}