# 基本概念 ## 簡介 Kafka 最初是由 LinkedIn 即領英公司基於 Scala 和 Java 語言開發的分散式消息發佈-訂閱系統,現已捐獻給Apache 軟體基金會。其具有高吞吐、低延遲的特性,許多大數據實時流式處理系統比如 Storm、Spark、Flink等都能很好地與之集成。 總 ...
基本概念
簡介
Kafka 最初是由 LinkedIn 即領英公司基於 Scala 和 Java 語言開發的分散式消息發佈-訂閱系統,現已捐獻給Apache 軟體基金會。其具有高吞吐、低延遲的特性,許多大數據實時流式處理系統比如 Storm、Spark、Flink等都能很好地與之集成。
總的來講,Kafka 通常具有 3 重角色:
- 存儲系統:通常消息隊列會把消息持久化到磁碟,防止消息丟失,保證消息可靠性。Kafka 的消息持久化機制和多副本機制使其能夠作為通用數據存儲系統來使用。
- 消息系統:Kafka 和傳統的消息隊列比如 RabbitMQ、RocketMQ、ActiveMQ 類似,支持流量削峰、服務解耦、非同步通信等核心功能。 ==》 先進先出 ==》 只針對分區,不是全局的
- 流處理平臺:Kafka 不僅能夠與大多數流式計算框架完美整合,並且自身也提供了一個完整的流式處理庫,即 Kafka Streaming。Kafka Streaming 提供了類似 Flink 中的視窗、聚合、變換、連接等功能。
一句話概括:Kafka 是一個分散式的基於發佈/訂閱模式的消息中間件,在業界主要應用於大數據實時流式計算領域,起解耦合和削峰填谷的作用。
特點
- 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, 由多個consumer group 對partition進行consume操作。
- 可擴展性:kafka集群支持熱擴展
- 持久性、可靠性:消息被持久化到本地磁碟,並且支持數據備份防止數據丟失
- 容錯性:允許集群中有節點失敗(若副本數量為n,則允許n-1個節點失敗)
- 高併發:支持數千個客戶端同時讀寫
Kafka在各種應用場景中,起到的作用可以歸納為這麼幾個術語:削峰填谷,解耦!
在大數據流式計算領域中,kafka主要作為計算系統的前置緩存和輸出結果緩存;
安裝部署
kafka基於Zookeeper, 因此需要先安裝Zookeeper, 詳見https://www.cnblogs.com/paopaoT/p/17461562.html
- 上傳安裝包
- 解壓
tar -zxvf kafka_2.11-2.2.2.tgz tar -C /opt/apps/
- 修改配置文件
# 進入配置文件目錄
cd kafka_2.12-2.3.1/config
# 編輯配置文件
vi server.properties
# 為依次增長的:0、1、2、3、4,集群中唯一 id
broker.id=0
# 數據存儲的⽬錄
log.dirs=/opt/data/kafka
# 底層存儲的數據(日誌)留存時長(預設7天)
log.retention.hours=168
# 底層存儲的數據(日誌)留存量(預設1G)
log.retention.bytes=1073741824
# 指定zk集群地址
zookeeper.connect=linux01:2181,linux02:2181,linux03:2181
- 環境變數
vi /etc/profile
export KAFKA_HOME=/opt/apps/kafka_2.11-2.2.2
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
- 分發安裝包
for i in {2..3}
do
scp -r kafka_2.11-2.2.2 linux0$i:$PWD
done
# 安裝包分發後,記得修改config/server.properties中的 配置參數: broker.id
# 註意:還需要分發環境變數
- 啟停集群(在各個節點上啟動)
bin/kafka-server-start.sh -daemon /opt/apps/kafka_2.11-2.2.2/config/server.properties
# 停止集群
bin/kafka-server-stop.sh
- 一鍵啟停腳本:
#!/bin/bash
case $1 in
"start"){
for i in linux01 linux02 linux03
do
echo ---------- kafka $i 啟動 ------------
ssh $i "source /etc/profile; /opt/app/kafka2.4.1/bin/kafka-server-start.sh -daemon /opt/app/kafka2.4.1/config/server.properties"
done
};;
"stop"){
for i in linux01 linux02 linux03
do
echo ---------- kafka $i 停止 ------------
ssh $i "source /etc/profile; /opt/app/kafka2.4.1/bin/kafka-server-stop.sh "
done
};;
esac
基本操作
概述
Kafka 中提供了許多命令行工具(位於$KAFKA_HOME/bin 目錄下)用於管理集群的變更。
腳本 | 作用 |
---|---|
kafka-console-producer.sh | 生產消息 |
kafka-topics.sh | 管理主題 |
kafka-server-stop.sh | 關閉Kafka服務 |
kafka-server-start.sh | 啟動Kafka服務 |
kafka-configs.sh | 配置管理 |
kafka-consumer-perf-test.sh | 測試消費性能 |
kafka-producer-perf-test.sh | 測試生產性能 |
kafka-dump-log.sh | 查看數據日誌內容 |
kafka-preferred-replica-election.sh | 優先副本的選舉 |
kafka-reassign-partitions.sh | 分區重分配 |
管理操作:kafka-topics
創建topic
--bootstrap-server 和 --zookeeper一樣的效果 ,新版本建議使用 --bootstrap-server
kafka-topics.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --create --topic test01 --partitions 3 --replication-factor 3
參數解釋:
--replication-factor 副本數量
--partitions 分區數量
--topic topic名稱
# 本方式,副本的存儲位置是系統自動決定的
# 手動指定分配方案:分區數,副本數,存儲位置
kafka-topics.sh --create --topic tpc-1 --zookeeper linux01:2181 --replica-assignment 0:1:3,1:2:6
該topic,將有如下partition:(2個分區 3個副本)
partition0 ,所在節點: broker0、broker1、broker3
partition1 ,所在節點: broker1、broker2、broker6
# 查看topic的狀態信息
kafka-topics.sh --describe --topic tpc-1 --zookeeper linux01:2181
Topic: tpc-1 PartitionCount: 2 ReplicationFactor: 3 Configs:
Topic: tpc-1 Partition: 0 Leader: 0 Replicas: 0,1,3 Isr: 0,1
Topic: tpc-1 Partition: 1 Leader: 1 Replicas: 1,2,6 Isr: 1,2
查看topic列表
kafka-topics.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --list
kafka-topics.sh --list --zookeeper linux01:2181
__consumer_offsets
tpc-1
查看topic狀態信息
kafka-topics.sh --describe --zookeeper linux01:2181 --topic test
Topic: test PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
# topic的分區數量,以及每個分區的副本數量,以及每個副本所在的broker節點,以及每個分區的leader副本所在broker節點,以及每個分區的ISR副本列表;
# ISR: in sync replica ,同步副同步本(當然也包含leader自身,replica.lag.time.max.ms =30000)
# OSR:out of sync replicas 失去同步的副本(該副本上次請求leader同步數據距現在的時間間隔超出配置閾值)
# ISR同步副本列表
# ISR概念:(同步副本)。每個分區的leader會維護一個ISR列表,ISR列表裡面就是follower副本的Borker編號,只有跟得上Leader的 follower副本才能加入到 ISR裡面
# 這個是通過replica.lag.time.max.ms =30000(預設值)參數配置的,只有ISR里的成員才有被選為 leader 的可能。
踢出ISR和重新加入ISR的條件:
- 踢出ISR的條件: 由replica.lag.time.max.ms =30000決定,如上圖;
- 重新加入ISR的條件: OSR副本的LEO(log end offset)追上leader的LEO;
刪除topic
bin/kafka-topics.sh --zookeeper linux01:2181 --delete --topic test
# 刪除topic,server.properties中需要一個參數處於啟用狀態: delete.topic.enable = true(預設是true)
# 使用 kafka-topics .sh 腳本刪除主題的行為本質上只是在 ZooKeeper 中的 /admin/delete_topics 路徑下建一個與待刪除主題同名的節點,以標記該主題為待刪除的狀態。然後由 Kafka控制器非同步完成。
增加分區數
kafka-topics.sh --zookeeper linux01:2181 --alter --topic paopao --partitions 3
# Kafka只支持增加分區,不支持減少分區
# 原因是:減少分區,代價太大(數據的轉移,日誌段拼接合併)
# 如果真的需要實現此功能,則完全可以重新創建一個分區數較小的主題,然後將現有主題中的消息按照既定的邏輯複製過去;
動態配置topic參數(不常用)
# 通過管理命令,可以為已創建的topic增加、修改、刪除topic level參數
# 添加/修改 指定topic的配置參數:
kafka-topics.sh --zookeeper linux01:2181 --alter --topic tpc2 --config compression.type=gzip
# --config compression.type=gzip 修改或添加參數配置
# --add-config compression.type=gzip 添加參數配置
# --delete-config compression.type 刪除配置參數
生產者:kafka-console-producer
kafka-console-producer.sh --broker-list linux01:9092 --topic test01
>a
>b
>c
>hello
>hi
>hadoop
>hive
順序輪詢(老版本)
順序分配,消息是均勻的分配給每個 partition,即每個分區存儲一次消息,輪詢策略是 Kafka Producer 提供的預設策略,如果你不使用指定的輪詢策略的話,Kafka 預設會使用順序輪訓策略的方式。
隨機分配
實現隨機分配的代碼只需要兩行,如下
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
消費者:kafka-console-consumer
消費者在消費的時候,需要指定要訂閱的主題,還可以指定消費的起始偏移量
起始偏移量的指定策略有3中:
- earliest 起始點
- latest 最新
- 指定的offset( 分區號:偏移量) ==》 必須的告訴他是哪個topic 的哪個分區的哪個offset
- 從之前所記錄的偏移量開始消費
在命令行中,可以指定從什麼地方開始消費
- 加上參數 --from-beginning 指定從最前面開始消費
- 如果不加--from-beginning 就需要分情況討論了,如果之前記錄過消費的位置,那麼就從之前消費的位置開始消費,如果說之前沒有記錄過之前消費的偏移量,那麼就從最新的位置開始消費
kafka的topic中的消息,是有序號的(序號叫消息偏移量),而且消息的偏移量是在各個partition中獨立維護的,在各個分區內,都是從0開始遞增編號!
# 消費消息
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic test01 --from-beginning
hive
hello
hadoop
# 指定從最前面開始消費
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao --from-beginning
hadoop
list
hello
kafka
# 不指定他消費的位置的時候,就是從最新的地方開始消費
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao
# 指定要消費的分區,和要消費的起始offset
# 從指定的offset(需要指定偏移量和分區號)
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao --offset 2 --partition 0
yy
abc
3333
2222
消費組
- 消費組是kafka為了提高消費並行度的一種機制!
- 在kafka的底層邏輯中,任何一個消費者都有自己所屬的組(如果沒有指定,系統會自己給你分配一個組id)
- 組和組之間,沒有任何關係,大家都可以消費到目標topic的所有數據
- 但是組內的各個消費者,就只能讀到自己所分配到的partitions
- KAFKA中的消費組,可以動態增減消費者,而且消費組中的消費者數量發生任意變動,都會重新分配分區消費任務(消費者組在均衡策略)
如何讓多個消費者組成一個組: 就是讓這些消費者的groupId相同即可!
消費位移的記錄
kafka的消費者,可以記錄自己所消費到的消息偏移量,記錄的這個偏移量就叫(消費位移);
記錄這個消費到的位置,作用就在於消費者重啟後可以接續上一次消費到位置來繼續往後面消費;
消費位移,是組內共用的!!!消費位置記錄在一個內置的topic中 ,預設是5s提交一次位移更新。
參數:auto.commit.interval.ms 預設是5s記錄一次
# 可以使用特定的工具類 解析內置記錄偏移量的topic
kafka-console-consumer.sh --bootstrap-server linux01:9092 --from-beginning --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
# 通過指定formatter工具類,來對__consumer_offsets主題中的數據進行解析;
[g01,linux01,0]::OffsetAndMetadata(offset=14, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889851318, expireTimestamp=None)
[g01,linux01,2]::OffsetAndMetadata(offset=17, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
[g01,linux01,1]::OffsetAndMetadata(offset=13, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
[g01,linux01,0]::OffsetAndMetadata(offset=14, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
# 如果需要獲取某個特定 consumer-group的消費偏移量信息,則需要計算該消費組的偏移量記錄所在分區: Math.abs(groupID.hashCode()) % numPartitions(50)
# 根據組id的hash取值%50 確定具體是將這個組具體每個分區消費到了哪裡
# __consumer_offsets的分區數為:50
配置管理 kafka-config
kafka-configs.sh 腳本是專門用來進行動態參數配置操作的,這裡的操作是運行狀態修改原有的配置,如此可以達到動態變更的目的;一般情況下不會進行動態修改 。
動態配置的參數,會被存儲在zookeeper上,因而是持久生效的
可用參數的查閱地址: https://kafka.apache.org/documentation/#configuration
# kafka-configs.sh 腳本包含:變更alter、查看describe 這兩種指令類型;
# kafka-configs. sh 支持主題、 broker 、用戶和客戶端這4個類型的配置。
# kafka-configs.sh 腳本使用 entity-type 參數來指定操作配置的類型,並且使 entity-name參數來指定操作配置的名稱。
# 比如查看topic的配置可以按如下方式執行:
kafka-configs.sh --zookeeper linux01:2181 --describe --entity-type topics --entity-name paopao
# 查看broker的動態配置可以按如下方式執行:
kafka-configs.sh --describe --entity-type brokers --entity-name 0 --zookeeper linux01:2181
entity-type和entity-name的對應關係
# 示例:添加topic級別參數
kafka-configs.sh --zookeeper linux01:2181 --alter --entity-type topics --entity-name paopao --add-config cleanup.policy=compact,max.message.bytes=10000
# 示例:添加broker參數
kafka-configs.sh --entity-type brokers --entity-name 0 --alter --add-config log.flush.interval.ms=1000 --bootstrap-server linux01:9092,linux02:9092,linux03:9092
動態配置topic參數
通過管理命令,可以為已創建的topic增加、修改、刪除topic level參數
添加/修改 指定topic的配置參數:
kafka-topics.sh --topic paopao --alter --config compression.type=gzip --zookeeper linux01:2181
# 如果利用 kafka-configs.sh 腳本來對topic、producer、consumer、broker等進行參數動態
# 添加、修改配置參數
kafka-configs.sh --zookeeper linux01:2181 --entity-type topics --entity-name paopao --alter --add-config compression.type=gzip
# 刪除配置參數
kafka-configs.sh --zookeeper linux01:2181 --entity-type topics --entity-name paopao --alter --delete-config compression.type