Kafka的介面回調 +自定義分區、攔截器

来源:https://www.cnblogs.com/HelloBigTable/archive/2019/02/28/10453884.html
-Advertisement-
Play Games

一、介面回調+自定義分區 1.介面回調:在使用消費者的send方法時添加Callback回調 註意:在自定義分區後,你的消費者會收不到消息,因為消費者預設接收的分區為0。 二、攔截器 1)創建生產者類; 2)創建自定義攔截器類實現ProducerInterceptor介面,重寫抽象方法; 3)在業務 ...


一、介面回調+自定義分區

  1.介面回調:在使用消費者的send方法時添加Callback回調

 

producer.send(new ProducerRecord<String, String>("xinnian", "20" + i + "年新年好!"), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata!=null){
System.out.println(recordMetadata.topic()+"-----"+recordMetadata.offset()+"-----"+recordMetadata.partition());
}
}
 2.自定義分區:定義類實現Patitioner介面,實現介面的方法:
   設置configure、分區邏輯partition(return 1;)、釋放資源close、在生產者的配置過程中添加入分區屬性。
 在定義生產者屬性時添加分區的屬性即可
/**
 * @author: PrincessHug
 * @date: 2019/2/28, 16:24
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class PartitionDemo implements Partitioner {
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        return 1;
    }

    public void close() {

    }

    public void configure(Map<String, ?> map) {

    }
}

public class ProducerDemo {
    public static void main(String[] args) {
        Properties prop = new Properties();

        //參數配置
        //kafka節點的地址
        prop.put("bootstrap.servers", "192.168.126.128:9092");
        //發送消息是否等待應答
        prop.put("acks", "all");
        //配置發送消息失敗重試
        prop.put("retries", "0");
        //配置批量處理消息大小
        prop.put("batch.size", "10241");
        //配置批量處理數據延遲
        prop.put("linger.ms","5");
        //配置記憶體緩衝大小
        prop.put("buffer.memory", "12341235");
        //消息在發送前必須序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("partitioner.class", "PartitionDemo");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

        for (int i=10;i<100;i++){
            producer.send(new ProducerRecord<String, String>("xinnian", "20" + i + "年新年好!"), new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (recordMetadata!=null){
                        System.out.println(recordMetadata.topic()+"-----"+recordMetadata.offset()+"-----"+recordMetadata.partition());
                    }
                }
            });
        }
        producer.close();
    }
}

  註意:在自定義分區後,你的消費者會收不到消息,因為消費者預設接收的分區為0。

 

二、攔截器

  1)創建生產者類;
     2)創建自定義攔截器類實現ProducerInterceptor介面,重寫抽象方法;
     3)在業務邏輯方法ProducerRecord方法中,修改返回值,
        return new ProducerRecord<String,String>(
        record.topic(),
        record.partiiton(),
        record.key(),
        System.currentTimeMillis() + "-" + record.value() + "-" + record.topic());
     4)在生產者類中將自定義攔截器生效
       prop.put(ProducerConfig.INTERCEPTOR_CLASSEA_CONFIG,"com.wyh.com.wyh.kafka.interceptor.TimeInterceptor");
     5)運行生產者main方法,或者在linux端用shell測試。

/**
 * @author: PrincessHug
 * @date: 2019/2/28, 20:59
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class TimeInterceptor implements ProducerInterceptor<String, String> {

    //業務邏輯
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        return new ProducerRecord<String,String>(
                producerRecord.topic(),
                producerRecord.partition(),
                producerRecord.key(),
                System.currentTimeMillis()+"--"+producerRecord.value()
        );
    }

    //發送失敗調用
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    //釋放資源
    public void close() {

    }

    //獲取配置信息
    public void configure(Map<String, ?> map) {

    }
}

public class ItctorProducer {
    public static void main(String[] args) {
        //配置生產者屬性
        Properties prop = new Properties();
        //kafka節點的地址
        prop.put("bootstrap.servers", "192.168.126.128:9092");
        //發送消息是否等待應答
        prop.put("acks", "all");
        //配置發送消息失敗重試
        prop.put("retries", "0");
        //配置批量處理消息大小
        prop.put("batch.size", "1024");
        //配置批量處理數據延遲
        prop.put("linger.ms","5");
        //配置記憶體緩衝大小
        prop.put("buffer.memory", "12341235");
        //消息在發送前必須序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //添加攔截器
        ArrayList<String> inList = new ArrayList<String>();
        inList.add("interceptor.TimeInterceptor");
        prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,inList);

        //實例化producer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

        //發送消息
        for (int i=0;i<99;i++){
            producer.send(new ProducerRecord<String, String>("xinnian","You are genius!"+i));
        }

        //釋放資源
        producer.close();
        
    }
}

 


  

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在WSUS下從Win10 1803更新到1809的問題解決。 ...
  • 決心書 我叫劉騰達,來自河北省衡水,大學實習期,學的網路專業,有接觸過Liunx課程,但畢業後工作了一段時間後還是選擇走培訓加深自己的專業知識,藉此進入互聯網行業,經過多方打聽和瞭解,來到了口碑很好的老男孩,在這裡希望自己能完成一個質的蛻變,達到自己的目標,希望這五個月老師能嚴格監督並督促我完成自己 ...
  • 我們一般平時安裝完WarIII後運行時的解析度預設是800*600,導致有黑邊的存在。所以我寫了一個bat腳本來自定義WarIII的運行解析度。需要以管理員身份運行。 下載鏈接: 鏈接:https://pan.baidu.com/s/1v-ZWjkhkVQTaXunEJs8ThQ 提取碼:m17p ...
  • 一 Ceph文件系統 1.1 概述 Ceph 對象網關是一個構建在 librados 之上的對象存儲介面,它為應用程式訪問Ceph 存儲集群提供了一個 RESTful 風格的網關 。 Ceph 對象存儲支持 2 種介面: 相容S3: 提供了對象存儲介面,相容亞馬遜S3 RESTful介面的一個大子集 ...
  • 1.打開cmd命令行,查看當前配置 輸入 npm config ls 先看一下當前npm的配置環境,由於我已經修改過,所以可以看到修改後的路徑 2.修改路徑 這裡需要修改兩個路徑,module路徑和cache路徑 module對應prefix cache對應cache 首先在別的盤新建兩個目錄 D: ...
  • 進入系統安裝的第一個界面,開始系統的安裝操作。每一步的操作,左下角都會提示操作方式!! 1.選擇系統語言-English 2.選擇操作-Install Ubuntu Server 3.選擇安裝過程和系統的預設語言-English 4.選擇區域-other 5.選擇亞洲-Asia 6.選擇國家-Chi ...
  • 1、ubuntu網卡配置 1、查看網卡名稱 2、進行編輯網卡配置文件 更改網卡配置文件添加內容修改內容如下:下麵的enp0s3需要改成自己的網卡名稱。ip地址,掩碼網關自己合理配置就行了。記得wq!保存退出。 3、重啟網卡 4、ping百度測試 2、ubuntu安裝ssh服務 1、查看是否開啟ssh ...
  • 在實際的資料庫Sqlserver的運維的過程中,很多時候我們需要做到數據的備份操作,可以做到定時備份,也可以進行手動資料庫備份。在實際的過程中,有時候因業務需要備份出完整資料庫,而有時候又因為實際業務只需要影響到一張表或者幾張表,備份整個資料庫未必是最優的方案,此時可採用生成腳本或者Select I ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...