使用python-kafka類庫開發kafka生產者&消費者&客戶端 By: 授客 QQ:1033553122 1.測試環境 python 3.4 zookeeper-3.4.13.tar.gz 下載地址1: http://zookeeper.apache.org/releases.html#dow ...
使用python-kafka類庫開發kafka生產者&消費者&客戶端
By: 授客 QQ:1033553122
1.測試環境
python 3.4
zookeeper-3.4.13.tar.gz
下載地址1:
http://zookeeper.apache.org/releases.html#download
https://www.apache.org/dyn/closer.cgi/zookeeper/
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
下載地址2:
https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ
kafka_2.12-2.1.0.tgz
下載地址1:
http://kafka.apache.org/downloads.html
下載地址2:
https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw
pip-18.1.tar.gz
下載地址:https://pan.baidu.com/s/1VpYk8JvMuztzbvEF8mQoRw
說明:實踐中發現,pip版本比較舊的話,沒法安裝whl文件
kafka_python-1.4.4-py2.py3-none-any.whl
下載地址1:
https://pypi.org/project/kafka-python/#files
下載地址2:
https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg
python_snappy-0.5.3-cp34-cp34m-win_amd64.whl
下載地址1:
https://www.lfd.uci.edu/~gohlke/pythonlibs/
下載地址2:
https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg
說明:
kafka-python支持gzip壓縮/解壓縮。如果要消費lz4方式壓縮的消息,則需要安裝python-lz4,如果要支持snappy方式壓縮/解壓縮則需要安裝,否則可能會報錯:kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found.
構建生產者對象時,可通過compression_type 參數指定由對應生產者生產的消息數據的壓縮方式,或者在producer.properties配置中配置compression.type參數。
參考鏈接:
https://pypi.org/project/kafka-python/#description
https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install
2.代碼實踐
生產者
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for i in range(0, 100):
producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)
# Block直到單條消息發送完或者超時
future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')
result = future.get(timeout=60)
print(result)
# Block直到所有阻塞的消息發送到網路
# 註意: 該操作不保證傳輸或者消息發送成功,僅在配置了linger_ms的情況下有用。(It is really only useful if you configure internal batching using linger_ms
# 序列化json數據
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('MY_TOPIC1', {'shouke':'kafka'})
# 序列化字元串key
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)
producer.send('MY_TOPIC1', b'shouke', key='strKey')
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')
for i in range(2):
producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))
# 消息記錄攜帶header
producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])
# 獲取性能數據(註意,實踐發現分區較多的情況下,該操作比較耗時
metrics = producer.metrics()
print(metrics)
producer.flush()
實踐中遇到錯誤: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解決方案如下:
進入到配置目錄(config),編輯server.properties文件,
查找並設置listener,配置監聽埠,格式:listeners = listener_name://host_name:port,供kafka客戶端連接用的ip和埠,例中配置如下:
listeners=PLAINTEXT://127.0.0.1:9092
API及常用參數說明:
class kafka.KafkaProducer(**configs)
bootstrap_servers –'host[:port]'字元串,或者由'host[:port]'組成的字元串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單台伺服器)地址,預設值為 localhost, port預設值為9092,這裡可以不用填寫所有broker的host和port,但必須保證至少有一個broker)
key_serializer (可調用對象) –用於轉換用戶提供的key值為位元組,必須返回位元組數據。 如果為None,則等同調用f(key)。 預設值: None.
value_serializer(可調用對象) – 用於轉換用戶提供的value消息值為位元組,必須返回位元組數據。 如果為None,則等同調用f(value)。 預設值: None.
send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)
topic(str) – 設置消息將要發佈到的主題,即消息所屬主題
value(可選) – 消息內容,必須為位元組數據,或者通過value_serializer序列化後的位元組數據。如果為None,則key必填,消息等同於“刪除”。( If value is None, key is required and message acts as a ‘delete’)
partition (int, 可選) – 指定分區。如果未設置,則使用配置的partitioner
key (可選) – 和消息對應的key,可用於決定消息發送到哪個分區。如果平partition為None,則相同key的消息會被髮布到相同分區(但是如果key為None,則隨機選取分區)(If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly)). 必須為位元組數據或者通過配置的key_serializer序列化後的位元組數據.
headers (可選) – 設置消息header,header-value鍵值對錶示的list。list項為元組:格式 (str_header,bytes_value)
timestamp_ms (int, 可選) –毫秒數 (從1970 1月1日 UTC算起) ,作為消息時間戳。預設為當前時間
函數返回FutureRecordMetadata類型的RecordMetadata數據
flush(timeout=None)
發送所有可以立即獲取的緩衝消息(即時linger_ms大於0),線程block直到這些記錄發送完成。當一個線程等待flush調用完成而block時,其它線程可以繼續發送消息。
註意:flush調用不保證記錄發送成功
metrics(raw=False)
獲取生產者性能指標。
參考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
註:生產者代碼是線程安全的,支持多線程,而消費者則不然
消費者
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
from kafka import KafkaConsumer
from kafka import TopicPartition
import json
consumer = KafkaConsumer('MY_TOPIC1',
bootstrap_servers=['127.0.0.1:9092'],
#auto_offset_reset='',
auto_offset_reset='latest',# 消費kafka中最近的數據,如果設置為earliest則消費最早的數據,不管這些數據是否消費
enable_auto_commit=True, # 自動提交消費者的offset
auto_commit_interval_ms=3000, ## 自動提交消費者offset的時間間隔
group_id='MY_GROUP1',
consumer_timeout_ms= 10000, # 如果10秒內kafka中沒有可供消費的數據,自動退出
client_id='consumer-python3'
)
for msg in consumer:
print (msg)
print('topic: ', msg.topic)
print('partition: ', msg.partition)
print('key: ', msg.key, 'value: ', msg.value)
print('offset:', msg.offset)
print('headers:', msg.headers)
# Get consumer metrics
metrics = consumer.metrics()
print(metrics)
運行效果
通過assign、subscribe兩者之一為消費者設置消費的主題
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'],
auto_offset_reset='latest',
enable_auto_commit=True, # 自動提交消費數據的offset
consumer_timeout_ms= 10000, # 如果1秒內kafka中沒有可供消費的數據,自動退出
value_deserializer=lambda m: json.loads(m.decode('ascii')), #消費json 格式的消息
client_id='consumer-python3'
)
# consumer.assign([TopicPartition('MY_TOPIC1', 0)])
# msg = next(consumer)
# print(msg)
consumer.subscribe('MY_TOPIC1')
for msg in consumer:
print (msg)
API及常用參數說明:
class kafka.KafkaConsumer(*topics, **configs)
*topics (str) – 可選,設置需要訂閱的topic,如果未設置,需要在消費記錄前調用subscribe或者assign。
client_id (str) – 客戶端名稱,預設值: ‘kafka-python-{version}’
group_id (str or None) – 消費組名稱。如果為None,則通過group coordinator auto-partition分區分配,offset提交被禁用。預設為None
auto_offset_reset (str) – 重置offset策略: 'earliest'將移動到最老的可用消息, 'latest'將移動到最近消息。 設置為其它任何值將拋出異常。預設值:'latest'。
enable_auto_commit (bool) – 如果為True,將自動定時提交消費者offset。預設為True。
auto_commit_interval_ms (int) – 自動提交offset之間的間隔毫秒數。如果enable_auto_commit 為true,預設值為: 5000。
value_deserializer(可調用對象) - 攜帶原始消息value並返回反序列化後的value
subscribe(topics=(), pattern=None, listener=None)
訂閱需要的主題
topics (list) – 需要訂閱的主題列表
pattern (str) – 用於匹配可用主題的模式,即正則表達式。註意:必須提供topics、pattern兩者參數之一,但不能同時提供兩者。
metrics(raw=False)
獲取消費者性能指標。
參考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
客戶端
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
from kafka.client import KafkaClient
client = KafkaClient(bootstrap_servers=['127.0.0.1:9092'], request_timeout_ms=3000)
# 獲取所有broker
brokers = client.cluster.brokers()
for broker in brokers:
print('broker: ', broker)
print('broker nodeId: ', broker.nodeId)
# 獲取主題的所有分區
topic = 'MY_TOPIC1'
partitions = client.cluster.available_partitions_for_topic(topic)
print(partitions)
partition_dict = {}
partition_dict[topic] = [partition for partition in partitions]
print(partition_dict)
運行結果:
broker: BrokerMetadata(nodeId=0, host='127.0.0.1', port=9092, rack=None)
broker nodeId: 0
{0}
{'MY_TOPIC1': [0]}
API及常用參數說明:
class kafka.client.KafkaClient(**configs)
bootstrap_servers –'host[:port]'字元串,或者由'host[:port]'組成的字元串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單台伺服器)地址,預設值為 localhost, port預設值為9092,這裡可以不用填寫所有broker的host和port,但必須保證至少有一個broker)
client_id (str) – 客戶端名稱,預設值: ‘kafka-python-{version}’
request_timeout_ms (int) – 客戶端請求超時時間,單位毫秒。預設值: 30000.
參考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html
brokers()
獲取所有broker元數據
available_partitions_for_topic(topic)
返回主題的所有分區
參考API: https://kafka-python.readthedocs.io/en/master/apidoc/ClusterMetadata.html