【詳細教程】Kafka應用場景、基礎組件、架構探索

来源:https://www.cnblogs.com/jiagooushi/archive/2022/08/11/16575970.html
-Advertisement-
Play Games

1、應用場景 1.1 kafka場景 ​ Kafka最初是由LinkedIn公司採用Scala語言開發,基於ZooKeeper,現在已經捐獻給了Apache基金會。目前Kafka已經定位為一個分散式流式處理平臺,它以 高吞吐、可持久化、可水平擴展、支持流處理等多種特性而被廣泛應用。 ​ Apache ...


file

1、應用場景

1.1 kafka場景

​ Kafka最初是由LinkedIn公司採用Scala語言開發,基於ZooKeeper,現在已經捐獻給了Apache基金會。目前Kafka已經定位為一個分散式流式處理平臺,它以 高吞吐、可持久化、可水平擴展、支持流處理等多種特性而被廣泛應用。

​ Apache Kafka能夠支撐海量數據的數據傳遞。在離線和實時的消息處理業務系統中,Kafka都有廣泛的應用。

(1)日誌收集:收集各種服務的log,通過kafka以統一介面服務的方式開放 給各種consumer,例如Hadoop、Hbase、Solr等;

(2)消息系統:解耦和生產者和消費者、緩存消息等;

(3)用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點 擊等活動,這些活動信息被各個伺服器發佈到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時 的監控分析,或者裝載到Hadoop、數據倉庫中做離線分析和挖掘;

(4)運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分散式應用的數據,生產各種操作 的集中反饋,比如報警和報告;

(5)流式處理:比如spark streaming和storm;

1.2 kafka特性

kafka以高吞吐量著稱,主要有以下特性:

(1)高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒;

(2)可擴展性:kafka集群支持熱擴展;

(3)持久性、可靠性:消息被持久化到本地磁碟,並且支持數據備份防止數據丟失;

(4)容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗);

(5)高併發:支持數千個客戶端同時讀寫;

1.3 消息對比

  • 如果普通的業務消息解耦,消息傳輸,rabbitMq是首選,它足夠簡單,管理方便,性能夠用。
  • 如果在上述,日誌、消息收集、訪問記錄等高吞吐,實時性場景下,推薦kafka,它基於分散式,擴容便捷
  • 如果很重的業務,要做到極高的可靠性,考慮rocketMq,但是它太重。需要你有足夠的瞭解

1.4 大廠應用

  • 京東通過kafka搭建數據平臺,用於用戶購買、瀏覽等行為的分析。成功抗住6.18的流量洪峰
  • 阿裡借鑒kafka的理念,推出自己的rocketmq。在設計上參考了kafka的架構體系

2、基礎組件

2.1 角色

file

  • broker:節點,就是你看到的機器

  • provider:生產者,發消息的

  • consumer:消費者,讀消息的

  • zookeeper:信息中心,記錄kafka的各種信息的地方

  • controller:其中的一個broker,作為leader身份來負責管理整個集群。如果掛掉,藉助zk重新選主

2.2 邏輯組件

file

  • topic:主題,一個消息的通道,收發總得知道消息往哪投

  • partition:分區,每個主題可以有多個分區分擔數據的傳遞,多條路並行,吞吐量大

  • Replicas:副本,每個分區可以設置多個副本,副本之間數據一致。相當於備份,有備胎更可靠

  • leader & follower:主從,上面的這些副本里有1個身份為leader,其他的為follower。leader處理partition的所有讀寫請求

2.3 副本集合

  • AR:所有副本的統稱,AR=ISR+OSR

  • ISR:同步中的副本,可以參與leader選主。一旦落後太多(數量滯後和時間滯後兩個維度)會被踢到OSR。

  • OSR:踢出同步的副本,一直追趕leader,追上後會進入ISR

2.4 消息標記

file

  • offset:偏移量,消息消費到哪一條了?每個消費者都有自己的偏移量
  • HW:(high watermark):副本的高水印值,客戶端最多能消費到的位置,HW值為8,代表offset為[0,8]的9條消息都可以被消費到,它們是對消費者可見的,而[9,12]這4條消息由於未提交,對消費者是不可見的。
  • LEO:(log end offset):日誌末端位移,代表日誌文件中下一條待寫入消息的offset,這個offset上實際是沒有消息的。不管是leader副本還是follower副本,都有這個值。

那麼這三者有什麼關係呢?

比如在副本數等於3的情況下,消息發送到Leader A之後會更新LEO的值,Follower B和Follower C也會實時拉取Leader A中的消息來更新自己,HW就表示A、B、C三者同時達到的日誌位移,也就是A、B、C三者中LEO最小的那個值。由於B、C拉取A消息之間延時問題,所以HW一般會小於LEO,即LEO>=HW。

具體的同步原理,下麵章節會詳細講到

3.1 發展歷程

http://kafka.apache.org/downloads

file

3.1.1 版本命名

Kafka在1.0.0版本前的命名規則是4位,比如0.8.2.1,0.8是大版本號,2是小版本號,1表示打過1個補丁

現在的版本號命名規則是3位,格式是“大版本號”+“小版本號”+“修訂補丁數”,比如2.5.0,前面的2代表的是大版本號,中間的5代表的是小版本號,0表示沒有打過補丁

我們所看到的下載包,前面是scala編譯器的版本,後面才是真正的kafka版本。

3.1.2 演進歷史

0.7版本
只提供了最基礎的消息隊列功能。

0.8版本
引入了副本機制,至此Kafka成為了一個真正意義上完備的分散式高可靠消息隊列解決方案。

0.9版本
增加許可權和認證,使用Java重寫了新的consumer API,Kafka Connect功能;不建議使用consumer API;

0.10版本
引入Kafka Streams功能,正式升級成分散式流處理平臺;建議版本0.10.2.2;建議使用新版consumer API

0.11版本
producer API冪等,事務API,消息格式重構;建議版本0.11.0.3;謹慎對待消息格式變化

1.0和2.0版本
Kafka Streams改進;建議版本2.0;

3.2 集群搭建(助學)

1)原生啟動

kafka啟動需要zookeeper,第一步啟動zk:

docker run --name zookeeper-1 -d -p 2181 zookeeper:3.4.13

原生安裝:下載後解壓啟動即可 http://kafka.apache.org/downloads

bin/kafka-server-start.sh config/server.properties
#server.properties配置說明
#表示broker的編號,如果集群中有多個broker,則每個broker的編號需要設置的不同
broker.id=0 
#brokder對外提供的服務入口地址,預設9092
listeners=PLAINTEXT://:9092 
#設置存放消息日誌文件的地址
log.dirs=/tmp/kafka/log 
#Kafka所需Zookeeper集群地址,這裡是關鍵!加入同一個zk的kafka為同一集群
zookeeper.connect=zookeeper:2181 

2)推薦docker-compose 一鍵啟動

#參考資料中的kafka.yml
#註意hostname問題,ip地址:52.82.98.209,換成你自己伺服器的
#docker-compose -f kafka.yml up -d 啟動
version: '3'
services:
    zookeeper:
        image: zookeeper:3.4.13

    kafka-1:
        container_name: kafka-1
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10903:9092
        environment:
            KAFKA_BROKER_ID: 1 
            HOST_IP: 52.82.98.209
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            #docker部署必須設置外部可訪問ip和埠,否則註冊進zk的地址將不可達造成外部無法連接
            KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
            KAFKA_ADVERTISED_PORT: 10903 
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper           
    kafka-2:
        container_name: kafka-2
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10904:9092
        environment:
            KAFKA_BROKER_ID: 2 
            HOST_IP: 52.82.98.209
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
            KAFKA_ADVERTISED_PORT: 10904 
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper 

3.3 組件探秘

命令行工具是管理kafka集群最直接的工具。官方自帶,不需要額外安裝。

3.2.1 主題創建

#進入容器
docker exec -it kafka-1 sh
#進入bin目錄
cd /opt/kafka/bin
#創建
kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test --partitions 2 --replication-factor 1

3.2.2 查看主題

kafka-topics.sh --zookeeper zookeeper:2181 --list

3.2.3 主題詳情

kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test

#分析輸出:
Topic:test	PartitionCount:2	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: test	Partition: 1	Leader: 1	Replicas: 1	Isr: 1

3.2.4 消息收發

#使用docker連接任意集群中的一個容器
docker exec -it kafka-1 sh

#進入kafka的容器內目錄
cd /opt/kafka/bin

#客戶端監聽
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

#另起一個終端,驗證發送
./kafka-console-producer.sh --broker-list localhost:9092 --topic test

3.2.5 分組消費

#啟動兩個consumer時,如果不指定group信息,消息被廣播
#指定相同的group,讓多個消費者分工消費(畫圖:group原理)

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa

#結果:在發送方,連續發送 1-4 ,4條消息,同一group下的兩台consumer交替消費,併發執行

註意!!!

這是在消費者和分區數相等(都是2)的情況下。
如果同一group下的 ( 消費者數量 > 分區數量 ) 那麼就會有消費者閑置。

驗證方式:

可以再多啟動幾個消費者試一試,會發現,超出2個的時候,有的始終不會消費到消息。
停掉可以消費到的,那麼閑置的會被激活,進入工作狀態

3.2.6 指定分區

#指定分區通過參數 --partition,註意!需要去掉上面的group
#指定分區的意義在於,保障消息傳輸的順序性(畫圖:kafka順序性原理)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 1

#結果:發送1-4條消息,交替出現。說明消息被均分到各個分區中投遞


#預設的發送是沒有指定key的
#要指定分區發送,就需要定義key。那麼相同的key被路由到同一個分區
./kafka-console-producer.sh --broker-list kafka-1:9092 --topic test --property parse.key=true

#攜帶key再發送,註意key和value之間用tab分割
>1	1111
>1	2222
>2	3333
>2	4444

#查看consumer的接收情況
#結果:相同的key被同一個consumer消費掉

3.2.7 偏移量

#偏移量決定了消息從哪開始消費,支持:開頭,還是末尾

# earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
# latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
# none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

# 註意點!!!有提交偏移量的話,仍然以提交的為主,即便使用earliest,比提交點更早的也不會被提取

#--offset [earliest|latest(預設)] , 或者 --from-beginning
#新起一個終端,指定offset位置
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset earliest

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --from-beginning

#結果:之前發送的消息,從頭又消費了一遍!

3.4 zk探秘

前面說過,zk存儲了kafka集群的相關信息,本節來探索內部的秘密。

kafka的信息記錄在zk中,進入zk容器,查看相關節點和信息

docker exec -it kafka_zookeeper_1 sh

>./bin/zkCli.sh

>ls /

#結果:得到以下配置信息

file

3.4.1 broker信息

[zk: localhost:2181(CONNECTED) 0] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[1, 2]

#機器broker信息
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://52.82.98.209:10903"],"jmx_port":-1,"host":"52.82.98.209","timestamp":"1609825245500","port":10903,"version":4}
cZxid = 0x27
ctime = Tue Jan 05 05:40:45 GMT 2021
mZxid = 0x27
mtime = Tue Jan 05 05:40:45 GMT 2021
pZxid = 0x27
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x105a2db626b0000
dataLength = 196
numChildren = 0

3.4.2 主題與分區

#分區節點路徑
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[test, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/test/partitions
[0, 1]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics/test/partitions/0
[state]

#分區信息,leader所在的機器id,isr列表等
[zk: localhost:2181(CONNECTED) 18] get /brokers/topics/test/partitions/0/state
{"controller_epoch":1,"leader":1,"version":1,"leader_epoch":0,"isr":[1]}
cZxid = 0xb0
ctime = Tue Jan 05 05:56:06 GMT 2021
mZxid = 0xb0
mtime = Tue Jan 05 05:56:06 GMT 2021
pZxid = 0xb0
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0

3.4.3 消費者與偏移量

[zk: localhost:2181(CONNECTED) 15] ls /consumers
[]
#空的???
#那麼,消費者以及它的偏移記在哪裡呢???

kafka 消費者記錄 group 的消費 偏移量 有兩種方式 :

1)kafka 自維護 (新)

2)zookpeer 維護 (舊) ,已經逐漸被廢棄

查看方式:

上面的消費用的是控制台工具,這個工具使用--bootstrap-server,不經過zk,也就不會記錄到/consumers下。

其消費者的offset會更新到一個kafka自帶的topic【__consumer_offsets】下麵

#先起一個消費端,指定group
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa

#使用控制台工具查看消費者及偏移量情況
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --list
KMOffsetCache-44acff134cad
aaa

#查看偏移量詳情
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --describe --group aaa

當前與LEO保持一致,說明消息都完整的被消費過

file

停掉consumer後,往provider中再發幾條記錄,offset開始滯後:

file

重新啟動consumer,消費到最新的消息,同時再返回看偏移量,消息得到同步:

file

3.4.4 controller

#當前集群中的主控節點是誰
[zk: localhost:2181(CONNECTED) 17] get /controller
{"version":1,"brokerid":1,"timestamp":"1609825245694"}
cZxid = 0x2a
ctime = Tue Jan 05 05:40:45 GMT 2021
mZxid = 0x2a
mtime = Tue Jan 05 05:40:45 GMT 2021
pZxid = 0x2a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x105a2db626b0000
dataLength = 54
numChildren = 0

3.5 km

3.5.1 啟動

kafka-manager是目前最受歡迎的kafka集群管理工具,最早由雅虎開源。提供可視化kafka集群操作

官網:https://github.com/yahoo/kafka-manager/releases

註意它的版本,docker社區的景象版本滯後於kafka,我們自己來打鏡像。

#Dockerfile
FROM daocloud.io/library/java:openjdk-8u40-jdk
ADD kafka-manager-2.0.0.2/ /opt/km2002/
CMD ["/opt/km2002/bin/kafka-manager","-Dconfig.file=/opt/km2002/conf/application.conf"]

#打包,註意將kafka-manager-2.0.0.2放到同一目錄
docker build -t km:2002 .
#啟動:在上面的yml里,services節點下加一段
#參考資料:km.yml
#執行: docker-compose -f km.yml up -d
		km:
        image: km:2002
        ports:
            - 10906:9000
        depends_on:
            - zookeeper
         

3.5.2 使用

使用km可以方便的查看以下信息:

  • cluster:創建集群,填寫zk地址,選中jmx,consumer信息等選項
  • brokers:列表,機器信息
  • topic:主題信息,主題內的分區信息。創建新的主題,增加分區
  • cosumers: 消費者信息,偏移量等

本文由傳智教育博學谷 - 狂野架構師教研團隊發佈
如果本文對您有幫助,歡迎關註和點贊;如果您有任何建議也可留言評論或私信,您的支持是我堅持創作的動力
轉載請註明出處!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 啥是函數式介面、它和JAVA中普通的介面有啥區別?函數式介面有啥用?如何在實際編碼中使用函數式介面?帶著這些問題,我們一起來認識下函數式介面的廬山真面目。 ...
  • 函數是基於功能或者邏輯進行聚合的可復用的代碼塊。將一些複雜的、冗長的代碼抽離封裝成多個代碼片段,即函數,有助於提高代碼邏輯的可讀性和可維護性。不同於Python,由於 Go lang是編譯型語言,編譯之後再運行,所以函數的定義順序無關痛癢。 函數聲明 在 Go lang里,函數聲明語法如下: fun ...
  • 《Python高手之路 第3版》|免費下載地址 作者簡介 · · · · · · Julien Danjou 具有12年從業經驗的自由軟體黑客。擁有多個開源社區的不同身份:Debian開發者、Freedesktop貢獻者、GNU Emacs提交者、awesome視窗管理器的創建者以及OpenStac ...
  • 《紅樓夢》作為我國四大名著之一,古典小說的巔峰之作,粉絲量極其龐大,而紅學也經久不衰。所以我們今天通過 Python 來探索下紅樓夢裡那千絲萬縷的人物關係,話不多說,開始整活! 一、準備工作 紅樓夢txt格式電子書一份 金陵十二釵+賈寶玉人物名稱列表 寶玉 nr 黛玉 nr 寶釵 nr 湘雲 nr ...
  • “什麼是IO的多路復用機制?” 這是一道年薪50W的面試題,很遺憾,99%的人都回答不出來。 大家好,我是Mic,一個工作了14年的Java程式員。 今天,給大家分享一道網路IO的面試題。 這道題目的文字回答已經整理到了15W字的面試文檔裡面,大家可以S我領取。 下麵看看高手的回答。 高手: IO多 ...
  • 多商戶商城系統,也稱為B2B2C(BBC)平臺電商模式多商家商城系統。可以快速幫助企業搭建類似拼多多/京東/天貓/淘寶的綜合商城。 多商戶商城系統支持商家入駐加盟,同時滿足平臺自營、旗艦店等多種經營方式。平臺可以通過收取商家入駐費,訂單交易服務費,提現手續費,簡訊通道費等多手段方式,實現整體盈利。 ...
  • 原文連接:https://www.zhoubotong.site/post/67.html Go 標準庫的net/url包提供的兩個函可以直接檢查URL合法性,不需要手動去正則匹配校驗。 下麵可以直接使用ParseRequestURI()函數解析URL,當然這個只會驗證url格式,至於功能變數名稱是否存在或 ...
  • Python帶我起飛——入門、進階、商業實戰_ 免費下載地址 內容簡介 · · · · · · 《Python帶我起飛——入門、進階、商業實戰》針對Python 3.5 以上版本,採用“理論+實踐”的形式編寫,通過大量的實例(共42 個),全面而深入地講解“Python 基礎語法”和“Python ...
一周排行
    -Advertisement-
    Play Games
  • Timer是什麼 Timer 是一種用於創建定期粒度行為的機制。 與標準的 .NET System.Threading.Timer 類相似,Orleans 的 Timer 允許在一段時間後執行特定的操作,或者在特定的時間間隔內重覆執行操作。 它在分散式系統中具有重要作用,特別是在處理需要周期性執行的 ...
  • 前言 相信很多做WPF開發的小伙伴都遇到過表格類的需求,雖然現有的Grid控制項也能實現,但是使用起來的體驗感並不好,比如要實現一個Excel中的表格效果,估計你能想到的第一個方法就是套Border控制項,用這種方法你需要控制每個Border的邊框,並且在一堆Bordr中找到Grid.Row,Grid. ...
  • .NET C#程式啟動閃退,目錄導致的問題 這是第2次踩這個坑了,很小的編程細節,容易忽略,所以寫個博客,分享給大家。 1.第一次坑:是windows 系統把程式運行成服務,找不到配置文件,原因是以服務運行它的工作目錄是在C:\Windows\System32 2.本次坑:WPF桌面程式通過註冊表設 ...
  • 在分散式系統中,數據的持久化是至關重要的一環。 Orleans 7 引入了強大的持久化功能,使得在分散式環境下管理數據變得更加輕鬆和可靠。 本文將介紹什麼是 Orleans 7 的持久化,如何設置它以及相應的代碼示例。 什麼是 Orleans 7 的持久化? Orleans 7 的持久化是指將 Or ...
  • 前言 .NET Feature Management 是一個用於管理應用程式功能的庫,它可以幫助開發人員在應用程式中輕鬆地添加、移除和管理功能。使用 Feature Management,開發人員可以根據不同用戶、環境或其他條件來動態地控制應用程式中的功能。這使得開發人員可以更靈活地管理應用程式的功 ...
  • 在 WPF 應用程式中,拖放操作是實現用戶交互的重要組成部分。通過拖放操作,用戶可以輕鬆地將數據從一個位置移動到另一個位置,或者將控制項從一個容器移動到另一個容器。然而,WPF 中預設的拖放操作可能並不是那麼好用。為瞭解決這個問題,我們可以自定義一個 Panel 來實現更簡單的拖拽操作。 自定義 Pa ...
  • 在實際使用中,由於涉及到不同編程語言之間互相調用,導致C++ 中的OpenCV與C#中的OpenCvSharp 圖像數據在不同編程語言之間難以有效傳遞。在本文中我們將結合OpenCvSharp源碼實現原理,探究兩種數據之間的通信方式。 ...
  • 一、前言 這是一篇搭建許可權管理系統的系列文章。 隨著網路的發展,信息安全對應任何企業來說都越發的重要,而本系列文章將和大家一起一步一步搭建一個全新的許可權管理系統。 說明:由於搭建一個全新的項目過於繁瑣,所有作者將挑選核心代碼和核心思路進行分享。 二、技術選擇 三、開始設計 1、自主搭建vue前端和. ...
  • Csharper中的表達式樹 這節課來瞭解一下表示式樹是什麼? 在C#中,表達式樹是一種數據結構,它可以表示一些代碼塊,如Lambda表達式或查詢表達式。表達式樹使你能夠查看和操作數據,就像你可以查看和操作代碼一樣。它們通常用於創建動態查詢和解析表達式。 一、認識表達式樹 為什麼要這樣說?它和委托有 ...
  • 在使用Django等框架來操作MySQL時,實際上底層還是通過Python來操作的,首先需要安裝一個驅動程式,在Python3中,驅動程式有多種選擇,比如有pymysql以及mysqlclient等。使用pip命令安裝mysqlclient失敗應如何解決? 安裝的python版本說明 機器同時安裝了 ...