Kafka 2.3 Producer (0.9以後版本適用)

来源:https://www.cnblogs.com/tree1123/archive/2019/08/21/11386944.html
-Advertisement-
Play Games

kafka0.9版本以後用java重新編寫了producer,廢除了原來scala編寫的版本。 這裡直接使用最新2.3版本,0.9以後的版本都適用。 註意引用的包為:org.apache.kafka.clients.producer 0.11.0以後增加了事務,事務producer的示例代碼如下,需 ...


kafka0.9版本以後用java重新編寫了producer,廢除了原來scala編寫的版本。

這裡直接使用最新2.3版本,0.9以後的版本都適用。

註意引用的包為:org.apache.kafka.clients.producer

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDemo {

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        kafkaProducer.send(new ProducerRecord<>("topic", "value"));
        kafkaProducer.close();

    }
    
}

0.11.0以後增加了事務,事務producer的示例代碼如下,需要適用於0.11.0以後的版本:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class TransactionsProducerDemo {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("transactional.id", "my-transactional-id");
        Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

        producer.initTransactions();

        try {
            producer.beginTransaction();
            for (int i = 0; i < 100; i++)
                producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
            producer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            // We can't recover from these exceptions, so our only option is to close the producer and exit.
            producer.close();
        } catch (KafkaException e) {
            // For all other exceptions, just abort the transaction and try again.
            producer.abortTransaction();
        }
        producer.close();

    }
    
}

更多實時計算,Kafka等相關技術博文,歡迎關註實時流式計算


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 公司在Azure的Iaas虛擬機上部署有好幾台MySQL資料庫,至於沒有選擇Azure Database for MySQL,是因為預算有限(錢不夠啊!說多了也是淚,坑的還是DBA自己)。選擇了Iaas的話,DBA就必須考慮離線備份(offline backup),以預防災難性故障出現。我們選擇將歷... ...
  • 分享一篇關於實時流式計算的經典文章,這篇文章名為Streaming 101: The world beyond batch 那麼流計算如何超越批處理呢? 從這幾個方面說明:實時流計算系統,數據處理模式,還有大數據的未來。 一、實時流式計算系統 實時流式計算的意義: 1、企業渴望獲得更及時的數據,實時 ...
  • 所需補丁及高版本opatch上傳後將p6880880_112000_Linux-x86-64.zip解壓覆蓋$ORACLE_HOME/OPatch目錄即可[oracle@localhost OPatch]$ ./opatch versionOPatch Version: 11.2.0.3.16OPa... ...
  • 1.Redo Log The redo log is a disk-based data structure used during crash recovery to correct data written by incomplete transactions. During normal op ...
  • 轉載、節選於 https://dev.mysql.com/doc/refman/8.0/en/innodb-doublewrite-buffer.html The doublewrite buffer is a storage area located in the system tablespac ...
  • 概念介紹 CDH概覽 CDH是Apache Hadoop和相關項目的最完整、最受測試和最流行的發行版。CDH提供Hadoop的核心元素 可伸縮存儲和分散式計算 以及基於web的用戶界面和重要的企業功能。CDH是Apache許可的開放源碼,是唯一提供統一批處理、互動式SQL和互動式搜索以及基於角色的訪 ...
  • 本數據源來自 https://www.kafan.cn/edu/922556.html 目的為了備忘 把原來的sql server 2005直接裝成了2012,然後在建立鏈接伺服器鏈接一臺sql server 2000的伺服器時,報錯信息大概是“SQL Server Native Client 11 ...
  • 如果你還不知道redis的基本命令與基本使用方法,請看 【redis】redis基礎命令學習集合 緩存 redis還有另外一個重要的應用領域——緩存 引用來自網友的圖解釋緩存在架構中的位置 預設情況下,我們的服務架構如下圖,客戶端請求service,然後service去讀取mysql資料庫 問題存在 ...
一周排行
    -Advertisement-
    Play Games
  • Dapr Outbox 是1.12中的功能。 本文只介紹Dapr Outbox 執行流程,Dapr Outbox基本用法請閱讀官方文檔 。本文中appID=order-processor,topic=orders 本文前提知識:熟悉Dapr狀態管理、Dapr發佈訂閱和Outbox 模式。 Outbo ...
  • 引言 在前幾章我們深度講解了單元測試和集成測試的基礎知識,這一章我們來講解一下代碼覆蓋率,代碼覆蓋率是單元測試運行的度量值,覆蓋率通常以百分比表示,用於衡量代碼被測試覆蓋的程度,幫助開發人員評估測試用例的質量和代碼的健壯性。常見的覆蓋率包括語句覆蓋率(Line Coverage)、分支覆蓋率(Bra ...
  • 前言 本文介紹瞭如何使用S7.NET庫實現對西門子PLC DB塊數據的讀寫,記錄了使用電腦模擬,模擬PLC,自至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1.Windows環境下鏈路層網路訪問的行業標準工具(WinPcap_4_1_3.exe)下載鏈接:http ...
  • 從依賴倒置原則(Dependency Inversion Principle, DIP)到控制反轉(Inversion of Control, IoC)再到依賴註入(Dependency Injection, DI)的演進過程,我們可以理解為一種逐步抽象和解耦的設計思想。這種思想在C#等面向對象的編 ...
  • 關於Python中的私有屬性和私有方法 Python對於類的成員沒有嚴格的訪問控制限制,這與其他面相對對象語言有區別。關於私有屬性和私有方法,有如下要點: 1、通常我們約定,兩個下劃線開頭的屬性是私有的(private)。其他為公共的(public); 2、類內部可以訪問私有屬性(方法); 3、類外 ...
  • C++ 訪問說明符 訪問說明符是 C++ 中控制類成員(屬性和方法)可訪問性的關鍵字。它們用於封裝類數據並保護其免受意外修改或濫用。 三種訪問說明符: public:允許從類外部的任何地方訪問成員。 private:僅允許在類內部訪問成員。 protected:允許在類內部及其派生類中訪問成員。 示 ...
  • 寫這個隨筆說一下C++的static_cast和dynamic_cast用在子類與父類的指針轉換時的一些事宜。首先,【static_cast,dynamic_cast】【父類指針,子類指針】,兩兩一組,共有4種組合:用 static_cast 父類轉子類、用 static_cast 子類轉父類、使用 ...
  • /******************************************************************************************************** * * * 設計雙向鏈表的介面 * * * * Copyright (c) 2023-2 ...
  • 相信接觸過spring做開發的小伙伴們一定使用過@ComponentScan註解 @ComponentScan("com.wangm.lifecycle") public class AppConfig { } @ComponentScan指定basePackage,將包下的類按照一定規則註冊成Be ...
  • 操作系統 :CentOS 7.6_x64 opensips版本: 2.4.9 python版本:2.7.5 python作為腳本語言,使用起來很方便,查了下opensips的文檔,支持使用python腳本寫邏輯代碼。今天整理下CentOS7環境下opensips2.4.9的python模塊筆記及使用 ...