天天看点

Akka RoutingAkka Routing

Akka Routing

引言

Akka分布式集群组件中对Routing做了简单介绍,本文将对Routing做更详细的说明。

概念

上篇文章中说Routing由Router和Routee组成,可以理解成Router是个actor,负责接收外界消息,然后根据一定的策略把消息分发给Routee,Routee负责真实的运算。

值得注意的是Router是透明的,Router消息转发给Routee,Routee可以直接给sender(非Router)返回消息,后边举例子说明。

对比下Group Router和Pool Router:

Pool Router中Router是所有Routee的父actor,因此Routees都由Router监管,而且是由Router批量创建的。所以需要特别要注意Router的监管策略,可以设置成SupervisorStrategy.Escalate,保证在Router重启时,不会重新生成Routees。

Group Router的Routee由外界创建的,可以有不同的监管策略,需要多少Router用户自己决定。
           

可以发现,如果服务是主-从模式选择Pool Router,如果服务所有节点都是一样的,建议选择Group Router,当然Pool模式可以,我们可以结合SingleActor使用,保证Router唯一。

分布式集群

如果是cluster模式,Group Router和Pool Router还能一样开箱即用吗?答案是毋庸置疑的。创建Router时稍稍的不一样,用到ClusterRouterGroup、ClusterRouterPool

逻辑策略

它们都从RoutingLogic继承而来:

akka.routing.BalancingRoutingLogic dispatcher共享邮箱均衡

akka.cluster.metrics.AdaptiveLoadBalancingRoutingLogic 资源负载均衡

akka.routing.RoundRobinRoutingLogic 轮询

akka.routing.RandomRoutingLogic 随机

akka.routing.SmallestMailboxRoutingLogic 消息量空闲

akka.routing.BroadcastRoutingLogic 广播

akka.routing.ScatterGatherFirstCompletedRoutingLogic 分散聚集

akka.routing.TailChoppingRoutingLogic 尾部断续

akka.routing.ConsistentHashingRoutingLogic 一致性哈希
           

策略的使用

根据上边的逻辑策略,Group Router 和 Pool Router都有自己的封装。

Group

AdaptiveLoadBalancingGroup
BroadcastGroup
ConsistentHashingGroup
RandomGroup
RoundRobinGroup
ScatterGatherFirstCompletedGroup
TailChoppingGroup
ClusterRouterGroup 用于集群模式,构建时传入local group
           

Pool

AdaptiveLoadBalancingPool
BalancingPool
BroadcastPool
ConsistentHashingPool
RandomPool
RoundRobinPool
ScatterGatherFirstCompletedPool
SmallestMailboxPool
TailChoppingPool
ClusterRouterPool 用于集群模式,构建时传入local pool,可以remote创建Routee
           

案例一

单节点下,class类中创建Router样例:

object RouterFirstTest extends App{

  implicit val system:ActorSystem = ActorSystem()
  implicit val mat:ActorMaterializer = ActorMaterializer()
  implicit val ec:ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

  //创建RobinPool Router
  val router = system.actorOf(RoundRobinPool(5).props(Props[FirstWorkerActor]), "routerFirst")

  //发10条小学
  for(i<-1 to 10){
    router ! s"a$i"
  }
  
  system.terminate()
}

class FirstWorkerActor extends Actor with ActorLogging{
  override def receive: Receive ={
    case s:String=> 
        //打印日志
        log.info(s)
  }
}
           

案例二

单节点下,Actor中创建Router样例:

object RouterSecondTest extends App{

  implicit val system:ActorSystem = ActorSystem()
  implicit val mat:ActorMaterializer = ActorMaterializer()
  implicit val ec:ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

  val ra = system.actorOf(Props[RouterActor],"RA")

  for(i<-1 to 10){
    ra ! s"a$i"
  }

  Thread.sleep(5000)
  system.terminate()
}

class RouterActor extends Actor with ActorLogging{

  //动态调整Routee个数
  val reSizer = DefaultResizer(
    lowerBound = 2, upperBound = 5, pressureThreshold = 1
    ,rampupRate = 1, backoffRate = 0.25
    ,backoffThreshold = 0.25, messagesPerResize = 1
  )
  val router = context.actorOf(RoundRobinPool(2,Option(reSizer)).props(Props[SecondWorkerActor]), "routerSecond")

  override def receive: Receive = {
    case s:String=>
      router ! s
    case BackMessage(s)=>
      log.info(s"back message$s")
  }
}

class SecondWorkerActor extends Actor with ActorLogging{
  override def receive: Receive ={
    case s:String=>
      log.info(s)
      //向上级actor发送消息,RouterActor能收到
      sender() ! BackMessage(s)
  }
}

case class BackMessage(m:String)
           

案例三

集群模式,创建Router

class RouteActor extends Actor{
  val logger = Logger(this.getClass)
  val workerRouter = context.actorOf(
    ClusterRouterGroup(
      AdaptiveLoadBalancingGroup(HeapMetricsSelector),
      ClusterRouterGroupSettings(
        totalInstances = 100,
        routeesPaths = List("/user/Worker"),
        allowLocalRoutees = true,
        useRoles = Set("worker-node"))
      ).props(),
      name = "workerRouter"
    )

  override def receive: Receive = {
    case Start(s)=>
      logger.info(s"${self.path.name} router $s")
      workerRouter ! Start(s)

    case s:String =>
      logger.info(s"test return info $s")
  }
}
           

参考文献

Akka Routing

Akka Cluster Routes

Akka Metrics Extenion