天天看点

Kafka 和 EMS 消息批量 ack 的实现

我们现在用 kafka 和 ems 两种方式来接收外部消息,之前没接收一条消息就 ack, 系统当前消息量大概接近亿级每天,集中在工作时间的八到十个小时。这意味着每个消息都 ack 会消耗大量网络资源,拖慢消息处理速度。因此决定用批量 ack 来降低网络消耗。

实现过程中碰到一个问题,假设没10条 ack 一次,那如果有37条数据,意味着前10条可以成功 ack,而后 7 条由于没有凑够 batchsize 有可能会一直不 ack。

对于 kafkaconsumer 来说,consumer 调用 poll 方法主动从服务器获取消息,这个方法可以接受 timeout 参数。这时即使没有达到 batch size,依然有机会在 timeout 时 act.

而对于 EMS, 就有点麻烦。

EMS client 有两种消息接收方式。一种是 client 主动调用consume 方法从服务器上获取消息。这种方式和 kafka 一样,可以接受timeout 参数,因此不是问题。

问题在于另一种方式,onMessage()它是事件驱动的,不接受 timeout 参数。这样当最后7条消息收到以后它会一直等待后面的消息到来。 这时客户端已经接收并处理完前7条,但还没有机会 ack, 如果客户端这时挂掉,那么这7条处理过的消息再也没机会被 ack。 这会导致消息重复处理。解决办法是开一个守护线程来定时做 ack,由于 EMS ack 依赖于具体消息,因此每次收到一条消息都要 cache 下来,并且几下最后接收到的时间,然后用这最后一条消息来做 ack。

Kafka 和 EMS 一样都支持最后一条消息 ack 时,前面的消息就自动 ack。