Python 使用python-kafka類庫開發kafka生產者&消費者&客戶端

来源:https://www.cnblogs.com/shouke/archive/2019/03/02/10463377.html
-Advertisement-
Play Games

使用python-kafka類庫開發kafka生產者&消費者&客戶端 By: 授客 QQ:1033553122 1.測試環境 python 3.4 zookeeper-3.4.13.tar.gz 下載地址1: http://zookeeper.apache.org/releases.html#dow ...


使用python-kafka類庫開發kafka生產者&消費者&客戶端

  By: 授客 QQ:1033553122

 

 

 

1.測試環境

python 3.4

 

zookeeper-3.4.13.tar.gz

下載地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

下載地址2:

https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ

 

kafka_2.12-2.1.0.tgz

下載地址1:

http://kafka.apache.org/downloads.html

下載地址2:

https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw

 

pip-18.1.tar.gz

下載地址:https://pan.baidu.com/s/1VpYk8JvMuztzbvEF8mQoRw

說明:實踐中發現,pip版本比較舊的話,沒法安裝whl文件

 

kafka_python-1.4.4-py2.py3-none-any.whl

下載地址1:

https://pypi.org/project/kafka-python/#files

https://files.pythonhosted.org/packages/5f/89/f13d9b1f32cc37168788215a7ad1e4c133915f6853660a447660393b577d/kafka_python-1.4.4-py2.py3-none-any.whl

 

下載地址2:

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

 

 

python_snappy-0.5.3-cp34-cp34m-win_amd64.whl

下載地址1:

https://www.lfd.uci.edu/~gohlke/pythonlibs/

 

下載地址2:

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

 

 

說明:

kafka-python支持gzip壓縮/解壓縮。如果要消費lz4方式壓縮的消息,則需要安裝python-lz4,如果要支持snappy方式壓縮/解壓縮則需要安裝,否則可能會報錯:kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found.

構建生產者對象時,可通過compression_type 參數指定由對應生產者生產的消息數據的壓縮方式,或者在producer.properties配置中配置compression.type參數。

 

參考鏈接:

https://pypi.org/project/kafka-python/#description

https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install

 

2.代碼實踐

生產者

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

 

from kafka import KafkaProducer

import json

 

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

 

 

for i in range(0, 100):

    producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)

 

# Block直到單條消息發送完或者超時

future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')

result = future.get(timeout=60)

print(result)

 

# Block直到所有阻塞的消息發送到網路

# 註意: 該操作不保證傳輸或者消息發送成功,僅在配置了linger_ms的情況下有用。(It is really only useful if you configure internal batching using linger_ms

 

 

# 序列化json數據

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

producer.send('MY_TOPIC1', {'shouke':'kafka'})

 

# 序列化字元串key

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)

producer.send('MY_TOPIC1', b'shouke', key='strKey')

 

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')

for i in range(2):

    producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))

 

# 消息記錄攜帶header

producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])

 

# 獲取性能數據(註意,實踐發現分區較多的情況下,該操作比較耗時

metrics = producer.metrics()

print(metrics)

 

producer.flush()

 

實踐中遇到錯誤: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解決方案如下:

進入到配置目錄(config),編輯server.properties文件,

查找並設置listener,配置監聽埠,格式:listeners = listener_name://host_name:port,供kafka客戶端連接用的ip和埠,例中配置如下:

listeners=PLAINTEXT://127.0.0.1:9092

 

 

API及常用參數說明:

class kafka.KafkaProducer(**configs)

bootstrap_servers –'host[:port]'字元串,或者由'host[:port]'組成的字元串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單台伺服器)地址,預設值為 localhost, port預設值為9092,這裡可以不用填寫所有broker的host和port,但必須保證至少有一個broker)

 

key_serializer (可調用對象) –用於轉換用戶提供的key值為位元組,必須返回位元組數據。 如果為None,則等同調用f(key)。 預設值: None.

 

value_serializer(可調用對象) – 用於轉換用戶提供的value消息值為位元組,必須返回位元組數據。 如果為None,則等同調用f(value)。 預設值: None.

 

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

topic(str) – 設置消息將要發佈到的主題,即消息所屬主題

 

value(可選) – 消息內容,必須為位元組數據,或者通過value_serializer序列化後的位元組數據。如果為None,則key必填,消息等同於“刪除”。( If value is None, key is required and message acts as a ‘delete’)

 

partition (int, 可選) – 指定分區。如果未設置,則使用配置的partitioner

 

key (可選) – 和消息對應的key,可用於決定消息發送到哪個分區。如果平partition為None,則相同key的消息會被髮布到相同分區(但是如果key為None,則隨機選取分區)(If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly)). 必須為位元組數據或者通過配置的key_serializer序列化後的位元組數據.

 

headers (可選) – 設置消息header,header-value鍵值對錶示的list。list項為元組:格式 (str_header,bytes_value)

 

timestamp_ms (int, 可選) –毫秒數 (從1970 1月1日 UTC算起) ,作為消息時間戳。預設為當前時間

 

函數返回FutureRecordMetadata類型的RecordMetadata數據

 

flush(timeout=None)

發送所有可以立即獲取的緩衝消息(即時linger_ms大於0),線程block直到這些記錄發送完成。當一個線程等待flush調用完成而block時,其它線程可以繼續發送消息。

 

註意:flush調用不保證記錄發送成功

 

metrics(raw=False)

獲取生產者性能指標。

 

參考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

 

註:生產者代碼是線程安全的,支持多線程,而消費者則不然

 

消費者

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

from kafka import KafkaConsumer

from kafka import TopicPartition

import json

 


consumer = KafkaConsumer('MY_TOPIC1',
                         bootstrap_servers=['127.0.0.1:9092'],
                         #auto_offset_reset='',
                         auto_offset_reset='latest',# 消費kafka中最近的數據,如果設置為earliest則消費最早的數據,不管這些數據是否消費
                         enable_auto_commit=True, # 自動提交消費者的offset
                         auto_commit_interval_ms=3000, ## 自動提交消費者offset的時間間隔
                         group_id='MY_GROUP1',

                         consumer_timeout_ms= 10000, # 如果10秒內kafka中沒有可供消費的數據,自動退出
                         client_id='consumer-python3'
                         )

 

for msg in consumer:

    print (msg)

    print('topic: ', msg.topic)

    print('partition: ', msg.partition)

    print('key: ', msg.key, 'value: ', msg.value)

    print('offset:', msg.offset)

    print('headers:', msg.headers)

 

# Get consumer metrics

metrics = consumer.metrics()

print(metrics)

 

運行效果

 

 

 

通過assign、subscribe兩者之一為消費者設置消費的主題

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'],

                         auto_offset_reset='latest',

                         enable_auto_commit=True, # 自動提交消費數據的offset

                         consumer_timeout_ms= 10000, # 如果1秒內kafka中沒有可供消費的數據,自動退出

                         value_deserializer=lambda m: json.loads(m.decode('ascii')), #消費json 格式的消息

                         client_id='consumer-python3'

                         )

 

 

# consumer.assign([TopicPartition('MY_TOPIC1', 0)])

# msg = next(consumer)

# print(msg)

 

consumer.subscribe('MY_TOPIC1')

for msg in consumer:

    print (msg)

 

 

API及常用參數說明:

class kafka.KafkaConsumer(*topics, **configs)

*topics (str) – 可選,設置需要訂閱的topic,如果未設置,需要在消費記錄前調用subscribe或者assign。

 

client_id (str) – 客戶端名稱,預設值: ‘kafka-python-{version}’

 

group_id (str or None) – 消費組名稱。如果為None,則通過group coordinator auto-partition分區分配,offset提交被禁用。預設為None

 

auto_offset_reset (str) – 重置offset策略: 'earliest'將移動到最老的可用消息, 'latest'將移動到最近消息。 設置為其它任何值將拋出異常。預設值:'latest'。

 

enable_auto_commit (bool) –  如果為True,將自動定時提交消費者offset。預設為True。

 

auto_commit_interval_ms (int) – 自動提交offset之間的間隔毫秒數。如果enable_auto_commit 為true,預設值為: 5000。

 

value_deserializer(可調用對象) - 攜帶原始消息value並返回反序列化後的value

 

subscribe(topics=(), pattern=None, listener=None)

訂閱需要的主題

topics (list) – 需要訂閱的主題列表

pattern (str) – 用於匹配可用主題的模式,即正則表達式。註意:必須提供topics、pattern兩者參數之一,但不能同時提供兩者。

 

metrics(raw=False)

獲取消費者性能指標。

 

參考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

 

客戶端

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

from kafka.client import KafkaClient

 

client = KafkaClient(bootstrap_servers=['127.0.0.1:9092'], request_timeout_ms=3000)

 

# 獲取所有broker

brokers = client.cluster.brokers()

for broker in brokers:

    print('broker: ', broker)

    print('broker nodeId: ', broker.nodeId)

 

# 獲取主題的所有分區

topic = 'MY_TOPIC1'

partitions = client.cluster.available_partitions_for_topic(topic)

print(partitions)

 

partition_dict = {}

partition_dict[topic] = [partition for partition in partitions]

print(partition_dict)

 

 

運行結果:

broker:  BrokerMetadata(nodeId=0, host='127.0.0.1', port=9092, rack=None)

broker nodeId:  0

{0}

{'MY_TOPIC1': [0]}

 

API及常用參數說明:

class kafka.client.KafkaClient(**configs)

bootstrap_servers –'host[:port]'字元串,或者由'host[:port]'組成的字元串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單台伺服器)地址,預設值為 localhost, port預設值為9092,這裡可以不用填寫所有broker的host和port,但必須保證至少有一個broker)

 

client_id (str) – 客戶端名稱,預設值: ‘kafka-python-{version}’

 

request_timeout_ms (int) – 客戶端請求超時時間,單位毫秒。預設值: 30000.

 

參考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html

 

brokers()

獲取所有broker元數據

 

available_partitions_for_topic(topic)

返回主題的所有分區

 

 

參考API: https://kafka-python.readthedocs.io/en/master/apidoc/ClusterMetadata.html

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 當都為正數時,即1+2+3+...+99,如上,很簡單; 其實,計算正負相間的式子也很簡單,只需要加上一個標記正負號的變數乘到計數器上即可。 用一個布爾型變數來記錄執行加法還是減法,也能達到同樣的效果(這裡額外增加一個要求,就是剔除某個數後,保持正負相間的累加) 這樣,得到的就是1-2+3-4... ...
  • 今日不寫日感,直接扔上今日興趣點: 新研究稱火星曾經有一個巨大的地下水系統 鏈接:https://mbd.baidu.com/newspage/data/landingsuper?context=%7B"nid"%3A"news_6959868648919860397"%7D&n_type=0&p_ ...
  • 1、下載JDK "官網" 打開後,直接下載最新版本。 選擇dmg文件下載 2、開始安裝,一直下一步。 3、打開終端,查詢安裝路徑: ,複製備用。 4、配置Java的環境變數 1)打開終端,到主目錄 2)查看是否有.bash_profile文件 3)如果沒有新建一個 4)有了這個文件以後,進行編輯 , ...
  • 基於Python結合pykafka實現kafka生產及消費速率&主題分區偏移實時監控 By: 授客 QQ:1033553122 1.測試環境 python 3.4 zookeeper-3.4.13.tar.gz 下載地址1: http://zookeeper.apache.org/releases. ...
  • 我的其他隨筆: 來一局緊張刺激的吃雞——淺談裝飾者模式 一起去開心的購物吧——淺談觀察者模式 記一場精彩的籃球比賽——淺談策略模式 大家好,前幾日連夜更了幾篇Java設計模式的小隨筆,從觀看量來說,我還是很高興的,有很多的朋友通過看了博文,也許接觸了新的知識,也許理解了自己之前沒弄懂的東西,也許只是 ...
  • 最近偶然到博客園看了一下,距離上次的博客已經過去很多天了,閱讀量卻少得可憐,對於博客園小白來說感覺不是很友好(主要是心理不平衡),而且有些博客被其他網站不帶出處的裝載了,它的閱讀量卻很多。於是靈光一閃,決定寫個程式增加一下閱讀量。(僅用於學術交流,實際上我就試了一下,沒有真正刷過) 一、原理 一般來 ...
  • 先看一段java代碼,func返回值為int: 正確的返回結果是,func返回1。 原因:如果finally中沒有return語句,但是改變了要返回的值,這裡有點類似與引用傳遞和值傳遞的區別,分以下兩種情況,: 1)如果return的數據是基本數據類型或文本字元串,則在finally中對該基本數據的 ...
  • 準備 安裝 "vscode" ,可直接下載deb包進行安裝,完成後安裝C/C++ for Visual Studio Code插件,安裝後重啟(最新1.3版本以後不需要重啟)。 生成目錄和文件 新建文件夾【test】,並新建文件helloworld.cpp文件,文件中內容如下, include in ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...