天天看点

消息队列面试必问解析(下)2 RocketMQ和Kafka的消息模型实现单个队列的并行消费4 保证消息的严格顺序总结

2 RocketMQ和Kafka的消息模型

这两个消息队列产品的消息模型是一样的。通过具体案例再次讲解下。

假设有一主题MyTopic,为主题创建5个队列,分布到俩Broker。

消息队列面试必问解析(下)2 RocketMQ和Kafka的消息模型实现单个队列的并行消费4 保证消息的严格顺序总结

消息生产端

设有3个生产者实例:Produer0、Produer1、Producer2。

这3生产者如何对应到2Broker,又如何对应到5个队列?

无需对应,随便发。

每个生产者可在5个队列中轮询发送,也可随机选个队列发送,或只往某队列发,这皆可。

消费端

很多人没搞清消费组、消费者和队列区别。

消费组

每个消费组是一份订阅,它要消费主题MyTopic下所有队列的全部消息。

队列里的消息并非消费掉就没了,这里的“消费”,只是去队列里面读了消息,并不是删除,消费完这消息,还是在队列里。

多个消费组在消费同一主题时,消费组间互不影响。

比如有2个消费组:G0和G1。

  • G0消费了哪些消息,G1是不知道的,也不用知道
  • G0消费过的消息,G1还可以消费
  • 即使G0积压了很多消息,对G1来说也没有任何影响

消费组内部

一个消费组中可包含多个消费者实例。

比如消费组G1,包含2个消费者C0和C1,那这2个消费者又是怎么和主题MyTopic的5个队列对应的呢?

由于消费确认机制,在同一消费组里,每个队列只能被一个消费者实例占用。

至于如何分配,这里面有很多策略,我就不展开说了。总之保证每个队列分配一个消费者就行了。比如,我们可以让消费者C0消费Q0,Q1和Q2,C1消费Q3和Q4,如果C0宕机了,会触发重新分配,这时候C1同时消费全部5个队列。

队列占用只针对消费组内部,对其他消费组没有影响。

比如队列Q2被消费组G1的消费者C1占用,对消费组G2完全没有影响,G2也可分配它的消费者占用和消费队列Q2。

消费位置

每个消费组内部维护自己的一组消费位置,每个队列对应一个消费位置。

消费位置在服务端保存,并且消费位置和消费者没有关系。

每个消费位置一般就是个整数,记录这个消费组中,这个队列消费到哪个位置了,这位置之前的消息都成功消费了,之后的消息都没有消费或正在消费。

  • 例子的消费位置表格
消息队列面试必问解析(下)2 RocketMQ和Kafka的消息模型实现单个队列的并行消费4 保证消息的严格顺序总结

并没有消费者这一列,即消费者和消费位置没有关系。

实现单个队列的并行消费

如果不要求严格顺序,如何实现单个队列的并行消费?

有很多的实现方式,讲个实现思路。

比如队列中当前有10条消息,编号0-9,当前的消费位置是5。

同时来了三个消费者拉消息,把编号为5、6、7的消息分别给三个消费者,每人一条。

过了一段时间,三个消费成功的响应都回来了,这时候就可以把消费位置更新为8了,就实现了并行消费。

这是理想的情况。还有可能编号为6、7的消息响应回来了,编号5的消息响应一直回不来,怎么办?

这个位置5就是一个消息空洞。为了避免位置5把这个队列卡住,可以先把消费位置5这条消息,复制到一个特殊重试队列,然后依旧把消费位置更新为8,继续消费。

再有消费者来拉消息的时候,优先把重试队列中的那条消息给消费者就可以了。

这是并行消费的一种实现方式。

并行消费开销还是很大的,不应该作为一个常规的,提升消费并发的手段,如果消费慢需要增加消费者的并发数,还是需要扩容队列数。

4 保证消息的严格顺序

怎么保证消息的严格顺序?

主题层面是无法保证严格顺序的,只有在队列上才能保证消息的严格顺序。

如果说,你的业务必须要求全局严格顺序,就只能把消息队列数配置成1,生产者和消费者也只能是一个实例,才能保证全局严格顺序。

大部分情况下,我们并不需要全局严格顺序,只要保证局部有序即可满足。

比如,在传递账户流水记录的时候,只要保证每个账户的流水有序,不同账户间流水记录无需保证顺序。

保证局部严格顺序,可以这样实现。

在发送端,使用账户ID作为Key,采用一致性哈希算法计算出队列编号,指定队列来发送消息。

一致性哈希算法可以保证,相同Key的消息总是发送到同一队列,保证相同Key的消息严格有序。

如果不考虑队列扩容,也可以用队列数量取模的简单方法来计算队列编号。

消息传入kafka的函数中,参数key本身的实现是普通hash还是一致性hash?

Kafka的分区选择器是可以配置的,默认情况下,如果不传入key,采用轮询算法,传入key的话,按照key做普通hash,然后哈希值与分区总数取模,计算出分区号。

总结

使用消息队列,大部分的难点在宏观架构层面,要解决这些难点,你需要掌握消息队列宏观层面上的实现原理和最佳实践,这样,无论你使用什么消息队列,都可以做到游刃有余。在选定了合适的消息队列产品,准备写代码之前,再去文档中查看这些细节都来得及。

所以,我们先讲的是消息队列的使用,注重通用的原理。

关于事务消息的ACID那个问题没有提到,能不能找机会说下你的看法?

没有实现隔离性,一致性只能保证最终一致,而原子操作和持久化可以通过各种手段实现。

严格的说,ACI都没实现,只有D实现了。

放宽点儿限制的话,或者考虑实际效果的话,A(原子性)绝大多数情况下还是可以保证的,即“要么都成功,要么都失败”。C(一致性)通过补偿,大部分情况下也可以保证最终一致。

如果一个topic中有多个消费者,但每个消费者可能只需要其中的一部分数据,一种可行的方案是消费者消费全量消息,然后自行过滤;另一种方式是生产者将这些消息进行分类,不同类别的消息分别对应不同的topic,但这样可能会出现N多的topic,topic太多是否又会出现随机io太多导致性能问题,另外对生产端的编码也不友好,每种消息都要感知发到哪个topic中,这种情况下应该如何取舍?

可以使用RocketMQ的服务端过滤功能,正好可以满足这个需求。

JSR330的注释 Inject , 但实际上和spring自身的 Autowired 注释功能相同, 所以我平时都是直接用 spring 自带的注释。 请问老师使用 JSR330 提供的注释是有什么讲究莫

JSR是一个标准,Spring是JSR的一个实现,并做了很多的扩展。

消息队列就是个分布式存储系统。