針對kafka_2.13版本測試過程中的一些坑

来源:https://www.cnblogs.com/volatile0509/archive/2020/05/23/12944163.html

聲明:這是在windows10上進行kafka_2.13demo搭建時的過程記錄,提供給同學們參考。 1.jdk先要裝一下。 2.先安裝zookeeper,這裡不贅述,貼一個鏈接 https://blog.csdn.net/ring300/article/details/80446918。記得測試一 ...


聲明:這是在windows10上進行kafka_2.13demo搭建時的過程記錄,提供給同學們參考。

1.jdk先要裝一下。

2.先安裝zookeeper,這裡不贅述,貼一個鏈接  https://blog.csdn.net/ring300/article/details/80446918。記得測試一下zookeeper是否正確安裝。

3.下載安裝kafka_2.13。在這裡下載https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/,並解壓到你覺得OK的目錄下。

 

 自己安裝的kafka最好檢查一下配置文件中的參數(server.properties)。1.zookeeper.connect=localhost:2181       2.log.dirs=D:\\kafka_2.13-2.5.0\\kafka-logs (後面的地址就是放置日誌的地方,可以自己先在目錄下新建,可以看上~上一張圖)。

 

4.開始啟動服務。

這裡需要說明一下,不想cmd到文件目錄下的話,請在需要打開運行視窗的地方按住 shift 然後右鍵 在彈出的視窗上選擇 在此處打開powershell 。

4.1先啟動 zookeeper,在安裝目錄下的bin里直接點擊zkserver.cmd 啟動比較省事

 

 

 4.2啟動kafka服務。

在kafka的安裝目錄下直接通過(shift 然後右鍵 在彈出的視窗上選擇 在此處打開powershell )打開powershell。然後輸入   

bin\windows\kafka-server-start.bat config\server.properties

     回車   就可以啟動服務。

 

4.3.創建一個topic   命名test(隨意點就行)(shift 然後右鍵 在彈出的視窗上選擇 在此處打開powershell )打開powershell,輸入下麵的命令 回車。

創建topic:      bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

創建好之後,查看現有的topic:    bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

 

 

 4.4生產消息和消費消息。

打開shell 後 輸入      bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test    生產消息

 

打開shell後輸入       bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning    消費消息

 

 

 (稍微等個幾秒鐘,有點慢)  

到此就已經結束了整個test的 測試工作,接下來我們用java代碼調一下這裡的服務。

pom :   "<dependencies></dependencies>"已經有了的話就去掉。

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
</dependencies>

1.生產

package com.test.kfserver;

import java.util.Properties;
import java.util.Random;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class Producer {
public static String topic = "duanjt_test";//定義主題

public static void main(String[] args) throws InterruptedException {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");//kafka地址,多個地址用逗號分割
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);

try {
while (true) {
String msg = "Hello," + new Random().nextInt(100);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
kafkaProducer.send(record);
System.out.println("消息發送成功:" + msg);
Thread.sleep(500);
}
} finally {
kafkaProducer.close();
}

}
}

2. 消費
package com.test.kfserver;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class Consumer {

public static void main(String[] args) {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));// 訂閱消息

while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
record.topic(), record.offset(), record.value()));
}
}
}

}


到這裡我們的測試就算高一段落了,其實最新版的kafka里已經自帶了zk 但是,如果用自己的zk ,只要是新版的kafka 就會報錯

zookeeper is not a recognized option  

意思就是沒有zookeeper這個參數   

 

參考:https://www.cnblogs.com/duanjt/p/10132116.html


 


您的分享是我們最大的動力!

更多相關文章
  • 狀態模式: 根據狀態決定動作 當動作一定但是狀態可擴展適合使用狀態模式 當動作變化不適合 當狀態不會擴展也沒有必要使用使用傳統的switch即可。 先看一個沒有使用狀態模式的例子: package com.srr.dp.state; /** * 我的女朋友有很多種狀態 * 當給我的女朋友添加新的狀態 ...
  • Spring Boot 教程 swagger ui 1. 什麼是Swagger? Swagger™的目標是為REST APIs 定義一個標準的,與語言無關的介面,使人和電腦在看不到源碼或者看不到文檔或者不能通過網路流量檢測的情況下能發現和理解各種服務的功能。當服務通過Swagger定義,消費者就能 ...
  • Spring Boot 教程 RESTful 1. RESTful風格 1.1 簡介與特點 RESTful是一種網路應用程式的設計風格和開發方式,基於 "HTTP" ,可以使用 "XML" 格式定義或 "JSON" 格式定義。RESTful適用於移動互聯網廠商作為業務使能介面的場景,實現第三方 "O ...
  • 眾所周知,升級某個庫(假設為 xxx),可以用 命令,或者簡寫成 。 如果有多個庫,可以依次寫在 xxx 後面,以空格間隔。那麼,如何簡單優雅地批量更新系統中全部已安裝的庫呢? 接下來我們直奔主題,帶大家學習幾種方法/騷操作吧! 方法一:pip list 結合 Linux 命令 命令可以查詢已安裝的 ...
  • Redis持久化過程一直是影響redis性能的常見因素,如何監控持久化以及如何優化持久化過程呢?下麵我們就一起來看看吧。 fork的監控及優化 不管是使用哪種持久化,RDB持久化或AOF重寫,主進程都會fork出一個子進程,在子進程里完成rdb文件的生成或aof的重寫。fork操作對於操作系統來說屬 ...
  • Spring MVC的執行流程 一、名詞解釋 1、前端控制器(DispatcherServlet) 接收請求,響應結果,相當於轉發器,中央處理器 2、處理器映射器(HandlerMapping) 根據請求的url查找Handler(處理器/Controller) 可以通過XML和註解方式實現映射。 ...
  • 這是一個群友分享出來的一次阿裡巴巴面試題 ,介紹一下,渣渣雙非本,想要隨緣求offer,他說他知道自己的菜,沒想到還面過了,所以我就找他要了這次的面試經歷,來告訴大家,不要覺得自己菜,就不敢去大廠面試,可能就是因為你的菜而收到offer了。 ...
  • 概念內部狀態、外部狀態、享元池角色 & UMLDemo: 編輯器圖片重用 - JavaReference概念享元模式(Flyweight Pattern),是以 共用 的方式,對 大量細粒度對象 重用,來減少記憶體的使用(避免大量重覆地創建、銷毀對象)。名稱中的Flyweight,是搏擊比賽中體重級別... ...
一周排行
  • 一:背景 1. 講故事 曾今在項目中發現有同事自定義結構體的時候,居然沒有重寫Equals方法,比如下麵這段代碼: static void Main(string[] args) { var list = Enumerable.Range(0, 1000).Select(m => new Point ...
  • 最近一個朋友有個關於素數的小東西要寫一下,素數是什麼呢?除了1和他本身不能被其他數整除,那麼這個數就是素數,1除外哦。我們知道概念那就很簡單了,直接代碼擼起。 ...
  • 前言 在開發編程中,我們經常會遇到功能非常相似的功能模塊,只是他們的處理的數據不一樣,所以我們會分別採用多個方法來處理不同的數據類型。但是這個時候,我們就會想一個問題,有沒有辦法實現利用同一個方法來傳遞不同種類型的參數呢? 這個時候,泛型也就因運而生,專門來解決這個問題的。 泛型是在C 2.0就推出 ...
  • 本文章主要用於介紹在Asp.Net Mvc(C#)中使用Fleck製作一個Html5的即時聊天室,含有完整代碼和演示Demo。 ...
  • 出庫單的功能。能學習了出庫單管理之後,WMS的 主體功能算是完成了。當然一個成熟的WMS還包括了盤點,報表,策略規則,移庫功能及與其他系統(ERP、TMS等)的介面,實現無縫集成,打破信息孤島,讓數據實時、準確和同步。 ...
  • Data StructureThere're two types of variables in C#, reference type and value type.Enum:enum Color{Red=0,Green=1}//equals to enum Color{Red,//start fr... ...
  • 0. 前言 該項目使用Maven進行管理和構建,所以需要預先配置好Maven。嗯,在這個系列里就不做過多的介紹了。 1. 創建項目 先創建一個pom.xml 文件,添加以下內容: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http: ...
  • API 概述 API(Application Programming Interface),應用程式編程介面。 Java API是一本程式員的 字典 ,是JDK中提供給我們使用的類的說明文檔。 這些類將底層的代碼實現封裝了起來,我們不需要關心這些類是如何實現的,只需要學習這些類如何使用即可。 所以我 ...
  • 女程式員是這麼徵婚的: SELECT * FROM 男人們 WHERE 未婚=true and 同性戀=false and 有房=true and 有車=true and 條件 in (帥氣,紳士,大度,氣質,智慧,溫柔,體貼,會浪漫,活潑,可愛,最好還能帶孩子) and 年齡 between(24 ...
  • 有很多剛學習軟體測試的小伙伴,都會在網路上找尋各種學習資料,去提升自己的專業技能水平。因此,我決定定期分享我整理收集的一些軟體測試的測試工具下載、面試寶典、視頻教學合集。都整理好了,有需要的可以關註我(獲取方式在文末) 軟體測試的學習,不止是基礎理論,還需要學習測試工具的用法,如介面工具Postma ...