一站式Kafka平臺解決方案——KafkaCenter

来源:https://www.cnblogs.com/tree1123/archive/2020/05/18/12908653.html
-Advertisement-
Play Games

KafkaCenter是什麼 KafkaCenter是一個針對Kafka的一站式,解決方案。用於Kafka集群的維護與管理,生產者和消費者的監控,以及Kafka部分生態組件的使用。 對於Kafka的平臺化,一直缺少一個成熟的解決方案,之前比較流行的kafka監控方案,如kafka manager提供 ...


KafkaCenter是什麼

KafkaCenter是一個針對Kafka的一站式,解決方案。用於Kafka集群的維護與管理,生產者和消費者的監控,以及Kafka部分生態組件的使用。

對於Kafka的平臺化,一直缺少一個成熟的解決方案,之前比較流行的kafka監控方案,如kafka-manager提供了集群管理與topic管理等等功能。但是對於生產者、消費者的監控,以及Kafka的新生態,如Connect,KSQL還缺少響應的支持。Confluent Control Center功能要完整一些,但卻是非開源收費的。

對於Kafka的使用,一直都是一個讓人頭疼的問題,由於實時系統的強運維特性,我們不得不投入大量的時間用於集群的維護,kafka的運維,比如:

  • 人工創建topic,特別費力
  • 相關kafka運維,監控孤島化
  • 現有消費監控工具監控不准確
  • 無法拿到Kafka 集群的summay信息
  • 無法快速知曉集群健康狀態
  • 無法知曉業務對team kafka使用情況
  • kafka管理,監控工具稀少,沒有一個好的工具我們直接可以使用
  • 無法快速查詢topic消息

功能模塊介紹

  • Home-> 查看平臺管理的Kafka Cluster集群信息及監控信息
  • Topic-> 用戶可以在此模塊查看自己的Topic,發起申請新建Topic,同時可以對Topic進行生產消費測試。
  • Monitor-> 用戶可以在此模塊中可以查看Topic的生產以及消費情況,同時可以針對消費延遲情況設置預警信息。
  • Connect-> 實現用戶快速創建自己的Connect Job,並對自己的Connect進行維護。
  • KSQL-> 實現用戶快速創建自己的KSQL Job,並對自己的Job進行維護。
  • Approve-> 此模塊主要用於當普通用戶申請創建Topic,管理員進行審批操作。
  • Setting-> 此模塊主要功能為管理員維護User、Team以及kafka cluster信息
  • Kafka Manager-> 此模塊用於管理員對集群的正常維護操作。

系統截圖:

安裝與入門

安裝需要依賴 mysql es email server

組件 是否必須 功能
mysql 必須 配置信息存在mysql
elasticsearch(7.0+) 可選 各種監控信息的存儲
email server 可選 Apply, approval, warning e-mail alert

1、初始化

在MySQL中執行sql建表

-- Dumping database structure for kafka_center
CREATE DATABASE IF NOT EXISTS `kafka_center` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;
USE `kafka_center`;


-- Dumping structure for table kafka_center.alert_group
CREATE TABLE IF NOT EXISTS `alert_group` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `threshold` int(11) DEFAULT NULL,
  `dispause` int(11) DEFAULT NULL,
  `mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
  `webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
  `create_date` datetime DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `disable_alerta` tinyint(1) DEFAULT 0,
  `enable` tinyint(1) NOT NULL DEFAULT 1,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.cluster_info
CREATE TABLE IF NOT EXISTS `cluster_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL,
  `zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `create_time` datetime DEFAULT NULL,
  `comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `enable` int(11) DEFAULT NULL,
  `broker_size` int(4) DEFAULT 0,
  `kafka_version` varchar(10) COLLATE utf8_bin DEFAULT '',
  `location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `graf_addr` varchar(255) COLLATE utf8_bin DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.ksql_info
CREATE TABLE IF NOT EXISTS `ksql_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) DEFAULT NULL,
  `cluster_name` varchar(255) DEFAULT NULL,
  `ksql_url` varchar(255) DEFAULT NULL,
  `ksql_serverId` varchar(255) DEFAULT NULL,
  `version` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.task_info
CREATE TABLE IF NOT EXISTS `task_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT '',
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `partition` int(11) DEFAULT NULL,
  `replication` int(11) DEFAULT NULL,
  `message_rate` int(50) DEFAULT NULL,
  `ttl` int(11) DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
  `create_time` datetime DEFAULT NULL,
  `approved` int(11) DEFAULT NULL,
  `approved_id` int(11) DEFAULT NULL,
  `approved_time` datetime DEFAULT NULL,
  `approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.team_info
CREATE TABLE IF NOT EXISTS `team_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.topic_collection
CREATE TABLE IF NOT EXISTS `topic_collection` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `user_id` int(11) NOT NULL,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.topic_info
CREATE TABLE IF NOT EXISTS `topic_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `partition` int(11) DEFAULT NULL,
  `replication` int(11) DEFAULT NULL,
  `ttl` bigint(11) DEFAULT NULL,
  `config` varchar(512) COLLATE utf8_bin DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.user_info
CREATE TABLE IF NOT EXISTS `user_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `real_name` varchar(255) COLLATE utf8_bin DEFAULT '',
  `email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
  `role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '100',
  `create_time` datetime DEFAULT NULL,
  `password` varchar(255) COLLATE utf8_bin DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.user_team
CREATE TABLE IF NOT EXISTS `user_team` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

2、配置

相關配置位於application.properties

可對埠 日誌等信息做一些修改

server.port=8080
debug=false
# 設置session timeout為6小時
server.servlet.session.timeout=21600
spring.security.user.name=admin
spring.security.user.password=admin
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/kafka_center?useUnicode=true&characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.maximum-pool-size=15
spring.datasource.hikari.pool-name=KafkaCenterHikariCP
spring.datasource.hikari.max-lifetime =30000
spring.datasource.hikari.connection-test-query=SELECT 1
management.health.defaults.enabled=false

public.url=http://localhost:8080
connect.url=http://localhost:8000/#/
system.topic.ttl.h=16

monitor.enable=true
monitor.collect.period.minutes=5
monitor.elasticsearch.hosts=localhost:9200
monitor.elasticsearch.index=kafka_center_monitor
#是否啟用收集線程指定集群收集
monitor.collector.include.enable=false
#收集線程指定location,必須屬於remote.locations之中
monitor.collector.include.location=dev
collect.topic.enable=true
collect.topic.period.minutes=10
# remote的功能是為了提高lag查詢和收集,解決跨location網路延遲問題
remote.query.enable=false
remote.hosts=gqc@localhost2:8080
remote.locations=dev,gqc
#發送consumer group的lag發送給alert service
alert.enable=false
alert.dispause=2
alert.service=
alert.threshold=1000
alter.env=other
#是否開啟郵件功能,true:啟用,false:禁用
mail.enable=false
spring.mail.host=
[email protected]
# oauth2
generic.enabled=false
generic.name=oauth2 Login
generic.auth_url=
generic.token_url=
generic.redirect_utl=
generic.api_url=
generic.client_id=
generic.client_secret=
generic.scopes=

3、運行

推薦使用docker

docker run -d -p 8080:8080 --name KafkaCenter -v ${PWD}/application.properties:/opt/app/kafka-center/config/application.properties xaecbd/kafka-center:2.1.0

不用docker

$ git clone https://github.com/xaecbd/KafkaCenter.git
$ cd KafkaCenter
$ mvn clean package -Dmaven.test.skip=true
$ cd KafkaCenter\KafkaCenter-Core\target
$ java -jar KafkaCenter-Core-2.1.0-SNAPSHOT.jar

4、查看

訪問http://localhost:8080 管理員用戶與密碼預設:admin / admin

功能介紹

Topics

用戶可以在此模塊完成Topic查看,已經申請新建Topic,同時可以對Topic進行生產消費測試。

Monitor

用戶可以在此模塊中可以查看Topic的生成以及消費情況,同時可以針對消費延遲情況設置預警信息。

Alerts

此模塊用於維護預警信息。用戶可以看到自己所有預警信息,管理員可以看到所有人的預警信息。

Kafka Connect

實現用戶快速創建自己的Connect Job,並對自己的Connect進行維護。

KSQL

實現用戶快速創建自己的KSQL Job,並對自己的Job進行維護。

Approve

此模塊主要用於當普通用戶申請創建Topic 或者Job時,管理員進行審批操作。

Setting

此模塊主要功能為管理員維護User、Team以及kafka cluster信息

Cluster Manager

此模塊用於管理員對集群的正常維護操作。

Home

這裡是一些基本的統計信息

My Favorite

集群與topic列表

Topic

這裡是一些topic的管理功能

Topic List

操作範圍:

用戶所屬Team的所有Topic

  • Topic -> Topic List -> Detail 查看Topic的詳細信息
  • Topic -> Topic List -> Mock 對Topic進行生產測試

申請創建topic

Important: admin不能申請task,普通用戶必須先讓管理員新建team後,將用戶加入指定team後,才可以申請task。

操作範圍:

用戶所屬Team的所有Task

  • Topic -> My Task -> Detail 查看申請的Task信息

  • Topic -> My Task -> Delete 刪除被拒絕或待審批的Task

  • Topic -> My Task -> Edit 修改被拒絕的Task

  • Topic -> My Task -> Create Topic Task 創建Task

    • 按照表單各欄位要求填寫信息
    • 點擊確認,提交申請

    審批結果:

    • 審批通過:Topic將會被創建在管理員指定的集群
    • 審批拒絕:用戶收到郵件,返回到My Task,點擊對應Task後面的Edit,針對審批意見進行修改

Topic命名規則:

只能包含:數字、大小寫字母、下劃線、中劃線、點;長度大於等於3小於等於100。

不推薦:下劃線開頭;

可對所有Topic進行消費測試

Monitor

監控模塊

生產者監控

消費者監控

消息積壓

報警功能

Connect

這裡是一些Connect的操作

KSQL

可以進行KQL的查詢操作

Approve

這裡主要是管理員做一些審核操作

  • Approve->check 審批用戶的Task
  • 根據用戶選擇的location指定cluster
  • 檢查用戶設置的partition和replication大小是否合理,如不合理做出調整
  • 檢查其他欄位是否合理,如需要拒絕該申請,點擊Reject並填寫意見。

Kafka Manager
Topic管理

Cluster管理

broker管理

group管理

Setting

這些主要是用戶的一些設置

KafkaCenter還是一個非常不錯的kafka管理工具,可以滿足大部分需求。
更多實時數據分析相關博文與科技資訊,歡迎關註 “實時流式計算”


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Redis伺服器是典型的一對多伺服器程式:一個伺服器可以與多個客戶端建立網路連接,每個客戶端可以向伺服器發送命令請求,而伺服器則接收並處理客戶端發送的命令請求,並向客戶端返回命令回覆。 通過使用由I/O多路復用技術實現的文件事件處理器,Redis伺服器使用單線程單進程的方式處理命令請求,並於多個客戶 ...
  • 本文來源於微信公眾號【胖滾豬學編程】、轉載請註明出處 在漫畫併發編程系統博文中,我們講了N篇關於鎖的知識,確實,鎖是解決併發問題的萬能鑰匙,可是併發問題只有鎖能解決嗎?今天要出場一個大BOSS:CAS無鎖演算法,可謂是併發編程核心中的核心! 溫故 首先我們再回顧一下原子性問題的原因,參考 "【漫畫】J ...
  • 今天在聽陳華軍老師的課時;感觸頗多。其中講到“不同執行計劃的選擇(子查詢)”這一欄。我們在平時工作也經常要用到子查詢。有哪些思路來優化這種子查詢呢? 例如我們今天實驗的表結構 表T1 有10000條記錄;併在id欄位創建btree索引 表T2 有1000條記錄 postgres=# create t ...
  • 新公司使用的是寶塔來部署項目,war包。在部署運行時遇到了SQLException: Access denied for user 'xxx@xxxxx' (using password:yes) 重裝mysql無果之後。就只能慢慢解決,分享下我的解決過程。 # 一. 錯誤原因 正常來講,使用nav ...
  • 1、基本概念 數據讀寫性能主要是IO次數,單次從磁碟讀取單位是頁,即便只讀取一行記錄,從磁碟中也是會讀取一頁的()單頁讀取代價高,一般都會進行預讀) (1)扇區是磁碟的最小存儲單元 (2)塊是文件系統的最小存儲單元,比如你保存一個記事本,即使只輸入一個字元,也要占用4KB的存儲,這就是最小存儲的意思 ...
  • "TOC" 實驗9:存儲過程實驗 自擬題目完成8個存儲過程的編寫及調試,熟練掌握存儲過程的使用。也可採用下圖中作業上的題目。 SQL語句代碼 實驗10:觸發器實驗 自擬題目完成5個觸發器的編寫及調試,熟練掌握觸發器的使用。也可採用下圖中作業上的題目。 SQL語句代碼 ...
  • 資料庫實驗系列之2資料庫上的基本操作實驗(針對錶、視圖及不同許可權用戶的增、刪、改、查及資料庫的備份和恢復) ...
  • 資料庫實驗系列之1資料庫及資料庫中表等資料庫對象的建立實驗(包括關係圖、完整性、許可權控制、視圖、索引等內容) ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...