實踐環境 Python 3.6.2 confluent-kafka 2.2.0 confluent-kafka簡介 Confluent在GitHub上開發和維護的confluent-kafka-python,Apache Kafka®的一個python客戶端,提供了一個與所有brokers>=v0. ...
實踐環境
Python 3.6.2
confluent-kafka 2.2.0
confluent-kafka簡介
Confluent在GitHub上開發和維護的confluent-kafka-python,Apache Kafka®的一個python客戶端,提供了一個與所有brokers>=v0.8
的kafka 、Confluent Cloud和Confluent Platform相容的高階級生產者、消費者和AdminClient。
confluent-kafka安裝
pip install confluent-kafka
代碼實踐
Kafka生產者
from confluent_kafka import Producer
import socket
def acked(err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % msg.value(), str(err))
else:
print("Message produced: %s" % msg.value())
if __name__ == '__main__':
topic_name = 'FREE_TOPIC_FOR_TEST'
### 初始化Producer (針對本地運行的Kafka,即不在Confluent雲平臺上運行的Kafka)
conf = {'bootstrap.servers': '100.81.xxx.xx:9092,100.81.xxx.xx:9092',
'client.id': socket.gethostname()}
producer = Producer(conf)
### 非同步寫 kafka
# 給kafka發送消息--非同步請求
producer.produce(topic_name, key="key", value="value")
# 添加回調函數
producer.produce(topic_name, value="test value", callback=acked)
# 最多等待事件1秒鐘。等待期間,如果消息被確認,即成功寫入kafka中,將調用回調 callback指定方法 acked
producer.poll(1)
### 同步寫kafka
producer.produce(topic_name, key="key", value="new msg")
producer.flush()
說明:
produce方法
producer.produce(topic, key="key", value="value", callback=None) # 給kafka發送消息
topic
kafka主題,如果主題不存在,則將自動創建key
可選value
需要發送的消息,可以為None
callback
回調函數。
product調用為非同步請求,所以調用後立即完成,且不會返回值。如果由於librdkafka的本地生產隊列已滿而導致消息無法入隊,則會引發KafkaException
。
如果要接收發送是否成功或失敗的通知,可以傳遞callback
參數,該參數值可以是任何可調用的,例如lambda、函數、綁定方法或可調用對象。儘管produce()
方法會立即將消息加入隊列以進行批處理、壓縮並傳輸到代理,但在調用poll()
之前,不會傳播任何傳遞通知事件。
flush方法
flush()
方法用於同步寫kafka。這通常是個壞主意,因為它有效地將吞吐量限制在broker往返時間內,但在某些情況下可能是合理的。
通常,應該在關閉生產者之前調用flush(),以確保所有未完成的/排隊的/in-flight的消息都被傳遞。
Kafka消費者
import time
from confluent_kafka import Consumer
from confluent_kafka import KafkaException, KafkaError
running = True
def msg_process(msg):
value = msg.value()
if value:
value = value.decode('utf-8') # 假設消息可採用 utf-8解碼
return {
'topic': msg.topic(),
'partition': msg.partition(),
'offset': msg.offset(),
'value': value
}
def consume_loop(consumer, topics):
global running
try:
consumer.subscribe(topics) # 訂閱主題
while running:
msg = consumer.poll(timeout=10.0)
if msg is None:
time.sleep(0.1)
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
res = msg_process(msg)
try:
result = '{' + '"topic": "{topic}", "partition": {partition}, "offset": {offset}, "value": {value}'.format(**res) + '}\n'
except Exception:
result = '{' + '"topic": "{topic}", "partition": {partition}, "offset": {offset}, "value": "{value}"'.format(**res) + '}\n'
print(result)
finally:
# 關閉消費者以提交最後的偏移量
consumer.close()
if __name__ == '__main__':
topic_name = 'FREE_TOPIC_FOR_TEST_1'
# 初始化消費者
conf = {'bootstrap.servers': '100.81.xxx.xx:9092,100.81.xxx.xx:9092',
'group.id': 'custom_consumer',
'enable.auto.commit': 'true',
'auto.offset.reset': 'smallest',
}
consumer = Consumer(conf)
consume_loop(consumer, [topic_name]) # 可以指定多個主題
說明:
初始化消費者配置字典說明
conf = {'bootstrap.servers': 'host1:9092,host2:9092',
'group.id': 'foo',
'enable.auto.commit': 'false',
'auto.offset.reset': 'smallest'}
說明:
-
group.id
屬性是必需的,設置當前消費者歸屬的消費組,可以是事先不存在的消費組。 -
auto.offset.reset
屬性指定針對當前消費組,在分區沒有提交偏移量或提交偏移量無效(可能是由於日誌截斷)的情況下,消費者應該從哪個偏移量開始讀取。可選值: -
'smallest'
如果針對當前消費組,分區未提交offset
,則從頭開始消費,否則從已提交的offset
開始消費(即讀取上次提交offset
之後生產的數據)。 -
'largest'
如果針對當前消費組,分區未提交offset
,則讀取新生產的數據(在啟動該消費者之後才生產的數據),不會讀取之前的數據,否則從已提交的offset
開始消費,同smallest
-
'earliest'
同'smallest'
-
'latest'
同'largest'
kafka-0.10.1.X 版本之前:auto.offset.reset
的值為smallest
和largest
(offest
保存在zk中)
kafka-0.10.1.X版本之後:auto.offset.reset
的值更改為 earliest
, latest
(offest
保存在kafka的一個特殊的topic名為:__consumer_offsets
裡面)
enable.auto.commit
設置是否允許自動提交偏移量,預設為'true'
,即允許。
一個典型的Kafka消費者應用程式以迴圈消費為中心,該迴圈重覆調用poll
方法來逐條檢索消費者在後臺高效預取的記錄。例中poll
超時被硬編碼為1秒。如果在此期間沒有收到任何記錄,則Consumer.poll()
將返回一個空記錄集。
註意,在使用完Consumer
之後,應該始終調用Consumer.close()
,以確保活動套接字處於關閉狀態,並清理內部狀態。此外,還將立即觸發組再均衡(group rebalance),以確保消費者擁有的任何分區都被重新分配給組中的另一個成員。如果未正確關閉,broker將僅在會話超時到期後才觸發再均衡。
同步提交
手動提交偏移量的最簡單、最可靠的方法是為Consumer.commit()
調用設置asynchronous
參數,與此同時設置構建消費者對象參數配置'enable.auto.commit'
為'false'
。
MIN_COMMIT_COUNT = 10
def consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
msg_count = 0
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
msg_process(msg)
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0:
consumer.commit(asynchronous=False)
finally:
# 關閉消費者以提交最後的偏移量
consumer.close()
在本例中,每消費MIN_COMMIT_COUNT
消息都會觸發一次同步提交。asynchronous
標誌控制此調用是否為非同步調用,預設為False
,即同步 。您還可以在超時到期時觸發提交,以確保定期更新提交的位置。
消息投遞保證
在前面的示例中,由於提交在消息處理之後,所以獲得了“至少一次(at least once
)”投遞。然而,通過更改提交偏移和處理消息的順序,可獲得“最多一次(at most once
)”投遞,但必須小心提交失敗。
說明:
- 最多一次(
at most once
):消息可能丟失也可能被處理,但最多只會處理一次。因為當提交offset
後,處理消息過程中出錯導致消息處理失敗,或者消費者down掉,導致消息不被處理。 - 至少一次(
at least once
):消息不會丟失,但可能被處理多次。先獲取消息,然後處理消息,最後提交offset
,提交offset
時,可能會因為網路超時,消費者down掉等,導致提交偏移量失敗的情況,所以,會導致重覆消費消息的情況,進而導致多次處理消息。
def consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
consumer.commit(asynchronous=False)
msg_process(msg)
finally:
# 關閉消費者以提交最後的偏移量
consumer.close()
簡單起見,在本例中,在處理消息之前使用Consumer.commit()
。在實踐中,對每條消息都進行提交會產生大量開銷。更好的方法是收集一批消息,執行同步提交,然後只有在提交成功的情況下才處理消息。
非同步提交
def consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
msg_count = 0
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
msg_process(msg)
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0:
consumer.commit(asynchronous=True)
finally:
consumer.close()
本例在前面示例的基礎上,將commit()
的asynchronous
參數改成True
,消費者將使用非同步提交發送請求並立即返回
API提供了一個callback,當提交成功或失敗時會調用該callback。 commit callback回調可以是任何可調用的,並且可以作為配置參數傳遞給消費者構造函數。
from confluent_kafka import Consumer
def commit_completed(err, partitions):
if err:
print(str(err))
else:
print("Committed partition offsets: " + str(partitions))
conf = {'bootstrap.servers': "host1:9092,host2:9092",
'group.id': "foo",
'auto.offset.reset': 'smallest',
'on_commit': commit_completed}
consumer = Consumer(conf)
參考連接
https://docs.confluent.io/kafka-clients/python/current/overview.html#initialization
https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html
作者:授客
微信/QQ:1033553122
全國軟體測試QQ交流群:7156436
Git地址:https://gitee.com/ishouke
友情提示:限於時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額隨意,您的支持將是我繼續創作的源動力,打賞後如有任何疑問,請聯繫我!!!
微信打賞
支付寶打賞 全國軟體測試交流QQ群