kafka的安裝和基本操作

来源:https://www.cnblogs.com/paopaoT/archive/2023/06/07/17464032.html
-Advertisement-
Play Games

# 基本概念 ## 簡介 Kafka 最初是由 LinkedIn 即領英公司基於 Scala 和 Java 語言開發的分散式消息發佈-訂閱系統,現已捐獻給Apache 軟體基金會。其具有高吞吐、低延遲的特性,許多大數據實時流式處理系統比如 Storm、Spark、Flink等都能很好地與之集成。 總 ...


基本概念

簡介

Kafka 最初是由 LinkedIn 即領英公司基於 Scala 和 Java 語言開發的分散式消息發佈-訂閱系統,現已捐獻給Apache 軟體基金會。其具有高吞吐、低延遲的特性,許多大數據實時流式處理系統比如 Storm、Spark、Flink等都能很好地與之集成。

總的來講,Kafka 通常具有 3 重角色:

  • 存儲系統:通常消息隊列會把消息持久化到磁碟,防止消息丟失,保證消息可靠性。Kafka 的消息持久化機制和多副本機制使其能夠作為通用數據存儲系統來使用。
  • 消息系統:Kafka 和傳統的消息隊列比如 RabbitMQ、RocketMQ、ActiveMQ 類似,支持流量削峰、服務解耦、非同步通信等核心功能。 ==》 先進先出 ==》 只針對分區,不是全局的
  • 流處理平臺:Kafka 不僅能夠與大多數流式計算框架完美整合,並且自身也提供了一個完整的流式處理庫,即 Kafka Streaming。Kafka Streaming 提供了類似 Flink 中的視窗、聚合、變換、連接等功能。

一句話概括:Kafka 是一個分散式的基於發佈/訂閱模式的消息中間件,在業界主要應用於大數據實時流式計算領域,起解耦合和削峰填谷的作用。

特點

  • 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, 由多個consumer group 對partition進行consume操作。
  • 可擴展性:kafka集群支持熱擴展
  • 持久性、可靠性:消息被持久化到本地磁碟,並且支持數據備份防止數據丟失
  • 容錯性:允許集群中有節點失敗(若副本數量為n,則允許n-1個節點失敗)
  • 高併發:支持數千個客戶端同時讀寫

Kafka在各種應用場景中,起到的作用可以歸納為這麼幾個術語:削峰填谷,解耦!
在大數據流式計算領域中,kafka主要作為計算系統的前置緩存和輸出結果緩存;

安裝部署

kafka基於Zookeeper, 因此需要先安裝Zookeeper, 詳見https://www.cnblogs.com/paopaoT/p/17461562.html

  1. 上傳安裝包
  2. 解壓
tar -zxvf kafka_2.11-2.2.2.tgz tar  -C /opt/apps/
  1. 修改配置文件
# 進入配置文件目錄
cd kafka_2.12-2.3.1/config
# 編輯配置文件
vi server.properties
# 為依次增長的:0、1、2、3、4,集群中唯一 id
broker.id=0
# 數據存儲的⽬錄 
log.dirs=/opt/data/kafka
# 底層存儲的數據(日誌)留存時長(預設7天)
log.retention.hours=168
# 底層存儲的數據(日誌)留存量(預設1G)
log.retention.bytes=1073741824
# 指定zk集群地址
zookeeper.connect=linux01:2181,linux02:2181,linux03:2181
  1. 環境變數
vi /etc/profile
export KAFKA_HOME=/opt/apps/kafka_2.11-2.2.2
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile
  1. 分發安裝包
for  i  in {2..3}
do 
scp  -r  kafka_2.11-2.2.2  linux0$i:$PWD 
done

# 安裝包分發後,記得修改config/server.properties中的 配置參數: broker.id
# 註意:還需要分發環境變數
  1. 啟停集群(在各個節點上啟動)
bin/kafka-server-start.sh -daemon /opt/apps/kafka_2.11-2.2.2/config/server.properties

# 停止集群
bin/kafka-server-stop.sh
  1. 一鍵啟停腳本:
#!/bin/bash

case $1 in
"start"){
        for i in linux01 linux02 linux03
        do
        echo ---------- kafka $i 啟動 ------------
                ssh $i "source /etc/profile;  /opt/app/kafka2.4.1/bin/kafka-server-start.sh -daemon /opt/app/kafka2.4.1/config/server.properties"
        done
};;
"stop"){
        for i in linux01 linux02 linux03
        do
        echo ---------- kafka $i 停止 ------------
                ssh $i "source /etc/profile;  /opt/app/kafka2.4.1/bin/kafka-server-stop.sh "
        done
};;
esac

基本操作

概述

Kafka 中提供了許多命令行工具(位於$KAFKA_HOME/bin 目錄下)用於管理集群的變更。

腳本 作用
kafka-console-producer.sh 生產消息
kafka-topics.sh 管理主題
kafka-server-stop.sh 關閉Kafka服務
kafka-server-start.sh 啟動Kafka服務
kafka-configs.sh 配置管理
kafka-consumer-perf-test.sh 測試消費性能
kafka-producer-perf-test.sh 測試生產性能
kafka-dump-log.sh 查看數據日誌內容
kafka-preferred-replica-election.sh 優先副本的選舉
kafka-reassign-partitions.sh 分區重分配

管理操作:kafka-topics

創建topic

--bootstrap-server 和 --zookeeper一樣的效果 ,新版本建議使用 --bootstrap-server

kafka-topics.sh   --bootstrap-server  linux01:9092,linux02:9092,linux03:9092    --create --topic test01  --partitions 3  --replication-factor  3

參數解釋:
--replication-factor  副本數量
--partitions 分區數量 
--topic topic名稱
# 本方式,副本的存儲位置是系統自動決定的


# 手動指定分配方案:分區數,副本數,存儲位置
kafka-topics.sh --create --topic tpc-1  --zookeeper linux01:2181 --replica-assignment 0:1:3,1:2:6

該topic,將有如下partition:(2個分區 3個副本)
partition0 ,所在節點: broker0、broker1、broker3
partition1 ,所在節點: broker1、broker2、broker6

# 查看topic的狀態信息
kafka-topics.sh --describe --topic tpc-1 --zookeeper linux01:2181
Topic: tpc-1    PartitionCount: 2       ReplicationFactor: 3    Configs: 
        Topic: tpc-1    Partition: 0    Leader: 0       Replicas: 0,1,3 Isr: 0,1
        Topic: tpc-1    Partition: 1    Leader: 1       Replicas: 1,2,6 Isr: 1,2

查看topic列表

kafka-topics.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --list

kafka-topics.sh --list --zookeeper linux01:2181
__consumer_offsets
tpc-1

查看topic狀態信息

kafka-topics.sh --describe --zookeeper linux01:2181  --topic test
Topic: test     PartitionCount: 3       ReplicationFactor: 3    Configs: 
        Topic: test     Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: test     Partition: 1    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
        Topic: test     Partition: 2    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0


# topic的分區數量,以及每個分區的副本數量,以及每個副本所在的broker節點,以及每個分區的leader副本所在broker節點,以及每個分區的ISR副本列表;
# ISR: in  sync  replica ,同步副同步本(當然也包含leader自身,replica.lag.time.max.ms =30000)
# OSR:out  of  sync replicas 失去同步的副本(該副本上次請求leader同步數據距現在的時間間隔超出配置閾值)

# ISR同步副本列表
# ISR概念:(同步副本)。每個分區的leader會維護一個ISR列表,ISR列表裡面就是follower副本的Borker編號,只有跟得上Leader的 follower副本才能加入到 ISR裡面
# 這個是通過replica.lag.time.max.ms =30000(預設值)參數配置的,只有ISR里的成員才有被選為 leader 的可能。

踢出ISR和重新加入ISR的條件:

  • 踢出ISR的條件: 由replica.lag.time.max.ms =30000決定,如上圖;
  • 重新加入ISR的條件: OSR副本的LEO(log end offset)追上leader的LEO;

刪除topic

bin/kafka-topics.sh --zookeeper linux01:2181 --delete --topic test
# 刪除topic,server.properties中需要一個參數處於啟用狀態: delete.topic.enable = true(預設是true)

# 使用 kafka-topics .sh 腳本刪除主題的行為本質上只是在 ZooKeeper 中的 /admin/delete_topics 路徑下建一個與待刪除主題同名的節點,以標記該主題為待刪除的狀態。然後由 Kafka控制器非同步完成。

增加分區數

kafka-topics.sh --zookeeper linux01:2181 --alter --topic paopao --partitions 3

# Kafka只支持增加分區,不支持減少分區
# 原因是:減少分區,代價太大(數據的轉移,日誌段拼接合併)
# 如果真的需要實現此功能,則完全可以重新創建一個分區數較小的主題,然後將現有主題中的消息按照既定的邏輯複製過去;

動態配置topic參數(不常用)

# 通過管理命令,可以為已創建的topic增加、修改、刪除topic level參數
# 添加/修改  指定topic的配置參數:

kafka-topics.sh  --zookeeper linux01:2181  --alter  --topic tpc2 --config compression.type=gzip
# --config compression.type=gzip  修改或添加參數配置
# --add-config compression.type=gzip  添加參數配置
# --delete-config compression.type  刪除配置參數

生產者:kafka-console-producer

kafka-console-producer.sh --broker-list linux01:9092 --topic test01
>a
>b
>c
>hello 
>hi
>hadoop
>hive

順序輪詢(老版本)

順序分配,消息是均勻的分配給每個 partition,即每個分區存儲一次消息,輪詢策略是 Kafka Producer 提供的預設策略,如果你不使用指定的輪詢策略的話,Kafka 預設會使用順序輪訓策略的方式。

隨機分配

實現隨機分配的代碼只需要兩行,如下

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size()); 

消費者:kafka-console-consumer

消費者在消費的時候,需要指定要訂閱的主題,還可以指定消費的起始偏移量

起始偏移量的指定策略有3中:

  • earliest 起始點
  • latest 最新
  • 指定的offset( 分區號:偏移量) ==》 必須的告訴他是哪個topic 的哪個分區的哪個offset
  • 從之前所記錄的偏移量開始消費

在命令行中,可以指定從什麼地方開始消費

  1. 加上參數 --from-beginning 指定從最前面開始消費
  2. 如果不加--from-beginning 就需要分情況討論了,如果之前記錄過消費的位置,那麼就從之前消費的位置開始消費,如果說之前沒有記錄過之前消費的偏移量,那麼就從最新的位置開始消費

kafka的topic中的消息,是有序號的(序號叫消息偏移量),而且消息的偏移量是在各個partition中獨立維護的,在各個分區內,都是從0開始遞增編號!

# 消費消息
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic test01 --from-beginning
hive
hello
hadoop

# 指定從最前面開始消費
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao --from-beginning
hadoop
list
hello
kafka

# 不指定他消費的位置的時候,就是從最新的地方開始消費
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao

# 指定要消費的分區,和要消費的起始offset
# 從指定的offset(需要指定偏移量和分區號)
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao --offset 2 --partition 0
yy
abc
3333
2222

消費組

  • 消費組是kafka為了提高消費並行度的一種機制!
  • 在kafka的底層邏輯中,任何一個消費者都有自己所屬的組(如果沒有指定,系統會自己給你分配一個組id)
  • 組和組之間,沒有任何關係,大家都可以消費到目標topic的所有數據
  • 但是組內的各個消費者,就只能讀到自己所分配到的partitions
  • KAFKA中的消費組,可以動態增減消費者,而且消費組中的消費者數量發生任意變動,都會重新分配分區消費任務(消費者組在均衡策略)

如何讓多個消費者組成一個組: 就是讓這些消費者的groupId相同即可!

消費位移的記錄

kafka的消費者,可以記錄自己所消費到的消息偏移量,記錄的這個偏移量就叫(消費位移);

記錄這個消費到的位置,作用就在於消費者重啟後可以接續上一次消費到位置來繼續往後面消費;

消費位移,是組內共用的!!!消費位置記錄在一個內置的topic中 ,預設是5s提交一次位移更新。
參數:auto.commit.interval.ms 預設是5s記錄一次

#  可以使用特定的工具類 解析內置記錄偏移量的topic
kafka-console-consumer.sh --bootstrap-server linux01:9092 --from-beginning --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
# 通過指定formatter工具類,來對__consumer_offsets主題中的數據進行解析;

[g01,linux01,0]::OffsetAndMetadata(offset=14, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889851318, expireTimestamp=None)
[g01,linux01,2]::OffsetAndMetadata(offset=17, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
[g01,linux01,1]::OffsetAndMetadata(offset=13, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
[g01,linux01,0]::OffsetAndMetadata(offset=14, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)

# 如果需要獲取某個特定 consumer-group的消費偏移量信息,則需要計算該消費組的偏移量記錄所在分區: Math.abs(groupID.hashCode()) % numPartitions(50)
# 根據組id的hash取值%50 確定具體是將這個組具體每個分區消費到了哪裡
# __consumer_offsets的分區數為:50

配置管理 kafka-config

kafka-configs.sh 腳本是專門用來進行動態參數配置操作的,這裡的操作是運行狀態修改原有的配置,如此可以達到動態變更的目的;一般情況下不會進行動態修改 。
動態配置的參數,會被存儲在zookeeper上,因而是持久生效的
可用參數的查閱地址: https://kafka.apache.org/documentation/#configuration

# kafka-configs.sh 腳本包含:變更alter、查看describe 這兩種指令類型;
# kafka-configs. sh 支持主題、 broker 、用戶和客戶端這4個類型的配置。
# kafka-configs.sh 腳本使用 entity-type 參數來指定操作配置的類型,並且使 entity-name參數來指定操作配置的名稱。
# 比如查看topic的配置可以按如下方式執行:
kafka-configs.sh --zookeeper linux01:2181  --describe  --entity-type topics  --entity-name paopao

# 查看broker的動態配置可以按如下方式執行:
kafka-configs.sh  --describe --entity-type brokers --entity-name 0 --zookeeper linux01:2181

entity-type和entity-name的對應關係

image

# 示例:添加topic級別參數
kafka-configs.sh --zookeeper linux01:2181 --alter --entity-type topics --entity-name paopao --add-config cleanup.policy=compact,max.message.bytes=10000

# 示例:添加broker參數
kafka-configs.sh  --entity-type brokers --entity-name 0 --alter --add-config log.flush.interval.ms=1000 --bootstrap-server linux01:9092,linux02:9092,linux03:9092

動態配置topic參數

通過管理命令,可以為已創建的topic增加、修改、刪除topic level參數
添加/修改 指定topic的配置參數:

kafka-topics.sh  --topic paopao --alter  --config compression.type=gzip --zookeeper linux01:2181

# 如果利用 kafka-configs.sh 腳本來對topic、producer、consumer、broker等進行參數動態
# 添加、修改配置參數
kafka-configs.sh --zookeeper linux01:2181 --entity-type topics --entity-name paopao --alter --add-config compression.type=gzip

# 刪除配置參數
kafka-configs.sh --zookeeper linux01:2181 --entity-type topics --entity-name paopao --alter --delete-config compression.type

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • ## 前言 使用 C# 作為開發語言已經 15 個年頭了,受惠於 C# 的不斷更新,伴隨著大量的新特性與大量語法糖,讓我更加容易寫出簡潔、高效的代碼。日常中大量特性早已信手拈來,當然從未嘗試過的特性更是難以盡數,但是每每回憶代碼中的特性究竟是哪個版本引入的,卻頗為含糊。索性簡單整理記錄下來,用以備忘 ...
  • 大家好,我是god23bin。歡迎來到《**一分鐘學一個 Linux 命令**》系列,今天需要你花兩分鐘時間來學習下,因為今天要講的是兩個命令,`mv` 和 `cp` 命令。 ...
  • # CentOS7 本地光碟鏡像rpm包 ## 一、前言 > rpm包的下載方式 > > - 通過本地光碟鏡像下載rpm,centos7.iso鏡像文件,內置了絕大多數軟體的rpm包(本文章即演示如何配置本地rpm) > > - 線上下載rpm包,有很多軟體的官網,以及第三方軟體倉庫,會提供下載功能 ...
  • 基本語法格式: Location block 的基本語法形式是: location [=|~|~*|^~|@] pattern { ... } [=|~|~*|^~|@] 被稱作 location modifier ,這會定義 Nginx 如何去匹配其後的 pattern ,以及該 pattern ...
  • MCU:STM32F429ZIT6 開發環境:STM32CubeMX+MDK5 外購了一個SPI介面的SD Card模塊,想要實現SD卡存儲數據的功能。 首先需要打開STM32CubeMX工具。輸入開發板MCU對應型號,找到開發板對應封裝的MCU型號,雙擊打開(圖中第三)。 此時,雙擊完後會關閉此界 ...
  • MCU:STM32F103VET6 開發環境:STM32CubeMX+MDK5 實現USB的虛擬串口不需要去理解USB的底層驅動,只需要STM32CubeMX去配置生成工程即可。在野火的指南者中,是沒有這一類的視頻和示例的,博主使用這款開發板實現USB虛擬串口。 首先需要打開STM32CubeMX工 ...
  • ## 前言 本篇文章主要介紹的關於本人從剛工作到現在使用Sql一些使用方法和經驗,從最基本的SQL函數使用,到一些場景的業務場景SQL編寫。 ## SQL基礎函數使用 ### 1.欄位轉換 CASE WHEN 意義: If(a==b) a=c; 用法: 1, CASE 欄位 WHEN 欄位結果1 T ...
  • [TOC] ## 概述 Hive查看執行計劃的命令中還有兩個不怎麼常用但很重要的命令,接下來詳細介紹一下。 有一個問題:**如何在hiveSQL執行之前就探查到這段邏輯的血緣依賴關係?** hive血緣是很多生產級數倉必須要提供的功能,大多數解決方案都是**使用hive hooks的方法通過SQL執 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...