基於Python結合pykafka實現kafka生產及消費速率&主題分區偏移實時監控 By: 授客 QQ:1033553122 1.測試環境 python 3.4 zookeeper-3.4.13.tar.gz 下載地址1: http://zookeeper.apache.org/releases. ...
基於Python結合pykafka實現kafka生產及消費速率&主題分區偏移實時監控
By: 授客 QQ:1033553122
1.測試環境
python 3.4
zookeeper-3.4.13.tar.gz
下載地址1:
http://zookeeper.apache.org/releases.html#download
https://www.apache.org/dyn/closer.cgi/zookeeper/
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
下載地址2:
https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ
kafka_2.12-2.1.0.tgz
下載地址1:
http://kafka.apache.org/downloads.html
下載地址2:
https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw
pykafka-2.8.0.tar.gz
下載地址1:
https://pypi.org/project/pykafka/
2.實現功能
實時採集Kafka生產者主題生產速率,主題消費速率,主題分區偏移,消費組消費速率,支持同時對多個來自不同集群的主題進行實時採集,支持同時對多個消費組實時採集
3.使用前提
1、“主題消費速率”&“消費組消費速率” 統計 依賴“消費組”,所以要統計消費速率,必須存在消費組才能統計;
2、“主題消費速率”&“消費組消費速率” 統計 依賴消費者自動、手動提交“offset”,所以所以要統計消費速率,必須確保消費者消費時,會提交消息的offset
3、Kafka版本大於等於0.10.1.1
4.使用方法
influxDB主機配置
KafkaMonitor\conf\influxDB.conf
[INFLUXDB]
influxdb_host = 10.203.25.106
influxdb_port = 8086
brokers集群配置
KafkaMonitor\conf\brokers.conf
[CLUSTER1]
broker1 = 127.0.0.1:9092
[bus]
#broker1 =10.202.xxx.xx:9096,10.202.xx.xx:9096,10.202.xxx.x:9096
格式說明:
[集群名稱]
自定義brokers標識 = broker ip:port配置(如果有多個broker,用英文逗號分隔)
如果不想對指定集群進行監控(不監控該集群的主題生產、消費速率,主題分區偏移,消費組消費速率),用 # 號註釋掉 該集群的“自定義brokers標識” 所在行即可,如上
topics主題配置
KafkaMonitor\conf\brokers.conf
[CLUSTER1]
topic1 = MY_TOPIC1
[bus]
topic1=NEXT_MARM_CORE_REPORT
#topic2=NEXT_MARM_CORE_EVENT
格式說明:
[集群名稱]
自定義topic 標識 = topic名稱
如果不想對指定主題進行監控(不監控該主題的生產、消費速率,主題分區偏移,該主題相關消費組消費速率),用 # 號註釋掉 該集群的“自定義 topic標識” 所在行即可,如上
註意:每個集群名稱下的 自定義 topic 標識不能重覆
consumer_groups消費組配置
KafkaMonitor\conf\consumer_groups.conf
[CLUSTER1]
groupID1 = MY_TOPIC1|MY_GROUP1:5000
[bus]
#groupID1=NEXT_MARM_CORE_EVENT|NEXT_MARM_CORE_TASK
groupID2=NEXT_MARM_CORE_REPORT|NEXT_MARM_CORE_REPORT,NEXT_MARM_CORE_REPORTTAG
格式說明:
[集群名稱]
自定義consumer_groups 標識 = 主題名稱|消費該主題的消費組名稱[:提交msg offset的時間間隔(單位為 毫秒)](如果有多個消費組,彼此之間用逗號分隔)
註意:
1、如果有為消費組設置提交msg offset的時間間隔,並且該時間間隔大於統一設置的數據採集頻率,那麼該消費組的數據採集頻率將自動調整為對應的 提交msg offset的時間間隔/1000 + 1
2、主題消費速率的統計依賴消費該主題的所有消費組的數據信息,所以,同一個主題,不要配置在多個“自定義consumer_groups 標識”配置值中
3、主題消費速率數據採集頻率取最大值 max(統一設置的數據採集頻率,max(消費該主題的消費組提交msg offset的時間間隔/1000 + 1))
如果不想對指定消費組進行監控(不監控該消費組消費速率,消費組關聯的主題消費速率),用 # 號註釋掉 該集群的“自定義consumer_groups 標識” 所在行即可,如上,,或者把對應消費組及其提交msg offset的時間間隔信息刪除即可。
運行程式
python main.py 採集頻率(單位 秒) 採集時長
eg:
每5秒採集一次,總共採集120秒
python main.py 5 120
註意:
如果(根據配置自動調整後的)採集頻率時間間隔大於單次程式採樣耗時,則處理完成後立即進行下一次採樣,忽略採樣頻率設置,實際採集時長變長,但是採集次數不變 int(採集時長/採樣頻率)
grafana圖表配置
數據源配置
說明:Database db_+brokers.conf中配置的集群名稱
Dashboard變數配置
Dashboard Pannel主要配置項
效果展示
參考鏈接:
https://pykafka.readthedocs.io/en/latest/index.html
源碼下載地址:
https://gitee.com/ishouke/KafkaMonitor