Kafka的介面回調 +自定義分區、攔截器

来源:https://www.cnblogs.com/HelloBigTable/archive/2019/02/28/10453884.html
-Advertisement-
Play Games

一、介面回調+自定義分區 1.介面回調:在使用消費者的send方法時添加Callback回調 註意:在自定義分區後,你的消費者會收不到消息,因為消費者預設接收的分區為0。 二、攔截器 1)創建生產者類; 2)創建自定義攔截器類實現ProducerInterceptor介面,重寫抽象方法; 3)在業務 ...


一、介面回調+自定義分區

  1.介面回調:在使用消費者的send方法時添加Callback回調

 

producer.send(new ProducerRecord<String, String>("xinnian", "20" + i + "年新年好!"), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata!=null){
System.out.println(recordMetadata.topic()+"-----"+recordMetadata.offset()+"-----"+recordMetadata.partition());
}
}
 2.自定義分區:定義類實現Patitioner介面,實現介面的方法:
   設置configure、分區邏輯partition(return 1;)、釋放資源close、在生產者的配置過程中添加入分區屬性。
 在定義生產者屬性時添加分區的屬性即可
/**
 * @author: PrincessHug
 * @date: 2019/2/28, 16:24
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class PartitionDemo implements Partitioner {
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        return 1;
    }

    public void close() {

    }

    public void configure(Map<String, ?> map) {

    }
}

public class ProducerDemo {
    public static void main(String[] args) {
        Properties prop = new Properties();

        //參數配置
        //kafka節點的地址
        prop.put("bootstrap.servers", "192.168.126.128:9092");
        //發送消息是否等待應答
        prop.put("acks", "all");
        //配置發送消息失敗重試
        prop.put("retries", "0");
        //配置批量處理消息大小
        prop.put("batch.size", "10241");
        //配置批量處理數據延遲
        prop.put("linger.ms","5");
        //配置記憶體緩衝大小
        prop.put("buffer.memory", "12341235");
        //消息在發送前必須序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("partitioner.class", "PartitionDemo");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

        for (int i=10;i<100;i++){
            producer.send(new ProducerRecord<String, String>("xinnian", "20" + i + "年新年好!"), new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (recordMetadata!=null){
                        System.out.println(recordMetadata.topic()+"-----"+recordMetadata.offset()+"-----"+recordMetadata.partition());
                    }
                }
            });
        }
        producer.close();
    }
}

  註意:在自定義分區後,你的消費者會收不到消息,因為消費者預設接收的分區為0。

 

二、攔截器

  1)創建生產者類;
     2)創建自定義攔截器類實現ProducerInterceptor介面,重寫抽象方法;
     3)在業務邏輯方法ProducerRecord方法中,修改返回值,
        return new ProducerRecord<String,String>(
        record.topic(),
        record.partiiton(),
        record.key(),
        System.currentTimeMillis() + "-" + record.value() + "-" + record.topic());
     4)在生產者類中將自定義攔截器生效
       prop.put(ProducerConfig.INTERCEPTOR_CLASSEA_CONFIG,"com.wyh.com.wyh.kafka.interceptor.TimeInterceptor");
     5)運行生產者main方法,或者在linux端用shell測試。

/**
 * @author: PrincessHug
 * @date: 2019/2/28, 20:59
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class TimeInterceptor implements ProducerInterceptor<String, String> {

    //業務邏輯
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        return new ProducerRecord<String,String>(
                producerRecord.topic(),
                producerRecord.partition(),
                producerRecord.key(),
                System.currentTimeMillis()+"--"+producerRecord.value()
        );
    }

    //發送失敗調用
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    //釋放資源
    public void close() {

    }

    //獲取配置信息
    public void configure(Map<String, ?> map) {

    }
}

public class ItctorProducer {
    public static void main(String[] args) {
        //配置生產者屬性
        Properties prop = new Properties();
        //kafka節點的地址
        prop.put("bootstrap.servers", "192.168.126.128:9092");
        //發送消息是否等待應答
        prop.put("acks", "all");
        //配置發送消息失敗重試
        prop.put("retries", "0");
        //配置批量處理消息大小
        prop.put("batch.size", "1024");
        //配置批量處理數據延遲
        prop.put("linger.ms","5");
        //配置記憶體緩衝大小
        prop.put("buffer.memory", "12341235");
        //消息在發送前必須序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //添加攔截器
        ArrayList<String> inList = new ArrayList<String>();
        inList.add("interceptor.TimeInterceptor");
        prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,inList);

        //實例化producer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

        //發送消息
        for (int i=0;i<99;i++){
            producer.send(new ProducerRecord<String, String>("xinnian","You are genius!"+i));
        }

        //釋放資源
        producer.close();
        
    }
}

 


  

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在WSUS下從Win10 1803更新到1809的問題解決。 ...
  • 決心書 我叫劉騰達,來自河北省衡水,大學實習期,學的網路專業,有接觸過Liunx課程,但畢業後工作了一段時間後還是選擇走培訓加深自己的專業知識,藉此進入互聯網行業,經過多方打聽和瞭解,來到了口碑很好的老男孩,在這裡希望自己能完成一個質的蛻變,達到自己的目標,希望這五個月老師能嚴格監督並督促我完成自己 ...
  • 我們一般平時安裝完WarIII後運行時的解析度預設是800*600,導致有黑邊的存在。所以我寫了一個bat腳本來自定義WarIII的運行解析度。需要以管理員身份運行。 下載鏈接: 鏈接:https://pan.baidu.com/s/1v-ZWjkhkVQTaXunEJs8ThQ 提取碼:m17p ...
  • 一 Ceph文件系統 1.1 概述 Ceph 對象網關是一個構建在 librados 之上的對象存儲介面,它為應用程式訪問Ceph 存儲集群提供了一個 RESTful 風格的網關 。 Ceph 對象存儲支持 2 種介面: 相容S3: 提供了對象存儲介面,相容亞馬遜S3 RESTful介面的一個大子集 ...
  • 1.打開cmd命令行,查看當前配置 輸入 npm config ls 先看一下當前npm的配置環境,由於我已經修改過,所以可以看到修改後的路徑 2.修改路徑 這裡需要修改兩個路徑,module路徑和cache路徑 module對應prefix cache對應cache 首先在別的盤新建兩個目錄 D: ...
  • 進入系統安裝的第一個界面,開始系統的安裝操作。每一步的操作,左下角都會提示操作方式!! 1.選擇系統語言-English 2.選擇操作-Install Ubuntu Server 3.選擇安裝過程和系統的預設語言-English 4.選擇區域-other 5.選擇亞洲-Asia 6.選擇國家-Chi ...
  • 1、ubuntu網卡配置 1、查看網卡名稱 2、進行編輯網卡配置文件 更改網卡配置文件添加內容修改內容如下:下麵的enp0s3需要改成自己的網卡名稱。ip地址,掩碼網關自己合理配置就行了。記得wq!保存退出。 3、重啟網卡 4、ping百度測試 2、ubuntu安裝ssh服務 1、查看是否開啟ssh ...
  • 在實際的資料庫Sqlserver的運維的過程中,很多時候我們需要做到數據的備份操作,可以做到定時備份,也可以進行手動資料庫備份。在實際的過程中,有時候因業務需要備份出完整資料庫,而有時候又因為實際業務只需要影響到一張表或者幾張表,備份整個資料庫未必是最優的方案,此時可採用生成腳本或者Select I ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...