本文重點介紹kafka的兩類常見資料遷移方式:1、broker内部不同資料盤之間的分區資料遷移;2、不同broker之間的分區資料遷移。
歡迎大家前往雲+社群,擷取更多騰訊海量技術實踐幹貨哦~
作者:mikealzhou
一、broker 内部不同資料盤之間進行分區資料遷移
1.1 背景介紹
最近,騰訊雲的一個重要客戶發現kafka broker内部的topic分區資料存儲分布不均勻,導緻部分磁盤100%耗盡,而部分磁盤隻有40%的消耗量。
分析原因,發現存在部分topic的分區資料過于集中在某些磁盤導緻,比如,以下截圖顯示的/data5 資料盤。

根據分布式系統的特點,很容易想到采取資料遷移的辦法,對broker内部不同資料盤的分區資料進行遷移。在進行線上叢集資料遷移之前,為了保證生産叢集的資料完整和安全,必須先在測試叢集進行測試。
1.2 測試broker内部不同資料盤進行分區資料遷移
1.2.1 建立測試topic并驗證生産和消費正常
我們搭建的測試叢集,Kafka 有三個broker,hostname分别為:tbds-172-16-16-11,tbds-172-16-16-12,tbds-172-16-16-16。每個broker配置了兩塊資料盤,緩存資料分别存儲在 /data/kafka-logs/ 和 /data1/kafka-logs/。
首先建立測試topic:
./kafka-topics.sh --create --zookeeper tbds-172-16-16-11:2181 --replication-factor 2 --partitions 3 --topic test_topic
然後向topic生産發送500條資料,發送的時候也同時消費資料。然後檢視topic的分區資料情況:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
groupid1 test_topic 0 172 172 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
groupid1 test_topic 1 156 156 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
groupid1 test_topic 2 172 172 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
發現test_topic生産和消費資料都正常。
1.2.2 将分區資料在磁盤間進行遷移
現在登入tbds-172-16-16-12這台broker節點,将test_topic的分區資料目錄 /data1/kafka-logs/test_topic-0/ 移動到 /data/kafka-logs/ :
mv /data1/kafka-logs/test_topic-0/ /data/kafka-logs/
檢視 /data/kafka-logs/ 目錄下,分區test_topic-0 的資料:
1.2.3 再次對測試topic生産和消費資料
再次發送500條資料,同時消費資料。然後檢視資料情況:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
groupid1 test_topic 0 337 337 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
groupid1 test_topic 1 304 304 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
groupid1 test_topic 2 359 359 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
再次檢視tbds-172-16-16-12 這個broker節點的/data/kafka-logs/test_topic-0/ 分區目錄下的資料:
發現,從 /data1/kafka-logs/ 移動到 /data/kafka-logs/ 目錄下的分區資料目錄test_topic-0/(也就是編号為0的分區)緩存資料并沒有增加。
因為test_topic每個分區有2個replicas,是以,我找到編号為0的另外一個分區replica資料存儲在tbds-172-16-16-16這台broker節點。登入tbds-172-16-16-16這個broker節點,打開編号為0的分區緩存資料目錄,得到如下資訊:
發現,tbds-172-16-16-16這台broker節點的分區資料目錄test_topic-0/内緩存資料量是增加的,也就是緩存有再次生産發送的message資料。
由此可見,經過移動之後的tbds-172-16-16-12這台broker節點的編号為0的分區資料緩存目錄内,并沒有新增緩存資料。與之對應的,沒有做分區資料移動操作的 tbds-172-16-16-16這台broker 節點的編号為0的分區緩存資料目錄内新增再次發送的資料。
是不是意味着不能在broker的磁盤間移動分區資料呢?
1.2.4 調用重新開機大法:重新開機kafka
重新開機kafka叢集,重新開機完成後,發現tbds-172-16-16-12這台broker節點的編号為0的分區緩存資料目錄内的資料也增加到正常水準。
表明重新開機之後,broker的不同磁盤間遷移資料已經生效。
1.2.5 驗證磁盤間遷移分區資料生效
再次向test_topic發送500條資料,同時消費資料,然後檢視資料情況:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
groupid1 test_topic 0 521 521 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
groupid1 test_topic 1 468 468 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
groupid1 test_topic 2 511 511 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3
檢視tbds-172-16-16-12 和 tbds-172-16-16-16 兩個broker節點的test_topic-0分區資料的緩存目錄:
發現兩個replicas完全一樣。
1.3 結論
Kafka broker 内部不同資料盤之間可以自由遷移分區資料目錄。遷移完成後,重新開機kafka即可生效。
二、不同broker之間傳輸分區資料
當對kafka叢集進行擴容之後,由于新擴容的broker沒有緩存資料,容易造成系統的資料分布不均勻。是以,需要将原來叢集broker的分區資料遷移到新擴容的broker節點。
不同broker之間傳輸分區資料,可以使用kafka自帶的kafka-reassign-partitions.sh腳本工具實作。
我們在kafka測試叢集原有的3台broker基礎上,擴容1台broker。
2.1 擷取test_topic的分區分布情況
執行指令:
./kafka-topics.sh --zookeeper 172.16.16.11:2181 --topic test_topic --describe
可以得到test_topic的3個分區(每個分區有2份replicas)在三個broker節點的分布情況:
Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001
Topic: test_topic Partition: 1 Leader: 1003 Replicas: 1003,1002 Isr: 1003,1002
Topic: test_topic Partition: 2 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003
2.2 擷取topic重新分區的配額檔案
編寫配置設定腳本:move_kafka_topic.json内容如下:
{"topics": [{"topic":"test_topic"}], "version": 1}
執行配置設定計劃生成腳本:
./kafka-reassign-partitions.sh --zookeeper tbds-172-16-16-11:2181 --topics-to-move-json-file /tmp/move_kafka_topic.json --broker-list "1001,1002,1003,1004" --generate
指令裡面的broker-list填寫kafka叢集4個broker的id。不同kafka叢集,因為部署方式不一樣,選擇的broker id也不一樣。我們的測試叢集broker id是1001,1002,1003,1004。讀者需要根據自己的kafka叢集設定的broker id填寫。
執行指令之後,得到以下結果:
Current partition replica assignment #目前分區的副本配置設定
{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1002,1001]},{"topic":"test_topic","partition":2,"replicas":[1001,1003]},{"topic":"test_topic","partition":1,"replicas":[1003,1002]}]}
Proposed partition reassignment configuration #建議的分區配置
{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1001,1002]},{"topic":"test_topic","partition":2,"replicas":[1003,1004]},{"topic":"test_topic","partition":1,"replicas":[1002,1003]}]}
Proposed partition reassignment configuration 後是根據指令行的指定的broker list生成的分區配置設定計劃json格式。将 Proposed partition reassignment configuration的配置複制儲存到一個檔案中 move_kafka_topic_result.json:
{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1001,1002]},{"topic":"test_topic","partition":2,"replicas":[1003,1004]},{"topic":"test_topic","partition":1,"replicas":[1002,1003]}]}
2.3 對topic分區資料進行重新分布
執行重新配置設定指令:
./kafka-reassign-partitions.sh --zookeeper tbds-172-16-16-11:2181 --reassignment-json-file /tmp/move_kafka_topic_result.json --execute
得到如下結果:
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1002,1001]},{"topic":"test_topic","partition":2,"replicas":[1001,1003]},{"topic":"test_topic","partition":1,"replicas":[1003,1002]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1001,1002]},{"topic":"test_topic","partition":2,"replicas":[1003,1004]},{"topic":"test_topic","partition":1,"replicas":[1002,1003]}]}
從傳回結果來看,分區資料重新分布任務已經啟動成功。
2.4 檢視分區資料重新分布進度
檢查配置設定的狀态,執行指令:
./kafka-reassign-partitions.sh --zookeeper tbds-172-16-16-11:2181 --reassignment-json-file /tmp/move_kafka_topic_result.json --verify
得到結果:
Status of partition reassignment:
Reassignment of partition [test_topic,0] completed successfully
Reassignment of partition [test_topic,2] completed successfully
Reassignment of partition [test_topic,1] completed successfully
表明分區資料重新分步任務已經完成。
2.5 再次擷取test_topic的分區分布情況
再次檢視各個分區的分布情況,執行指令:
./kafka-topics.sh --zookeeper 172.16.16.11:2181 --topic test_topic --describe
得到傳回結果:
Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_topic Partition: 0 Leader: 1002 Replicas: 1001,1002 Isr: 1002,1001
Topic: test_topic Partition: 1 Leader: 1003 Replicas: 1002,1003 Isr: 1003,1002
Topic: test_topic Partition: 2 Leader: 1003 Replicas: 1003,1004 Isr: 1003,1004
從結果看出,test_topic的分區資料已經由原來的3個broker,重新分布到4個broker。
三、測試結論
Ø Kafka broker 内部不同資料盤之間可以自由遷移分區資料目錄。遷移完成後,重新開機kafka即可生效;
Ø Kafka 不同broker之前可以遷移資料,使用kafka自帶的kafka-reassign-partitions.sh腳本工具實作。
四、修複客戶的kafka叢集故障
我們采用本文測試的方法,對該客戶的Kafka叢集進行broker節點内部不同磁盤間的資料遷移,對多個topic均進行了資料遷移,最終實作磁盤間的資料緩存分布均勻化。
同時,我們又對客戶的kafka叢集進行擴容,擴容之後采用本文描述的不同broker之間遷移分區資料方法,對多個topic均進行了資料遷移,保證新擴容節點也有緩存資料,原來的broker節點存儲壓力減小。
相關閱讀
kafka 營運總結
kafka 設計原理
kubernetes 中 kafka 和 zookeeper 有狀态叢集服務部署實踐 (一)
此文已由作者授權雲加社群釋出,轉載請注明原文出處
海量技術實踐經驗,盡在雲加社群!
https://cloud.tencent.com/developer