一、kafka環境配置 1.jdk安裝 安裝文件:http://www.oracle.com/technetwork/java/javase/downloads/index.html 下載JDK安裝完成後需要添加以下的環境變數(右鍵點擊“我的電腦” -> "高級系統設置" -> "環境變數" ): ...
一、kafka環境配置
1.jdk安裝
安裝文件:http://www.oracle.com/technetwork/java/javase/downloads/index.html 下載JDK
安裝完成後需要添加以下的環境變數(右鍵點擊“我的電腦” -> "高級系統設置" -> "環境變數" ):
JAVA_HOME: C:\Program Files\Java\jdk-13.0.1(jdk的安裝路徑)
Path: 現有值後追加 "%JAVA_HOME%\bin"
2.zookeeper安裝
Kafka的運行依賴於Zookeeper,所以在運行Kafka之前我們需要安裝並運行Zookeeper
下載安裝文件: http://zookeeper.apache.org/releases.html
解壓文件 apache-zookeeper-3.5.6-bin.tar
打開zookeeper-3.5.6\conf,把zoo_sample.cfg重命名成zoo.cfg 從文本編輯器里打開zoo.cfg, 把dataDir的值改成“./apache-zookeeper-3.5.6/data” 添加如下系統變數:ZOOKEEPER_HOME: C:\Users\Yc\work\apache-zookeeper-3.5.6 (zookeeper目錄)
Path: 在現有的值後面添加 ";%ZOOKEEPER_HOME%\bin;"
運行Zookeeper: 打開cmd然後執行 zkserver
3.安裝並運行kafka
下載安裝文件: http://kafka.apache.org/downloads.html 解壓文件 打開kafka_2.12-2.3.0\config 從文本編輯器里打開 server.properties 修改:log.dirs=./logs listeners=PLAINTEXT://localhost:9092 打開cmd 執行命令:C:\Users\Yc>cd C:\Users\Yc\work\kafka_2.12-2.3.0(進入此目錄中) 再執行:.\bin\windows\kafka-server-start.bat .\config\server.properties
4.創建Topics
cmd執行命令:cd C:\Users\Yc\work\kafka_2.12-2.3.0\bin\windows
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
執行成功後出現 :Created topic test
5.生產者使用測試
打開cmd視窗執行命令:cd C:\Users\Yc\work\kafka_2.12-2.3.0\bin\windows
kafka-console-producer.bat --broker-list localhost:9092 --topic test
6.消費者使用測試
打開cmd視窗執行命令:cd C:\Users\Yc\work\kafka_2.12-2.3.0\bin\windows
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
7..net程式消費者簡單使用
引入Confluent.kafka包
public static void Main(string[] args) { //地址及埠號 var conf = new ProducerConfig { BootstrapServers = "localhost:9092" }; Action<DeliveryReport<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}"); using (var p = new ProducerBuilder<Null, string>(conf).Build()) { //for (int i = 0; i < 100; ++i) //{ p.Produce("test", new Message<Null, string> { Value = "messagehowsf"}, handler);//kafka協議數據發送 //} // wait for up to 10 seconds for any inflight messages to be delivered. p.Flush(TimeSpan.FromSeconds(10)); } }