大數據Hadoop之——Kafka 圖形化工具 EFAK(EFAK環境部署)

来源:https://www.cnblogs.com/liugp/archive/2022/05/26/16307589.html
-Advertisement-
Play Games

一、概述 EFAK(Eagle For Apache Kafka,以前稱為 Kafka Eagle)是一款由國內公司開源的Kafka集群監控系統,可以用來監視kafka集群的broker狀態、Topic信息、IO、記憶體、consumer線程、偏移量等信息,併進行可視化圖表展示。獨特的KQL還可以通過 ...


目錄

一、概述

EFAK(Eagle For Apache Kafka,以前稱為 Kafka Eagle)是一款由國內公司開源的Kafka集群監控系統,可以用來監視kafka集群的broker狀態、Topic信息、IO、記憶體、consumer線程、偏移量等信息,併進行可視化圖表展示。獨特的KQL還可以通過SQL線上查詢kafka中的數據。

源碼: https://github.com/smartloli/kafka-eagle/
下載: http://download.kafka-eagle.org/
官方文檔:https://www.kafka-eagle.org/articles/docs/documentation.html

二、EFAK架構

EFAK分散式模式部署,這裡以5個節點為例子(1個Master和4個Slave),各個節點的角色如下如所示:

三、EFAK數據採集原理

對於 Kafka,我們可以收集以下數據

  • Kafka broker常用機器載入信息:記憶體、cpu、IP、版本等。
  • 服務監控數據:TPS、QPS、RT等
  • 應用程式監控:組、消費者、生產者、主題等。

因為EFAK是kafka的監控系統,所以前提是需先安裝Kafka和Zookeeper。

四、安裝Kafka

kafka官網文檔:https://kafka.apache.org/documentation/

1)Kafka下載

$ cd /opt/bigdata/hadoop/software
$ wget https://dlcdn.apache.org/kafka/3.1.1/kafka_2.13-3.1.1.tgz
$ tar -xf kafka_2.13-3.1.1.tgz -C /opt/bigdata/hadoop/server/

2)配置環境變數

$ vi /etc/profile
export KAFKA_HOME=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1
export PATH=$PATH:$KAFKA_HOME/bin

$ source /etc/profile

3)創建logs目錄

$ mkdir $KAFKA_HOME/logs

4)修改kafka配置

$ cd $KAFKA_HOME
# 查看現有配置,去掉空行和註釋
$ cat config/server.properties |grep -v '^$\|^#'
$ cat > $KAFKA_HOME/config/server.properties <<EOF
#broker的全局唯一編號,不能重覆
broker.id=0

#刪除topic功能使能
delete.topic.enable=true
#處理網路請求的線程數量
num.network.threads=3
#用來處理磁碟IO的現成數量
num.io.threads=8
#發送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接收套接字的緩衝區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩衝區大小
socket.request.max.bytes=104857600
#kafka數據的存儲位置
log.dirs=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/logs
#topic在當前broker上的分區個數
num.partitions=1
#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=hadoop-node1:12181,hadoop-node2:12181,hadoop-node3:12181
#zookeeper連接超時時間
zookeeper.connection.timeout.ms=60000
EOF

5)修改zookeeper配置

# 創建zookeeper data和logs目錄
$ mkdir $KAFKA_HOME/zookeeper_data $KAFKA_HOME/zookeeper_logs 
$ vi $KAFKA_HOME/config/zookeeper.properties
# 配置主要修改如下:
#數據目錄
dataDir=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/zookeeper_data
#日誌目錄
dataLogDir=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/zookeeper_logs
#心跳間隔時間,zookeeper中使用的基本時間單位,毫秒值。每隔2秒發送一個心跳,session時間tickTime*2
tickTime=2000
#leader與客戶端連接超時時間。表示5個心跳間隔
initLimit=5
#Leader與Follower之間的超時時間,表示2個心跳間隔
syncLimit=2
#客戶端連接埠,預設埠2181
clientPort=12181
# zookeeper集群配置項,server.1,server.2,server.3是zk集群節點;hadoop-node1,hadoop-node2,hadoop-node3是主機名稱;2888是主從通信埠;3888用來選舉leader
server.1=hadoop-node1:2888:3888
server.2=hadoop-node2:2888:3888
server.3=hadoop-node3:2888:3888

6)配置Zookeeper myid

# 在hadoop-node1配置如下:
$ echo 1 > $KAFKA_HOME/zookeeper_data/myid
# 在hadoop-node2配置如下:
$ echo 2 > $KAFKA_HOME/zookeeper_data/myid
# 在hadoop-node3配置如下:
$ echo 3 > $KAFKA_HOME/zookeeper_data/myid

7)開啟Kafka JMX監控

# 在kafka-server-start.sh文件中添加export JMX_PORT="9988",埠自定義就行
$ vi $KAFKA_HOME/bin/kafka-server-start.sh

重啟kafka

$ $KAFKA_HOME/bin/kafka-server-stop.sh ; $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

【問題】如遇以下報錯:

ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The Cluster ID ELIH-KKbRP-NnPnHt4z-lA doesn't match stored clusterId Some(2HC_x7bTR_u2bCxrqw0Otw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.scala:228) at kafka.Kafka$.main(Kafka.scala:109) at kafka.Kafka.main(Kafka.scala)

【解決】在server.properties找到log.dirs配置的路徑。將該路徑下的meta.properties文件刪除,或者編輯meta.properties文件修改裡面的cluster.id即可。

8)將kafka目錄推送到其它節點

$ scp -r $KAFKA_HOME hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r $KAFKA_HOME hadoop-node3:/opt/bigdata/hadoop/server/
# 在hadoop-node2和hadoop-node3節點上設置環境變數
$ vi /etc/profile
export KAFKA_HOME=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1
export PATH=$PATH:$KAFKA_HOME/bin

$ source /etc/profile

# 修改advertised.listeners,值改成對應的hostname或者ip

【溫馨提示】修改hadoop-node2上server.properties文件的broker.id,設置為1和2,只要不重覆就行,advertised.listeners地址改成對應機器IP。

9)啟動服務

啟動zookeeper集群之後再啟動kafka集群

$ cd $KAFKA_HOME
# -daemon後臺啟動
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
# 預設埠2181,可以在配置自定義,這裡修改為12181埠
$ netstat -tnlp|grep 12181

啟動Kafka

$ cd $KAFKA_HOME
# 預設埠9092,這裡修改成了19092,可以修改listeners和advertised.listeners
$ ./bin/kafka-server-start.sh -daemon ./config/server.properties
$ netstat -tnlp|grep 9092
$ jps

五、安裝EFAK

1)下載EFAK

$ cd /opt/bigdata/hadoop/software
$ wget https://github.com/smartloli/kafka-eagle-bin/archive/v2.1.0.tar.gz
$ tar -xf kafka-eagle-bin-2.1.0.tar.gz -C /opt/bigdata/hadoop/server/
$ cd /opt/bigdata/hadoop/server/
$ tar -xf 

2)創建資料庫

$ mysql -uroot -p 
123456

create database ke;

2)設置環境變數

$ vi /etc/profile
export KE_HOME=/opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0/efak-web-2.1.0
export PATH=$PATH:$KE_HOME/bin

$ source /etc/profile

3)配置

這裡設置hadoop-node1為master節點,其它兩個節點為slave節點,修改參數

$ vi $KE_HOME/conf/system-config.properties
# Multi zookeeper&kafka cluster list -- The client connection address of the Zookeeper cluster is set here
efak.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop-node1:12181,hadoop-node2:12181,hadoop-node3:12181

######################################
# kafka jmx 地址,預設Apache發佈的Kafka基本是這個預設值,
# 對於一些公有雲Kafka廠商,它們會修改這個值,
# 比如會將jmxrmi修改為kafka或者是其它的值,
# 若是選擇的公有雲廠商的Kafka,可以根據實際的值來設置該屬性
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

# Zkcli limit -- Zookeeper cluster allows the number of clients to connect to
# If you enable distributed mode, you can set value to 4 or 8
kafka.zk.limit.size=16

# EFAK webui port -- WebConsole port access address
efak.webui.port=8048

######################################
# EFAK enable distributed,啟用分散式部署
######################################
efak.distributed.enable=true

# 設置節點類型slave or master
# master worknode set status to master, other node set status to slave
efak.cluster.mode.status=master
# deploy efak server address
efak.worknode.master.host=hadoop-node1
efak.worknode.port=8085

# Kafka offset storage -- Offset stored in a Kafka cluster, if stored in the zookeeper, you can not use this option
cluster1.efak.offset.storage=kafka

# Whether the Kafka performance monitoring diagram is enabled
efak.metrics.charts=true

# EFAK keeps data for 30 days by default
efak.metrics.retain=15

# If offset is out of range occurs, enable this property -- Only suitable for kafka sql
efak.sql.fix.error=false
efak.sql.topic.records.max=5000

# Delete kafka topic token -- Set to delete the topic token, so that administrators can have the right to delete
efak.topic.token=keadmin


# 關閉自帶的sqlite資料庫,使用外部的mysql資料庫
# Default use sqlite to store data
# efak.driver=org.sqlite.JDBC
# It is important to note that the '/hadoop/kafka-eagle/db' path must be exist.
# efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
# efak.username=root
# efak.password=smartloli

# 配置外部資料庫
# (Optional) set mysql address
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://hadoop-node1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456

4)調整啟動參數

EFAK預設啟動記憶體大小為2G,考慮到伺服器情況可以將其調小

## 在 efak 安裝目錄執行
$ vi $KE_HOME/bin/ke.sh
## 將 KE_JAVA_OPTS 最大最小容量調小,例如:
export KE_JAVA_OPTS="-server -Xmx512m -Xms512m -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"

6)修改 works配置

預設是localhost

$  cat >$KE_HOME/conf/works<<EOF
hadoop-node2
hadoop-node3
EOF

7)將EFAK推送其它節點

# hadoop-node1推送
$ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node3:/opt/bigdata/hadoop/server/
# 在hadoop-node2和hadoop-node3配置環境變數修改節點類型為slave

8)啟動

上面配置文件配置的是分散式部署,當然也可以單機跑,但是不建議單機跑

# 在master節點上執行
$ cd $KE_HOME/bin
$ chmod +x ke.sh 
# 單機版啟動
$ ke.sh start
# 集群方式啟動
$ ke.sh cluster start
$ ke.sh cluster restart

web UI:http://192.168.0.113:8048/
賬號密碼:admin/123456


六、簡單使用

1)Kafka CLI簡單使用

【增】添加topic

# 創建topic,1副本,1分區,設置數據過期時間72小時(-1表示不過期),單位ms,72*3600*1000=259200000
$ kafka-topics.sh --create --topic test002 --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092  --partitions 1 --replication-factor 1 --config retention.ms=259200000

【查】

# 查看topic列表
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --list
# 查看topic列表詳情
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe
# 指定topic
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe --topic test002
# 查看消費者組
$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --list
$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe  --group test002

【改】這裡主要是修改最常用的三個參數:分區、副本,過期時間

# 修改分區,擴分區,不能減少分區
$ kafka-topics.sh --alter --bootstrap-server hadoop-node1:9092 --topic test002 --partitions 2
# 修改過期時間,下麵兩行都可以
$ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --topic test002 --add-config retention.ms=86400000
$ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --entity-name test002 --entity-type topics --add-config retention.ms=86400000

# 修改副本數,將副本數修改成3
$ cat >1.json<<EOF
{"version":1,
"partitions":[
{"topic":"test002","partition":0,"replicas":[0,1,2]},
{"topic":"test002","partition":1,"replicas":[1,2,0]},
{"topic":"test002","partition":2,"replicas":[2,0,1]}
]}
EOF
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe --topic test002

【刪】

$ kafka-topics.sh --delete --topic test002 --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092

【生成者】

$ kafka-console-producer.sh --broker-list hadoop-node1:9092 --topic test002
{"id":"1","name":"n1","age":"20"}
{"id":"2","name":"n2","age":"21"}
{"id":"3","name":"n3","age":"22"}

【消費者】

# 從頭開始消費
$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --from-beginning
# 指定從分區的某個位置開始消費,這裡只指定了一個分區,可以多寫幾行或者遍歷對應的所有分區
$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --partition 0 --offset 100

【消費組】

$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --group test002

【查看數據積壓】

$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe --group test002

2)EFAK常用命令

$KE_HOME/bin/ke.sh啟動腳本中包含以下命令:

命令 描述
ke.sh start 啟動 EFAK 伺服器。
ke.sh status 查看 EFAK 運行狀態。
ke.sh stop 停止 EFAK 伺服器。
ke.sh restart 重新啟動 EFAK 伺服器。
ke.sh stats 查看 linux 操作系統中的 EFAK 句柄數。
ke.sh find [ClassName] 在 jar 中找到類名的位置。
ke.sh gc 查看 EFAK 進程 gc。
ke.sh version 查看 EFAK 版本。
ke.sh jdk 查看 EFAK 安裝的 jdk 詳細信息。
ke.sh sdate 查看 EFAK 啟動日期。
ke.sh cluster start 查看 EFAK 集群分散式啟動。
ke.sh cluster status 查看 EFAK 集群分散式狀態。
ke.sh cluster stop 查看 EFAK 集群分散式停止。
ke.sh cluster restart 查看 EFAK 集群分散式重啟。

EFAK環境部署,和kafka的一些簡單操作就到這裡了,後續會分享KSQL和其它更詳細的頁面化操作,請小伙伴耐心等待~


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在基於SqlSugar的開發框架中,我們設計了一些系統服務層的基類,在基類中會有很多涉及到相關的數據處理操作的,如果需要跟蹤具體是那個用戶進行操作的,那麼就需要獲得當前用戶的身份信息,包括在Web API的控制器中也是一樣,需要獲得對應的用戶身份信息,才能進行相關的身份鑒別和處理操作。本篇隨筆介紹基... ...
  • 在前面隨筆,我們介紹過這個基於SqlSugar的開發框架,我們區分Interface、Modal、Service三個目錄來放置不同的內容,其中Modal是SqlSugar的映射實體,Interface是定義訪問介面,Service是提供具體的數據操作實現。在Service層中,往往除了本身的一些增刪... ...
  • 許可權術語 Subject:用戶,用戶組 Action:對Object的操作,如增刪改查等 Object:許可權作用的對象,也可以理解為資源 Effect:規則的作用,如允許,拒絕 Condition:生效條件 Permission:允許(拒絕)用戶(用戶組)在條件允許下對對象(資源)的動作 Role: ...
  • 1.什麼是quota 簡單的說就是限制用戶對磁碟空間的使用量。 因為Linux是多用戶多任務的操作系統,許多人共用磁碟空間,為了合理的分配磁碟空間,於是就有了quota的出現。 2.quota的用途 顯示磁碟使用情況和配額 3.quota的一般作用對象 (1)針對WWW server (2)針對ma ...
  • 1、引言 最近在查一個bug,查到最後發現是數組越界導致的。數組只有30個位元組,代碼卻向這個數組填充了35個數據,這個bug還是偶現的,查到它確實廢了一番功夫。我就突然想到:C語言為什麼不檢查數組下標呢???先來個demo驗證下 #include<stdio.h> #include<stdlib.h ...
  • 一 、通過雲開發平臺快速創建初始化應用 1.創建相關應用模版請參考鏈接:基於 vue.js 的 SSR 技術—Nuxt.js // 註意在後面提示中,上移下移,按空格選中 Element 2.完成創建後就可以在github中查看到新增的Nuxt倉庫 二 、 本地編寫 流程圖、拓撲圖項目 1.將應用模 ...
  • 思路: 1、執行df -h 找到 帶mnt的行。將結果存入一個文件中。 system("df -h |grep mnt >./extendevinfo.txt"); 也可以直接popen用管道打開,感覺效率可能會更高一些。 2、解析文件中最後/mnt/XXX部分即為掛載路徑。(具體看自己內核掛載路徑 ...
  • 為什麼要使用Nuxt.js Nuxt 基於一個強大的模塊化架構。你可以從 50 多個模塊中進行選擇,讓你的開發變得更快、更簡單。對 PWA 的支持、添加谷歌分析到你的網頁或生成網站地圖,這些功能都無需重新發明輪子來獲得。 Nuxt.js 預設會優化你的應用程式。我們儘可能地利用 Vue.js 和 N ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...