Kafka系列2:深入理解Kafka生產者

来源:https://www.cnblogs.com/mcbye/archive/2020/02/16/kafka-producer-in-detail.html
-Advertisement-
Play Games

生產者是如何生產消息 如何創建生產者 發送消息到Kafka 生產者配置 分區 ...


Kafka系列2:深入理解Kafka消費者

上篇聊了Kafka概況,包含了Kafka的基本概念、設計原理,以及設計核心。本篇單獨聊聊Kafka的生產者,包括如下內容:

  • 生產者是如何生產消息
  • 如何創建生產者
  • 發送消息到Kafka
  • 生產者配置
  • 分區

生產者是如何生產消息的

首先來看一下Kafka生產者組件圖

(生產者組件圖。圖片來源:《Kafka權威指南》)

第一步,Kafka 會將發送消息包裝為 ProducerRecord 對象, ProducerRecord 對象包含了目標主題和要發送的內容,同時還可以指定鍵和分區。在發送 ProducerRecord 對象前,生產者會先把鍵和值對象序列化成位元組數組,這樣它們才能夠在網路上傳輸。第二步,數據被傳給分區器。如果之前已經在 ProducerRecord 對象里指定了分區,那麼分區器就不會再做任何事情。如果沒有指定分區 ,那麼分區器會根據 ProducerRecord 對象的鍵來選擇一個分區,緊接著,這條記錄被添加到一個記錄批次里,這個批次里的所有消息會被髮送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的 broker 上。伺服器在收到這些消息時會返回一個響應。如果消息成功寫入 Kafka,就返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區里的偏移量。如果寫入失敗,則會返回一個錯誤。生產者在收到錯誤之後會嘗試重新發送消息,如果達到指定的重試次數後還沒有成功,則直接拋出異常,不再重試。

如何創建生產者

屬性設置

在創建生產者對象的時候,要設置一些屬性,有三個屬性是必選的:

  • bootstrap.servers:指定Broker的地址清單,地址格式為host:port。清單里不需要包含所有的Broker地址,生產者會從給定的Broker里查找到其他Broker的信息;不過建議至少要提供兩個Broker的信息保證容錯。

  • key.serializer:指定鍵的序列化器。Broker希望接收到的消息的鍵和值都是位元組數組。這個屬性必須被設置為一個實現了org.apache.kafka.common.serialization.Serializer介面的類,生產者會使用這個類把鍵對象序列化成位元組數組。Kafka客戶端預設提供了ByteArraySerializer、StringSerializer和IntegerSerializer,因此一般不需要實現自定義的序列化器。需要註意的是,key.serializer屬性是必須設置的,即使只發送值內容。

  • value.serializer:指定值的序列化器。如果鍵和值都是字元串,可以使用與key.serializer一樣的序列化器,否則需要使用不同的序列化器。

項目依賴

以maven項目為例,要使用Kafka客戶端,需要引入kafka-clients依賴:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>

樣例

一個簡單的創建Kafka生產者的代碼樣例如下:

        Properties props = new Properties();
        props.put("bootstrap.servers", "producer1:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*創建生產者*/
        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "v" + i);
            /* 發送消息*/
            producer.send(record);
        }
        /*關閉生產者*/
        producer.close();

這個樣例中只配置了必須的這三個屬性,其他都使用了預設的配置。

發送消息Kafka

實例化生產者對象後,接下來就可以開始發送消息了。發送消息主要有三種方式:

  • 發送並忘記(fire-and-forget):把消息發送給伺服器,但並不關心消息是否正常到達,也就是上面樣例中的方式。大多數情況下,消息會正常到達,這可以由Kafka的高可用性和自動重發機制來保證。不過有時候也會丟失消息。
  • 同步發送:使用send()方法發送消息,它會返回一個Future對象,調用get()方法進行等待,我們就可以知道消息是否發送成功。
  • 非同步發送:調用send()方法時,同時指定一個回調函數,伺服器在返迴響應時調用該函數。

發送並忘記

這是最簡單的消息發送方式,只發送不管發送結果,代碼樣例如下:

ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v"); // 1
try {
    producer.send(record); // 2
} catch (Exception e) {
    e.printStackTrace(); // 3
}

這段代碼要註意幾點:

  1. 生產者的send()方法將ProducerRecord對象作為參數,樣例里用到的ProducerRecord構造函數需要目標主題的名字和要發送的鍵和值對象,它們都是字元串。鍵和值對象的類型都必須與序列化器和生產者對象相匹配。
  2. 使用生產者的send()方法發送ProducerRecord對象。消息會先被放進緩衝區,然後使用單獨的線程發送到伺服器端。send()方法會返回一個包含RecordMetadata的Future對象,不過此處不關註返回了什麼。
  3. 發送消息時,生產者可能會出現一些執行異常,序列化消息失敗異常、緩衝區超出異常、超時異常,或者發送線程被中斷異常。

同步發送

在上一種發送方式中已經解釋過同步發送和只發送的區別,以下是最簡單的同步發送方式的代碼樣例,對比可以看到區別:

ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v"); 
try {
    producer.send(record).get; 
} catch (Exception e) {
    e.printStackTrace(); 
}

可以看到,二者的區別就在於是否接收發送結果。同步發送會接收send()方法的返回值,即一個Future對象,通過調用Future對象的get()方法來等待Kafka響應。如果伺服器返回錯誤,則get()方法就會拋出異常。如果沒有發生錯誤,我們會得到一個RecordMetadata對象,可以用它來獲取消息的偏移量。

非同步發送消息

對於吞吐量要求比較高的應用來說,又要同時保證服務的可靠性,發送並忘記方式可靠性較低,但同步發送方式又會降低吞吐量,這就需要非同步發送消息的方式了。大多數時候,生產者並不需要等待響應,只需要在遇到消息發送失敗時,拋出異常、記錄錯誤日誌,或者把消息寫入“錯誤日誌”文件便於以後分析。代碼樣例如下:

ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v");
// 非同步發送消息,並監聽回調
producer.send(record, new Callback() { // 1
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) { // 2
        if (exception != null) {
            // 進行異常處理
        } else {
            System.out.printf("topic=%s, partition=%d, offset=%s \n", metadata.topic(), metadata.partition(), metadata.offset());
        }
    }
});
  1. 從上面代碼可以看到,為了使用回調,只需要實現一個org.apache.kafka.clients.producer.Callback介面即可,這個介面只有一個onComplete方法。
  2. 如果Kafka返回錯誤,onComplete方法會拋出一個非空異常。在調用send()方法的時候會傳入這個callback對象,根據發送的結果決定調用異常處理方法還是發送結果處理方法。

生產者配置

在創建生產者的時候,介紹了三個必須的屬性,本節再一一介紹下其他的生產者屬性:

acks

acks 參數指定了必須要有多少個分區副本收到消息,生產者才會認為消息寫入是成功的:

  • acks=0 : 消息發送出去就認為已經成功了,不會等待任何來自伺服器的響應;
  • acks=1 : 只要集群的首領節點收到消息,生產者就會收到一個來自伺服器成功響應;
  • acks=all :只有當所有參與複製的節點全部收到消息時,生產者才會收到一個來自伺服器的成功響應。

buffer.memory

該參數用來設置生產者記憶體緩衝區的大小生產者用它緩衝要發送到伺服器的消息。如果程式發送消息的速度超過了發送到伺服器的速度,會導致生產者緩衝區空間不足,這時候調用send()方法要麼被阻塞,要麼拋出異常。

compression.type

預設情況下,發送的消息不會被壓縮。它指定了消息被髮送給broker之前使用哪一種壓縮演算法進行壓縮,可選值有 snappy(占用CPU少,關註性能和網路帶寬時選用),gzip(占用CPU多,更高壓縮比,網路帶寬有限時選用),lz4。

retries

指定了生產者放消息發生錯誤後,消息重發的次數。如果達到設定值,生產者就會放棄重試並返回錯誤。

batch.size

當有多個消息需要被髮送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的記憶體大小,按照位元組數計算。

linger.ms

該參數制定了生產者在發送批次之前等待更多消息加入批次的時間。KafkaProducer會在批次填滿或linger.ms達到上限時把批次發送出去。

client.id

客戶端 id,伺服器用來識別消息的來源。

max.in.flight.requests.per.connection

指定了生產者在收到伺服器響應之前可以發送多少個消息。它的值越高,就會占用越多的記憶體,不過也會提升吞吐量,把它設置為 1 可以保證消息是按照發送的順序寫入伺服器,即使發生了重試。

timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms

  • timeout.ms 指定了 borker 等待同步副本返回消息的確認時間;
  • request.timeout.ms 指定了生產者在發送數據時等待伺服器返迴響應的時間;
  • metadata.fetch.timeout.ms 指定了生產者在獲取元數據(比如分區首領是誰)時等待伺服器返迴響應的時間。

max.block.ms

該參數指定了在調用send()方法或使用partitionsFor()方法獲取元數據時生產者的阻塞時間。當生產者的發送緩衝區已滿,或者沒有可用的元數據時,這些方法會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。

max.request.size

該參數用於控制生產者發送的請求大小。它可以指發送的單個消息的最大值,也可以指單個請求里所有消息總的大小。例如,假設這個值為 1000K ,那麼可以發送的單個最大消息為 1000K ,或者生產者可以在單個請求里發送一個批次,該批次包含了 1000 個消息,每個消息大小為 1K。

receive.buffer.bytes和send.buffer.byte

這兩個參數分別指定 TCP socket 接收和發送數據包緩衝區的大小,-1 代表使用操作系統的預設值。

分區

分區器

上面在說明生產者發送消息方式的時候有如下一行代碼:

ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v"); 

這裡指定了Kafka消息的目標主題、鍵和值。ProducerRecord對象包含了主題、鍵和值。鍵的作用是:

  • 作為消息的附加信息;
  • 用來決定消息被寫到主題的哪個分區,擁有相同鍵的消息將被寫到同一個分區。

鍵可以設置為預設的null,是不是null的區別在於:

  • 如果鍵為null,那麼分區器使用輪詢演算法將消息均衡地分佈到各個分區上;
  • 如果鍵不為null,那麼 分區器 會使用內置的散列演算法對鍵進行散列,然後分佈到各個分區上。

要註意的是,只有在不改變分區主題分區數量的情況下,鍵與分區之間的映射才能保持不變。

順序保證

Kafka可以保證同一個分區里的消息是有序的。考慮一種情況,如果retries為非零整數,同時max.in.flight.requests.per.connection為比1大的數如果某些場景要求消息是有序的,也即生產者在收到伺服器響應之前可以發送多個消息,且失敗會重試。那麼如果第一個批次消息寫入失敗,而第二個成功,Broker會重試寫入第一個批次,如果此時第一個批次寫入成功,那麼兩個批次的順序就反過來了。也即,要保證消息是有序的,消息是否寫入成功也是很關鍵的。那麼如何做呢?在對消息的順序要嚴格要求的情況下,可以將retries設置為大於0,max.in.flight.requests.per.connection設為1,這樣在生產者嘗試發送第一批消息時,就不會有其他的消息發送給Broker。當然這回嚴重影響生產者的吞吐量。

 

關註我的公眾號,獲取更多關於面試、技術的文章及福利資源。

Dali王的技術博客公眾號

【參考資料】《Kafka 權威指南》


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Java13核心類 沒有特殊說明,我的所有學習筆記都是從 廖老師 那裡摘抄過來的,侵刪 引言 兜兜轉轉到了大四,學過了C,C++,C ,Java,Python,學一門丟一門,到了最後還是要把Java撿起來。所以奉勸大家,面向對象還是要掌握一門,雖然Python好寫舒服,但是畢竟不能完全面向對象,也沒 ...
  • 我們在這篇文章將要學習最有用的 "數據結構" 之一— 哈希表 ,哈希表的英文叫 Hash Table,也可以稱為 散列表 或者 Hash 表 。 哈希表用的是 數組支持按照下標隨機訪問數據的特性 ,所以哈希表其實就是數組的一種擴展,由數組演化而來。可以說,如果沒有數組,就沒有散列表。 哈希表存儲的是 ...
  • 一、Collections的常用方法介紹 1.承接上次連載,先介紹幾個簡單的常用方法 package com.bjpowernode.java_learning; import java.util.*; public class D84_1_CommonMethodOfCollection { pu ...
  • **UML**(Unified Modeling Language) 統一建模語言,又稱標準建模語言。是用來對軟體密集系統進行可視化建模的一種語言。UML的定義包括UML語義和UML表示法兩個元素。UML是在開發階段,說明、可視化、構建和書寫一個面向對象軟體密集系統的製品的開放方法。最佳的應用是工程... ...
  • 開發環境: Windows操作系統開發工具: MyEclipse/Eclipse+Jdk+mysql資料庫運行效果圖: 源碼及原文鏈接:https://javadao.xyz/forum.php?mod=viewthread&tid=36 ...
  • 資料庫設置 在上一章節中學習瞭如何創建Django項目,在Django項目中創建web應用,以及如何在Django主程式的URL中引用web應用中的URL。下麵來瞭解如何在Django中使用資料庫。Django中想要使用資料庫, 首先要瞭解mysite/mysite/settings.py中關於數據 ...
  • 需求 maven依賴 列印sql 配置要點: 1. 驅動配置 application.properties 2. psy配置 aop列印持久層執行時間 使用aop實現; 啟用aop註解: 小結 來個效果截圖: 通過本片文章,你可以學會: 1. 給代碼添加aop切麵,增加日誌或者列印出方法執行總耗時; ...
  • 一、問題剖析 看到這個問題,我想吹水兩句再做推薦。一般發出這個疑問都處在初學編程階段,編程語言都是相通的,只要你領悟了一門語言的“任督二脈”,以後你學哪一門語言都會輕易上手。學語言嘛,當你工作一兩年了,你還真會覺得像當初老師說的那樣,語言只是工具罷了。工作期間,可能要你接觸到其它語言,而且要你能快速 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...