1.Kafka相關知識 Broker:即Kafka的伺服器,用戶存儲消息,Kafa集群中的一臺或多台伺服器統稱為broker。 Message消息:是通信的基本單位,每個 producer 可以向一個 topic(主題)發佈一些消息。 Kafka中的Message是以topic為基本單位組織的,不同 ...
1.Kafka相關知識
- Broker:即Kafka的伺服器,用戶存儲消息,Kafa集群中的一臺或多台伺服器統稱為broker。
- Message消息:是通信的基本單位,每個 producer 可以向一個 topic(主題)發佈一些消息。
-
- Kafka中的Message是以topic為基本單位組織的,不同的topic之間是相互獨立的。每個topic又可以分成幾個不同的partition(每個topic有幾個partition是在創建topic時指定的),每個partition存儲一部分Message。
- partition中的每條Message包含了以下三個屬性:Kafka基於文件存儲.通過分區,可以將日誌內容分散到多個server上,來避免文件尺寸達到單機磁碟的上限,每個partiton都會被當前server(kafka實例)保存。可以將一個topic切分多任意多個partitions,來消息保存/消費的效率。
- offset:消息唯一標識:對應類型:long
- MessageSize 對應類型:int32
- data 是message的具體內容。
- 越多的partitions意味著可以容納更多的consumer,有效提升併發消費的能力。
- Message:在Broker中通Log追加的方式進行持久化存儲。併進行分區(patitions)。
-
- 一個Topic可以認為是一類消息,每個topic將被分成多partition(區),每個partition在存儲層面是append log文件。任何發佈到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),partition是以文件的形式存儲在文件系統中。
- Logs文件根據broker中的配置要求,保留一定時間後刪除來釋放磁碟空間。
-
- Topic物理上的分組,一個 topic可以分為多個 partition,每個 partition 是一個有序的隊列。partition中的每條消息都會被分配一個有序的 id(offset)。
- 為實現稀疏存儲,我們通過給文件建索引,每隔一定位元組的數據建立一條索引
- 為了減少磁碟寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO調用的次數。
- Broker沒有副本機制,一旦broker宕機,該broker的消息將都不可用。Message消息是有多份的。
- consumer:消息和數據消費者,訂閱topics並處理其發佈的消息的過程叫做consumers。
-
- 在 kafka中,我們可以認為一個group是一個訂閱者,一個Topic中的每個partions,只會被一個訂閱者中的一個consumer消費,不過一個 consumer可以消費多個partitions中的消息(消費者數據小於Partions 的數量時)。註意:kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到消息。
- 一個partition中的消息只會被group中的一個consumer消息。每個group中consumer消息消費互相獨立。
- 無狀態導致消息的刪除成為難題(可能刪除的消息正在被訂閱),kafka採用基於時間的SLA(服務水平保證),消息保存一定時間(通常為7天)後會被刪除。
- 消息訂閱者可以rewind back到任意位置重新進行消費,當訂閱者故障時,可以選擇最小的offset(id)進行重新讀取消費消息。
2.kafka操作
2.1.查看有哪些主題:
kafka-topics.sh --list --zookeeper 192.168.0.201:12181
2.2.查看topic的詳細信息
kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1
2.3.為topic增加副本
kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute
2.4.創建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1
2.5為topic增加partition
bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testKJ1
2.6kafka生產者客戶端命令
kafka-console-producer.sh --broker-list localhost:9092 --topic testKJ1
2.7kafka消費者客戶端命令
kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1
2.8kafka服務啟動
kafka-server-start.sh -daemon ../config/server.properties
3..net core操作
producer端,引入Confluent.Kafka
Install-Package Confluent.Kafka -Version 1.0-beta2
using Confluent.Kafka; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace KafkaTest { class Program { static void Main(string[] args) { Test().Wait(); } static async Task Test() { var conf = new ProducerConfig { BootstrapServers = "39.**.**.**:9092" }; Action<DeliveryReportResult<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}"); using (var p = new Producer<Null, string>(conf)) { for (int i = 0; i < 100000; ++i) { p.BeginProduce("my-topic", new Message<Null, string> { Value = i.ToString() }, handler); } // wait for up to 10 seconds for any inflight messages to be delivered. p.Flush(TimeSpan.FromSeconds(10)); } } } }
consumer端,引入Confluent.Kafka
Install-Package Confluent.Kafka -Version 1.0-beta2
using Confluent.Kafka; using System; using System.Linq; using System.Text; namespace KafkaClient { class Program { static void Main(string[] args) { var conf = new ConsumerConfig { GroupId = "test-consumer-group4", BootstrapServers = "39.**.**.**:9092", // Note: The AutoOffsetReset property determines the start offset in the event // there are not yet any committed offsets for the consumer group for the // topic/partitions of interest. By default, offsets are committed // automatically, so in this example, consumption will only start from the // earliest message in the topic 'my-topic' the first time you run the program. AutoOffsetReset = AutoOffsetResetType.Earliest }; using (var c = new Consumer<Ignore, string>(conf)) { c.Subscribe("my-topic"); bool consuming = true; // The client will automatically recover from non-fatal errors. You typically // don't need to take any action unless an error is marked as fatal. c.OnError += (_, e) => consuming = !e.IsFatal; while (consuming) { try { var cr = c.Consume(); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } // Ensure the consumer leaves the group cleanly and final offsets are committed. c.Close(); } } } }