一、介面回調+自定義分區 1.介面回調:在使用消費者的send方法時添加Callback回調 註意:在自定義分區後,你的消費者會收不到消息,因為消費者預設接收的分區為0。 二、攔截器 1)創建生產者類; 2)創建自定義攔截器類實現ProducerInterceptor介面,重寫抽象方法; 3)在業務 ...
一、介面回調+自定義分區
1.介面回調:在使用消費者的send方法時添加Callback回調
producer.send(new ProducerRecord<String, String>("xinnian", "20" + i + "年新年好!"), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata!=null){
System.out.println(recordMetadata.topic()+"-----"+recordMetadata.offset()+"-----"+recordMetadata.partition());
}
}
2.自定義分區:定義類實現Patitioner介面,實現介面的方法:
設置configure、分區邏輯partition(return 1;)、釋放資源close、在生產者的配置過程中添加入分區屬性。
在定義生產者屬性時添加分區的屬性即可
/** * @author: PrincessHug * @date: 2019/2/28, 16:24 * @Blog: https://www.cnblogs.com/HelloBigTable/ */ public class PartitionDemo implements Partitioner { public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { return 1; } public void close() { } public void configure(Map<String, ?> map) { } } public class ProducerDemo { public static void main(String[] args) { Properties prop = new Properties(); //參數配置 //kafka節點的地址 prop.put("bootstrap.servers", "192.168.126.128:9092"); //發送消息是否等待應答 prop.put("acks", "all"); //配置發送消息失敗重試 prop.put("retries", "0"); //配置批量處理消息大小 prop.put("batch.size", "10241"); //配置批量處理數據延遲 prop.put("linger.ms","5"); //配置記憶體緩衝大小 prop.put("buffer.memory", "12341235"); //消息在發送前必須序列化 prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("partitioner.class", "PartitionDemo"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop); for (int i=10;i<100;i++){ producer.send(new ProducerRecord<String, String>("xinnian", "20" + i + "年新年好!"), new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (recordMetadata!=null){ System.out.println(recordMetadata.topic()+"-----"+recordMetadata.offset()+"-----"+recordMetadata.partition()); } } }); } producer.close(); } }
註意:在自定義分區後,你的消費者會收不到消息,因為消費者預設接收的分區為0。
二、攔截器
1)創建生產者類;
2)創建自定義攔截器類實現ProducerInterceptor介面,重寫抽象方法;
3)在業務邏輯方法ProducerRecord方法中,修改返回值,
return new ProducerRecord<String,String>(
record.topic(),
record.partiiton(),
record.key(),
System.currentTimeMillis() + "-" + record.value() + "-" + record.topic());
4)在生產者類中將自定義攔截器生效
prop.put(ProducerConfig.INTERCEPTOR_CLASSEA_CONFIG,"com.wyh.com.wyh.kafka.interceptor.TimeInterceptor");
5)運行生產者main方法,或者在linux端用shell測試。
/** * @author: PrincessHug * @date: 2019/2/28, 20:59 * @Blog: https://www.cnblogs.com/HelloBigTable/ */ public class TimeInterceptor implements ProducerInterceptor<String, String> { //業務邏輯 public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { return new ProducerRecord<String,String>( producerRecord.topic(), producerRecord.partition(), producerRecord.key(), System.currentTimeMillis()+"--"+producerRecord.value() ); } //發送失敗調用 public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { } //釋放資源 public void close() { } //獲取配置信息 public void configure(Map<String, ?> map) { } } public class ItctorProducer { public static void main(String[] args) { //配置生產者屬性 Properties prop = new Properties(); //kafka節點的地址 prop.put("bootstrap.servers", "192.168.126.128:9092"); //發送消息是否等待應答 prop.put("acks", "all"); //配置發送消息失敗重試 prop.put("retries", "0"); //配置批量處理消息大小 prop.put("batch.size", "1024"); //配置批量處理數據延遲 prop.put("linger.ms","5"); //配置記憶體緩衝大小 prop.put("buffer.memory", "12341235"); //消息在發送前必須序列化 prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //添加攔截器 ArrayList<String> inList = new ArrayList<String>(); inList.add("interceptor.TimeInterceptor"); prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,inList); //實例化producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop); //發送消息 for (int i=0;i<99;i++){ producer.send(new ProducerRecord<String, String>("xinnian","You are genius!"+i)); } //釋放資源 producer.close(); } }