Kafka系列2:深入理解Kafka生產者

来源:https://www.cnblogs.com/mcbye/archive/2020/02/16/kafka-producer-in-detail.html
-Advertisement-
Play Games

生產者是如何生產消息 如何創建生產者 發送消息到Kafka 生產者配置 分區 ...


Kafka系列2:深入理解Kafka消費者

上篇聊了Kafka概況,包含了Kafka的基本概念、設計原理,以及設計核心。本篇單獨聊聊Kafka的生產者,包括如下內容:

  • 生產者是如何生產消息
  • 如何創建生產者
  • 發送消息到Kafka
  • 生產者配置
  • 分區

生產者是如何生產消息的

首先來看一下Kafka生產者組件圖

(生產者組件圖。圖片來源:《Kafka權威指南》)

第一步,Kafka 會將發送消息包裝為 ProducerRecord 對象, ProducerRecord 對象包含了目標主題和要發送的內容,同時還可以指定鍵和分區。在發送 ProducerRecord 對象前,生產者會先把鍵和值對象序列化成位元組數組,這樣它們才能夠在網路上傳輸。第二步,數據被傳給分區器。如果之前已經在 ProducerRecord 對象里指定了分區,那麼分區器就不會再做任何事情。如果沒有指定分區 ,那麼分區器會根據 ProducerRecord 對象的鍵來選擇一個分區,緊接著,這條記錄被添加到一個記錄批次里,這個批次里的所有消息會被髮送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的 broker 上。伺服器在收到這些消息時會返回一個響應。如果消息成功寫入 Kafka,就返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區里的偏移量。如果寫入失敗,則會返回一個錯誤。生產者在收到錯誤之後會嘗試重新發送消息,如果達到指定的重試次數後還沒有成功,則直接拋出異常,不再重試。

如何創建生產者

屬性設置

在創建生產者對象的時候,要設置一些屬性,有三個屬性是必選的:

  • bootstrap.servers:指定Broker的地址清單,地址格式為host:port。清單里不需要包含所有的Broker地址,生產者會從給定的Broker里查找到其他Broker的信息;不過建議至少要提供兩個Broker的信息保證容錯。

  • key.serializer:指定鍵的序列化器。Broker希望接收到的消息的鍵和值都是位元組數組。這個屬性必須被設置為一個實現了org.apache.kafka.common.serialization.Serializer介面的類,生產者會使用這個類把鍵對象序列化成位元組數組。Kafka客戶端預設提供了ByteArraySerializer、StringSerializer和IntegerSerializer,因此一般不需要實現自定義的序列化器。需要註意的是,key.serializer屬性是必須設置的,即使只發送值內容。

  • value.serializer:指定值的序列化器。如果鍵和值都是字元串,可以使用與key.serializer一樣的序列化器,否則需要使用不同的序列化器。

項目依賴

以maven項目為例,要使用Kafka客戶端,需要引入kafka-clients依賴:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>

樣例

一個簡單的創建Kafka生產者的代碼樣例如下:

        Properties props = new Properties();
        props.put("bootstrap.servers", "producer1:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*創建生產者*/
        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "v" + i);
            /* 發送消息*/
            producer.send(record);
        }
        /*關閉生產者*/
        producer.close();

這個樣例中只配置了必須的這三個屬性,其他都使用了預設的配置。

發送消息Kafka

實例化生產者對象後,接下來就可以開始發送消息了。發送消息主要有三種方式:

  • 發送並忘記(fire-and-forget):把消息發送給伺服器,但並不關心消息是否正常到達,也就是上面樣例中的方式。大多數情況下,消息會正常到達,這可以由Kafka的高可用性和自動重發機制來保證。不過有時候也會丟失消息。
  • 同步發送:使用send()方法發送消息,它會返回一個Future對象,調用get()方法進行等待,我們就可以知道消息是否發送成功。
  • 非同步發送:調用send()方法時,同時指定一個回調函數,伺服器在返迴響應時調用該函數。

發送並忘記

這是最簡單的消息發送方式,只發送不管發送結果,代碼樣例如下:

ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v"); // 1
try {
    producer.send(record); // 2
} catch (Exception e) {
    e.printStackTrace(); // 3
}

這段代碼要註意幾點:

  1. 生產者的send()方法將ProducerRecord對象作為參數,樣例里用到的ProducerRecord構造函數需要目標主題的名字和要發送的鍵和值對象,它們都是字元串。鍵和值對象的類型都必須與序列化器和生產者對象相匹配。
  2. 使用生產者的send()方法發送ProducerRecord對象。消息會先被放進緩衝區,然後使用單獨的線程發送到伺服器端。send()方法會返回一個包含RecordMetadata的Future對象,不過此處不關註返回了什麼。
  3. 發送消息時,生產者可能會出現一些執行異常,序列化消息失敗異常、緩衝區超出異常、超時異常,或者發送線程被中斷異常。

同步發送

在上一種發送方式中已經解釋過同步發送和只發送的區別,以下是最簡單的同步發送方式的代碼樣例,對比可以看到區別:

ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v"); 
try {
    producer.send(record).get; 
} catch (Exception e) {
    e.printStackTrace(); 
}

可以看到,二者的區別就在於是否接收發送結果。同步發送會接收send()方法的返回值,即一個Future對象,通過調用Future對象的get()方法來等待Kafka響應。如果伺服器返回錯誤,則get()方法就會拋出異常。如果沒有發生錯誤,我們會得到一個RecordMetadata對象,可以用它來獲取消息的偏移量。

非同步發送消息

對於吞吐量要求比較高的應用來說,又要同時保證服務的可靠性,發送並忘記方式可靠性較低,但同步發送方式又會降低吞吐量,這就需要非同步發送消息的方式了。大多數時候,生產者並不需要等待響應,只需要在遇到消息發送失敗時,拋出異常、記錄錯誤日誌,或者把消息寫入“錯誤日誌”文件便於以後分析。代碼樣例如下:

ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v");
// 非同步發送消息,並監聽回調
producer.send(record, new Callback() { // 1
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) { // 2
        if (exception != null) {
            // 進行異常處理
        } else {
            System.out.printf("topic=%s, partition=%d, offset=%s \n", metadata.topic(), metadata.partition(), metadata.offset());
        }
    }
});
  1. 從上面代碼可以看到,為了使用回調,只需要實現一個org.apache.kafka.clients.producer.Callback介面即可,這個介面只有一個onComplete方法。
  2. 如果Kafka返回錯誤,onComplete方法會拋出一個非空異常。在調用send()方法的時候會傳入這個callback對象,根據發送的結果決定調用異常處理方法還是發送結果處理方法。

生產者配置

在創建生產者的時候,介紹了三個必須的屬性,本節再一一介紹下其他的生產者屬性:

acks

acks 參數指定了必須要有多少個分區副本收到消息,生產者才會認為消息寫入是成功的:

  • acks=0 : 消息發送出去就認為已經成功了,不會等待任何來自伺服器的響應;
  • acks=1 : 只要集群的首領節點收到消息,生產者就會收到一個來自伺服器成功響應;
  • acks=all :只有當所有參與複製的節點全部收到消息時,生產者才會收到一個來自伺服器的成功響應。

buffer.memory

該參數用來設置生產者記憶體緩衝區的大小生產者用它緩衝要發送到伺服器的消息。如果程式發送消息的速度超過了發送到伺服器的速度,會導致生產者緩衝區空間不足,這時候調用send()方法要麼被阻塞,要麼拋出異常。

compression.type

預設情況下,發送的消息不會被壓縮。它指定了消息被髮送給broker之前使用哪一種壓縮演算法進行壓縮,可選值有 snappy(占用CPU少,關註性能和網路帶寬時選用),gzip(占用CPU多,更高壓縮比,網路帶寬有限時選用),lz4。

retries

指定了生產者放消息發生錯誤後,消息重發的次數。如果達到設定值,生產者就會放棄重試並返回錯誤。

batch.size

當有多個消息需要被髮送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的記憶體大小,按照位元組數計算。

linger.ms

該參數制定了生產者在發送批次之前等待更多消息加入批次的時間。KafkaProducer會在批次填滿或linger.ms達到上限時把批次發送出去。

client.id

客戶端 id,伺服器用來識別消息的來源。

max.in.flight.requests.per.connection

指定了生產者在收到伺服器響應之前可以發送多少個消息。它的值越高,就會占用越多的記憶體,不過也會提升吞吐量,把它設置為 1 可以保證消息是按照發送的順序寫入伺服器,即使發生了重試。

timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms

  • timeout.ms 指定了 borker 等待同步副本返回消息的確認時間;
  • request.timeout.ms 指定了生產者在發送數據時等待伺服器返迴響應的時間;
  • metadata.fetch.timeout.ms 指定了生產者在獲取元數據(比如分區首領是誰)時等待伺服器返迴響應的時間。

max.block.ms

該參數指定了在調用send()方法或使用partitionsFor()方法獲取元數據時生產者的阻塞時間。當生產者的發送緩衝區已滿,或者沒有可用的元數據時,這些方法會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。

max.request.size

該參數用於控制生產者發送的請求大小。它可以指發送的單個消息的最大值,也可以指單個請求里所有消息總的大小。例如,假設這個值為 1000K ,那麼可以發送的單個最大消息為 1000K ,或者生產者可以在單個請求里發送一個批次,該批次包含了 1000 個消息,每個消息大小為 1K。

receive.buffer.bytes和send.buffer.byte

這兩個參數分別指定 TCP socket 接收和發送數據包緩衝區的大小,-1 代表使用操作系統的預設值。

分區

分區器

上面在說明生產者發送消息方式的時候有如下一行代碼:

ProducerRecord<String, String> record = new ProducerRecord<>("Topic", "k", "v"); 

這裡指定了Kafka消息的目標主題、鍵和值。ProducerRecord對象包含了主題、鍵和值。鍵的作用是:

  • 作為消息的附加信息;
  • 用來決定消息被寫到主題的哪個分區,擁有相同鍵的消息將被寫到同一個分區。

鍵可以設置為預設的null,是不是null的區別在於:

  • 如果鍵為null,那麼分區器使用輪詢演算法將消息均衡地分佈到各個分區上;
  • 如果鍵不為null,那麼 分區器 會使用內置的散列演算法對鍵進行散列,然後分佈到各個分區上。

要註意的是,只有在不改變分區主題分區數量的情況下,鍵與分區之間的映射才能保持不變。

順序保證

Kafka可以保證同一個分區里的消息是有序的。考慮一種情況,如果retries為非零整數,同時max.in.flight.requests.per.connection為比1大的數如果某些場景要求消息是有序的,也即生產者在收到伺服器響應之前可以發送多個消息,且失敗會重試。那麼如果第一個批次消息寫入失敗,而第二個成功,Broker會重試寫入第一個批次,如果此時第一個批次寫入成功,那麼兩個批次的順序就反過來了。也即,要保證消息是有序的,消息是否寫入成功也是很關鍵的。那麼如何做呢?在對消息的順序要嚴格要求的情況下,可以將retries設置為大於0,max.in.flight.requests.per.connection設為1,這樣在生產者嘗試發送第一批消息時,就不會有其他的消息發送給Broker。當然這回嚴重影響生產者的吞吐量。

 

關註我的公眾號,獲取更多關於面試、技術的文章及福利資源。

Dali王的技術博客公眾號

【參考資料】《Kafka 權威指南》


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Java13核心類 沒有特殊說明,我的所有學習筆記都是從 廖老師 那裡摘抄過來的,侵刪 引言 兜兜轉轉到了大四,學過了C,C++,C ,Java,Python,學一門丟一門,到了最後還是要把Java撿起來。所以奉勸大家,面向對象還是要掌握一門,雖然Python好寫舒服,但是畢竟不能完全面向對象,也沒 ...
  • 我們在這篇文章將要學習最有用的 "數據結構" 之一— 哈希表 ,哈希表的英文叫 Hash Table,也可以稱為 散列表 或者 Hash 表 。 哈希表用的是 數組支持按照下標隨機訪問數據的特性 ,所以哈希表其實就是數組的一種擴展,由數組演化而來。可以說,如果沒有數組,就沒有散列表。 哈希表存儲的是 ...
  • 一、Collections的常用方法介紹 1.承接上次連載,先介紹幾個簡單的常用方法 package com.bjpowernode.java_learning; import java.util.*; public class D84_1_CommonMethodOfCollection { pu ...
  • **UML**(Unified Modeling Language) 統一建模語言,又稱標準建模語言。是用來對軟體密集系統進行可視化建模的一種語言。UML的定義包括UML語義和UML表示法兩個元素。UML是在開發階段,說明、可視化、構建和書寫一個面向對象軟體密集系統的製品的開放方法。最佳的應用是工程... ...
  • 開發環境: Windows操作系統開發工具: MyEclipse/Eclipse+Jdk+mysql資料庫運行效果圖: 源碼及原文鏈接:https://javadao.xyz/forum.php?mod=viewthread&tid=36 ...
  • 資料庫設置 在上一章節中學習瞭如何創建Django項目,在Django項目中創建web應用,以及如何在Django主程式的URL中引用web應用中的URL。下麵來瞭解如何在Django中使用資料庫。Django中想要使用資料庫, 首先要瞭解mysite/mysite/settings.py中關於數據 ...
  • 需求 maven依賴 列印sql 配置要點: 1. 驅動配置 application.properties 2. psy配置 aop列印持久層執行時間 使用aop實現; 啟用aop註解: 小結 來個效果截圖: 通過本片文章,你可以學會: 1. 給代碼添加aop切麵,增加日誌或者列印出方法執行總耗時; ...
  • 一、問題剖析 看到這個問題,我想吹水兩句再做推薦。一般發出這個疑問都處在初學編程階段,編程語言都是相通的,只要你領悟了一門語言的“任督二脈”,以後你學哪一門語言都會輕易上手。學語言嘛,當你工作一兩年了,你還真會覺得像當初老師說的那樣,語言只是工具罷了。工作期間,可能要你接觸到其它語言,而且要你能快速 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 在我們開發過程中基本上不可或缺的用到一些敏感機密數據,比如SQL伺服器的連接串或者是OAuth2的Secret等,這些敏感數據在代碼中是不太安全的,我們不應該在源代碼中存儲密碼和其他的敏感數據,一種推薦的方式是通過Asp.Net Core的機密管理器。 機密管理器 在 ASP.NET Core ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 順序棧的介面程式 目錄順序棧的介面程式頭文件創建順序棧入棧出棧利用棧將10進位轉16進位數驗證 頭文件 #include <stdio.h> #include <stdbool.h> #include <stdlib.h> 創建順序棧 // 指的是順序棧中的元素的數據類型,用戶可以根據需要進行修改 ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • C總結與剖析:關鍵字篇 -- <<C語言深度解剖>> 目錄C總結與剖析:關鍵字篇 -- <<C語言深度解剖>>程式的本質:二進位文件變數1.變數:記憶體上的某個位置開闢的空間2.變數的初始化3.為什麼要有變數4.局部變數與全局變數5.變數的大小由類型決定6.任何一個變數,記憶體賦值都是從低地址開始往高地 ...
  • 如果讓你來做一個有狀態流式應用的故障恢復,你會如何來做呢? 單機和多機會遇到什麼不同的問題? Flink Checkpoint 是做什麼用的?原理是什麼? ...
  • C++ 多級繼承 多級繼承是一種面向對象編程(OOP)特性,允許一個類從多個基類繼承屬性和方法。它使代碼更易於組織和維護,並促進代碼重用。 多級繼承的語法 在 C++ 中,使用 : 符號來指定繼承關係。多級繼承的語法如下: class DerivedClass : public BaseClass1 ...
  • 前言 什麼是SpringCloud? Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的開發便利性簡化了分散式系統的開發,比如服務註冊、服務發現、網關、路由、鏈路追蹤等。Spring Cloud 並不是重覆造輪子,而是將市面上開發得比較好的模塊集成進去,進行封裝,從 ...
  • class_template 類模板和函數模板的定義和使用類似,我們已經進行了介紹。有時,有兩個或多個類,其功能是相同的,僅僅是數據類型不同。類模板用於實現類所需數據的類型參數化 template<class NameType, class AgeType> class Person { publi ...
  • 目錄system v IPC簡介共用記憶體需要用到的函數介面shmget函數--獲取對象IDshmat函數--獲得映射空間shmctl函數--釋放資源共用記憶體實現思路註意 system v IPC簡介 消息隊列、共用記憶體和信號量統稱為system v IPC(進程間通信機制),V是羅馬數字5,是UNI ...