kafka環境搭建和使用(python API)

来源:https://www.cnblogs.com/iforever/archive/2018/06/03/9130983.html
-Advertisement-
Play Games

引言 上一篇文章瞭解了kafka的重要組件zookeeper,用來保存broker、consumer等相關信息,做到平滑擴展。這篇文章就實際操作部署下kafka,用幾個簡單的例子加深對kafka的理解,學會基本使用kafka。 環境搭建 我將會在本地部署一個三台機器的zookeeper集群,和一個2 ...


引言

上一篇文章瞭解了kafka的重要組件zookeeper,用來保存broker、consumer等相關信息,做到平滑擴展。這篇文章就實際操作部署下kafka,用幾個簡單的例子加深對kafka的理解,學會基本使用kafka。

環境搭建

我將會在本地部署一個三台機器的zookeeper集群,和一個2台機器的kafka集群。

zookeeper集群

zookeeper的搭建可以看我的上一篇文章分散式系統中zookeeper實現配置管理+集群管理,按照步驟,一步步可以很容易的搭建3太伺服器的zookeeper集群。跟之前一樣,我還是在本地的3個埠搭建了3台伺服器,地址如下所示:

192.168.0.105:2181
192.168.0.105:2182
192.168.0.105:2183

這三台伺服器一會兒會在kafka配置中用到。

kafka集群

第一步. 下載kafka

到kafka官網下載apache kafka,解壓到/path/to/kafka目錄。

第二步. 修改配置文件
複製/path/to/kafka/config/server.properties,到/path/to/kafka/config/server-1.properties/path/to/kafka/config/server-2.properties

配置文件中修改的差異內容如下所示:
server-1.properties

broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183

server-2.properties

broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
zookeeper.connect=192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183

其中broker.id是broker的唯一標示,集群中的broker標識必須唯一。
listeners是broker監聽的地址和埠,advertised.listeners用於和producer、consumer交互,後者未配置會預設使用前者,listeners的完整格式是listeners = listener_name://host_name:port,其中PLAINTEXT是協議,還有一種是SSL,具體還沒太搞明白(TODO)。
log.dirs是日誌數據的存放目錄,也就是producer產生的數據存放的目錄。
zookeeper.connect配置是zookeeper的集群,broker啟動之後將信息註冊到zookeeper集群中。

第三步. 啟動伺服器

cd /path/to/kafka
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties

使用jps命令可以看見2個kafka進程,證明啟動成功了。

第四步. 創建topic
創建topic一般使用kafka自帶的腳本創建:

bin/kafka-topics.sh --create --zookeeper 192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183 --replication-factor 2 --partitions 10 --topic user-event

其中--zookeeper就是後面就是我們上面配置的zookeeper集群,--replication-factor代表每個分區在集群中複製的份數,後面的值要小於kafka集群中伺服器數量,--partitions表示創建主題的分區數量,一般分區越大,性能越好,--topic後邊兒就是創建主題的名字,運行成功之後會看到Created topic "user-event".字樣,表示創建成功,會在kafka配置的日誌目錄下創建主題信息,比如下麵的:
ll /tmp/kafka-logs-1

drwxr-xr-x  7 ritoyan  wheel  224  6  3 21:21 clock-tick-0
drwxr-xr-x  7 ritoyan  wheel  224  6  3 21:21 clock-tick-2
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-0
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-1
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-2
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-3
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-4
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-5
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-6
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-7
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-8
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-9

ll /tmp/kafka-logs-2

drwxr-xr-x  7 ritoyan  wheel  224  6  3 21:21 clock-tick-1
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-0
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-1
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-2
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-3
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-4
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-5
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-6
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-7
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-8
drwxr-xr-x  6 ritoyan  wheel  192  6  3 21:26 user-event-9

可以看到兩個broker中都創建了主題user-event的10個分區。可能也有人要問了,clock-tick這個主題怎麼在broker1中有2個分區,broker2中有1個分區,這個是我之前創建的一個分區,用了下麵的命令bin/kafka-topics.sh --create --zookeeper 192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183 --replication-factor 1 --partitions 3 --topic clock-tick,只有一份日誌記錄,3個分區,分區會均勻的分佈在所有broker上。

至此kafka環境配置好了,西面我們看看如何使用。

基本使用

安裝kafka-python,用來操作kafka,pip3 install kafka-python,這裡是他的文檔,文檔寫的不錯,簡潔易懂kafka-python

producer 向broker發送消息

bootstrap_servers是kafka集群地址信息,下麵事項主題user-event發送一條消息,send發送消息是非同步的,會馬上返回,因此我們要通過阻塞的方式等待消息發送成功(或者flush()也可以,flush會阻塞知道所有log都發送成功),否則消息可能會發送失敗,但也不會有提示,關於上面這個可以通過刪除send之後的語句試試,會發現broker不會收到消息,然後在send後加上time.sleep(10)之後,會看到broker收到消息。

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(
    bootstrap_servers=[
        "localhost:9093",
  "localhost:9094"
  ]
)

future = producer.send("user-event", b'I am rito yan')
try:
    record_metadata = future.get(timeout=10)
    print_r(record_metadata)
except KafkaError as e:
    print(e)

阻塞等待發送成功之後,會看到返回插入記錄的信息:
RecordMetadata(topic='user-event', partition=7, topic_partition=TopicPartition(topic='user-event', partition=7), offset=1, timestamp=1528034253757, checksum=None, serialized_key_size=-1, serialized_value_size=13),裡面包括了插入log的主題、分區等信息。

格式化發送的信息

創建producer的時候可以通過value_serializer指定格式化函數,比如我們數據是個dict,可以指定格式化函數,將dict轉化為byte:

import json

producer = KafkaProducer(
    bootstrap_servers=[
        "localhost:9093",
        "localhost:9094"
    ],
    value_serializer=lambda m: json.dumps(m).encode('ascii')
)

future = producer.send("user-event", {
    "name": "燕睿濤",
    "age": 26,
    "friends": [
        "ritoyan",
        "luluyrt"
    ]
})

這樣就可以將格式化之後的信息發送給broker,不用每次發送的時候都自己格式化,真是不要太好用。

consumer 消費數據

創建一個consumer,其中group_id是分組,broker中的每一個數據只能被consumer組中的一個consumer消費。

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "user-event",
    group_id = "user-event-test",
    bootstrap_servers = [
        "localhost:9093",
        "localhost:9094"
    ]
)
for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

啟動之後,進程會一直阻塞在哪裡,等broker中有消息的時候就會去消費,啟動多個進程,只要保證group_id一致,就可以保證消息只被組內的一個consumer消費,上面的程式會輸出:

user-event:8:2: key=None value=b'{"name": "\\u71d5\\u777f\\u6d9b", "age": 26, "friends": ["ritoyan", "luluyrt"]}'

同樣,進入的時候有value_serializer,出來的時候對應的也有value_deserializer,消費者可以配置value_deserializer來格式化內容,跟producer對應起來

consumer = KafkaConsumer(
    "user-event",
  group_id = "user-event-test",
  bootstrap_servers = [
        "localhost:9093",
  "localhost:9094"
  ],
  value_deserializer=lambda m: json.loads(m.decode('ascii'))
)

輸出內容user-event:8:3: key=None value={'name': '燕睿濤', 'age': 26, 'friends': ['ritoyan', 'luluyrt']}

kafka其他命令

查看分組

我們的consumer可能有很多分組,可以通過西面的命令查看分組信息:

cd /path/to/kafka
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093,localhost:9094 --list

可以看到我使用中的分組有4個,分別如下所示

clock-tick-test3
user-event-test
clock-tick-test2
clock-tick-test

查看特定分組信息

可以通過bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9093 --group user-event-test --describe,查看分組user-event-test的信息,可以看到西面的信息,包含消費的主題、分區信息,以及consumer在分區中的offset和分區的總offset。(為了格式化顯示,刪了部分列的部分字母)

TOPIC       PARTITION   CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID HOST    CLIENT-ID
user-event  3   0   0   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  0   0   0   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  1   1   1   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  2   1   1   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  4   0   0   0   kafka-python-154b2 /127.0.0.1   kafka-python
user-event  9   1   1   0   kafka-python-78517 /127.0.0.1   kafka-python
user-event  8   4   4   0   kafka-python-78517 /127.0.0.1   kafka-python
user-event  7   2   2   0   kafka-python-78517 /127.0.0.1   kafka-python
user-event  6   1   1   0   kafka-python-78517 /127.0.0.1   kafka-python
user-event  5   0   0   0   kafka-python-78517 /127.0.0.1   kafka-python

結語

至此,kafka的基本使用算是掌握了,以後要是有機會在項目中實踐就好了,在實際工程中的各種問題可以更加深刻的理解其中的原理。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Q: 為什麼要引入鏈表的概念?它是解決什麼問題的? A: 數組作為數據存儲結構有一定的缺陷,在無序數組中,搜索是低效的;而在有序數組中,插入效率又很低;不管在哪一個數組中刪除效率都很低;況且一個數組創建後,它的大小是不可改變的。 A: 在本篇中,我們將學習一種新的數據結構 —— 鏈表,它可以解決上面 ...
  • django rest framework 中的序列化組件,可以說是其核心組件,也是我們平時使用最多的組件,它不僅僅有序列化功能,更提供了數據驗證的功能(與django中的form類似)。 便於展現的序列化操作,我們需要在model添加外鍵、多對多情況。以下是新的models(請刪除原有的資料庫,重 ...
  • 老樣子,拋出個問題,我們想要創建一個實例,但是由於某些原因想繞過__init__方法,用別的方式來進行創建。 舉個慄子 小賤賤反序列化數據,或者說實現一個類方法將其作為備選的構造函數,都屬於這種情況。舉個慄子: 採用下麵的方法可以不用調用__init__()創建一個Date實例: 但是註意,此時使用 ...
  • 一、文件的上傳和下載 1、文件上傳的原理分析 什麼是文件上傳? 要將客戶端(瀏覽器)數據存儲到伺服器端,而不將數據直接存儲到資料庫中,而是要將數據存儲到伺服器所在的磁碟上,這就要使用文件上傳。為什麼使用文件上傳? 通過文件上傳,可以將瀏覽器端的數據直接保存到伺服器端。不將數據保存到資料庫中,而是保存 ...
  • 在各種後臺系統中都會涉及到許可權的管控,從功能許可權的管控,到數據許可權的管控,都是為了讓系統的在使用的過程中更加的安全。功能許可權管控是對針對不同的角色可以進行不同的功能操作,而數據許可權管控是針對不同的角色可以查看不同的數據。這篇文章主要介紹 JeeSite 中對功能許可權的管控,也就是訪問控制許可權的使用, ...
  • sprign中的logging實現簡介 對於spring架構,Jakarta Commons Logging API (JCL)是強制依賴的。spring將JCL反編譯,並使得它們對類可見,從而擴展spring。程式員應該要意識到,所有版本的spring使用同一個logging庫:因此遷移是很容易的 ...
  • 今晚在Ubuntu環境上安裝composer後,想查看下是否安裝成功,使用composer v,結果提示:/usr/bin/env: php: 沒有那個文件或目錄 現說說我的解決辦法: 它提示的原因,主要是因為php的安裝文件不在/usr/local/bin下。解決辦法也很簡單,就是把php的可執行 ...
  • 原創 標題:激光樣式x星球的盛大節日為增加氣氛,用30台機光器一字排開,向太空中打出光柱。安裝調試的時候才發現,不知什麼原因,相鄰的兩台激光器不能同時打開!國王很想知道,在目前這種bug存在的情況下,一共能打出多少種激光效果?顯然,如果只有3台機器,一共可以成5種樣式,即:全都關上(sorry, 此 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...