針對kafka_2.13版本測試過程中的一些坑

来源:https://www.cnblogs.com/volatile0509/archive/2020/05/23/12944163.html
-Advertisement-
Play Games

聲明:這是在windows10上進行kafka_2.13demo搭建時的過程記錄,提供給同學們參考。 1.jdk先要裝一下。 2.先安裝zookeeper,這裡不贅述,貼一個鏈接 https://blog.csdn.net/ring300/article/details/80446918。記得測試一 ...


聲明:這是在windows10上進行kafka_2.13demo搭建時的過程記錄,提供給同學們參考。

1.jdk先要裝一下。

2.先安裝zookeeper,這裡不贅述,貼一個鏈接  https://blog.csdn.net/ring300/article/details/80446918。記得測試一下zookeeper是否正確安裝。

3.下載安裝kafka_2.13。在這裡下載https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/,並解壓到你覺得OK的目錄下。

 

 自己安裝的kafka最好檢查一下配置文件中的參數(server.properties)。1.zookeeper.connect=localhost:2181       2.log.dirs=D:\\kafka_2.13-2.5.0\\kafka-logs (後面的地址就是放置日誌的地方,可以自己先在目錄下新建,可以看上~上一張圖)。

 

4.開始啟動服務。

這裡需要說明一下,不想cmd到文件目錄下的話,請在需要打開運行視窗的地方按住 shift 然後右鍵 在彈出的視窗上選擇 在此處打開powershell 。

4.1先啟動 zookeeper,在安裝目錄下的bin里直接點擊zkserver.cmd 啟動比較省事

 

 

 4.2啟動kafka服務。

在kafka的安裝目錄下直接通過(shift 然後右鍵 在彈出的視窗上選擇 在此處打開powershell )打開powershell。然後輸入   

bin\windows\kafka-server-start.bat config\server.properties

     回車   就可以啟動服務。

 

4.3.創建一個topic   命名test(隨意點就行)(shift 然後右鍵 在彈出的視窗上選擇 在此處打開powershell )打開powershell,輸入下麵的命令 回車。

創建topic:      bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

創建好之後,查看現有的topic:    bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

 

 

 4.4生產消息和消費消息。

打開shell 後 輸入      bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test    生產消息

 

打開shell後輸入       bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning    消費消息

 

 

 (稍微等個幾秒鐘,有點慢)  

到此就已經結束了整個test的 測試工作,接下來我們用java代碼調一下這裡的服務。

pom :   "<dependencies></dependencies>"已經有了的話就去掉。

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
</dependencies>

1.生產

package com.test.kfserver;

import java.util.Properties;
import java.util.Random;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class Producer {
public static String topic = "duanjt_test";//定義主題

public static void main(String[] args) throws InterruptedException {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");//kafka地址,多個地址用逗號分割
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);

try {
while (true) {
String msg = "Hello," + new Random().nextInt(100);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
kafkaProducer.send(record);
System.out.println("消息發送成功:" + msg);
Thread.sleep(500);
}
} finally {
kafkaProducer.close();
}

}
}

2. 消費
package com.test.kfserver;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class Consumer {

public static void main(String[] args) {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));// 訂閱消息

while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
record.topic(), record.offset(), record.value()));
}
}
}

}


到這裡我們的測試就算高一段落了,其實最新版的kafka里已經自帶了zk 但是,如果用自己的zk ,只要是新版的kafka 就會報錯

zookeeper is not a recognized option  

意思就是沒有zookeeper這個參數   

 

參考:https://www.cnblogs.com/duanjt/p/10132116.html


 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 狀態模式: 根據狀態決定動作 當動作一定但是狀態可擴展適合使用狀態模式 當動作變化不適合 當狀態不會擴展也沒有必要使用使用傳統的switch即可。 先看一個沒有使用狀態模式的例子: package com.srr.dp.state; /** * 我的女朋友有很多種狀態 * 當給我的女朋友添加新的狀態 ...
  • Spring Boot 教程 swagger ui 1. 什麼是Swagger? Swagger™的目標是為REST APIs 定義一個標準的,與語言無關的介面,使人和電腦在看不到源碼或者看不到文檔或者不能通過網路流量檢測的情況下能發現和理解各種服務的功能。當服務通過Swagger定義,消費者就能 ...
  • Spring Boot 教程 RESTful 1. RESTful風格 1.1 簡介與特點 RESTful是一種網路應用程式的設計風格和開發方式,基於 "HTTP" ,可以使用 "XML" 格式定義或 "JSON" 格式定義。RESTful適用於移動互聯網廠商作為業務使能介面的場景,實現第三方 "O ...
  • 眾所周知,升級某個庫(假設為 xxx),可以用 命令,或者簡寫成 。 如果有多個庫,可以依次寫在 xxx 後面,以空格間隔。那麼,如何簡單優雅地批量更新系統中全部已安裝的庫呢? 接下來我們直奔主題,帶大家學習幾種方法/騷操作吧! 方法一:pip list 結合 Linux 命令 命令可以查詢已安裝的 ...
  • Redis持久化過程一直是影響redis性能的常見因素,如何監控持久化以及如何優化持久化過程呢?下麵我們就一起來看看吧。 fork的監控及優化 不管是使用哪種持久化,RDB持久化或AOF重寫,主進程都會fork出一個子進程,在子進程里完成rdb文件的生成或aof的重寫。fork操作對於操作系統來說屬 ...
  • Spring MVC的執行流程 一、名詞解釋 1、前端控制器(DispatcherServlet) 接收請求,響應結果,相當於轉發器,中央處理器 2、處理器映射器(HandlerMapping) 根據請求的url查找Handler(處理器/Controller) 可以通過XML和註解方式實現映射。 ...
  • 這是一個群友分享出來的一次阿裡巴巴面試題 ,介紹一下,渣渣雙非本,想要隨緣求offer,他說他知道自己的菜,沒想到還面過了,所以我就找他要了這次的面試經歷,來告訴大家,不要覺得自己菜,就不敢去大廠面試,可能就是因為你的菜而收到offer了。 ...
  • 概念內部狀態、外部狀態、享元池角色 & UMLDemo: 編輯器圖片重用 - JavaReference概念享元模式(Flyweight Pattern),是以 共用 的方式,對 大量細粒度對象 重用,來減少記憶體的使用(避免大量重覆地創建、銷毀對象)。名稱中的Flyweight,是搏擊比賽中體重級別... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...