一站式Kafka平臺解決方案——KafkaCenter

来源:https://www.cnblogs.com/tree1123/archive/2020/05/18/12908653.html
-Advertisement-
Play Games

KafkaCenter是什麼 KafkaCenter是一個針對Kafka的一站式,解決方案。用於Kafka集群的維護與管理,生產者和消費者的監控,以及Kafka部分生態組件的使用。 對於Kafka的平臺化,一直缺少一個成熟的解決方案,之前比較流行的kafka監控方案,如kafka manager提供 ...


KafkaCenter是什麼

KafkaCenter是一個針對Kafka的一站式,解決方案。用於Kafka集群的維護與管理,生產者和消費者的監控,以及Kafka部分生態組件的使用。

對於Kafka的平臺化,一直缺少一個成熟的解決方案,之前比較流行的kafka監控方案,如kafka-manager提供了集群管理與topic管理等等功能。但是對於生產者、消費者的監控,以及Kafka的新生態,如Connect,KSQL還缺少響應的支持。Confluent Control Center功能要完整一些,但卻是非開源收費的。

對於Kafka的使用,一直都是一個讓人頭疼的問題,由於實時系統的強運維特性,我們不得不投入大量的時間用於集群的維護,kafka的運維,比如:

  • 人工創建topic,特別費力
  • 相關kafka運維,監控孤島化
  • 現有消費監控工具監控不准確
  • 無法拿到Kafka 集群的summay信息
  • 無法快速知曉集群健康狀態
  • 無法知曉業務對team kafka使用情況
  • kafka管理,監控工具稀少,沒有一個好的工具我們直接可以使用
  • 無法快速查詢topic消息

功能模塊介紹

  • Home-> 查看平臺管理的Kafka Cluster集群信息及監控信息
  • Topic-> 用戶可以在此模塊查看自己的Topic,發起申請新建Topic,同時可以對Topic進行生產消費測試。
  • Monitor-> 用戶可以在此模塊中可以查看Topic的生產以及消費情況,同時可以針對消費延遲情況設置預警信息。
  • Connect-> 實現用戶快速創建自己的Connect Job,並對自己的Connect進行維護。
  • KSQL-> 實現用戶快速創建自己的KSQL Job,並對自己的Job進行維護。
  • Approve-> 此模塊主要用於當普通用戶申請創建Topic,管理員進行審批操作。
  • Setting-> 此模塊主要功能為管理員維護User、Team以及kafka cluster信息
  • Kafka Manager-> 此模塊用於管理員對集群的正常維護操作。

系統截圖:

安裝與入門

安裝需要依賴 mysql es email server

組件 是否必須 功能
mysql 必須 配置信息存在mysql
elasticsearch(7.0+) 可選 各種監控信息的存儲
email server 可選 Apply, approval, warning e-mail alert

1、初始化

在MySQL中執行sql建表

-- Dumping database structure for kafka_center
CREATE DATABASE IF NOT EXISTS `kafka_center` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;
USE `kafka_center`;


-- Dumping structure for table kafka_center.alert_group
CREATE TABLE IF NOT EXISTS `alert_group` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `threshold` int(11) DEFAULT NULL,
  `dispause` int(11) DEFAULT NULL,
  `mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
  `webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
  `create_date` datetime DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `disable_alerta` tinyint(1) DEFAULT 0,
  `enable` tinyint(1) NOT NULL DEFAULT 1,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.cluster_info
CREATE TABLE IF NOT EXISTS `cluster_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL,
  `zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `create_time` datetime DEFAULT NULL,
  `comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `enable` int(11) DEFAULT NULL,
  `broker_size` int(4) DEFAULT 0,
  `kafka_version` varchar(10) COLLATE utf8_bin DEFAULT '',
  `location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `graf_addr` varchar(255) COLLATE utf8_bin DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.ksql_info
CREATE TABLE IF NOT EXISTS `ksql_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) DEFAULT NULL,
  `cluster_name` varchar(255) DEFAULT NULL,
  `ksql_url` varchar(255) DEFAULT NULL,
  `ksql_serverId` varchar(255) DEFAULT NULL,
  `version` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.task_info
CREATE TABLE IF NOT EXISTS `task_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT '',
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `partition` int(11) DEFAULT NULL,
  `replication` int(11) DEFAULT NULL,
  `message_rate` int(50) DEFAULT NULL,
  `ttl` int(11) DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
  `create_time` datetime DEFAULT NULL,
  `approved` int(11) DEFAULT NULL,
  `approved_id` int(11) DEFAULT NULL,
  `approved_time` datetime DEFAULT NULL,
  `approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.team_info
CREATE TABLE IF NOT EXISTS `team_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.topic_collection
CREATE TABLE IF NOT EXISTS `topic_collection` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `user_id` int(11) NOT NULL,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.topic_info
CREATE TABLE IF NOT EXISTS `topic_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `partition` int(11) DEFAULT NULL,
  `replication` int(11) DEFAULT NULL,
  `ttl` bigint(11) DEFAULT NULL,
  `config` varchar(512) COLLATE utf8_bin DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.user_info
CREATE TABLE IF NOT EXISTS `user_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `real_name` varchar(255) COLLATE utf8_bin DEFAULT '',
  `email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '100',
  `create_time` datetime DEFAULT NULL,
  `password` varchar(255) COLLATE utf8_bin DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.user_team
CREATE TABLE IF NOT EXISTS `user_team` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

2、配置

相關配置位於application.properties

可對埠 日誌等信息做一些修改

server.port=8080
debug=false
# 設置session timeout為6小時
server.servlet.session.timeout=21600
spring.security.user.name=admin
spring.security.user.password=admin
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/kafka_center?useUnicode=true&characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.maximum-pool-size=15
spring.datasource.hikari.pool-name=KafkaCenterHikariCP
spring.datasource.hikari.max-lifetime =30000
spring.datasource.hikari.connection-test-query=SELECT 1
management.health.defaults.enabled=false

public.url=http://localhost:8080
connect.url=http://localhost:8000/#/
system.topic.ttl.h=16

monitor.enable=true
monitor.collect.period.minutes=5
monitor.elasticsearch.hosts=localhost:9200
monitor.elasticsearch.index=kafka_center_monitor
#是否啟用收集線程指定集群收集
monitor.collector.include.enable=false
#收集線程指定location,必須屬於remote.locations之中
monitor.collector.include.location=dev
collect.topic.enable=true
collect.topic.period.minutes=10
# remote的功能是為了提高lag查詢和收集,解決跨location網路延遲問題
remote.query.enable=false
remote.hosts=gqc@localhost2:8080
remote.locations=dev,gqc
#發送consumer group的lag發送給alert service
alert.enable=false
alert.dispause=2
alert.service=
alert.threshold=1000
alter.env=other
#是否開啟郵件功能,true:啟用,false:禁用
mail.enable=false
spring.mail.host=
[email protected]
# oauth2
generic.enabled=false
generic.name=oauth2 Login
generic.auth_url=
generic.token_url=
generic.redirect_utl=
generic.api_url=
generic.client_id=
generic.client_secret=
generic.scopes=

3、運行

推薦使用docker

docker run -d -p 8080:8080 --name KafkaCenter -v ${PWD}/application.properties:/opt/app/kafka-center/config/application.properties xaecbd/kafka-center:2.1.0

不用docker

$ git clone https://github.com/xaecbd/KafkaCenter.git
$ cd KafkaCenter
$ mvn clean package -Dmaven.test.skip=true
$ cd KafkaCenter\KafkaCenter-Core\target
$ java -jar KafkaCenter-Core-2.1.0-SNAPSHOT.jar

4、查看

訪問http://localhost:8080 管理員用戶與密碼預設:admin / admin

功能介紹

Topics

用戶可以在此模塊完成Topic查看,已經申請新建Topic,同時可以對Topic進行生產消費測試。

Monitor

用戶可以在此模塊中可以查看Topic的生成以及消費情況,同時可以針對消費延遲情況設置預警信息。

Alerts

此模塊用於維護預警信息。用戶可以看到自己所有預警信息,管理員可以看到所有人的預警信息。

Kafka Connect

實現用戶快速創建自己的Connect Job,並對自己的Connect進行維護。

KSQL

實現用戶快速創建自己的KSQL Job,並對自己的Job進行維護。

Approve

此模塊主要用於當普通用戶申請創建Topic 或者Job時,管理員進行審批操作。

Setting

此模塊主要功能為管理員維護User、Team以及kafka cluster信息

Cluster Manager

此模塊用於管理員對集群的正常維護操作。

Home

這裡是一些基本的統計信息

My Favorite

集群與topic列表

Topic

這裡是一些topic的管理功能

Topic List

操作範圍:

用戶所屬Team的所有Topic

  • Topic -> Topic List -> Detail 查看Topic的詳細信息
  • Topic -> Topic List -> Mock 對Topic進行生產測試

申請創建topic

Important: admin不能申請task,普通用戶必須先讓管理員新建team後,將用戶加入指定team後,才可以申請task。

操作範圍:

用戶所屬Team的所有Task

  • Topic -> My Task -> Detail 查看申請的Task信息

  • Topic -> My Task -> Delete 刪除被拒絕或待審批的Task

  • Topic -> My Task -> Edit 修改被拒絕的Task

  • Topic -> My Task -> Create Topic Task 創建Task

    • 按照表單各欄位要求填寫信息
    • 點擊確認,提交申請

    審批結果:

    • 審批通過:Topic將會被創建在管理員指定的集群
    • 審批拒絕:用戶收到郵件,返回到My Task,點擊對應Task後面的Edit,針對審批意見進行修改

Topic命名規則:

只能包含:數字、大小寫字母、下劃線、中劃線、點;長度大於等於3小於等於100。

不推薦:下劃線開頭;

可對所有Topic進行消費測試

Monitor

監控模塊

生產者監控

消費者監控

消息積壓

報警功能

Connect

這裡是一些Connect的操作

KSQL

可以進行KQL的查詢操作

Approve

這裡主要是管理員做一些審核操作

  • Approve->check 審批用戶的Task
  • 根據用戶選擇的location指定cluster
  • 檢查用戶設置的partition和replication大小是否合理,如不合理做出調整
  • 檢查其他欄位是否合理,如需要拒絕該申請,點擊Reject並填寫意見。

Kafka Manager
Topic管理

Cluster管理

broker管理

group管理

Setting

這些主要是用戶的一些設置

KafkaCenter還是一個非常不錯的kafka管理工具,可以滿足大部分需求。
更多實時數據分析相關博文與科技資訊,歡迎關註 “實時流式計算”


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Redis伺服器是典型的一對多伺服器程式:一個伺服器可以與多個客戶端建立網路連接,每個客戶端可以向伺服器發送命令請求,而伺服器則接收並處理客戶端發送的命令請求,並向客戶端返回命令回覆。 通過使用由I/O多路復用技術實現的文件事件處理器,Redis伺服器使用單線程單進程的方式處理命令請求,並於多個客戶 ...
  • 本文來源於微信公眾號【胖滾豬學編程】、轉載請註明出處 在漫畫併發編程系統博文中,我們講了N篇關於鎖的知識,確實,鎖是解決併發問題的萬能鑰匙,可是併發問題只有鎖能解決嗎?今天要出場一個大BOSS:CAS無鎖演算法,可謂是併發編程核心中的核心! 溫故 首先我們再回顧一下原子性問題的原因,參考 "【漫畫】J ...
  • 今天在聽陳華軍老師的課時;感觸頗多。其中講到“不同執行計劃的選擇(子查詢)”這一欄。我們在平時工作也經常要用到子查詢。有哪些思路來優化這種子查詢呢? 例如我們今天實驗的表結構 表T1 有10000條記錄;併在id欄位創建btree索引 表T2 有1000條記錄 postgres=# create t ...
  • 新公司使用的是寶塔來部署項目,war包。在部署運行時遇到了SQLException: Access denied for user 'xxx@xxxxx' (using password:yes) 重裝mysql無果之後。就只能慢慢解決,分享下我的解決過程。 # 一. 錯誤原因 正常來講,使用nav ...
  • 1、基本概念 數據讀寫性能主要是IO次數,單次從磁碟讀取單位是頁,即便只讀取一行記錄,從磁碟中也是會讀取一頁的()單頁讀取代價高,一般都會進行預讀) (1)扇區是磁碟的最小存儲單元 (2)塊是文件系統的最小存儲單元,比如你保存一個記事本,即使只輸入一個字元,也要占用4KB的存儲,這就是最小存儲的意思 ...
  • "TOC" 實驗9:存儲過程實驗 自擬題目完成8個存儲過程的編寫及調試,熟練掌握存儲過程的使用。也可採用下圖中作業上的題目。 SQL語句代碼 實驗10:觸發器實驗 自擬題目完成5個觸發器的編寫及調試,熟練掌握觸發器的使用。也可採用下圖中作業上的題目。 SQL語句代碼 ...
  • 資料庫實驗系列之2資料庫上的基本操作實驗(針對錶、視圖及不同許可權用戶的增、刪、改、查及資料庫的備份和恢復) ...
  • 資料庫實驗系列之1資料庫及資料庫中表等資料庫對象的建立實驗(包括關係圖、完整性、許可權控制、視圖、索引等內容) ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...