1、應用場景 1.1 kafka場景 Kafka最初是由LinkedIn公司採用Scala語言開發,基於ZooKeeper,現在已經捐獻給了Apache基金會。目前Kafka已經定位為一個分散式流式處理平臺,它以 高吞吐、可持久化、可水平擴展、支持流處理等多種特性而被廣泛應用。 Apache ...
1、應用場景
1.1 kafka場景
Kafka最初是由LinkedIn公司採用Scala語言開發,基於ZooKeeper,現在已經捐獻給了Apache基金會。目前Kafka已經定位為一個分散式流式處理平臺,它以 高吞吐、可持久化、可水平擴展、支持流處理等多種特性而被廣泛應用。
Apache Kafka能夠支撐海量數據的數據傳遞。在離線和實時的消息處理業務系統中,Kafka都有廣泛的應用。
(1)日誌收集:收集各種服務的log,通過kafka以統一介面服務的方式開放 給各種consumer,例如Hadoop、Hbase、Solr等;
(2)消息系統:解耦和生產者和消費者、緩存消息等;
(3)用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點 擊等活動,這些活動信息被各個伺服器發佈到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時 的監控分析,或者裝載到Hadoop、數據倉庫中做離線分析和挖掘;
(4)運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分散式應用的數據,生產各種操作 的集中反饋,比如報警和報告;
(5)流式處理:比如spark streaming和storm;
1.2 kafka特性
kafka以高吞吐量著稱,主要有以下特性:
(1)高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒;
(2)可擴展性:kafka集群支持熱擴展;
(3)持久性、可靠性:消息被持久化到本地磁碟,並且支持數據備份防止數據丟失;
(4)容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗);
(5)高併發:支持數千個客戶端同時讀寫;
1.3 消息對比
- 如果普通的業務消息解耦,消息傳輸,rabbitMq是首選,它足夠簡單,管理方便,性能夠用。
- 如果在上述,日誌、消息收集、訪問記錄等高吞吐,實時性場景下,推薦kafka,它基於分散式,擴容便捷
- 如果很重的業務,要做到極高的可靠性,考慮rocketMq,但是它太重。需要你有足夠的瞭解
1.4 大廠應用
- 京東通過kafka搭建數據平臺,用於用戶購買、瀏覽等行為的分析。成功抗住6.18的流量洪峰
- 阿裡借鑒kafka的理念,推出自己的rocketmq。在設計上參考了kafka的架構體系
2、基礎組件
2.1 角色
-
broker:節點,就是你看到的機器
-
provider:生產者,發消息的
-
consumer:消費者,讀消息的
-
zookeeper:信息中心,記錄kafka的各種信息的地方
-
controller:其中的一個broker,作為leader身份來負責管理整個集群。如果掛掉,藉助zk重新選主
2.2 邏輯組件
-
topic:主題,一個消息的通道,收發總得知道消息往哪投
-
partition:分區,每個主題可以有多個分區分擔數據的傳遞,多條路並行,吞吐量大
-
Replicas:副本,每個分區可以設置多個副本,副本之間數據一致。相當於備份,有備胎更可靠
-
leader & follower:主從,上面的這些副本里有1個身份為leader,其他的為follower。leader處理partition的所有讀寫請求
2.3 副本集合
-
AR:所有副本的統稱,AR=ISR+OSR
-
ISR:同步中的副本,可以參與leader選主。一旦落後太多(數量滯後和時間滯後兩個維度)會被踢到OSR。
-
OSR:踢出同步的副本,一直追趕leader,追上後會進入ISR
2.4 消息標記
- offset:偏移量,消息消費到哪一條了?每個消費者都有自己的偏移量
- HW:(high watermark):副本的高水印值,客戶端最多能消費到的位置,HW值為8,代表offset為[0,8]的9條消息都可以被消費到,它們是對消費者可見的,而[9,12]這4條消息由於未提交,對消費者是不可見的。
- LEO:(log end offset):日誌末端位移,代表日誌文件中下一條待寫入消息的offset,這個offset上實際是沒有消息的。不管是leader副本還是follower副本,都有這個值。
那麼這三者有什麼關係呢?
比如在副本數等於3的情況下,消息發送到Leader A之後會更新LEO的值,Follower B和Follower C也會實時拉取Leader A中的消息來更新自己,HW就表示A、B、C三者同時達到的日誌位移,也就是A、B、C三者中LEO最小的那個值。由於B、C拉取A消息之間延時問題,所以HW一般會小於LEO,即LEO>=HW。
具體的同步原理,下麵章節會詳細講到
3.1 發展歷程
http://kafka.apache.org/downloads
3.1.1 版本命名
Kafka在1.0.0版本前的命名規則是4位,比如0.8.2.1,0.8是大版本號,2是小版本號,1表示打過1個補丁
現在的版本號命名規則是3位,格式是“大版本號”+“小版本號”+“修訂補丁數”,比如2.5.0,前面的2代表的是大版本號,中間的5代表的是小版本號,0表示沒有打過補丁
我們所看到的下載包,前面是scala編譯器的版本,後面才是真正的kafka版本。
3.1.2 演進歷史
0.7版本
只提供了最基礎的消息隊列功能。
0.8版本
引入了副本機制,至此Kafka成為了一個真正意義上完備的分散式高可靠消息隊列解決方案。
0.9版本
增加許可權和認證,使用Java重寫了新的consumer API,Kafka Connect功能;不建議使用consumer API;
0.10版本
引入Kafka Streams功能,正式升級成分散式流處理平臺;建議版本0.10.2.2;建議使用新版consumer API
0.11版本
producer API冪等,事務API,消息格式重構;建議版本0.11.0.3;謹慎對待消息格式變化
1.0和2.0版本
Kafka Streams改進;建議版本2.0;
3.2 集群搭建(助學)
1)原生啟動
kafka啟動需要zookeeper,第一步啟動zk:
docker run --name zookeeper-1 -d -p 2181 zookeeper:3.4.13
原生安裝:下載後解壓啟動即可 http://kafka.apache.org/downloads
bin/kafka-server-start.sh config/server.properties
#server.properties配置說明
#表示broker的編號,如果集群中有多個broker,則每個broker的編號需要設置的不同
broker.id=0
#brokder對外提供的服務入口地址,預設9092
listeners=PLAINTEXT://:9092
#設置存放消息日誌文件的地址
log.dirs=/tmp/kafka/log
#Kafka所需Zookeeper集群地址,這裡是關鍵!加入同一個zk的kafka為同一集群
zookeeper.connect=zookeeper:2181
2)推薦docker-compose 一鍵啟動
#參考資料中的kafka.yml
#註意hostname問題,ip地址:52.82.98.209,換成你自己伺服器的
#docker-compose -f kafka.yml up -d 啟動
version: '3'
services:
zookeeper:
image: zookeeper:3.4.13
kafka-1:
container_name: kafka-1
image: wurstmeister/kafka:2.12-2.2.2
ports:
- 10903:9092
environment:
KAFKA_BROKER_ID: 1
HOST_IP: 52.82.98.209
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#docker部署必須設置外部可訪問ip和埠,否則註冊進zk的地址將不可達造成外部無法連接
KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
KAFKA_ADVERTISED_PORT: 10903
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper
kafka-2:
container_name: kafka-2
image: wurstmeister/kafka:2.12-2.2.2
ports:
- 10904:9092
environment:
KAFKA_BROKER_ID: 2
HOST_IP: 52.82.98.209
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
KAFKA_ADVERTISED_PORT: 10904
volumes:
- /etc/localtime:/etc/localtime
depends_on:
- zookeeper
3.3 組件探秘
命令行工具是管理kafka集群最直接的工具。官方自帶,不需要額外安裝。
3.2.1 主題創建
#進入容器
docker exec -it kafka-1 sh
#進入bin目錄
cd /opt/kafka/bin
#創建
kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test --partitions 2 --replication-factor 1
3.2.2 查看主題
kafka-topics.sh --zookeeper zookeeper:2181 --list
3.2.3 主題詳情
kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test
#分析輸出:
Topic:test PartitionCount:2 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test Partition: 1 Leader: 1 Replicas: 1 Isr: 1
3.2.4 消息收發
#使用docker連接任意集群中的一個容器
docker exec -it kafka-1 sh
#進入kafka的容器內目錄
cd /opt/kafka/bin
#客戶端監聽
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
#另起一個終端,驗證發送
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
3.2.5 分組消費
#啟動兩個consumer時,如果不指定group信息,消息被廣播
#指定相同的group,讓多個消費者分工消費(畫圖:group原理)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa
#結果:在發送方,連續發送 1-4 ,4條消息,同一group下的兩台consumer交替消費,併發執行
註意!!!
這是在消費者和分區數相等(都是2)的情況下。
如果同一group下的 ( 消費者數量 > 分區數量 ) 那麼就會有消費者閑置。
驗證方式:
可以再多啟動幾個消費者試一試,會發現,超出2個的時候,有的始終不會消費到消息。
停掉可以消費到的,那麼閑置的會被激活,進入工作狀態
3.2.6 指定分區
#指定分區通過參數 --partition,註意!需要去掉上面的group
#指定分區的意義在於,保障消息傳輸的順序性(畫圖:kafka順序性原理)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 1
#結果:發送1-4條消息,交替出現。說明消息被均分到各個分區中投遞
#預設的發送是沒有指定key的
#要指定分區發送,就需要定義key。那麼相同的key被路由到同一個分區
./kafka-console-producer.sh --broker-list kafka-1:9092 --topic test --property parse.key=true
#攜帶key再發送,註意key和value之間用tab分割
>1 1111
>1 2222
>2 3333
>2 4444
#查看consumer的接收情況
#結果:相同的key被同一個consumer消費掉
3.2.7 偏移量
#偏移量決定了消息從哪開始消費,支持:開頭,還是末尾
# earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
# latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
# none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常
# 註意點!!!有提交偏移量的話,仍然以提交的為主,即便使用earliest,比提交點更早的也不會被提取
#--offset [earliest|latest(預設)] , 或者 --from-beginning
#新起一個終端,指定offset位置
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset earliest
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --from-beginning
#結果:之前發送的消息,從頭又消費了一遍!
3.4 zk探秘
前面說過,zk存儲了kafka集群的相關信息,本節來探索內部的秘密。
kafka的信息記錄在zk中,進入zk容器,查看相關節點和信息
docker exec -it kafka_zookeeper_1 sh
>./bin/zkCli.sh
>ls /
#結果:得到以下配置信息
3.4.1 broker信息
[zk: localhost:2181(CONNECTED) 0] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[1, 2]
#機器broker信息
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://52.82.98.209:10903"],"jmx_port":-1,"host":"52.82.98.209","timestamp":"1609825245500","port":10903,"version":4}
cZxid = 0x27
ctime = Tue Jan 05 05:40:45 GMT 2021
mZxid = 0x27
mtime = Tue Jan 05 05:40:45 GMT 2021
pZxid = 0x27
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x105a2db626b0000
dataLength = 196
numChildren = 0
3.4.2 主題與分區
#分區節點路徑
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[test, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/test/partitions
[0, 1]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics/test/partitions/0
[state]
#分區信息,leader所在的機器id,isr列表等
[zk: localhost:2181(CONNECTED) 18] get /brokers/topics/test/partitions/0/state
{"controller_epoch":1,"leader":1,"version":1,"leader_epoch":0,"isr":[1]}
cZxid = 0xb0
ctime = Tue Jan 05 05:56:06 GMT 2021
mZxid = 0xb0
mtime = Tue Jan 05 05:56:06 GMT 2021
pZxid = 0xb0
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0
3.4.3 消費者與偏移量
[zk: localhost:2181(CONNECTED) 15] ls /consumers
[]
#空的???
#那麼,消費者以及它的偏移記在哪裡呢???
kafka 消費者記錄 group 的消費 偏移量 有兩種方式 :
1)kafka 自維護 (新)
2)zookpeer 維護 (舊) ,已經逐漸被廢棄
查看方式:
上面的消費用的是控制台工具,這個工具使用--bootstrap-server,不經過zk,也就不會記錄到/consumers下。
其消費者的offset會更新到一個kafka自帶的topic【__consumer_offsets】下麵
#先起一個消費端,指定group
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa
#使用控制台工具查看消費者及偏移量情況
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --list
KMOffsetCache-44acff134cad
aaa
#查看偏移量詳情
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --describe --group aaa
當前與LEO保持一致,說明消息都完整的被消費過
停掉consumer後,往provider中再發幾條記錄,offset開始滯後:
重新啟動consumer,消費到最新的消息,同時再返回看偏移量,消息得到同步:
3.4.4 controller
#當前集群中的主控節點是誰
[zk: localhost:2181(CONNECTED) 17] get /controller
{"version":1,"brokerid":1,"timestamp":"1609825245694"}
cZxid = 0x2a
ctime = Tue Jan 05 05:40:45 GMT 2021
mZxid = 0x2a
mtime = Tue Jan 05 05:40:45 GMT 2021
pZxid = 0x2a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x105a2db626b0000
dataLength = 54
numChildren = 0
3.5 km
3.5.1 啟動
kafka-manager是目前最受歡迎的kafka集群管理工具,最早由雅虎開源。提供可視化kafka集群操作
官網:https://github.com/yahoo/kafka-manager/releases
註意它的版本,docker社區的景象版本滯後於kafka,我們自己來打鏡像。
#Dockerfile
FROM daocloud.io/library/java:openjdk-8u40-jdk
ADD kafka-manager-2.0.0.2/ /opt/km2002/
CMD ["/opt/km2002/bin/kafka-manager","-Dconfig.file=/opt/km2002/conf/application.conf"]
#打包,註意將kafka-manager-2.0.0.2放到同一目錄
docker build -t km:2002 .
#啟動:在上面的yml里,services節點下加一段
#參考資料:km.yml
#執行: docker-compose -f km.yml up -d
km:
image: km:2002
ports:
- 10906:9000
depends_on:
- zookeeper
3.5.2 使用
使用km可以方便的查看以下信息:
- cluster:創建集群,填寫zk地址,選中jmx,consumer信息等選項
- brokers:列表,機器信息
- topic:主題信息,主題內的分區信息。創建新的主題,增加分區
- cosumers: 消費者信息,偏移量等
本文由傳智教育博學谷 - 狂野架構師教研團隊發佈
如果本文對您有幫助,歡迎關註和點贊;如果您有任何建議也可留言評論或私信,您的支持是我堅持創作的動力
轉載請註明出處!