Kafka學習(四)-------- Kafka核心之Producer

来源:https://www.cnblogs.com/tree1123/archive/2019/08/06/11309558.html
-Advertisement-
Play Games

通過https://www.cnblogs.com/tree1123/p/11243668.html 已經對consumer有了一定的瞭解。producer比consumer要簡單一些。 一、舊版本producer 0.9.0.0版本以前,是由scala編寫的舊版本producer。 入口類:kaf ...


通過https://www.cnblogs.com/tree1123/p/11243668.html 已經對consumer有了一定的瞭解。producer比consumer要簡單一些。

一、舊版本producer

0.9.0.0版本以前,是由scala編寫的舊版本producer。

入口類:kafka.producer.Producer

代碼示例:

Properties properties = new Properties();
        properties.put("metadata.broker.list", "kafka01:9092,kafka02:9092");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.requird.acks", "1");
        ProducerConfig config = new ProducerConfig(properties);
        Producer<String, String> producer = new Producer<String, String>(config);
        KeyedMessage<String,String> msg = new KeyedMessage<String,String>("topic","hello");
        Producer.send(msg);

舊版本是同步機制,等待響應。吞吐性很差。在0.9.0.0版本以後,正式下架了。

舊版本的方法:

send   發送
close   關閉
sync   非同步發送  有丟失消息的可能性

二、新版本producer

舊版本producer由scala編寫,0.9.0.0版本以後,新版本producer由java編寫。

新版本主要入口類是:org.apache.kafka.clients.producer.KafkaProducer

常用方法:

send  實現消息發送主邏輯
close  關閉producer   
metrics  獲取producer的實時監控指標數據 比如發送消息的速率

Kafka producer要比consumer設計簡單一些,主要就是向某個topic的某個分區發送一條消息。partitioner決定向哪個分區發送消息。用戶指定key,預設的分區器會根據key的哈希值來選擇分區,如果沒有指定key就以輪詢的方式選擇分區。也可以自定義分區策略。

確定分區後,producer尋找到分區的leader,也就是該leader所在的broker,然後發送消息,leader會進行副本同步ISR。

producer會啟兩個線程,主線程封裝ProducerRecord類,序列化後發給partitioner,然後發送到記憶體緩衝區。

另一個I/O線程,提取消息分batch統一發送給對應的broker。

示例代碼:

Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));
            System.out.println("testkafka"+i);
        }
        kafkaProducer.close();

1、構造Properties對象,bootstrap.servers key.serializer value.serializer是必須指定的。

2、使用Properties構造KafkaProducer對象。

3、構造ProducerRecord 指定topic 分區 key value。

4、KafkaProducer的send方法發送。

5、關閉KafkaProducer。

Properties主要參數:

bootstrap.servers 和consumer一樣,指定部分broker即可。而且broker端如果沒有配ip地址,要寫成主機名。

key.serializer value.serializer 序列化參數 一定要全類名 沒有key也必須設置。

acks 三個值

​ 0: producer完全不管broker的處理結果 回調也就沒有用了 並不能保證消息成功發送 但是這種吞吐量最高

​ all或者-1: leader broker會等消息寫入 並且ISR都寫入後 才會響應,這種只要ISR有副本存活就肯定不會丟失,但吞 吐量最低。

​ 1: 預設的值 leader broker自己寫入後就響應,不會等待ISR其他的副本寫入,只要leader broker存活就不會丟失,即保證了不丟失,也保證了吞吐量。

buffer.memory 緩衝區大小 位元組 預設是33554432 就是發送消息的記憶體緩衝區大小 過小的話會影響吞吐量

compression.type 設置是否壓縮消息 預設值是none 壓縮後可以降低IO開銷提高吞吐,但是會增大CPU開銷。

​ 支持三種: GZIP Snappy LZ4 性能 LZ4 > Snappy > GZIP

retries 發送消息重試的次數 預設0 不重試 重試可能造成重覆發送 可能造成亂序

​ retry.backoff.ms 設置重試間隔 預設100毫秒

batch.size 調優重要的參數 batch小 吞吐量也會小 batch大 記憶體壓力會大 預設值是16384 16KB

linger.ms 發送延時 預設是0 0的話不用等batch滿就發送 延時的話可以提高吞吐 看具體情況進行調整

max.request.size producer能夠發送最大消息的大小 預設1048576位元組 如果消息很大 需要修改它

request.timeout.ms 發送請求後broker在規定時間返回 預設30秒 超過就是超時了。

Send方法

fire and forget 就是上邊的示例

Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));
            System.out.println("testkafka"+i);
        }
        kafkaProducer.close();

非同步回調 不阻塞

Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i),new Callback(){
              public void onCompletion(RecordMetadata metadata, Exception e) {
                         if(e != null) {
                            e.printStackTrace();
                         } else {
                            System.out.println("The offset of the record we just sent is: " +       metadata.offset());
                         }
                     }           
            });
            System.out.println("testkafka"+i);
        }
        kafkaProducer.close();

同步發送 無限等待返回

producer.send(record).get()

重試機制

如果需要自定義重試機制,就要在回調里對不同異常區別對待,常見的幾種如下:

可重試異常

LeaderNotAvailableException :分區的Leader副本不可用,這可能是換屆選舉導致的瞬時的異常,重試幾次就可以恢復
NotControllerException:Controller主要是用來選擇分區副本和每一個分區leader的副本信息,主要負責統一管理分區信息等,也可能是選舉所致。

NetWorkerException :瞬時網路故障異常所致。

不可重試異常

SerializationException:序列化失敗異常

RecordToolLargeException:消息尺寸過大導致。

示例代碼:

 producer.send(myRecord,
                   new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {
                           if(e ==null){
                               //正常處理邏輯
                               System.out.println("The offset of the record we just sent is: " + metadata.offset()); 
                               
                           }else{
                                   
                                 if(e instanceof RetriableException) {
                                    //處理可重試異常
                                    ......
                                 } else {
                                    //處理不可重試異常
                                    ......
                                 }
                           }
                       }
                   });
分區機制

partitioner決定向哪個分區發送消息。用戶指定key,預設的分區器會根據key的哈希值來選擇分區,如果沒有指定key就以輪詢的方式選擇分區。也可以自定義分區策略。

對於有key的消息,java版本的producer自帶的partitioner會根據murmur2演算法計算消息key的哈希值。然後對總分區數求模得到消息要被髮送到的目標分區號。

自定義分區策略:

創建一個類,實現org.apache.kafka.clients.producer.Partitioner介面

主要分區邏輯在Partitioner.partition中實現:通過topic key value 一同確定分區

在構造KafkaProducer得Properties中設置partitioner.class 為自定義類 註意是全類名

序列化機制

常用的serializer

ByteArraySerializer.class

ByteBufferSerializer.class

BytesSerializer.class

DoubleSerializer.class

IntegerSerializer.class

LongSerializer.class

StringSerializer.class

但是其他一些複雜的就需要自定義序列化:

1、定義數據格式

2、創建自定義序列化類,實現org.apache.kafka.common.serialization.Serializer介面

3、在KafkaProducer的Properties中設置key.serializer value.serializer為自定義類

以上均為單線程的情況,但producer是線程安全的,單線程適合分區較少的情況,分區較多可以多線程但對記憶體損耗較大。

我的博客即將搬運同步至騰訊雲+社區,邀請大家一同入駐:https://cloud.tencent.com/developer/support-plan


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、echo在屏幕上列印內容 echo [選項] [輸出內容] -e 支持轉義字元控制的字元轉換 輸出帶顏色的文本 二、第一個腳本 編寫腳本 註意: 運行腳本 兩種方式 (1)賦予執行許可權,直接運行 (2)通過bash調用執行腳本 三、bash的基本功能 (1)命令別名 顯示已有的別名 alisa ...
  • vi -- 終端中的編輯器 visual interface ssh-- secure shell vim vi improved 打開和新建文件 vi 文件名 #如果文件已經存在,會直接打開文件 #如果文件不存在,會新建一個文件 打開文件並定位行 vi 文件 游標定位在最開頭 vi 文件 + 游標 ...
  • [TOC] 手動編譯PHP開發環境 這是一篇來自深夜加班的手稿 問題復盤 你有沒有遇到過這樣的情況,部署了集成環境,每次添加擴展的時候,總是需要找一堆的配置文件的位置(其實很多人都能熟練使用集成環境) 你有沒有遇到過這樣的情況,去面試,面試官問你: 有沒有自己手動編譯過環境? 你卻回答 我一般都使用 ...
  • 1. 遷移背景和限制條件 隨著功能的迭代或者數據表中數據量的增加,將現有數據進行遷移已是工作中經常遇到的事情。通常我們在平時遷移數據數據的時候,只需要用mysqldump、mysqlimport指令就能完成遷移功能,但在實際工作中,作為開發者的我們往往沒有這麼大的許可權(例如寫許可權)來操作線上數據,只 ...
  • 目前最流行的大數據查詢引擎非hive莫屬,它是基於MR的類SQL查詢工具,會把輸入的查詢SQL解釋為MapReduce,能極大的降低使用大數據查詢的門檻, 讓一般的業務人員也可以直接對大數據進行查詢。但因其基於MR,運行速度是一個弊端,通常運行一個查詢需等待很久才會有結果。對於此情況,創造了hive ...
  • 一、創建表 create table 表裡包含什麼類型的數據 表的名稱是什麼 主鍵 列的名稱是什麼 每一列的數據類型是什麼 每一列的長度是多少 表裡哪些列可以是空的 語法: create table table_name (field1 data_type [not null], field2 da ...
  • 1. MySQL多表查詢 1.1 外鍵約束 為了消除多張表查詢出現的笛卡爾積的現象,MySQL在建表併進行多表之間的關鍵查詢可以使用外鍵關聯查詢。 外鍵:從表1(sub)的某列引用(ref)另外一個表2(main)的某列的值,把表1的這列叫做表2這列的外鍵。 1.2 外鍵的設置使用 比如上述最簡單的 ...
  • zookeeper是幹嘛的呢 Zookeeper的作用1.可以為客戶端管理少量的數據kvkey:是以路徑的形式表示的,那就意味著,各key之間有父子關係,比如/ 是頂層key用戶建的key只能在/ 下作為子節點,比如建一個key: /aa 這個key可以帶value數據也可以建一個key: /bb也 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...