Akka Routing
引言
Akka分布式集群组件中对Routing做了简单介绍,本文将对Routing做更详细的说明。
概念
上篇文章中说Routing由Router和Routee组成,可以理解成Router是个actor,负责接收外界消息,然后根据一定的策略把消息分发给Routee,Routee负责真实的运算。
值得注意的是Router是透明的,Router消息转发给Routee,Routee可以直接给sender(非Router)返回消息,后边举例子说明。
对比下Group Router和Pool Router:
Pool Router中Router是所有Routee的父actor,因此Routees都由Router监管,而且是由Router批量创建的。所以需要特别要注意Router的监管策略,可以设置成SupervisorStrategy.Escalate,保证在Router重启时,不会重新生成Routees。
Group Router的Routee由外界创建的,可以有不同的监管策略,需要多少Router用户自己决定。
可以发现,如果服务是主-从模式选择Pool Router,如果服务所有节点都是一样的,建议选择Group Router,当然Pool模式可以,我们可以结合SingleActor使用,保证Router唯一。
分布式集群
如果是cluster模式,Group Router和Pool Router还能一样开箱即用吗?答案是毋庸置疑的。创建Router时稍稍的不一样,用到ClusterRouterGroup、ClusterRouterPool
逻辑策略
它们都从RoutingLogic继承而来:
akka.routing.BalancingRoutingLogic dispatcher共享邮箱均衡
akka.cluster.metrics.AdaptiveLoadBalancingRoutingLogic 资源负载均衡
akka.routing.RoundRobinRoutingLogic 轮询
akka.routing.RandomRoutingLogic 随机
akka.routing.SmallestMailboxRoutingLogic 消息量空闲
akka.routing.BroadcastRoutingLogic 广播
akka.routing.ScatterGatherFirstCompletedRoutingLogic 分散聚集
akka.routing.TailChoppingRoutingLogic 尾部断续
akka.routing.ConsistentHashingRoutingLogic 一致性哈希
策略的使用
根据上边的逻辑策略,Group Router 和 Pool Router都有自己的封装。
Group
AdaptiveLoadBalancingGroup
BroadcastGroup
ConsistentHashingGroup
RandomGroup
RoundRobinGroup
ScatterGatherFirstCompletedGroup
TailChoppingGroup
ClusterRouterGroup 用于集群模式,构建时传入local group
Pool
AdaptiveLoadBalancingPool
BalancingPool
BroadcastPool
ConsistentHashingPool
RandomPool
RoundRobinPool
ScatterGatherFirstCompletedPool
SmallestMailboxPool
TailChoppingPool
ClusterRouterPool 用于集群模式,构建时传入local pool,可以remote创建Routee
案例一
单节点下,class类中创建Router样例:
object RouterFirstTest extends App{
implicit val system:ActorSystem = ActorSystem()
implicit val mat:ActorMaterializer = ActorMaterializer()
implicit val ec:ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
//创建RobinPool Router
val router = system.actorOf(RoundRobinPool(5).props(Props[FirstWorkerActor]), "routerFirst")
//发10条小学
for(i<-1 to 10){
router ! s"a$i"
}
system.terminate()
}
class FirstWorkerActor extends Actor with ActorLogging{
override def receive: Receive ={
case s:String=>
//打印日志
log.info(s)
}
}
案例二
单节点下,Actor中创建Router样例:
object RouterSecondTest extends App{
implicit val system:ActorSystem = ActorSystem()
implicit val mat:ActorMaterializer = ActorMaterializer()
implicit val ec:ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
val ra = system.actorOf(Props[RouterActor],"RA")
for(i<-1 to 10){
ra ! s"a$i"
}
Thread.sleep(5000)
system.terminate()
}
class RouterActor extends Actor with ActorLogging{
//动态调整Routee个数
val reSizer = DefaultResizer(
lowerBound = 2, upperBound = 5, pressureThreshold = 1
,rampupRate = 1, backoffRate = 0.25
,backoffThreshold = 0.25, messagesPerResize = 1
)
val router = context.actorOf(RoundRobinPool(2,Option(reSizer)).props(Props[SecondWorkerActor]), "routerSecond")
override def receive: Receive = {
case s:String=>
router ! s
case BackMessage(s)=>
log.info(s"back message$s")
}
}
class SecondWorkerActor extends Actor with ActorLogging{
override def receive: Receive ={
case s:String=>
log.info(s)
//向上级actor发送消息,RouterActor能收到
sender() ! BackMessage(s)
}
}
case class BackMessage(m:String)
案例三
集群模式,创建Router
class RouteActor extends Actor{
val logger = Logger(this.getClass)
val workerRouter = context.actorOf(
ClusterRouterGroup(
AdaptiveLoadBalancingGroup(HeapMetricsSelector),
ClusterRouterGroupSettings(
totalInstances = 100,
routeesPaths = List("/user/Worker"),
allowLocalRoutees = true,
useRoles = Set("worker-node"))
).props(),
name = "workerRouter"
)
override def receive: Receive = {
case Start(s)=>
logger.info(s"${self.path.name} router $s")
workerRouter ! Start(s)
case s:String =>
logger.info(s"test return info $s")
}
}
参考文献
Akka Routing
Akka Cluster Routes
Akka Metrics Extenion