天天看点

基于kafka的消息队列的设计kafka的设计kafka的api消息队列服务

kafka的设计

动机

建⽴一个⾼吞吐量、低延迟、分布式的消息系统。但从设计来看,它可能更像是⼀个数据库的⽇志系统。

持久性

文件系统其实也挺快!

⾸先,SSD硬盘的顺序读写速度可以达到⼏百兆(普遍在300M-600M,有些好的硬盘读取的速度甚⾄可以接近千兆)。 其次,操作系统也做了许多策略来优化硬盘的读写速度。

为什么不使用内存来存储数据呢?理由如下:

1. 现代操作系统为了提高随机读硬盘的性能,会”胆大包天“地使⽤内存:将空的内存来作为硬盘的⻚缓存,所以,即使基于文件系统,可能也会获得不错的性能。如果再在进程里维护⼀个数据内存,可能会导致同一份数据在内存中存储了两份。

2. 此外,kafka是基于JVM开发的(使⽤了scala语言)。JVM的内存管理有两个显而易见的问题:

a) 存储对象的时候,jvm还需要存储对象头等额外的信息,经常会导致需要使用接近数据⼤小两倍的内存。

b) 当内存增⻓的时候,内存的垃圾回收将越来越慢

综上所述,基于⽂件系统并依赖于页缓存来设计一个系统,会⽐基于内存更优。有⼏个优势:

a) 32G内存的机器,可以利用到的页缓存可达到 28-30G,⽽且没有GC的困扰

b) 重启服务时⾮常快,⽽纯内存的系统,10G的数据在服务重启时至少需要10分钟

c) 硬盘和内存之间的数据映射更为简单了,因为操作系统已经帮你做了大部分的工作

性能

在很多系统中,由于某些下游基础服务很容易成为瓶颈,涉及到这些瓶颈服务的小变化也会导致产生问题。

硬盘的顺序读写性能,我们已经阐述过了,事实上,硬盘的顺序读写性能是很有保证的,并不会成为系统瓶颈。除此之外,可能还存在着两个隐藏的瓶颈:

1. 过多的⼩数据的零碎的IO操作

2. 昂贵的字节复制:例如要将一段数据发送到⽹卡,要经过以下步骤:

1) 从硬盘读取到内核空间(read系统调⽤用)

2) 从内核空间复制到⽤户空间(read系统调用)

3) 从⽤户空间⼜写⼊内核空间(需要写入socket buffer)

4) 从socket buffer复制到NIC buffer

如何解决以上两个问题?

1. 使用“message set”的模型,⼀次性传输/写入/读取多条数据

2. 使用sendfile系统调⽤,将数据从⻚缓存(pagecache)直接拷⻉到NIC缓存 (NIC:Network Interface Card),省去中间的许多步骤

消费者的消费position消费者的消费记录对⼀个消息系统的来说,也是性能关键之⼀。

按照传统的消息系统(例如beanstalkd),消费者每成功消费一条消息后,会给服务器器broker返回⼀一个ACK,然后broker将对应的消息删除掉。这样其实会有一些问题:

1) ACK丢失会导致消息被处理多次

2) 性能会受影响,因为每条消息都需要维护多种状态

kafka对消费的position有着更好的处理方式: ⾸先,我们知道,kafka对每个topic可以分成多个partition,每个partition限制了最多只能有⼀个cosumer来处理(如果有多个consumer,那么其余的consumer将收不到数据),同时消费完的消息也不用删除,这就使得每个consumer在某个partition中的消费position是一个简单的整数,⽽不用维护各种 状态。此外,还有⼀个好处是,消费者可以⾃行调整它在某个partition的position,从⽽重复处理消息。

消息传递的语义

消息系统中消息传递的语义有三种级别:

1) At most once —— 最多⼀次:消息最多被传递一次,消息可能会丢失但从不会重复传递

2) At least once —— 最少⼀次:消息⾄少会被传递一次,消息不会丢失但可能会重复传递

3) Exactly once —— 刚好一次:消息不会丢失,并且仅仅会只传递一次

首先,我们来看看对于生产者:如何保证⼀条消息正确发送到broker呢? 在kafka中,生产者发布消息到broker后,有个“已提交(committed)”的概念:⼀旦消息已提交了,意味着存放该消息的partition的任一个broker(一个partition会复制到指定数量的broker中,以提⾼容灾能力)存活着,那该消息就不会丢失。 ⽣产者发布消息时,⼀个常见的问题是可能在发布过程中遇到了网络错误,⽣产者是⽆法判断是在committed之前还是之后的,这就可能造成多次发布。要解决这个问题,需要使得发布消息的操作具备幂等性——具体的做法可以是在producer端⽣成⼀个唯⼀消息编号,当且仅当消息提交成功后,broker记录唯⼀的消息编号,并向producer返回一个ACK,producer收到ACK后才确认消息已发布成功,否则重发消息。但在kafka0.8.2版本,这个发布幂等性的功能还没实现。 但很多实际应⽤场景中,并不需要上述的那么严格的保证。在kafka中,可以配置一个⽣产者发布消息后是否需要等待“消息已提交”。

其次,我们来分析对于消费者来说,消息传递的3种语义是如何处理的: 在kafka中,任一个partition的所有复制broker都有一个完全一样的log来记录某个consumer/consumer group在partition中的消费的位置。kafka可以通过配置实现At-most-once和At-least-once的语义,但Exactlty-once的语义则需要consumer的配合(使⽤更底层的consumer API)。

1) 如果消费者先读取消息,然后将position记录在log中,最后再处理理消息:那么有可能在处理理消息时消费者crash掉了了,这就是 At-most-once的情形;

2) 如果消费者先读取消息,然后处理消息,确认处理理成功后在记录position:那么这种情况下,当消息处理理失败时,会重新读取消息;也有可能在记录position时发⽣了错误,这就导致要重复处理理消息。这就At-least-once的情形;

3) ⽽需要达到Exactly-once的⽬标,单依赖kafka是⽆无法达成的,需要消费者⾃行存储消费的position,这样才可以保证消息不不会丢失并仅仅被处理了一次。

Replication复制

kafka把topic中每个partition的数据按照配置指定的数量复制到相应的broker中 (创建topic时有⼀个参数 --replication-factor),这样达到了故障转移的⽬的,使得即使有一台broker崩溃了也依然可以提供服务。

kafka中复制的单元是partition。在不发⽣故障的前提下,每个partition都有一个leader的节点和0个或多个的follower节点,这些节点的数量量通过参数--replication-factor来指定。leader节点负责处理所有的读写请求。一般的,partition的数量(包括不同的topic)会远远超过broker的数量,而partition的leader节点则均衡分布在每个broker上。例如有10个broker,20个topic,每个topic有5个partition,总共有100个partition,其中有20个leader的partition,这些partition都会均衡分布在10个broker中。follower中的日志⽂件跟leader中的会保持一致,logs包含了partition的消息以及consumer的消费offset。follower从leader中同步的过程,就跟follower是一个普通的kafka consumer类似。

判断⼀个partition节点是否还活着(alive),从两个⽅面来判断:

1) 节点需要维持与zookeeper的⼼跳会话

2) 节点中的消息同步不能落后leader太多(通过配置项来规定“落后太多”的具体含义)

如果partition节点满足以上两个条件,我们称该节点是“in sync”的。leader节点跟踪管理“in sync”的节点,当发现有某个节点崩溃、失去联系、落后过多时, 则leader节点则将该节点从in sync节点列表中删除。失去联系和落后过多的含义分别由配置项replica.lag.max.messages 和replica.lag.time.max.ms 来定义。 当一个节点重新工作并重新同步数据、赶上进度后,leader节点⼜重新将该节点加入in sync节点列表。

上⽂提到,kafka中的消息有一个“已提交(committed)”的概念,它的具体含义是:相应partition的所有的in sync节点都已经将该消息记录到各⾃的log中(确保已写入硬盘fsync)。只有“已提交”的消息,consumer才能读取到。只要有⼀个in sync的节点存活,那么所有已提交的消息就不会丢失。 当leader节点崩溃时,会⾃动从"in sync"节点列列表中选择一个节点作为新的leader,因此,kafka中的partition可以容忍【f-1】个replica崩溃(f是partition的replication的数量)。

还有一个问题需要考虑的是,当⼀个partition的所有节点都崩溃的时候,kafka应该怎么做呢?理理论上有两个选择

1) 等待某个in sync的节点恢复服务

2) 等待任一个节点(即使不是in sync)恢复服务,很明显,第⼀种选择可以保证数据的一致性,但服务不可用的时间会增长;第二种选择可以尽可能快地恢复服务,但数据的一致性不能保证。在kafka 0.8.2,可以通过topic级的配置unclean.leader.election.enable来配置选择第⼀或第⼆种, 默认是第⼆种。

基于kafka的消息队列的设计kafka的设计kafka的api消息队列服务

持久性和可用性的衡量

持久性指的是数据的一致性,即保证数据尽可能不会丢失。 可⽤性指的是服务的可⽤性,即保证服务尽可能可提供使用。

当生产者向kafka发布消息时,生产者可以通过生产者配置项request.required.acks来选择需要等待多少个ACK后才认为写请求已被完成。或者更明确的说,就是多少个broker(包括leader)“已提交”了消息并向leader返回ACK。

a. request.required.acks=0:消息发布到服务器端即可认为已成功发布,这是默认值

b. request.required.acks=1:消息发布到服务器端后,需要等待leader节点的ACK,以确保数据“已提交”。此时,只有当leader节点崩溃并且尚未复制到别的follower的数据才可能丢失。

c. request.required.acks=-1:这个就厉害了,需要等到所有in-sync的节点都返回ACK后,才认为消息发布成功。但当in-sync的节点数量量只有⼀个时,就会退 化到 b) 的情形。

此外,还有两个配置也是⽤在持久性和可⽤性之间取舍的:

1) unclean.leader.election.enable:在所有的broker都崩溃的时候,是否允许⼀个不在in-sync列列表的节点恢复时被选作leader节点。此配置,更倾向于可⽤性。

2) min.insync.replicas:指定了in-sync节点的最⼩值,当in-sync节点数量不够 时,partition则停⽌服务。此配置更倾向于持久性,而牺牲了可用性。

日志压紧(Log Compaction)日志compaction不是指compress log⽂件,而是说对于一些重复更新的key,只保留最新的值。

在kafka中,使⽤Log Compaction,可以保证在一个topic partition中,对于每个消息的key,总是保留最新的值。

kafka的api

生产者API(Producer API)kafka提供了了java版本的⽣产者API,使⽤者只需把相关jar包引⼊自己的工程即可。

消费者API(Consumer API)

kafka的消费者API有三种:

1. ⾼度抽象的消费者API:

这些API高度封装了一个consumer group连接kafka、顺序获取数据的过程,使用者很容易就能写出一个多线程的消费者程序。但有些地方需要注意的是:

1) 如果消费者线程的数量比partition的数量还多,有些线程将永远读取不到数据;

2) 如果partition的数量比消费者线程的数量要多,有的线程将会从多个partition中读取数据;

3) 如果⼀个消费者线程是从多个partition中读取数据的,那么读取的数据的顺序将无法保证:例如线程可能会从partition 10中读取5条数据,然后从partition 11中读取10条数据,然后⼜从partition 10中读取5条数据,紧接着依然从partition 10中读取5条数据。

4) 增加⼀条消费者线程/进程,会导致kafka重新规划负载均衡,从而会改变线程和partition之间的对应关系。

5) 当partition中没有可用数据的时候,线程会阻塞。

2. 底层的消费者API: ⼀般来说,⾼度封装好的消费者API可以满足⼤部分的需求。但当你的应用有一些特殊需求时,例如需要重复读取某条数据,重启consumer时需要从头读取消息等等,此时,需要使⽤底层的消费者API。

3. 这对Hadoop的消费者API: 这个有点意思:因为从kafka直接导数据到Hadoop是⼀一个很常见的场景,因此kafka还专门提供了相关的API,并已开源到github上了,可以参考。

消息队列服务

为什么需要消息服务

  • 解耦
  • ⼴播
  • 错峰&流控

常用 Message Queue 对比

  • 特性 ActiveMQ RabbitMQ RocketMQ Kafka
    单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
    topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
    时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
    可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
    消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
    功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

消息队列设计

  • 核心依赖kafka
  • 负载均衡依赖于partition
  • ⾼可用依赖于replication和重新选取leader机制
  • 可靠性传输:
    • 消费端:至少一次,需要业务端保证幂等性
    • kafka端: 可参考上文中持久性和副本相关配置

消息顺序:

  1. 存入partition 的顺序,由上游服务控制。
  2. Partition 中的顺序取出,由kafka保证。
  3. 收到消息的顺序由ack机制保证。
  • 即MQ service发送消息之后,等待订阅者发送确认,收到确认之后再对 kafka 进行commit,读取下一条。

消息内容

  • event_name(事件名称)
  • event_data(事件内容)
  • event_time(事件时间)

消息积压思路:

  • 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
  • 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
  • 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
  • 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
  • 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。

参考自: 

https://www.orchome.com/kafka/index