基于Python結合pykafka實作kafka生産及消費速率&主題分區偏移實時監控
By: 授客 QQ:1033553122
1.測試環境
python 3.4
zookeeper-3.4.13.tar.gz
下載下傳位址1:
http://zookeeper.apache.org/releases.html#download
https://www.apache.org/dyn/closer.cgi/zookeeper/
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
下載下傳位址2:
https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ
kafka_2.12-2.1.0.tgz
http://kafka.apache.org/downloads.html
https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw
pykafka-2.8.0.tar.gz
https://pypi.org/project/pykafka/
https://files.pythonhosted.org/packages/55/4b/4828ec5ed766cca0c27de234688122494c5762965e70deeb88b84f5d8d98/pykafka-2.8.0.tar.gz
2.實作功能
實時采集Kafka生産者主題生産速率,主題消費速率,主題分區偏移,消費組消費速率,支援同時對多個來自不同叢集的主題進行實時采集,支援同時對多個消費組實時采集
3.使用前提
1、“主題消費速率”&“消費組消費速率” 統計 依賴“消費組”,是以要統計消費速率,必須存在消費組才能統計;
2、“主題消費速率”&“消費組消費速率” 統計 依賴消費者自動、手動送出“offset”,是以是以要統計消費速率,必須確定消費者消費時,會送出消息的offset
3、Kafka版本大于等于0.10.1.1
4.使用方法
influxDB主機配置
KafkaMonitor\conf\influxDB.conf
[INFLUXDB]
influxdb_host = 10.203.25.106
influxdb_port = 8086
brokers叢集配置
KafkaMonitor\conf\brokers.conf
[CLUSTER1]
broker1 = 127.0.0.1:9092
[bus]
#broker1 =10.202.xxx.xx:9096,10.202.xx.xx:9096,10.202.xxx.x:9096
格式說明:
[叢集名稱]
自定義brokers辨別 = broker ip:port配置(如果有多個broker,用英文逗号分隔)
如果不想對指定叢集進行監控(不監控該叢集的主題生産、消費速率,主題分區偏移,消費組消費速率),用 # 号注釋掉 該叢集的“自定義brokers辨別” 所在行即可,如上
topics主題配置
topic1 = MY_TOPIC1
topic1=NEXT_MARM_CORE_REPORT
#topic2=NEXT_MARM_CORE_EVENT
自定義topic 辨別 = topic名稱
如果不想對指定主題進行監控(不監控該主題的生産、消費速率,主題分區偏移,該主題相關消費組消費速率),用 # 号注釋掉 該叢集的“自定義 topic辨別” 所在行即可,如上
注意:每個叢集名稱下的 自定義 topic 辨別不能重複
consumer_groups消費組配置
KafkaMonitor\conf\consumer_groups.conf
groupID1 = MY_TOPIC1|MY_GROUP1:5000
#groupID1=NEXT_MARM_CORE_EVENT|NEXT_MARM_CORE_TASK
groupID2=NEXT_MARM_CORE_REPORT|NEXT_MARM_CORE_REPORT,NEXT_MARM_CORE_REPORTTAG
自定義consumer_groups 辨別 = 主題名稱|消費該主題的消費組名稱[:送出msg offset的時間間隔(機關為 毫秒)](如果有多個消費組,彼此之間用逗号分隔)
注意:
1、如果有為消費組設定送出msg offset的時間間隔,并且該時間間隔大于統一設定的資料采集頻率,那麼該消費組的資料采集頻率将自動調整為對應的 送出msg offset的時間間隔/1000 + 1
2、主題消費速率的統計依賴消費該主題的所有消費組的資料資訊,是以,同一個主題,不要配置在多個“自定義consumer_groups 辨別”配置值中
3、主題消費速率資料采集頻率取最大值 max(統一設定的資料采集頻率,max(消費該主題的消費組送出msg offset的時間間隔/1000 + 1))
如果不想對指定消費組進行監控(不監控該消費組消費速率,消費組關聯的主題消費速率),用 # 号注釋掉 該叢集的“自定義consumer_groups 辨別” 所在行即可,如上,,或者把對應消費組及其送出msg offset的時間間隔資訊删除即可。
運作程式
python main.py 采集頻率(機關 秒) 采集時長
eg:
每5秒采集一次,總共采集120秒
python main.py 5 120

如果(根據配置自動調整後的)采集頻率時間間隔大于單次程式采樣耗時,則處理完成後立即進行下一次采樣,忽略采樣頻率設定,實際采集時長變長,但是采集次數不變 int(采集時長/采樣頻率)
grafana圖表配置
資料源配置
說明:Database db_+brokers.conf中配置的叢集名稱
Dashboard變量配置
Dashboard Pannel主要配置項
效果展示
參考連結:
https://pykafka.readthedocs.io/en/latest/index.html
源碼下載下傳位址:
https://gitee.com/ishouke/KafkaMonitor
作者:授客
QQ:1033553122
全國軟體測試QQ交流群:7156436
Git位址:https://gitee.com/ishouke
友情提示:限于時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額随意,您的支援将是我繼續創作的源動力,打賞後如有任何疑問,請聯系我!!!
微信打賞
支付寶打賞 全國軟體測試交流QQ群