一、什麼是Kafka? 數據工程中最具挑戰性的部分之一是如何從不同點收集和傳輸大量數據到分散式系統進行處理和分析。需要通過消息隊列正確地分離大量數據,因為如果一部分數據無法傳送,則可以在系統恢復時傳輸和分析其他數據。有兩種消息排隊,對於上述目的,它們都是可靠的和非同步的。點對點(Point to po ...
一、什麼是Kafka?
數據工程中最具挑戰性的部分之一是如何從不同點收集和傳輸大量數據到分散式系統進行處理和分析。需要通過消息隊列正確地分離大量數據,因為如果一部分數據無法傳送,則可以在系統恢復時傳輸和分析其他數據。有兩種消息排隊,對於上述目的,它們都是可靠的和非同步的。點對點(Point to point)和發佈者——訂閱者(publisher-subscriber)。下圖展示了一個典型的消息系統,其中:消息的生產者負責產生消息;消息的消費者負責處理消息。
Kafka是一個分散式發佈——訂閱消息傳遞系統。Kafka快速、可擴展且耐用。它保留主題中的消息源。生產者將數據寫入主題,消費者從主題中讀取數據。
Zookeeper需要覆蓋Kafka生態系統,因此有必要下載它,更改其屬性並最終設置環境。在運行Zookeeper之後,應該下載Kafka,然後開發人員可以藉助一些指令創建代理,集群和主題。
二、消息隊列的分類
- 點對點(Queue)
在點對點或一對一中,有一個發件人和正在監聽發件人的多個消費者。當一個消費者從隊列收到消息時,該特定消息將從隊列中消失,而其他消費者無法獲得該消息。
- 發佈和訂閱系統(Topic)
在發佈者——訂閱者中,發佈者向同時收聽發佈者的多個消費者或訂閱者發送消息,並且每個訂閱者可以獲得相同的消息。數據應通過數據管道傳輸,數據管道負責整合來自數據源的數據。
三、Kafka的體系架構
- 主題和發佈者
有一個發佈者發送消息。消息根據主題進行分類,每個主題都有一個或多個分區,並有自己的偏移地址。例如,如果我們為一個主題分配複製因數= 2,那麼Kafka將為每個分區創建兩個相同的副本併在群集中找到它。
- 集群和Brokers
Kafka集群包括代理——伺服器或節點,每個代理可以位於不同的機器中,並允許訂戶選擇消息。因此,複製就像備份分區一樣,這意味著Kafka是持久的,這有助於容錯。
- Zookeeper
Kafka集群不保留其自身生態系統的元數據,因為它是無狀態的。因此,Kafka依賴於Zookeeper來跟蹤元數據。Zookeeper應該首先啟動。實際上,Zookeeper是brokers和consumers之間的介面,它的存在是容錯的必要條件。Kafka代理負責負載平衡,假設該主題有一個主題和多個分區,每個分區都有一個領導者,定期確認其與Zookeeper的偏移量。因此,如果一個節點或代理失敗,Kafka可以從Zookeeper請求的最後一個偏移地址繼續操作,因此Zookeeper在崩潰情況下在Kafka恢復中起著至關重要的作用。
四、Kafka單機單Broker的部署
- 部署ZooKeeper
配置/root/training/zookeeper-3.4.6/conf/zoo.cfg文件 dataDir=/root/training/zookeeper-3.4.6/tmp server.1=hadoop112:2888:3888 在/root/training/zookeeper-3.4.6/tmp目錄下創建一個myid的空文件 echo 1 > /root/training/zookeeper-3.4.6/tmp/myid 啟動ZooKeeper zkServer.sh start 查看ZooKeeper的狀態 zkServer.sh status
由於我們部署的是單節點的ZooKeeper,所以ZooKeeper的狀態將是Standalone。
- 部署Kafka
修改server.conf文件 broker.id=0 port=9092 log.dirs=/root/training/kafka_2.11-2.4.0/logs/broker0 zookeeper.connect=localhost:2181 啟動Kafka bin/kafka-server-start.sh config/server.properties &
- 使用JPS查看後臺進程
五、測試Kafka
創建Topic bin/kafka-topics.sh --create --zookeeper bigdata111:2181 --replication-factor 1 --partitions 3 --topic mytopic1 發送消息 bin/kafka-console-producer.sh --broker-list bigdata111:9092 --topic mytopic1 接收消息 消息消費 bin/kafka-console-consumer.sh --bootstrap-server bigdata111:9092 --topic mytopic1 從開始位置消費 bin/kafka-console-consumer.sh --bootstrap-server bigdata111:9092 --from-beginning --topic topicName 顯示key消費 bin/kafka-console-consumer.sh --bootstrap-server bigdata111:9092 --property print.key=true --topic mytopic1