針對kafka_2.13版本測試過程中的一些坑

来源:https://www.cnblogs.com/volatile0509/archive/2020/05/23/12944163.html
-Advertisement-
Play Games

聲明:這是在windows10上進行kafka_2.13demo搭建時的過程記錄,提供給同學們參考。 1.jdk先要裝一下。 2.先安裝zookeeper,這裡不贅述,貼一個鏈接 https://blog.csdn.net/ring300/article/details/80446918。記得測試一 ...


聲明:這是在windows10上進行kafka_2.13demo搭建時的過程記錄,提供給同學們參考。

1.jdk先要裝一下。

2.先安裝zookeeper,這裡不贅述,貼一個鏈接  https://blog.csdn.net/ring300/article/details/80446918。記得測試一下zookeeper是否正確安裝。

3.下載安裝kafka_2.13。在這裡下載https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/,並解壓到你覺得OK的目錄下。

 

 自己安裝的kafka最好檢查一下配置文件中的參數(server.properties)。1.zookeeper.connect=localhost:2181       2.log.dirs=D:\\kafka_2.13-2.5.0\\kafka-logs (後面的地址就是放置日誌的地方,可以自己先在目錄下新建,可以看上~上一張圖)。

 

4.開始啟動服務。

這裡需要說明一下,不想cmd到文件目錄下的話,請在需要打開運行視窗的地方按住 shift 然後右鍵 在彈出的視窗上選擇 在此處打開powershell 。

4.1先啟動 zookeeper,在安裝目錄下的bin里直接點擊zkserver.cmd 啟動比較省事

 

 

 4.2啟動kafka服務。

在kafka的安裝目錄下直接通過(shift 然後右鍵 在彈出的視窗上選擇 在此處打開powershell )打開powershell。然後輸入   

bin\windows\kafka-server-start.bat config\server.properties

     回車   就可以啟動服務。

 

4.3.創建一個topic   命名test(隨意點就行)(shift 然後右鍵 在彈出的視窗上選擇 在此處打開powershell )打開powershell,輸入下麵的命令 回車。

創建topic:      bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

創建好之後,查看現有的topic:    bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

 

 

 4.4生產消息和消費消息。

打開shell 後 輸入      bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test    生產消息

 

打開shell後輸入       bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning    消費消息

 

 

 (稍微等個幾秒鐘,有點慢)  

到此就已經結束了整個test的 測試工作,接下來我們用java代碼調一下這裡的服務。

pom :   "<dependencies></dependencies>"已經有了的話就去掉。

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
</dependencies>

1.生產

package com.test.kfserver;

import java.util.Properties;
import java.util.Random;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class Producer {
public static String topic = "duanjt_test";//定義主題

public static void main(String[] args) throws InterruptedException {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");//kafka地址,多個地址用逗號分割
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);

try {
while (true) {
String msg = "Hello," + new Random().nextInt(100);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
kafkaProducer.send(record);
System.out.println("消息發送成功:" + msg);
Thread.sleep(500);
}
} finally {
kafkaProducer.close();
}

}
}

2. 消費
package com.test.kfserver;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class Consumer {

public static void main(String[] args) {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));// 訂閱消息

while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
record.topic(), record.offset(), record.value()));
}
}
}

}


到這裡我們的測試就算高一段落了,其實最新版的kafka里已經自帶了zk 但是,如果用自己的zk ,只要是新版的kafka 就會報錯

zookeeper is not a recognized option  

意思就是沒有zookeeper這個參數   

 

參考:https://www.cnblogs.com/duanjt/p/10132116.html


 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 狀態模式: 根據狀態決定動作 當動作一定但是狀態可擴展適合使用狀態模式 當動作變化不適合 當狀態不會擴展也沒有必要使用使用傳統的switch即可。 先看一個沒有使用狀態模式的例子: package com.srr.dp.state; /** * 我的女朋友有很多種狀態 * 當給我的女朋友添加新的狀態 ...
  • Spring Boot 教程 swagger ui 1. 什麼是Swagger? Swagger™的目標是為REST APIs 定義一個標準的,與語言無關的介面,使人和電腦在看不到源碼或者看不到文檔或者不能通過網路流量檢測的情況下能發現和理解各種服務的功能。當服務通過Swagger定義,消費者就能 ...
  • Spring Boot 教程 RESTful 1. RESTful風格 1.1 簡介與特點 RESTful是一種網路應用程式的設計風格和開發方式,基於 "HTTP" ,可以使用 "XML" 格式定義或 "JSON" 格式定義。RESTful適用於移動互聯網廠商作為業務使能介面的場景,實現第三方 "O ...
  • 眾所周知,升級某個庫(假設為 xxx),可以用 命令,或者簡寫成 。 如果有多個庫,可以依次寫在 xxx 後面,以空格間隔。那麼,如何簡單優雅地批量更新系統中全部已安裝的庫呢? 接下來我們直奔主題,帶大家學習幾種方法/騷操作吧! 方法一:pip list 結合 Linux 命令 命令可以查詢已安裝的 ...
  • Redis持久化過程一直是影響redis性能的常見因素,如何監控持久化以及如何優化持久化過程呢?下麵我們就一起來看看吧。 fork的監控及優化 不管是使用哪種持久化,RDB持久化或AOF重寫,主進程都會fork出一個子進程,在子進程里完成rdb文件的生成或aof的重寫。fork操作對於操作系統來說屬 ...
  • Spring MVC的執行流程 一、名詞解釋 1、前端控制器(DispatcherServlet) 接收請求,響應結果,相當於轉發器,中央處理器 2、處理器映射器(HandlerMapping) 根據請求的url查找Handler(處理器/Controller) 可以通過XML和註解方式實現映射。 ...
  • 這是一個群友分享出來的一次阿裡巴巴面試題 ,介紹一下,渣渣雙非本,想要隨緣求offer,他說他知道自己的菜,沒想到還面過了,所以我就找他要了這次的面試經歷,來告訴大家,不要覺得自己菜,就不敢去大廠面試,可能就是因為你的菜而收到offer了。 ...
  • 概念內部狀態、外部狀態、享元池角色 & UMLDemo: 編輯器圖片重用 - JavaReference概念享元模式(Flyweight Pattern),是以 共用 的方式,對 大量細粒度對象 重用,來減少記憶體的使用(避免大量重覆地創建、銷毀對象)。名稱中的Flyweight,是搏擊比賽中體重級別... ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...