天天看點

Kafka跨叢集遷移方案MirrorMaker原理、使用以及性能調優實踐

序言

Kakfa MirrorMaker是Kafka 官方提供的跨資料中心的流資料同步方案。其實作原理,其實就是通過從Source Cluster消費消息然後将消息生産到Target Cluster,即普通的消息生産和消費。使用者隻要通過簡單的consumer配置和producer配置,然後啟動Mirror,就可以實作準實時的資料同步。

1. Kafka MirrorMaker基本特性

Kafka Mirror的基本特性有:

在Target Cluster沒有對應的Topic的時候,Kafka MirrorMaker會自動為我們在Target Cluster上建立一個一模一樣(Topic Name、分區數量、副本數量)一模一樣的topic。如果Target Cluster存在相同的Topic則不進行建立,并且,MirrorMaker運作Source Cluster和Target Cluster的Topic的分區數量和副本數量不同。

同我們使用Kafka API建立KafkaConsumer一樣,Kafka MirrorMaker允許我們指定多個Topic。比如,TopicA|TopicB|TopicC。在這裡,|其實是正則比對符,MirrorMaker也相容使用逗号進行分隔。

多線程支援。MirrorMaker會在每一個線程上建立一個Consumer對象,如果性能允許,建議多建立一些線程

多程序任意橫向擴充,前提是這些程序的consumerGroup相同。無論是多程序還是多線程,都是由Kafka ConsumerGroup的設計帶來的任意橫向擴充性,具體的分區分派,即具體的TopicPartition會分派給Group中的哪個Topic負責,是Kafka自動完成的,Consumer無需關心。

我們使用Kafka MirrorMaker完成遠端的AWS(Source Cluster)上的Kafka資訊同步到公司的計算叢集(Target Cluster)。由于我們的大資料叢集隻有一個統一的出口IP,外網通路我們的内網伺服器必須通過nginx轉發,是以為了降低複雜度,決定使用“拉”模式而不是“推”模式,即,Kafka MirrorMaker部署在我們内網叢集(Target Cluster),它負責從遠端的Source Cluster(AWS)的Kafka 上拉取資料,然後生産到本地的Kafka。

Kafka MirrorMaker的官方文檔一直沒有更新,是以新版Kafka為MirrorMaker增加的一些參數、特性等在文檔上往往找不到,需要看Kafka MirrorMaker的源碼。Kafka MirrorMaker的主類位于kafka.tools.MirrorMaker,尤其是一些參數的解析邏輯和主要的執行流程,會比較有助于我們了解、調試和優化Kafka MirrorMaker。

這是我啟動Kakfa MirrorMaker 的指令:

nohup ./bin/kafka-mirror-maker.sh --new.consumer --consumer.config config/mirror-consumer.properties --num.streams 40 --producer.config config/mirror-producer.properties --whitelist 'ABTestMsg|AppColdStartMsg|BackPayMsg|WebMsg|GoldOpenMsg|BoCaiMsg' &

mirror-consumer.properties配置檔案如下:

#新版consumer擯棄了對zookeeper的依賴,使用bootstrap.servers告訴consumer kafka server的位置

bootstrap.servers=ip-188-33-33-31.eu-central-1.compute.internal:9092,ip-188-33-33-32.eu-central-1.compute.internal:9092,ip-188-33-33-33.eu-central-1.compute.internal:9092

#如果使用舊版Consumer,則使用zookeeper.connect

#zookeeper.connect=ip-188-33-33-31.eu-central-1.compute.internal:2181,ip-188-33-33-32.eu-central-1.compute.internal:2181,ip-188-33-33-33.eu-central-1.compute.internal:2181

1.compute.internal:2181

#change the default 40000 to 50000

request.timeout.ms=50000

#hange default heartbeat interval from 3000 to 15000

heartbeat.interval.ms=30000

#change default session timeout from 30000 to 40000

session.timeout.ms=40000

#consumer group id

group.id=africaBetMirrorGroupTest

partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

#restrict the max poll records from 2147483647 to 200000

max.poll.records=20000

#set receive buffer from default 64kB to 512kb

receive.buffer.bytes=524288

#set max amount of data per partition to override default 1048576

max.partition.fetch.bytes=5248576

#consumer timeout

#consumer.timeout.ms=5000

mirror-producer.properties的配置檔案如下:

bootstrap.servers=10.120.241.146:9092,10.120.241.82:9092,10.120.241.110:9092

# name of the partitioner class for partitioning events; default partition spreads data randomly

#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or synchronously (sync)

producer.type=sync

# specify the compression codec for all data generated: none, gzip, snappy, lz4.

# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively

compression.codec=none

# message encoder

serializer.class=kafka.serializer.DefaultEncoder

同時,我使用kafka-consumer-groups.sh循環監控消費延遲:

bin/kafka-consumer-groups.sh --bootstrap-server ip-188-33-33-31.eu-central-1.compute.internal:9092,ip-188-33-33-32.eu-central-1.compute.internal:9092,ip-188-33-33-33.eu-central-1.compute.internal:9092 --describe --group africaBetMirrorGroupTest --new-consumer

當我們使用new KafkaConsumer進行消息消費,要想通過kafka-consumer-groups.sh擷取整個group的offset、lag延遲資訊,也必須加上–new-consumer,告知kafka-consumer-groups.sh,這個group的消費者使用的是new kafka consumer,即group中所有consumer的資訊儲存在了Kafka上的一個名字叫做__consumer_offsets的特殊topic上,而不是儲存在zookeeper上。我在使用kafka-consumer-groups.sh的時候就不知道還需要添加--new-consumer,結果我啟動了MirrorMaker以後,感覺消息在消費,但是就是在zookeeper的/consumer/ids/上找不到group的任何資訊。後來在stack overflow上問了别人才知道。

3. 負載不均衡原因診斷以及問題解決

在我的另外一篇部落格《Kafka為Consumer分派分區:RangeAssignor和RoundRobinAssignor》中,介紹了Kafka内置的分區分派政策:RangeAssignor和RoundRobinAssignor。由于RangeAssignor是早期版本的Kafka的唯一的分區分派政策,是以,預設不配置的情況下,Kafka使用RangeAssignor進行分區分派,但是,在MirrorMaker的使用場景下,RoundRobinAssignor更有利于均勻的分區分派。甚至在KAFKA-3831中有人建議直接将MirrorMaker的預設分區分派政策改為RoundRobinAssignor。那麼,它們到底有什麼差別呢?我們先來看兩種政策下的分區分派結果。在我的實驗場景下,有6個topic:ABTestMsg|AppColdStartMsg|BackPayMsg|WebMsg|GoldOpenMsg|BoCaiMsg,每個topic有兩個分區。由于MirrorMaker所在的伺服器性能良好,我設定--num.streams 40,即單台MirrorMaker會用40個線程,建立40個獨立的Consumer進行消息消費,兩個MirrorMaker加起來80個線程,80個并行Consumer。由于總共隻有6 * 2=12個TopicPartition,是以最多也隻有12個Consumer會被分派到分區,其餘Consumer空閑。

我們來看基于RangeAssignor分派政策,運作kafka-consumer-groups.sh觀察到的分區分派的結果:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID

ABTestMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0

ABTestMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1

AppColdStartMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0

AppColdStartMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1

BackPayMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0

BackPayMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1

WebMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0

WebMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1

GoldOpenMsg 0 780000 820038 49938 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-0

GoldOpenMsg 1 774988 820038 55000 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-1

BoCaiMsg 0 774000 820039 55938 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-0

BoCaiMsg 1 774100 820045 56038 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-1

- - - - - africaBetMirrorGroupTest-6-ae373364-2ae2-42b8-8a74-683557e315bf/114.113.198.126 africaBetMirrorGroupTest-6

- - - - - africaBetMirrorGroupTest-9-0e346b46-1a2c-46a2-a2da-d977402f5c5d/114.113.198.126 africaBetMirrorGroupTest-9

- - - - - africaBetMirrorGroupTest-7-f0ae9f31-33e6-4ddd-beac-236fb7cf20d5/114.113.198.126 africaBetMirrorGroupTest-7

- - - - - africaBetMirrorGroupTest-7-e2a9e905-57c1-40a6-a7f3-4aefd4f1a30a/114.113.198.126 africaBetMirrorGroupTest-7

- - - - - africaBetMirrorGroupTest-8-480a2ef5-907c-48ed-be1f-33450903ec72/114.113.198.126 africaBetMirrorGroupTest-8

- - - - - africaBetMirrorGroupTest-8-4206bc08-58a5-488a-b756-672fb4eee6e0/114.113.198.126 africaBetMirrorGroupTest-8

.....後續更多空閑consumer省略不顯示

當沒有在mirror-consumer.properties 中配置分區分派政策,即使用預設的RangeAssignor的時候,我們發現,盡管我們每一個MirrorMaker有40個Consumer,整個Group中有80個Consumer,但是,一共6 * 2 = 12個TopicPartition竟然全部聚集在2-3個Consumer上,顯然,這完全浪費了并行特性,被配置設定到一個consumer上的多個TopicPartition隻能串行消費。

是以,通過partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor顯式指定分區分派政策為RoundRobinAssignor,重新開機MirrorMaker,重新通過kafka-consumer-groups.sh 指令觀察分區分派和消費延遲結果:

ABTestMsg 0 819079 820038 959 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-1

ABTestMsg 1 818373 820038 1665 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-5

AppColdStartMsg 0 818700 818907 1338 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-20

AppColdStartMsg 1 818901 820045 1132 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-18

BackPayMsg 0 819032 820038 959 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-5

BackPayMsg 1 818343 820038 1638 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-8

WebMsg 0 818710 818907 1328 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-7

WebMsg 1 818921 820045 1134 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-9

GoldOpenMsg 0 819032 820038 959 africaBetMirrorGroupTest-4-cf330e66-1319-4925-9605-46545df13453/114.113.198.126 africaBetMirrorGroupTest-12

GoldOpenMsg 1 818343 820038 1638 africaBetMirrorGroupTest-19-c77523e7-7b87-472b-9a26-cd902888944d/114.113.198.126 africaBetMirrorGroupTest-14

BoCaiMsg 0 818710 818907 1322 africaBetMirrorGroupTest-19-674d8ad4-39d2-40cc-ae97-f4be9c1bb154/114.113.198.126 africaBetMirrorGroupTest-14

BoCaiMsg 1 818921 820045 1189 africaBetMirrorGroupTest-15-91c67bf8-0c1c-42ac-97f0-5369794c2d1b/114.113.198.126 africaBetMirrorGroupTest-117

對比RangeAssingor,消息延遲明顯減輕,而且,12個TopicPartition被均勻配置設定到了不同的consumer上,即單個Consumer隻負責一個TopicPartition的消息消費,不同的TopicPartition之間實作了完全并行。

之是以出現以上不同,原因在于兩個分區分派方式的政策不同:

RangeAssingor:先對所有Consumer進行排序,然後對Topic逐個進行分區分派。用以上Topic作為例子:

對所有的Consumer進行排序,排序後的結果為Consumer-0,Consumer-1,Consumer-2 ....Consumer-79

對ABTestMsg進行分區分派:

ABTestMsg-0配置設定給Consumer-0

ABTestMsg-1配置設定各Consumer-1

對AppColdStartMsg進行分區分派:

AppColdStartMsg-0配置設定各Consumer-0

AppColdStartMsg-1配置設定各Consumer-1

#後續TopicParition的分派以此類推

可見,RangeAssingor 會導緻多個TopicPartition被分派在少量分區上面。

- RoundRobinAssignor:與RangeAssignor最大的差別,是不再逐個Topic進行分區分派,而是先将Group中的所有TopicPartition平鋪展開,再一次性對他們進行一輪分區分派。

将Group中的所有TopicPartition展開,展開結果為:

ABTestMsg-0,ABTestMsg-1,AppColdStartMsg-0,AppColdStartMsg-1,BackPayMsg-0,BackPayMsg-1,WebMsg-0,WebMsg-1,GoldOpenMsg-0,GoldOpenMsg-1,BoCaiMsg-0,BoCaiMsg-1

對所有的Consumer進行排序,排序後的結果為Consumer-0,Consumer-1,Consumer-2 ,Consumer-79。

開始講平鋪的TopicPartition進行分區分派

ABTestMsg-1配置設定給Consumer-1

AppColdStartMsg-0配置設定給Consumer-2

AppColdStartMsg-1配置設定給Consumer-3

BackPayMsg-0配置設定給Consumer-4

BackPayMsg-1配置設定給Consumer-5

由此可見,RoundRobinAssignor平鋪式的分區分派算法是讓我們的Kafka MirrorMaker能夠無重疊地将TopicParition分派給Consumer的原因。

4. 本身網絡帶寬限制問題

網絡帶寬本身也會限制Kafka Mirror的吞吐量。進行壓測的時候,我分别在我們的線上環境和測試環境分别運作Kafka MirrorMaker,均選擇兩台伺服器運作MirrorMaker,但是線上環境是實體機環境,單台機器通過SCP方式拷貝Source Cluster上的大檔案,平均吞吐量是600KB-1.5MB之間,但是測試環境的機器是同一個host主機上的多台虛拟機,SCP吞吐量是100KB以下。經過測試,測試環境消息積壓會逐漸增多,線上環境持續積壓,但是積壓一直保持穩定。這種穩定積壓是由于每次poll()的間隙新産生的消息量,屬于正常現象。

5. 适當配置單次poll的消息總量和單次poll()的消息大小

通過Kafka MirrorMaker運作時指定的consumer配置檔案(在我的環境中為$MIRROR_HOME/config/mirror-consumer.properties)來配置consumer。其中,通過以下配置,可以控制單次poll()的消息體量(數量和總體大小)

max.poll.records:單次poll()操作最多消費的消息總量,這裡說的poll是單個consumer而言的。無論過大過小,都會發生問題:

如果設定得過小,則消息傳輸率降低,大量的頭資訊會占用較大的網絡帶寬;-

如果設定得過大,則會産生一個非常難以判斷原因同時又會影響整個group中所有消息的消費的重要問題:rebalance。看過kafka代碼的話可以看到,每次poll()請求都會順帶向遠端server發送心跳資訊,遠端GroupCoordinator會根據這個心跳資訊判斷consumer的活性。如果超過指定時間(heartbeat.interval.ms)沒有收到對應Consumer的心跳,則GroupCoordinator會判定這個Server已經挂掉,是以将這個Consumer負責的partition分派給其它Consumer,即觸發rebalance。rebalance操作的影響範圍是整個Group,即Group中所有的Consumer全部暫停消費直到Rebalance完成。而且,TopicPartition越長,這個過程會越長。其實,一個正常消費的環境,應該是任何時候都不應該發生rebalance的(一個新的Consumer的正常加入也會引起Rebalance,這種情況除外)。雖然Kafka本身是非常穩定的,但是還是應該盡量避免rebalance的發生。在某些極端情況下觸發一些bug,rebalance可能永遠停不下來了。。。如果單次max.poll.records消費太多消息,這些消息produce到Target Cluster的時間可能會較長,進而可能觸發Rebalance。

6. 惡劣網絡環境下增加逾時時間配置

在不穩定的網絡環境下,應該增加部分逾時時間配置,如request.timeout.ms或者session.timeout.ms,一方面可以避免頻繁的逾時導緻大量不必要的重試操作,同時,通過增加如上文所講,通過增加heartbeat.interval.ms時間,可以避免不必要的rebalance操作。當然,在網絡環境良好的情況下,上述配置可以适當減小以增加Kafka Server對MirrorMaker出現異常情況下的更加及時的響應。

總之,Kafka MirrorMaker作為跨資料中心的Kafka資料同步方案,絕對無法允許資料丢失以及資料的傳輸速度低于生産速度導緻資料越積累越多。是以,唯有進行充分的壓測和精準的性能調優,才能綜合網絡環境、伺服器性能,将Kafka MirrorMaker的性能發揮到最大。