Python Kafka客戶端confluent-kafka學習總結

来源:https://www.cnblogs.com/shouke/archive/2023/11/06/17814004.html
-Advertisement-
Play Games

實踐環境 Python 3.6.2 confluent-kafka 2.2.0 confluent-kafka簡介 Confluent在GitHub上開發和維護的confluent-kafka-python,Apache Kafka®的一個python客戶端,提供了一個與所有brokers>=v0. ...


實踐環境

Python 3.6.2

confluent-kafka 2.2.0

confluent-kafka簡介

Confluent在GitHub上開發和維護的confluent-kafka-python,Apache Kafka®的一個python客戶端,提供了一個與所有brokers>=v0.8的kafka 、Confluent Cloud和Confluent Platform相容的高階級生產者、消費者和AdminClient。

confluent-kafka安裝

pip install confluent-kafka

代碼實踐

Kafka生產者

from confluent_kafka import Producer
import socket


def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % msg.value(), str(err))
    else:
        print("Message produced: %s" % msg.value())

if __name__ == '__main__':
    topic_name = 'FREE_TOPIC_FOR_TEST'

    ### 初始化Producer (針對本地運行的Kafka,即不在Confluent雲平臺上運行的Kafka)
    conf = {'bootstrap.servers': '100.81.xxx.xx:9092,100.81.xxx.xx:9092',
            'client.id': socket.gethostname()}

    producer = Producer(conf)

    ### 非同步寫 kafka
    # 給kafka發送消息--非同步請求
    producer.produce(topic_name, key="key", value="value")

    # 添加回調函數
    producer.produce(topic_name, value="test value", callback=acked)
    # 最多等待事件1秒鐘。等待期間,如果消息被確認,即成功寫入kafka中,將調用回調 callback指定方法 acked
    producer.poll(1)


    ### 同步寫kafka
    producer.produce(topic_name, key="key", value="new msg")
    producer.flush()

說明:

produce方法

producer.produce(topic, key="key", value="value", callback=None) # 給kafka發送消息
  • topic kafka主題,如果主題不存在,則將自動創建
  • key 可選
  • value 需要發送的消息,可以為None
  • callback 回調函數。

product調用為非同步請求,所以調用後立即完成,且不會返回值。如果由於librdkafka的本地生產隊列已滿而導致消息無法入隊,則會引發KafkaException

如果要接收發送是否成功或失敗的通知,可以傳遞callback參數,該參數值可以是任何可調用的,例如lambda、函數、綁定方法或可調用對象。儘管produce()方法會立即將消息加入隊列以進行批處理、壓縮並傳輸到代理,但在調用poll()之前,不會傳播任何傳遞通知事件。

flush方法

flush()方法用於同步寫kafka。這通常是個壞主意,因為它有效地將吞吐量限制在broker往返時間內,但在某些情況下可能是合理的。

通常,應該在關閉生產者之前調用flush(),以確保所有未完成的/排隊的/in-flight的消息都被傳遞。

Kafka消費者

import time

from confluent_kafka import Consumer
from confluent_kafka import KafkaException, KafkaError

running = True

def msg_process(msg):
    value = msg.value()
    if value:
        value = value.decode('utf-8') # 假設消息可採用 utf-8解碼
        
    return {
        'topic': msg.topic(),
        'partition': msg.partition(),
        'offset': msg.offset(),
        'value': value
    }
       
def consume_loop(consumer, topics):
    global running
    try:
        consumer.subscribe(topics) # 訂閱主題
        while running:
            msg = consumer.poll(timeout=10.0)
            if msg is None:
                time.sleep(0.1)
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    print('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                res = msg_process(msg)
                try:
                    result = '{' + '"topic": "{topic}", "partition": {partition}, "offset": {offset}, "value": {value}'.format(**res) + '}\n'
                except Exception:
                    result = '{' + '"topic": "{topic}", "partition": {partition}, "offset": {offset}, "value": "{value}"'.format(**res) + '}\n'
                print(result)

    finally:
        # 關閉消費者以提交最後的偏移量
        consumer.close()

if __name__ == '__main__':
    topic_name = 'FREE_TOPIC_FOR_TEST_1'

    # 初始化消費者
    conf = {'bootstrap.servers': '100.81.xxx.xx:9092,100.81.xxx.xx:9092',
            'group.id': 'custom_consumer',
            'enable.auto.commit': 'true',
            'auto.offset.reset': 'smallest',
            }

    consumer = Consumer(conf)
    consume_loop(consumer, [topic_name]) # 可以指定多個主題

說明:

初始化消費者配置字典說明

conf = {'bootstrap.servers': 'host1:9092,host2:9092',
        'group.id': 'foo',
        'enable.auto.commit': 'false',
        'auto.offset.reset': 'smallest'}

說明

  • group.id 屬性是必需的,設置當前消費者歸屬的消費組,可以是事先不存在的消費組。

  • auto.offset.reset 屬性指定針對當前消費組,在分區沒有提交偏移量或提交偏移量無效(可能是由於日誌截斷)的情況下,消費者應該從哪個偏移量開始讀取。可選值:

  • 'smallest' 如果針對當前消費組,分區未提交offset,則從頭開始消費,否則從已提交的offset 開始消費(即讀取上次提交offset之後生產的數據)。

  • 'largest' 如果針對當前消費組,分區未提交offset,則讀取新生產的數據(在啟動該消費者之後才生產的數據),不會讀取之前的數據,否則從已提交的offset 開始消費,同smallest

  • 'earliest''smallest'

  • 'latest''largest'

​ kafka-0.10.1.X 版本之前:auto.offset.reset 的值為smallestlargest (offest保存在zk中)

​ kafka-0.10.1.X版本之後:auto.offset.reset 的值更改為 earliest, latest (offest保存在kafka的一個特殊的topic名為:__consumer_offsets裡面)

  • enable.auto.commit 設置是否允許自動提交偏移量,預設為'true',即允許。

一個典型的Kafka消費者應用程式以迴圈消費為中心,該迴圈重覆調用poll方法來逐條檢索消費者在後臺高效預取的記錄。例中poll超時被硬編碼為1秒。如果在此期間沒有收到任何記錄,則Consumer.poll()將返回一個空記錄集。

註意,在使用完Consumer之後,應該始終調用Consumer.close(),以確保活動套接字處於關閉狀態,並清理內部狀態。此外,還將立即觸發組再均衡(group rebalance),以確保消費者擁有的任何分區都被重新分配給組中的另一個成員。如果未正確關閉,broker將僅在會話超時到期後才觸發再均衡。

同步提交

手動提交偏移量的最簡單、最可靠的方法是為Consumer.commit()調用設置asynchronous參數,與此同時設置構建消費者對象參數配置'enable.auto.commit''false'

MIN_COMMIT_COUNT = 10

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        msg_count = 0
        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    print('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
                msg_count += 1
                if msg_count % MIN_COMMIT_COUNT == 0:
                    consumer.commit(asynchronous=False)
    finally:
        # 關閉消費者以提交最後的偏移量
        consumer.close()

在本例中,每消費MIN_COMMIT_COUNT 消息都會觸發一次同步提交。asynchronous標誌控制此調用是否為非同步調用,預設為False,即同步 。您還可以在超時到期時觸發提交,以確保定期更新提交的位置。

消息投遞保證

在前面的示例中,由於提交在消息處理之後,所以獲得了“至少一次(at least once)”投遞。然而,通過更改提交偏移和處理消息的順序,可獲得“最多一次(at most once)”投遞,但必須小心提交失敗。

說明:

  • 最多一次(at most once):消息可能丟失也可能被處理,但最多只會處理一次。因為當提交offset後,處理消息過程中出錯導致消息處理失敗,或者消費者down掉,導致消息不被處理。
  • 至少一次(at least once):消息不會丟失,但可能被處理多次。先獲取消息,然後處理消息,最後提交offset,提交offset時,可能會因為網路超時,消費者down掉等,導致提交偏移量失敗的情況,所以,會導致重覆消費消息的情況,進而導致多次處理消息。
def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    print('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                consumer.commit(asynchronous=False)
                msg_process(msg)

    finally:
        # 關閉消費者以提交最後的偏移量
        consumer.close()

簡單起見,在本例中,在處理消息之前使用Consumer.commit()。在實踐中,對每條消息都進行提交會產生大量開銷。更好的方法是收集一批消息,執行同步提交,然後只有在提交成功的情況下才處理消息。

非同步提交

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        msg_count = 0
        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    print('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
                msg_count += 1
                if msg_count % MIN_COMMIT_COUNT == 0:
                    consumer.commit(asynchronous=True)
    finally:
        consumer.close()

本例在前面示例的基礎上,將commit()asynchronous 參數改成True,消費者將使用非同步提交發送請求並立即返回

API提供了一個callback,當提交成功或失敗時會調用該callback。 commit callback回調可以是任何可調用的,並且可以作為配置參數傳遞給消費者構造函數。

from confluent_kafka import Consumer

def commit_completed(err, partitions):
    if err:
        print(str(err))
    else:
        print("Committed partition offsets: " + str(partitions))

conf = {'bootstrap.servers': "host1:9092,host2:9092",
        'group.id': "foo",
        'auto.offset.reset': 'smallest',
        'on_commit': commit_completed}

consumer = Consumer(conf)

參考連接

https://docs.confluent.io/kafka-clients/python/current/overview.html#initialization

https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html

作者:授客
微信/QQ:1033553122
全國軟體測試QQ交流群:7156436

Git地址:https://gitee.com/ishouke
友情提示:限於時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額隨意,您的支持將是我繼續創作的源動力,打賞後如有任何疑問,請聯繫我!!!
           微信打賞                        支付寶打賞                  全國軟體測試交流QQ群  
              


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在"zookeeper源碼(03)集群啟動流程"中介紹了leader選舉的入口,本文將詳細分析leader選舉組件和流程。 leader選舉流程(重要) quorumPeer的start階段使用startLeaderElection()方法啟動選舉 LOOKING狀態,投自己一票 createEle ...
  • 準備 我是小C同學編寫得一個java文件,如何實現我的功能呢?需要去JVM(Java Virtual Machine)這個地方旅行。 變身 我高高興興的來到JVM,想要開始JVM之旅,它確說:“現在的我還不能進去,需要做一次轉換,生成class文件才行”。為什麼這樣呢? JVM不能直接載入java文 ...
  • 配置文件yml # phantomjs的位置地址 phantomjs: binPath: windows: binPath-win linux: binPath-linux jsPath: windows: jsPath-win linux: jsPath-linux imagePath: wind ...
  • 定時任務簡介 定時任務是指按照預定的時間間隔或特定時間點自動執行的計劃任務或操作。這些任務通常用於自動化重覆性的工作,以減輕人工操作的負擔,提高效率。在電腦編程和應用程式開發中,定時任務是一種常見的編程模式,用於周期性地執行某些操作、處理數據或觸發事件。 以下是一些關於定時任務的重要概念: 時間間 ...
  • 插值運算是一種數據處理方法,主要用來填補數據之間的空白或缺失值。因為在實際應用中,數據往往不是完整的,而是存在著空白或缺失值,這些空白或缺失值可能是由於數據採集困難、數據丟失或數據處理錯誤等原因造成的。如果直接使用這些空白或缺失值進行分析和預測,將會對結果造成很大的影響。 插值運算可以用來填補這些空 ...
  • 是極致魅惑、灑脫自由的Java heap space?是知性柔情、溫婉大氣的GC overhead limit exceeded?是純真無邪、活潑可愛的Metaspace?如果以上不是你的菜,那還有……刁蠻任性,無跡可尋的CodeCache!性感火辣、心思細膩的Direct Memory高貴冷艷,獨... ...
  • JMM 請你談談對Volatile的理解 Volatile是java虛擬機提供的輕量級的同步機制 1、保證可見性 2、不保證原子性 3、禁止指令重排 什麼是JMM JVM->java虛擬機 JMM->java記憶體模型,不存在的東西,概念!約定 關於JMM的一些同步的約定: 線程解鎖前,必須把共用變數 ...
  • Go 介面-契約介紹 目錄Go 介面-契約介紹一、介面基本介紹1.1 介面類型介紹1.2 為什麼要使用介面1.3 面向介面編程1.4 介面的定義二、空介面2.1 空介面的定義2.2 空介面的應用2.2.1 空介面作為函數的參數2.2.2 空介面作為map的值2.3 介面類型變數2.4 類型斷言三、盡 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...