Python 使用python-kafka類庫開發kafka生產者&消費者&客戶端

来源:https://www.cnblogs.com/shouke/archive/2019/03/02/10463377.html
-Advertisement-
Play Games

使用python-kafka類庫開發kafka生產者&消費者&客戶端 By: 授客 QQ:1033553122 1.測試環境 python 3.4 zookeeper-3.4.13.tar.gz 下載地址1: http://zookeeper.apache.org/releases.html#dow ...


使用python-kafka類庫開發kafka生產者&消費者&客戶端

  By: 授客 QQ:1033553122

 

 

 

1.測試環境

python 3.4

 

zookeeper-3.4.13.tar.gz

下載地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

下載地址2:

https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ

 

kafka_2.12-2.1.0.tgz

下載地址1:

http://kafka.apache.org/downloads.html

下載地址2:

https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw

 

pip-18.1.tar.gz

下載地址:https://pan.baidu.com/s/1VpYk8JvMuztzbvEF8mQoRw

說明:實踐中發現,pip版本比較舊的話,沒法安裝whl文件

 

kafka_python-1.4.4-py2.py3-none-any.whl

下載地址1:

https://pypi.org/project/kafka-python/#files

https://files.pythonhosted.org/packages/5f/89/f13d9b1f32cc37168788215a7ad1e4c133915f6853660a447660393b577d/kafka_python-1.4.4-py2.py3-none-any.whl

 

下載地址2:

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

 

 

python_snappy-0.5.3-cp34-cp34m-win_amd64.whl

下載地址1:

https://www.lfd.uci.edu/~gohlke/pythonlibs/

 

下載地址2:

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

 

 

說明:

kafka-python支持gzip壓縮/解壓縮。如果要消費lz4方式壓縮的消息,則需要安裝python-lz4,如果要支持snappy方式壓縮/解壓縮則需要安裝,否則可能會報錯:kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found.

構建生產者對象時,可通過compression_type 參數指定由對應生產者生產的消息數據的壓縮方式,或者在producer.properties配置中配置compression.type參數。

 

參考鏈接:

https://pypi.org/project/kafka-python/#description

https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install

 

2.代碼實踐

生產者

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

 

from kafka import KafkaProducer

import json

 

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

 

 

for i in range(0, 100):

    producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)

 

# Block直到單條消息發送完或者超時

future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')

result = future.get(timeout=60)

print(result)

 

# Block直到所有阻塞的消息發送到網路

# 註意: 該操作不保證傳輸或者消息發送成功,僅在配置了linger_ms的情況下有用。(It is really only useful if you configure internal batching using linger_ms

 

 

# 序列化json數據

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

producer.send('MY_TOPIC1', {'shouke':'kafka'})

 

# 序列化字元串key

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)

producer.send('MY_TOPIC1', b'shouke', key='strKey')

 

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')

for i in range(2):

    producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))

 

# 消息記錄攜帶header

producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])

 

# 獲取性能數據(註意,實踐發現分區較多的情況下,該操作比較耗時

metrics = producer.metrics()

print(metrics)

 

producer.flush()

 

實踐中遇到錯誤: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解決方案如下:

進入到配置目錄(config),編輯server.properties文件,

查找並設置listener,配置監聽埠,格式:listeners = listener_name://host_name:port,供kafka客戶端連接用的ip和埠,例中配置如下:

listeners=PLAINTEXT://127.0.0.1:9092

 

 

API及常用參數說明:

class kafka.KafkaProducer(**configs)

bootstrap_servers –'host[:port]'字元串,或者由'host[:port]'組成的字元串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單台伺服器)地址,預設值為 localhost, port預設值為9092,這裡可以不用填寫所有broker的host和port,但必須保證至少有一個broker)

 

key_serializer (可調用對象) –用於轉換用戶提供的key值為位元組,必須返回位元組數據。 如果為None,則等同調用f(key)。 預設值: None.

 

value_serializer(可調用對象) – 用於轉換用戶提供的value消息值為位元組,必須返回位元組數據。 如果為None,則等同調用f(value)。 預設值: None.

 

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

topic(str) – 設置消息將要發佈到的主題,即消息所屬主題

 

value(可選) – 消息內容,必須為位元組數據,或者通過value_serializer序列化後的位元組數據。如果為None,則key必填,消息等同於“刪除”。( If value is None, key is required and message acts as a ‘delete’)

 

partition (int, 可選) – 指定分區。如果未設置,則使用配置的partitioner

 

key (可選) – 和消息對應的key,可用於決定消息發送到哪個分區。如果平partition為None,則相同key的消息會被髮布到相同分區(但是如果key為None,則隨機選取分區)(If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly)). 必須為位元組數據或者通過配置的key_serializer序列化後的位元組數據.

 

headers (可選) – 設置消息header,header-value鍵值對錶示的list。list項為元組:格式 (str_header,bytes_value)

 

timestamp_ms (int, 可選) –毫秒數 (從1970 1月1日 UTC算起) ,作為消息時間戳。預設為當前時間

 

函數返回FutureRecordMetadata類型的RecordMetadata數據

 

flush(timeout=None)

發送所有可以立即獲取的緩衝消息(即時linger_ms大於0),線程block直到這些記錄發送完成。當一個線程等待flush調用完成而block時,其它線程可以繼續發送消息。

 

註意:flush調用不保證記錄發送成功

 

metrics(raw=False)

獲取生產者性能指標。

 

參考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

 

註:生產者代碼是線程安全的,支持多線程,而消費者則不然

 

消費者

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

from kafka import KafkaConsumer

from kafka import TopicPartition

import json

 


consumer = KafkaConsumer('MY_TOPIC1',
                         bootstrap_servers=['127.0.0.1:9092'],
                         #auto_offset_reset='',
                         auto_offset_reset='latest',# 消費kafka中最近的數據,如果設置為earliest則消費最早的數據,不管這些數據是否消費
                         enable_auto_commit=True, # 自動提交消費者的offset
                         auto_commit_interval_ms=3000, ## 自動提交消費者offset的時間間隔
                         group_id='MY_GROUP1',

                         consumer_timeout_ms= 10000, # 如果10秒內kafka中沒有可供消費的數據,自動退出
                         client_id='consumer-python3'
                         )

 

for msg in consumer:

    print (msg)

    print('topic: ', msg.topic)

    print('partition: ', msg.partition)

    print('key: ', msg.key, 'value: ', msg.value)

    print('offset:', msg.offset)

    print('headers:', msg.headers)

 

# Get consumer metrics

metrics = consumer.metrics()

print(metrics)

 

運行效果

 

 

 

通過assign、subscribe兩者之一為消費者設置消費的主題

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'],

                         auto_offset_reset='latest',

                         enable_auto_commit=True, # 自動提交消費數據的offset

                         consumer_timeout_ms= 10000, # 如果1秒內kafka中沒有可供消費的數據,自動退出

                         value_deserializer=lambda m: json.loads(m.decode('ascii')), #消費json 格式的消息

                         client_id='consumer-python3'

                         )

 

 

# consumer.assign([TopicPartition('MY_TOPIC1', 0)])

# msg = next(consumer)

# print(msg)

 

consumer.subscribe('MY_TOPIC1')

for msg in consumer:

    print (msg)

 

 

API及常用參數說明:

class kafka.KafkaConsumer(*topics, **configs)

*topics (str) – 可選,設置需要訂閱的topic,如果未設置,需要在消費記錄前調用subscribe或者assign。

 

client_id (str) – 客戶端名稱,預設值: ‘kafka-python-{version}’

 

group_id (str or None) – 消費組名稱。如果為None,則通過group coordinator auto-partition分區分配,offset提交被禁用。預設為None

 

auto_offset_reset (str) – 重置offset策略: 'earliest'將移動到最老的可用消息, 'latest'將移動到最近消息。 設置為其它任何值將拋出異常。預設值:'latest'。

 

enable_auto_commit (bool) –  如果為True,將自動定時提交消費者offset。預設為True。

 

auto_commit_interval_ms (int) – 自動提交offset之間的間隔毫秒數。如果enable_auto_commit 為true,預設值為: 5000。

 

value_deserializer(可調用對象) - 攜帶原始消息value並返回反序列化後的value

 

subscribe(topics=(), pattern=None, listener=None)

訂閱需要的主題

topics (list) – 需要訂閱的主題列表

pattern (str) – 用於匹配可用主題的模式,即正則表達式。註意:必須提供topics、pattern兩者參數之一,但不能同時提供兩者。

 

metrics(raw=False)

獲取消費者性能指標。

 

參考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

 

客戶端

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

from kafka.client import KafkaClient

 

client = KafkaClient(bootstrap_servers=['127.0.0.1:9092'], request_timeout_ms=3000)

 

# 獲取所有broker

brokers = client.cluster.brokers()

for broker in brokers:

    print('broker: ', broker)

    print('broker nodeId: ', broker.nodeId)

 

# 獲取主題的所有分區

topic = 'MY_TOPIC1'

partitions = client.cluster.available_partitions_for_topic(topic)

print(partitions)

 

partition_dict = {}

partition_dict[topic] = [partition for partition in partitions]

print(partition_dict)

 

 

運行結果:

broker:  BrokerMetadata(nodeId=0, host='127.0.0.1', port=9092, rack=None)

broker nodeId:  0

{0}

{'MY_TOPIC1': [0]}

 

API及常用參數說明:

class kafka.client.KafkaClient(**configs)

bootstrap_servers –'host[:port]'字元串,或者由'host[:port]'組成的字元串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單台伺服器)地址,預設值為 localhost, port預設值為9092,這裡可以不用填寫所有broker的host和port,但必須保證至少有一個broker)

 

client_id (str) – 客戶端名稱,預設值: ‘kafka-python-{version}’

 

request_timeout_ms (int) – 客戶端請求超時時間,單位毫秒。預設值: 30000.

 

參考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html

 

brokers()

獲取所有broker元數據

 

available_partitions_for_topic(topic)

返回主題的所有分區

 

 

參考API: https://kafka-python.readthedocs.io/en/master/apidoc/ClusterMetadata.html

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 當都為正數時,即1+2+3+...+99,如上,很簡單; 其實,計算正負相間的式子也很簡單,只需要加上一個標記正負號的變數乘到計數器上即可。 用一個布爾型變數來記錄執行加法還是減法,也能達到同樣的效果(這裡額外增加一個要求,就是剔除某個數後,保持正負相間的累加) 這樣,得到的就是1-2+3-4... ...
  • 今日不寫日感,直接扔上今日興趣點: 新研究稱火星曾經有一個巨大的地下水系統 鏈接:https://mbd.baidu.com/newspage/data/landingsuper?context=%7B"nid"%3A"news_6959868648919860397"%7D&n_type=0&p_ ...
  • 1、下載JDK "官網" 打開後,直接下載最新版本。 選擇dmg文件下載 2、開始安裝,一直下一步。 3、打開終端,查詢安裝路徑: ,複製備用。 4、配置Java的環境變數 1)打開終端,到主目錄 2)查看是否有.bash_profile文件 3)如果沒有新建一個 4)有了這個文件以後,進行編輯 , ...
  • 基於Python結合pykafka實現kafka生產及消費速率&主題分區偏移實時監控 By: 授客 QQ:1033553122 1.測試環境 python 3.4 zookeeper-3.4.13.tar.gz 下載地址1: http://zookeeper.apache.org/releases. ...
  • 我的其他隨筆: 來一局緊張刺激的吃雞——淺談裝飾者模式 一起去開心的購物吧——淺談觀察者模式 記一場精彩的籃球比賽——淺談策略模式 大家好,前幾日連夜更了幾篇Java設計模式的小隨筆,從觀看量來說,我還是很高興的,有很多的朋友通過看了博文,也許接觸了新的知識,也許理解了自己之前沒弄懂的東西,也許只是 ...
  • 最近偶然到博客園看了一下,距離上次的博客已經過去很多天了,閱讀量卻少得可憐,對於博客園小白來說感覺不是很友好(主要是心理不平衡),而且有些博客被其他網站不帶出處的裝載了,它的閱讀量卻很多。於是靈光一閃,決定寫個程式增加一下閱讀量。(僅用於學術交流,實際上我就試了一下,沒有真正刷過) 一、原理 一般來 ...
  • 先看一段java代碼,func返回值為int: 正確的返回結果是,func返回1。 原因:如果finally中沒有return語句,但是改變了要返回的值,這裡有點類似與引用傳遞和值傳遞的區別,分以下兩種情況,: 1)如果return的數據是基本數據類型或文本字元串,則在finally中對該基本數據的 ...
  • 準備 安裝 "vscode" ,可直接下載deb包進行安裝,完成後安裝C/C++ for Visual Studio Code插件,安裝後重啟(最新1.3版本以後不需要重啟)。 生成目錄和文件 新建文件夾【test】,並新建文件helloworld.cpp文件,文件中內容如下, include in ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...