聲明:這是在windows10上進行kafka_2.13demo搭建時的過程記錄,提供給同學們參考。 1.jdk先要裝一下。 2.先安裝zookeeper,這裡不贅述,貼一個鏈接 https://blog.csdn.net/ring300/article/details/80446918。記得測試一 ...
聲明:這是在windows10上進行kafka_2.13demo搭建時的過程記錄,提供給同學們參考。
1.jdk先要裝一下。
2.先安裝zookeeper,這裡不贅述,貼一個鏈接 https://blog.csdn.net/ring300/article/details/80446918。記得測試一下zookeeper是否正確安裝。
3.下載安裝kafka_2.13。在這裡下載https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/,並解壓到你覺得OK的目錄下。
自己安裝的kafka最好檢查一下配置文件中的參數(server.properties)。1.zookeeper.connect=localhost:2181 2.log.dirs=D:\\kafka_2.13-2.5.0\\kafka-logs (後面的地址就是放置日誌的地方,可以自己先在目錄下新建,可以看上~上一張圖)。
4.開始啟動服務。
這裡需要說明一下,不想cmd到文件目錄下的話,請在需要打開運行視窗的地方按住 shift 然後右鍵 在彈出的視窗上選擇 在此處打開powershell 。
4.1先啟動 zookeeper,在安裝目錄下的bin里直接點擊zkserver.cmd 啟動比較省事
4.2啟動kafka服務。
在kafka的安裝目錄下直接通過(shift 然後右鍵 在彈出的視窗上選擇 在此處打開powershell )打開powershell。然後輸入
bin\windows\kafka-server-start.bat config\server.properties
回車 就可以啟動服務。
4.3.創建一個topic 命名test(隨意點就行)(shift 然後右鍵 在彈出的視窗上選擇 在此處打開powershell )打開powershell,輸入下麵的命令 回車。
創建topic: bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
創建好之後,查看現有的topic: bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
4.4生產消息和消費消息。
打開shell 後 輸入 bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test 生產消息
打開shell後輸入 bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning 消費消息
(稍微等個幾秒鐘,有點慢)
到此就已經結束了整個test的 測試工作,接下來我們用java代碼調一下這裡的服務。
pom : "<dependencies></dependencies>"已經有了的話就去掉。
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
</dependencies>
1.生產
package com.test.kfserver;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class Producer {
public static String topic = "duanjt_test";//定義主題
public static void main(String[] args) throws InterruptedException {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");//kafka地址,多個地址用逗號分割
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
try {
while (true) {
String msg = "Hello," + new Random().nextInt(100);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
kafkaProducer.send(record);
System.out.println("消息發送成功:" + msg);
Thread.sleep(500);
}
} finally {
kafkaProducer.close();
}
}
}
2. 消費
package com.test.kfserver;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class Consumer {
public static void main(String[] args) {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));// 訂閱消息
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
record.topic(), record.offset(), record.value()));
}
}
}
}
到這裡我們的測試就算高一段落了,其實最新版的kafka里已經自帶了zk 但是,如果用自己的zk ,只要是新版的kafka 就會報錯
zookeeper is not a recognized option
意思就是沒有zookeeper
這個參數
參考:https://www.cnblogs.com/duanjt/p/10132116.html