【詳細教程】Kafka應用場景、基礎組件、架構探索

来源:https://www.cnblogs.com/jiagooushi/archive/2022/08/11/16575970.html
-Advertisement-
Play Games

1、應用場景 1.1 kafka場景 ​ Kafka最初是由LinkedIn公司採用Scala語言開發,基於ZooKeeper,現在已經捐獻給了Apache基金會。目前Kafka已經定位為一個分散式流式處理平臺,它以 高吞吐、可持久化、可水平擴展、支持流處理等多種特性而被廣泛應用。 ​ Apache ...


file

1、應用場景

1.1 kafka場景

​ Kafka最初是由LinkedIn公司採用Scala語言開發,基於ZooKeeper,現在已經捐獻給了Apache基金會。目前Kafka已經定位為一個分散式流式處理平臺,它以 高吞吐、可持久化、可水平擴展、支持流處理等多種特性而被廣泛應用。

​ Apache Kafka能夠支撐海量數據的數據傳遞。在離線和實時的消息處理業務系統中,Kafka都有廣泛的應用。

(1)日誌收集:收集各種服務的log,通過kafka以統一介面服務的方式開放 給各種consumer,例如Hadoop、Hbase、Solr等;

(2)消息系統:解耦和生產者和消費者、緩存消息等;

(3)用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點 擊等活動,這些活動信息被各個伺服器發佈到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時 的監控分析,或者裝載到Hadoop、數據倉庫中做離線分析和挖掘;

(4)運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分散式應用的數據,生產各種操作 的集中反饋,比如報警和報告;

(5)流式處理:比如spark streaming和storm;

1.2 kafka特性

kafka以高吞吐量著稱,主要有以下特性:

(1)高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒;

(2)可擴展性:kafka集群支持熱擴展;

(3)持久性、可靠性:消息被持久化到本地磁碟,並且支持數據備份防止數據丟失;

(4)容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗);

(5)高併發:支持數千個客戶端同時讀寫;

1.3 消息對比

  • 如果普通的業務消息解耦,消息傳輸,rabbitMq是首選,它足夠簡單,管理方便,性能夠用。
  • 如果在上述,日誌、消息收集、訪問記錄等高吞吐,實時性場景下,推薦kafka,它基於分散式,擴容便捷
  • 如果很重的業務,要做到極高的可靠性,考慮rocketMq,但是它太重。需要你有足夠的瞭解

1.4 大廠應用

  • 京東通過kafka搭建數據平臺,用於用戶購買、瀏覽等行為的分析。成功抗住6.18的流量洪峰
  • 阿裡借鑒kafka的理念,推出自己的rocketmq。在設計上參考了kafka的架構體系

2、基礎組件

2.1 角色

file

  • broker:節點,就是你看到的機器

  • provider:生產者,發消息的

  • consumer:消費者,讀消息的

  • zookeeper:信息中心,記錄kafka的各種信息的地方

  • controller:其中的一個broker,作為leader身份來負責管理整個集群。如果掛掉,藉助zk重新選主

2.2 邏輯組件

file

  • topic:主題,一個消息的通道,收發總得知道消息往哪投

  • partition:分區,每個主題可以有多個分區分擔數據的傳遞,多條路並行,吞吐量大

  • Replicas:副本,每個分區可以設置多個副本,副本之間數據一致。相當於備份,有備胎更可靠

  • leader & follower:主從,上面的這些副本里有1個身份為leader,其他的為follower。leader處理partition的所有讀寫請求

2.3 副本集合

  • AR:所有副本的統稱,AR=ISR+OSR

  • ISR:同步中的副本,可以參與leader選主。一旦落後太多(數量滯後和時間滯後兩個維度)會被踢到OSR。

  • OSR:踢出同步的副本,一直追趕leader,追上後會進入ISR

2.4 消息標記

file

  • offset:偏移量,消息消費到哪一條了?每個消費者都有自己的偏移量
  • HW:(high watermark):副本的高水印值,客戶端最多能消費到的位置,HW值為8,代表offset為[0,8]的9條消息都可以被消費到,它們是對消費者可見的,而[9,12]這4條消息由於未提交,對消費者是不可見的。
  • LEO:(log end offset):日誌末端位移,代表日誌文件中下一條待寫入消息的offset,這個offset上實際是沒有消息的。不管是leader副本還是follower副本,都有這個值。

那麼這三者有什麼關係呢?

比如在副本數等於3的情況下,消息發送到Leader A之後會更新LEO的值,Follower B和Follower C也會實時拉取Leader A中的消息來更新自己,HW就表示A、B、C三者同時達到的日誌位移,也就是A、B、C三者中LEO最小的那個值。由於B、C拉取A消息之間延時問題,所以HW一般會小於LEO,即LEO>=HW。

具體的同步原理,下麵章節會詳細講到

3.1 發展歷程

http://kafka.apache.org/downloads

file

3.1.1 版本命名

Kafka在1.0.0版本前的命名規則是4位,比如0.8.2.1,0.8是大版本號,2是小版本號,1表示打過1個補丁

現在的版本號命名規則是3位,格式是“大版本號”+“小版本號”+“修訂補丁數”,比如2.5.0,前面的2代表的是大版本號,中間的5代表的是小版本號,0表示沒有打過補丁

我們所看到的下載包,前面是scala編譯器的版本,後面才是真正的kafka版本。

3.1.2 演進歷史

0.7版本
只提供了最基礎的消息隊列功能。

0.8版本
引入了副本機制,至此Kafka成為了一個真正意義上完備的分散式高可靠消息隊列解決方案。

0.9版本
增加許可權和認證,使用Java重寫了新的consumer API,Kafka Connect功能;不建議使用consumer API;

0.10版本
引入Kafka Streams功能,正式升級成分散式流處理平臺;建議版本0.10.2.2;建議使用新版consumer API

0.11版本
producer API冪等,事務API,消息格式重構;建議版本0.11.0.3;謹慎對待消息格式變化

1.0和2.0版本
Kafka Streams改進;建議版本2.0;

3.2 集群搭建(助學)

1)原生啟動

kafka啟動需要zookeeper,第一步啟動zk:

docker run --name zookeeper-1 -d -p 2181 zookeeper:3.4.13

原生安裝:下載後解壓啟動即可 http://kafka.apache.org/downloads

bin/kafka-server-start.sh config/server.properties
#server.properties配置說明
#表示broker的編號,如果集群中有多個broker,則每個broker的編號需要設置的不同
broker.id=0 
#brokder對外提供的服務入口地址,預設9092
listeners=PLAINTEXT://:9092 
#設置存放消息日誌文件的地址
log.dirs=/tmp/kafka/log 
#Kafka所需Zookeeper集群地址,這裡是關鍵!加入同一個zk的kafka為同一集群
zookeeper.connect=zookeeper:2181 

2)推薦docker-compose 一鍵啟動

#參考資料中的kafka.yml
#註意hostname問題,ip地址:52.82.98.209,換成你自己伺服器的
#docker-compose -f kafka.yml up -d 啟動
version: '3'
services:
    zookeeper:
        image: zookeeper:3.4.13

    kafka-1:
        container_name: kafka-1
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10903:9092
        environment:
            KAFKA_BROKER_ID: 1 
            HOST_IP: 52.82.98.209
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            #docker部署必須設置外部可訪問ip和埠,否則註冊進zk的地址將不可達造成外部無法連接
            KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
            KAFKA_ADVERTISED_PORT: 10903 
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper           
    kafka-2:
        container_name: kafka-2
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10904:9092
        environment:
            KAFKA_BROKER_ID: 2 
            HOST_IP: 52.82.98.209
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
            KAFKA_ADVERTISED_PORT: 10904 
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper 

3.3 組件探秘

命令行工具是管理kafka集群最直接的工具。官方自帶,不需要額外安裝。

3.2.1 主題創建

#進入容器
docker exec -it kafka-1 sh
#進入bin目錄
cd /opt/kafka/bin
#創建
kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test --partitions 2 --replication-factor 1

3.2.2 查看主題

kafka-topics.sh --zookeeper zookeeper:2181 --list

3.2.3 主題詳情

kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test

#分析輸出:
Topic:test	PartitionCount:2	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: test	Partition: 1	Leader: 1	Replicas: 1	Isr: 1

3.2.4 消息收發

#使用docker連接任意集群中的一個容器
docker exec -it kafka-1 sh

#進入kafka的容器內目錄
cd /opt/kafka/bin

#客戶端監聽
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

#另起一個終端,驗證發送
./kafka-console-producer.sh --broker-list localhost:9092 --topic test

3.2.5 分組消費

#啟動兩個consumer時,如果不指定group信息,消息被廣播
#指定相同的group,讓多個消費者分工消費(畫圖:group原理)

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa

#結果:在發送方,連續發送 1-4 ,4條消息,同一group下的兩台consumer交替消費,併發執行

註意!!!

這是在消費者和分區數相等(都是2)的情況下。
如果同一group下的 ( 消費者數量 > 分區數量 ) 那麼就會有消費者閑置。

驗證方式:

可以再多啟動幾個消費者試一試,會發現,超出2個的時候,有的始終不會消費到消息。
停掉可以消費到的,那麼閑置的會被激活,進入工作狀態

3.2.6 指定分區

#指定分區通過參數 --partition,註意!需要去掉上面的group
#指定分區的意義在於,保障消息傳輸的順序性(畫圖:kafka順序性原理)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 1

#結果:發送1-4條消息,交替出現。說明消息被均分到各個分區中投遞


#預設的發送是沒有指定key的
#要指定分區發送,就需要定義key。那麼相同的key被路由到同一個分區
./kafka-console-producer.sh --broker-list kafka-1:9092 --topic test --property parse.key=true

#攜帶key再發送,註意key和value之間用tab分割
>1	1111
>1	2222
>2	3333
>2	4444

#查看consumer的接收情況
#結果:相同的key被同一個consumer消費掉

3.2.7 偏移量

#偏移量決定了消息從哪開始消費,支持:開頭,還是末尾

# earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
# latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
# none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

# 註意點!!!有提交偏移量的話,仍然以提交的為主,即便使用earliest,比提交點更早的也不會被提取

#--offset [earliest|latest(預設)] , 或者 --from-beginning
#新起一個終端,指定offset位置
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset earliest

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --from-beginning

#結果:之前發送的消息,從頭又消費了一遍!

3.4 zk探秘

前面說過,zk存儲了kafka集群的相關信息,本節來探索內部的秘密。

kafka的信息記錄在zk中,進入zk容器,查看相關節點和信息

docker exec -it kafka_zookeeper_1 sh

>./bin/zkCli.sh

>ls /

#結果:得到以下配置信息

file

3.4.1 broker信息

[zk: localhost:2181(CONNECTED) 0] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[1, 2]

#機器broker信息
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://52.82.98.209:10903"],"jmx_port":-1,"host":"52.82.98.209","timestamp":"1609825245500","port":10903,"version":4}
cZxid = 0x27
ctime = Tue Jan 05 05:40:45 GMT 2021
mZxid = 0x27
mtime = Tue Jan 05 05:40:45 GMT 2021
pZxid = 0x27
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x105a2db626b0000
dataLength = 196
numChildren = 0

3.4.2 主題與分區

#分區節點路徑
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[test, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/test/partitions
[0, 1]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics/test/partitions/0
[state]

#分區信息,leader所在的機器id,isr列表等
[zk: localhost:2181(CONNECTED) 18] get /brokers/topics/test/partitions/0/state
{"controller_epoch":1,"leader":1,"version":1,"leader_epoch":0,"isr":[1]}
cZxid = 0xb0
ctime = Tue Jan 05 05:56:06 GMT 2021
mZxid = 0xb0
mtime = Tue Jan 05 05:56:06 GMT 2021
pZxid = 0xb0
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0

3.4.3 消費者與偏移量

[zk: localhost:2181(CONNECTED) 15] ls /consumers
[]
#空的???
#那麼,消費者以及它的偏移記在哪裡呢???

kafka 消費者記錄 group 的消費 偏移量 有兩種方式 :

1)kafka 自維護 (新)

2)zookpeer 維護 (舊) ,已經逐漸被廢棄

查看方式:

上面的消費用的是控制台工具,這個工具使用--bootstrap-server,不經過zk,也就不會記錄到/consumers下。

其消費者的offset會更新到一個kafka自帶的topic【__consumer_offsets】下麵

#先起一個消費端,指定group
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa

#使用控制台工具查看消費者及偏移量情況
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --list
KMOffsetCache-44acff134cad
aaa

#查看偏移量詳情
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --describe --group aaa

當前與LEO保持一致,說明消息都完整的被消費過

file

停掉consumer後,往provider中再發幾條記錄,offset開始滯後:

file

重新啟動consumer,消費到最新的消息,同時再返回看偏移量,消息得到同步:

file

3.4.4 controller

#當前集群中的主控節點是誰
[zk: localhost:2181(CONNECTED) 17] get /controller
{"version":1,"brokerid":1,"timestamp":"1609825245694"}
cZxid = 0x2a
ctime = Tue Jan 05 05:40:45 GMT 2021
mZxid = 0x2a
mtime = Tue Jan 05 05:40:45 GMT 2021
pZxid = 0x2a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x105a2db626b0000
dataLength = 54
numChildren = 0

3.5 km

3.5.1 啟動

kafka-manager是目前最受歡迎的kafka集群管理工具,最早由雅虎開源。提供可視化kafka集群操作

官網:https://github.com/yahoo/kafka-manager/releases

註意它的版本,docker社區的景象版本滯後於kafka,我們自己來打鏡像。

#Dockerfile
FROM daocloud.io/library/java:openjdk-8u40-jdk
ADD kafka-manager-2.0.0.2/ /opt/km2002/
CMD ["/opt/km2002/bin/kafka-manager","-Dconfig.file=/opt/km2002/conf/application.conf"]

#打包,註意將kafka-manager-2.0.0.2放到同一目錄
docker build -t km:2002 .
#啟動:在上面的yml里,services節點下加一段
#參考資料:km.yml
#執行: docker-compose -f km.yml up -d
		km:
        image: km:2002
        ports:
            - 10906:9000
        depends_on:
            - zookeeper
         

3.5.2 使用

使用km可以方便的查看以下信息:

  • cluster:創建集群,填寫zk地址,選中jmx,consumer信息等選項
  • brokers:列表,機器信息
  • topic:主題信息,主題內的分區信息。創建新的主題,增加分區
  • cosumers: 消費者信息,偏移量等

本文由傳智教育博學谷 - 狂野架構師教研團隊發佈
如果本文對您有幫助,歡迎關註和點贊;如果您有任何建議也可留言評論或私信,您的支持是我堅持創作的動力
轉載請註明出處!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 啥是函數式介面、它和JAVA中普通的介面有啥區別?函數式介面有啥用?如何在實際編碼中使用函數式介面?帶著這些問題,我們一起來認識下函數式介面的廬山真面目。 ...
  • 函數是基於功能或者邏輯進行聚合的可復用的代碼塊。將一些複雜的、冗長的代碼抽離封裝成多個代碼片段,即函數,有助於提高代碼邏輯的可讀性和可維護性。不同於Python,由於 Go lang是編譯型語言,編譯之後再運行,所以函數的定義順序無關痛癢。 函數聲明 在 Go lang里,函數聲明語法如下: fun ...
  • 《Python高手之路 第3版》|免費下載地址 作者簡介 · · · · · · Julien Danjou 具有12年從業經驗的自由軟體黑客。擁有多個開源社區的不同身份:Debian開發者、Freedesktop貢獻者、GNU Emacs提交者、awesome視窗管理器的創建者以及OpenStac ...
  • 《紅樓夢》作為我國四大名著之一,古典小說的巔峰之作,粉絲量極其龐大,而紅學也經久不衰。所以我們今天通過 Python 來探索下紅樓夢裡那千絲萬縷的人物關係,話不多說,開始整活! 一、準備工作 紅樓夢txt格式電子書一份 金陵十二釵+賈寶玉人物名稱列表 寶玉 nr 黛玉 nr 寶釵 nr 湘雲 nr ...
  • “什麼是IO的多路復用機制?” 這是一道年薪50W的面試題,很遺憾,99%的人都回答不出來。 大家好,我是Mic,一個工作了14年的Java程式員。 今天,給大家分享一道網路IO的面試題。 這道題目的文字回答已經整理到了15W字的面試文檔裡面,大家可以S我領取。 下麵看看高手的回答。 高手: IO多 ...
  • 多商戶商城系統,也稱為B2B2C(BBC)平臺電商模式多商家商城系統。可以快速幫助企業搭建類似拼多多/京東/天貓/淘寶的綜合商城。 多商戶商城系統支持商家入駐加盟,同時滿足平臺自營、旗艦店等多種經營方式。平臺可以通過收取商家入駐費,訂單交易服務費,提現手續費,簡訊通道費等多手段方式,實現整體盈利。 ...
  • 原文連接:https://www.zhoubotong.site/post/67.html Go 標準庫的net/url包提供的兩個函可以直接檢查URL合法性,不需要手動去正則匹配校驗。 下麵可以直接使用ParseRequestURI()函數解析URL,當然這個只會驗證url格式,至於功能變數名稱是否存在或 ...
  • Python帶我起飛——入門、進階、商業實戰_ 免費下載地址 內容簡介 · · · · · · 《Python帶我起飛——入門、進階、商業實戰》針對Python 3.5 以上版本,採用“理論+實踐”的形式編寫,通過大量的實例(共42 個),全面而深入地講解“Python 基礎語法”和“Python ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...