【詳細教程】Kafka應用場景、基礎組件、架構探索

来源:https://www.cnblogs.com/jiagooushi/archive/2022/08/11/16575970.html
-Advertisement-
Play Games

1、應用場景 1.1 kafka場景 ​ Kafka最初是由LinkedIn公司採用Scala語言開發,基於ZooKeeper,現在已經捐獻給了Apache基金會。目前Kafka已經定位為一個分散式流式處理平臺,它以 高吞吐、可持久化、可水平擴展、支持流處理等多種特性而被廣泛應用。 ​ Apache ...


file

1、應用場景

1.1 kafka場景

​ Kafka最初是由LinkedIn公司採用Scala語言開發,基於ZooKeeper,現在已經捐獻給了Apache基金會。目前Kafka已經定位為一個分散式流式處理平臺,它以 高吞吐、可持久化、可水平擴展、支持流處理等多種特性而被廣泛應用。

​ Apache Kafka能夠支撐海量數據的數據傳遞。在離線和實時的消息處理業務系統中,Kafka都有廣泛的應用。

(1)日誌收集:收集各種服務的log,通過kafka以統一介面服務的方式開放 給各種consumer,例如Hadoop、Hbase、Solr等;

(2)消息系統:解耦和生產者和消費者、緩存消息等;

(3)用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點 擊等活動,這些活動信息被各個伺服器發佈到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時 的監控分析,或者裝載到Hadoop、數據倉庫中做離線分析和挖掘;

(4)運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分散式應用的數據,生產各種操作 的集中反饋,比如報警和報告;

(5)流式處理:比如spark streaming和storm;

1.2 kafka特性

kafka以高吞吐量著稱,主要有以下特性:

(1)高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒;

(2)可擴展性:kafka集群支持熱擴展;

(3)持久性、可靠性:消息被持久化到本地磁碟,並且支持數據備份防止數據丟失;

(4)容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗);

(5)高併發:支持數千個客戶端同時讀寫;

1.3 消息對比

  • 如果普通的業務消息解耦,消息傳輸,rabbitMq是首選,它足夠簡單,管理方便,性能夠用。
  • 如果在上述,日誌、消息收集、訪問記錄等高吞吐,實時性場景下,推薦kafka,它基於分散式,擴容便捷
  • 如果很重的業務,要做到極高的可靠性,考慮rocketMq,但是它太重。需要你有足夠的瞭解

1.4 大廠應用

  • 京東通過kafka搭建數據平臺,用於用戶購買、瀏覽等行為的分析。成功抗住6.18的流量洪峰
  • 阿裡借鑒kafka的理念,推出自己的rocketmq。在設計上參考了kafka的架構體系

2、基礎組件

2.1 角色

file

  • broker:節點,就是你看到的機器

  • provider:生產者,發消息的

  • consumer:消費者,讀消息的

  • zookeeper:信息中心,記錄kafka的各種信息的地方

  • controller:其中的一個broker,作為leader身份來負責管理整個集群。如果掛掉,藉助zk重新選主

2.2 邏輯組件

file

  • topic:主題,一個消息的通道,收發總得知道消息往哪投

  • partition:分區,每個主題可以有多個分區分擔數據的傳遞,多條路並行,吞吐量大

  • Replicas:副本,每個分區可以設置多個副本,副本之間數據一致。相當於備份,有備胎更可靠

  • leader & follower:主從,上面的這些副本里有1個身份為leader,其他的為follower。leader處理partition的所有讀寫請求

2.3 副本集合

  • AR:所有副本的統稱,AR=ISR+OSR

  • ISR:同步中的副本,可以參與leader選主。一旦落後太多(數量滯後和時間滯後兩個維度)會被踢到OSR。

  • OSR:踢出同步的副本,一直追趕leader,追上後會進入ISR

2.4 消息標記

file

  • offset:偏移量,消息消費到哪一條了?每個消費者都有自己的偏移量
  • HW:(high watermark):副本的高水印值,客戶端最多能消費到的位置,HW值為8,代表offset為[0,8]的9條消息都可以被消費到,它們是對消費者可見的,而[9,12]這4條消息由於未提交,對消費者是不可見的。
  • LEO:(log end offset):日誌末端位移,代表日誌文件中下一條待寫入消息的offset,這個offset上實際是沒有消息的。不管是leader副本還是follower副本,都有這個值。

那麼這三者有什麼關係呢?

比如在副本數等於3的情況下,消息發送到Leader A之後會更新LEO的值,Follower B和Follower C也會實時拉取Leader A中的消息來更新自己,HW就表示A、B、C三者同時達到的日誌位移,也就是A、B、C三者中LEO最小的那個值。由於B、C拉取A消息之間延時問題,所以HW一般會小於LEO,即LEO>=HW。

具體的同步原理,下麵章節會詳細講到

3.1 發展歷程

http://kafka.apache.org/downloads

file

3.1.1 版本命名

Kafka在1.0.0版本前的命名規則是4位,比如0.8.2.1,0.8是大版本號,2是小版本號,1表示打過1個補丁

現在的版本號命名規則是3位,格式是“大版本號”+“小版本號”+“修訂補丁數”,比如2.5.0,前面的2代表的是大版本號,中間的5代表的是小版本號,0表示沒有打過補丁

我們所看到的下載包,前面是scala編譯器的版本,後面才是真正的kafka版本。

3.1.2 演進歷史

0.7版本
只提供了最基礎的消息隊列功能。

0.8版本
引入了副本機制,至此Kafka成為了一個真正意義上完備的分散式高可靠消息隊列解決方案。

0.9版本
增加許可權和認證,使用Java重寫了新的consumer API,Kafka Connect功能;不建議使用consumer API;

0.10版本
引入Kafka Streams功能,正式升級成分散式流處理平臺;建議版本0.10.2.2;建議使用新版consumer API

0.11版本
producer API冪等,事務API,消息格式重構;建議版本0.11.0.3;謹慎對待消息格式變化

1.0和2.0版本
Kafka Streams改進;建議版本2.0;

3.2 集群搭建(助學)

1)原生啟動

kafka啟動需要zookeeper,第一步啟動zk:

docker run --name zookeeper-1 -d -p 2181 zookeeper:3.4.13

原生安裝:下載後解壓啟動即可 http://kafka.apache.org/downloads

bin/kafka-server-start.sh config/server.properties
#server.properties配置說明
#表示broker的編號,如果集群中有多個broker,則每個broker的編號需要設置的不同
broker.id=0 
#brokder對外提供的服務入口地址,預設9092
listeners=PLAINTEXT://:9092 
#設置存放消息日誌文件的地址
log.dirs=/tmp/kafka/log 
#Kafka所需Zookeeper集群地址,這裡是關鍵!加入同一個zk的kafka為同一集群
zookeeper.connect=zookeeper:2181 

2)推薦docker-compose 一鍵啟動

#參考資料中的kafka.yml
#註意hostname問題,ip地址:52.82.98.209,換成你自己伺服器的
#docker-compose -f kafka.yml up -d 啟動
version: '3'
services:
    zookeeper:
        image: zookeeper:3.4.13

    kafka-1:
        container_name: kafka-1
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10903:9092
        environment:
            KAFKA_BROKER_ID: 1 
            HOST_IP: 52.82.98.209
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            #docker部署必須設置外部可訪問ip和埠,否則註冊進zk的地址將不可達造成外部無法連接
            KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
            KAFKA_ADVERTISED_PORT: 10903 
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper           
    kafka-2:
        container_name: kafka-2
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10904:9092
        environment:
            KAFKA_BROKER_ID: 2 
            HOST_IP: 52.82.98.209
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_HOST_NAME: 52.82.98.209
            KAFKA_ADVERTISED_PORT: 10904 
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper 

3.3 組件探秘

命令行工具是管理kafka集群最直接的工具。官方自帶,不需要額外安裝。

3.2.1 主題創建

#進入容器
docker exec -it kafka-1 sh
#進入bin目錄
cd /opt/kafka/bin
#創建
kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test --partitions 2 --replication-factor 1

3.2.2 查看主題

kafka-topics.sh --zookeeper zookeeper:2181 --list

3.2.3 主題詳情

kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test

#分析輸出:
Topic:test	PartitionCount:2	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: test	Partition: 1	Leader: 1	Replicas: 1	Isr: 1

3.2.4 消息收發

#使用docker連接任意集群中的一個容器
docker exec -it kafka-1 sh

#進入kafka的容器內目錄
cd /opt/kafka/bin

#客戶端監聽
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

#另起一個終端,驗證發送
./kafka-console-producer.sh --broker-list localhost:9092 --topic test

3.2.5 分組消費

#啟動兩個consumer時,如果不指定group信息,消息被廣播
#指定相同的group,讓多個消費者分工消費(畫圖:group原理)

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa

#結果:在發送方,連續發送 1-4 ,4條消息,同一group下的兩台consumer交替消費,併發執行

註意!!!

這是在消費者和分區數相等(都是2)的情況下。
如果同一group下的 ( 消費者數量 > 分區數量 ) 那麼就會有消費者閑置。

驗證方式:

可以再多啟動幾個消費者試一試,會發現,超出2個的時候,有的始終不會消費到消息。
停掉可以消費到的,那麼閑置的會被激活,進入工作狀態

3.2.6 指定分區

#指定分區通過參數 --partition,註意!需要去掉上面的group
#指定分區的意義在於,保障消息傳輸的順序性(畫圖:kafka順序性原理)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 1

#結果:發送1-4條消息,交替出現。說明消息被均分到各個分區中投遞


#預設的發送是沒有指定key的
#要指定分區發送,就需要定義key。那麼相同的key被路由到同一個分區
./kafka-console-producer.sh --broker-list kafka-1:9092 --topic test --property parse.key=true

#攜帶key再發送,註意key和value之間用tab分割
>1	1111
>1	2222
>2	3333
>2	4444

#查看consumer的接收情況
#結果:相同的key被同一個consumer消費掉

3.2.7 偏移量

#偏移量決定了消息從哪開始消費,支持:開頭,還是末尾

# earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
# latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
# none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

# 註意點!!!有提交偏移量的話,仍然以提交的為主,即便使用earliest,比提交點更早的也不會被提取

#--offset [earliest|latest(預設)] , 或者 --from-beginning
#新起一個終端,指定offset位置
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset earliest

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --from-beginning

#結果:之前發送的消息,從頭又消費了一遍!

3.4 zk探秘

前面說過,zk存儲了kafka集群的相關信息,本節來探索內部的秘密。

kafka的信息記錄在zk中,進入zk容器,查看相關節點和信息

docker exec -it kafka_zookeeper_1 sh

>./bin/zkCli.sh

>ls /

#結果:得到以下配置信息

file

3.4.1 broker信息

[zk: localhost:2181(CONNECTED) 0] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[1, 2]

#機器broker信息
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://52.82.98.209:10903"],"jmx_port":-1,"host":"52.82.98.209","timestamp":"1609825245500","port":10903,"version":4}
cZxid = 0x27
ctime = Tue Jan 05 05:40:45 GMT 2021
mZxid = 0x27
mtime = Tue Jan 05 05:40:45 GMT 2021
pZxid = 0x27
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x105a2db626b0000
dataLength = 196
numChildren = 0

3.4.2 主題與分區

#分區節點路徑
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[test, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/test/partitions
[0, 1]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics/test/partitions/0
[state]

#分區信息,leader所在的機器id,isr列表等
[zk: localhost:2181(CONNECTED) 18] get /brokers/topics/test/partitions/0/state
{"controller_epoch":1,"leader":1,"version":1,"leader_epoch":0,"isr":[1]}
cZxid = 0xb0
ctime = Tue Jan 05 05:56:06 GMT 2021
mZxid = 0xb0
mtime = Tue Jan 05 05:56:06 GMT 2021
pZxid = 0xb0
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0

3.4.3 消費者與偏移量

[zk: localhost:2181(CONNECTED) 15] ls /consumers
[]
#空的???
#那麼,消費者以及它的偏移記在哪裡呢???

kafka 消費者記錄 group 的消費 偏移量 有兩種方式 :

1)kafka 自維護 (新)

2)zookpeer 維護 (舊) ,已經逐漸被廢棄

查看方式:

上面的消費用的是控制台工具,這個工具使用--bootstrap-server,不經過zk,也就不會記錄到/consumers下。

其消費者的offset會更新到一個kafka自帶的topic【__consumer_offsets】下麵

#先起一個消費端,指定group
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group aaa

#使用控制台工具查看消費者及偏移量情況
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --list
KMOffsetCache-44acff134cad
aaa

#查看偏移量詳情
./kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 --describe --group aaa

當前與LEO保持一致,說明消息都完整的被消費過

file

停掉consumer後,往provider中再發幾條記錄,offset開始滯後:

file

重新啟動consumer,消費到最新的消息,同時再返回看偏移量,消息得到同步:

file

3.4.4 controller

#當前集群中的主控節點是誰
[zk: localhost:2181(CONNECTED) 17] get /controller
{"version":1,"brokerid":1,"timestamp":"1609825245694"}
cZxid = 0x2a
ctime = Tue Jan 05 05:40:45 GMT 2021
mZxid = 0x2a
mtime = Tue Jan 05 05:40:45 GMT 2021
pZxid = 0x2a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x105a2db626b0000
dataLength = 54
numChildren = 0

3.5 km

3.5.1 啟動

kafka-manager是目前最受歡迎的kafka集群管理工具,最早由雅虎開源。提供可視化kafka集群操作

官網:https://github.com/yahoo/kafka-manager/releases

註意它的版本,docker社區的景象版本滯後於kafka,我們自己來打鏡像。

#Dockerfile
FROM daocloud.io/library/java:openjdk-8u40-jdk
ADD kafka-manager-2.0.0.2/ /opt/km2002/
CMD ["/opt/km2002/bin/kafka-manager","-Dconfig.file=/opt/km2002/conf/application.conf"]

#打包,註意將kafka-manager-2.0.0.2放到同一目錄
docker build -t km:2002 .
#啟動:在上面的yml里,services節點下加一段
#參考資料:km.yml
#執行: docker-compose -f km.yml up -d
		km:
        image: km:2002
        ports:
            - 10906:9000
        depends_on:
            - zookeeper
         

3.5.2 使用

使用km可以方便的查看以下信息:

  • cluster:創建集群,填寫zk地址,選中jmx,consumer信息等選項
  • brokers:列表,機器信息
  • topic:主題信息,主題內的分區信息。創建新的主題,增加分區
  • cosumers: 消費者信息,偏移量等

本文由傳智教育博學谷 - 狂野架構師教研團隊發佈
如果本文對您有幫助,歡迎關註和點贊;如果您有任何建議也可留言評論或私信,您的支持是我堅持創作的動力
轉載請註明出處!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 啥是函數式介面、它和JAVA中普通的介面有啥區別?函數式介面有啥用?如何在實際編碼中使用函數式介面?帶著這些問題,我們一起來認識下函數式介面的廬山真面目。 ...
  • 函數是基於功能或者邏輯進行聚合的可復用的代碼塊。將一些複雜的、冗長的代碼抽離封裝成多個代碼片段,即函數,有助於提高代碼邏輯的可讀性和可維護性。不同於Python,由於 Go lang是編譯型語言,編譯之後再運行,所以函數的定義順序無關痛癢。 函數聲明 在 Go lang里,函數聲明語法如下: fun ...
  • 《Python高手之路 第3版》|免費下載地址 作者簡介 · · · · · · Julien Danjou 具有12年從業經驗的自由軟體黑客。擁有多個開源社區的不同身份:Debian開發者、Freedesktop貢獻者、GNU Emacs提交者、awesome視窗管理器的創建者以及OpenStac ...
  • 《紅樓夢》作為我國四大名著之一,古典小說的巔峰之作,粉絲量極其龐大,而紅學也經久不衰。所以我們今天通過 Python 來探索下紅樓夢裡那千絲萬縷的人物關係,話不多說,開始整活! 一、準備工作 紅樓夢txt格式電子書一份 金陵十二釵+賈寶玉人物名稱列表 寶玉 nr 黛玉 nr 寶釵 nr 湘雲 nr ...
  • “什麼是IO的多路復用機制?” 這是一道年薪50W的面試題,很遺憾,99%的人都回答不出來。 大家好,我是Mic,一個工作了14年的Java程式員。 今天,給大家分享一道網路IO的面試題。 這道題目的文字回答已經整理到了15W字的面試文檔裡面,大家可以S我領取。 下麵看看高手的回答。 高手: IO多 ...
  • 多商戶商城系統,也稱為B2B2C(BBC)平臺電商模式多商家商城系統。可以快速幫助企業搭建類似拼多多/京東/天貓/淘寶的綜合商城。 多商戶商城系統支持商家入駐加盟,同時滿足平臺自營、旗艦店等多種經營方式。平臺可以通過收取商家入駐費,訂單交易服務費,提現手續費,簡訊通道費等多手段方式,實現整體盈利。 ...
  • 原文連接:https://www.zhoubotong.site/post/67.html Go 標準庫的net/url包提供的兩個函可以直接檢查URL合法性,不需要手動去正則匹配校驗。 下麵可以直接使用ParseRequestURI()函數解析URL,當然這個只會驗證url格式,至於功能變數名稱是否存在或 ...
  • Python帶我起飛——入門、進階、商業實戰_ 免費下載地址 內容簡介 · · · · · · 《Python帶我起飛——入門、進階、商業實戰》針對Python 3.5 以上版本,採用“理論+實踐”的形式編寫,通過大量的實例(共42 個),全面而深入地講解“Python 基礎語法”和“Python ...
一周排行
    -Advertisement-
    Play Games
  • 一:背景 1.講故事 在分析的眾多dump中,經常會遇到各種奇葩的問題,僅通過dump這種快照形式還是有很多問題搞不定,而通過 perfview 這種粒度又太粗,很難找到問題之所在,真的很頭疼,比如本篇的 短命線程 問題,參考圖如下: 我們在 t2 時刻抓取的dump對查看 短命線程 毫無幫助,我根 ...
  • 在日常後端Api開發中,我們跟前端的溝通中,通常需要協商好入參的數據類型,和參數是通過什麼方式存在於請求中的,是表單(form)、請求體(body)、地址欄參數(query)、還是說通過請求頭(header)。 當協商好後,我們的介面又需要怎麼去接收這些數據呢?很多小伙伴可能上手就是直接寫一個實體, ...
  • 許多情況下我們需要用到攝像頭獲取圖像,進而處理圖像,這篇博文介紹利用pyqt5、OpenCV實現用電腦上連接的攝像頭拍照並保存照片。為了使用和後續開發方便,這裡利用pyqt5設計了個相機界面,後面將介紹如何實現,要點包括界面設計、邏輯實現及完整代碼。 ...
  • 思路分析 註冊頁面需要對用戶提交的數據進行校驗,並且需要對用戶輸入錯誤的地方進行提示! 所有我們需要使用forms組件搭建註冊頁面! 平時我們書寫form是組件的時候是在views.py裡面書寫的, 但是為了接耦合,我們需要將forms組件都單獨寫在一個地方,需要用的時候導入就行! 例如,在項目文件 ...
  • 思路分析 登錄頁面,我們還是採用ajax的方式提交用戶數據 唯一需要學習的是如何製作圖片驗證碼! 具體的登錄頁面效果圖如下: 如何製作圖片驗證碼 推導步驟1:在img標簽的src屬性里放上驗證碼的請求路徑 補充1.img的src屬性: 1.圖片路徑 2.url 3.圖片的二進位數據 補充2:字體樣式 ...
  • 哈嘍,兄弟們! 最近有許多小伙伴都在吐槽打工好難。 每天都是執行許多重覆的任務 例如閱讀新聞、發郵件、查看天氣、打開書簽、清理文件夾等等, 使用自動化腳本,就無需手動一次又一次地完成這些任務, 非常方便啊有木有?! 而在某種程度上,Python 就是自動化的代名詞。 今天就來和大家一起學習一下, 用 ...
  • 作者:IT王小二 博客:https://itwxe.com 前面小二介紹過使用Typora+PicGo+LskyPro打造舒適寫作環境,那時候需要使用水印功能,但是小二在升級LskyPro2.x版本發現有很多不如人意的東西,遂棄用LskyPro使用MinIO結合代碼實現自己需要的圖床功能,也適合以後 ...
  • OpenAI Gym是一款用於研發和比較強化學習演算法的工具包,本文主要介紹Gym模擬環境的功能和工具包的使用方法,並詳細介紹其中的經典控制問題中的倒立擺(CartPole-v0/1)問題。最後針對倒立擺問題如何建立控制模型並採用爬山演算法優化進行了介紹,並給出了相應的完整python代碼示例和解釋。要... ...
  • python爬蟲瀏覽器偽裝 #導入urllib.request模塊 import urllib.request #設置請求頭 headers=("User-Agent","Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, l ...
  • 前端代碼搭建 主要利用的是bootstrap3中js插件里的模態框版塊 <li><a href="" data-toggle="modal" data-target=".bs-example-modal-lg">修改密碼</a></li> <div class="modal fade bs-exam ...