天天看点

用了这么久的RabbitMQ异步编程竟然都是错的!(上)1 适用场景2 异步处理之坑

优秀的项目都由同步、异步和定时任务三种处理模式相辅相成。其中当属异步编程充满坑点。

1 适用场景

1.1 服务于主流程的分支流程

在注册流程中,数据写DB是主流程,但注册后给用户发优惠券或欢迎短信的操作是分支流程,时效性不强,可异步处理。

1.2 用户无需实时看到结果的流程

比如,下单后的配货、送货流程完全可以进行异步处理,每个阶段处理完成后,再给用户发推送或短信让用户知晓即可。

1.3 MQ

任务的缓冲的分发,流量削峰、服务解耦和消息广播。

当然了异步处理不仅仅是通过 MQ 来实现,还有其他方式

  • 比如开新线程执行,返回 Future
  • 还有各种异步框架,比如 Vertx,它是通过 callback 的方式实现

2 异步处理之坑

异步处理流程的可靠性问题、消息发送模式的区分问题、大量死信消息堵塞队列的问题,为方便操作,本文MQ选型RabbitMQ。

2.1 异步处理需要消息补偿闭环

RabbitMQ虽可将消息落地磁盘,即使MQ异常消息数据也不会丢失,但异步流程在消息发送、传输、处理等环节,都可能发生消息丢失。MQ都无法确保百分百可用,业务设计都需考虑不可用时异步流程将如何继续。

因此,对于异步处理流程,必须考虑补偿或建立主备双活流程。

2.1.1 案例

用户注册后异步发送欢迎消息。

  • 用户注册落DB为同步流程
  • 会员服务收到消息后发送欢迎消息为异步流程
  • 用了这么久的RabbitMQ异步编程竟然都是错的!(上)1 适用场景2 异步处理之坑
  • 蓝线

    MQ异步处理(主线),消息可能丢失(虚线代表异步调用)

  • 绿线

    补偿Job定期消息补偿(备线),以补偿主线丢失的消息

  • 考虑到极端的MQ中间件失效的情况

    要求备线的处理吞吐能力达到主线性能

代码示例

  • UserController 注册+发送异步消息。注册方法,一次性注册10个用户,用户注册消息不能发送出去的概率为50%。
  • 用了这么久的RabbitMQ异步编程竟然都是错的!(上)1 适用场景2 异步处理之坑
  • MemberService 会员服务监听用户注册成功的消息,并发送欢迎短信。使用ConcurrentHashMap存放那些发过短信的用户ID实现幂等,避免相同的用户补偿时重复发短信
  • 用了这么久的RabbitMQ异步编程竟然都是错的!(上)1 适用场景2 异步处理之坑
  • 对于MQ消费程序,处理逻辑务必考虑去重(支持幂等)因为:
  • MQ消息可能会因为中间件本身配置错误、稳定性等原因出现重复
  • 自动补偿重复,比如本例,同一条消息可能既走MQ也走补偿,肯定会出现重复,而且考虑到高内聚,补偿Job本身不会做去重处理
  • 人工补偿重复。出现消息堆积时,异步处理流程必然会延迟。如果我们提供了通过后台进行补偿的功能,那么在处理遇到延迟的时候,很可能会先进行人工补偿,过了一段时间后处理程序又收到消息了,重复处理。我之前就遇到过一次由MQ故障引发的事故,MQ中堆积了几十万条发放资金的消息,导致业务无法及时处理,运营以为程序出错了就先通过后台进行了人工处理,结果MQ系统恢复后消息又被重复处理了一次,造成大量资金重复发放。

小结

异步处理的时候需要考虑消息重复的可能性,处理逻辑需要实现幂等,防止重复处理。

接着定义补偿Job即备线操作。

在CompensationJob中定义一个@Scheduled定时任务,5秒做一次补偿操作,因为Job并不知道哪些用户注册的消息可能丢失,所以是全量补偿,补偿逻辑:每5秒补偿一次,按顺序一次补偿5个用户,下一次补偿操作从上一次补偿的最后一个用户ID开始;对于补偿任务我们提交到线程池进行“异步”处理,提高处理能力。

用了这么久的RabbitMQ异步编程竟然都是错的!(上)1 适用场景2 异步处理之坑

为实现高内聚,主线和备线处理消息,最好使用同一方法。本案例的MemberService监听到MQ消息和CompensationJob补偿,调用的都是welcome。z这里的补偿逻辑简单,实际生产代码应该做到:

考虑配置补偿的频次、每次处理数量,以及补偿线程池大小等参数为合适的值,以满足补偿的吞吐量

考虑备线补偿数据进行适当延迟

比如,对注册时间在30秒之前的用户再进行补偿,以方便和主线MQ实时流程错开,避免冲突。

诸如当前补偿到哪个用户的offset数据,需要落地数据库。

补偿Job本身需要高可用,可以使用类似XXLJob或ElasticJob等任务系统。

运行程序,执行注册方法注册10个用户,输出如下:

[17:01:16.570] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 1
[17:01:16.571] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 5
[17:01:16.572] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 7
[17:01:16.573] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 8
[17:01:16.594] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 1
[17:01:18.597] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 1
[17:01:18.601] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 5
[17:01:20.603] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 5
[17:01:20.604] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 7
[17:01:22.605] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 7
[17:01:22.606] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 8
[17:01:24.611] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 8
[17:01:25.498] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 开始从用户ID 0 补偿
[17:01:27.510] [compensation-threadpool-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 2
[17:01:27.510] [compensation-threadpool-3] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 4
[17:01:27.511] [compensation-threadpool-2] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 3
[17:01:30.496] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 开始从用户ID 5 补偿
[17:01:32.500] [compensation-threadpool-6] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 6
[17:01:32.500] [compensation-threadpool-9] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 9
[17:01:35.496] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 开始从用户ID 9 补偿
[17:01:37.501] [compensation-threadpool-0] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 10
[17:01:40.495] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 开始从用户ID 10 补偿      

可见

  • 共10个用户,MQ发送成功的用户有四个:1、5、7、8
  • 补偿任务第一次运行,补偿了用户2、3、4,第二次运行补偿了用户6、9,第三次运行补充了用户10
  • 针对消息的补偿闭环处理的最高标准是,能够达到补偿全量数据的吞吐量。即若补偿备线足够完善,即使直接把MQ停机,虽然会略微影响处理的及时性,但至少确保流程都能正常执行。

实际开发要考虑异步流程丢消息或处理中断场景。

异步流程需有备线以补偿,比如这里的全量补偿方式,即便异步流程彻底失效,通过补偿也能让业务继续进行。