上次介紹了ES集群搭建的方法,希望能幫助大家,這兒我再接著介紹kafka集群,接著上次搭建的效果。 首先我們來簡單瞭解下什麼是kafka和zookeeper? Apache kafka 是一個分散式的基於push-subscribe的消息系統,它具備快速、可擴展、可持久化的特點。它現在是Apache ...
上次介紹了ES集群搭建的方法,希望能幫助大家,這兒我再接著介紹kafka集群,接著上次搭建的效果。
首先我們來簡單瞭解下什麼是kafka和zookeeper?
Apache kafka 是一個分散式的基於push-subscribe的消息系統,它具備快速、可擴展、可持久化的特點。它現在是Apache旗下的一個開源系統,作為hadoop生態系統的一部分,被各種商業公司廣泛應用。它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/spark流式處理引擎。
特點:
- 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒
- 可擴展性:kafka集群支持熱擴展
- 持久性、可靠性:消息被持久化到本地磁碟,並且支持數據備份防止數據丟失
- 容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
- 高併發:支持數千個客戶端同時讀寫
ZooKeeper是一個分散式的,開放源碼的分散式應用程式協調服務,它包含一個簡單的原語集,分散式應用程式可以基於它實現同步服務,配置維護和命名服務等。
集群角色:
- Leader伺服器是整個zookeeper集群工作機制中的核心
- Follower伺服器是zookeeper集群狀態的跟隨者
- Observer 伺服器充當一個觀察者的角色
接下來就直接進去正題,如何正確的搭建kafka和zookeeper集群。
一、zookeeper集群配置
1、修改主機名
kafka1.example.com --> 172.16.81.131 kafka2.example.com --> 172.16.81.132
2、修改hosts文件
[root@kafka1 opt]# cat /etc/hosts 127.0.0.1 kafka1.example.com localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 [root@kafka2 opt]# cat /etc/hosts 127.0.0.1 kafka2.example.com localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
3、安裝jdk
cd /opt jdk-8u131-linux-x64.tar.gz tar -zxvf jdk-8u131-linux-x64.tar.gz mv jdk-1.8.0_131 /usr/local/
4、配置jdk環境變數
[root@kafka1 opt]# tail -10 /etc/profile #JAVA環境變數 export JAVA_HOME=/usr/local/jdk1.8.0_131 export JAVA_BIN=$JAVA_HOME/bin export JAVA_LIB=$JAVA_HOME/lib export CLASSPATH=.:$JAVA_LIB/tools.jar:$JAVA_LIB/dt.jar export PATH=$JAVA_BIN:$PATH export _JAVA_SR_SIGNUM=12 #zookeeper環境變數 export ZOOKEEPER_HOME=/opt/zookeeper/ export PATH=$ZOOKEEPER_HOME/bin:$PATH export PATH [root@kafka2 opt]# tail -10 /etc/profile #JAVA環境變數 export JAVA_HOME=/usr/local/jdk1.8.0_131 export JAVA_BIN=$JAVA_HOME/bin export JAVA_LIB=$JAVA_HOME/lib export CLASSPATH=.:$JAVA_LIB/tools.jar:$JAVA_LIB/dt.jar export PATH=$JAVA_BIN:$PATH export _JAVA_SR_SIGNUM=12 #zookeeper環境變數 export ZOOKEEPER_HOME=/opt/zookeeper/ export PATH=$ZOOKEEPER_HOME/bin:$PATH export PATH #應用環境變數 source /etc/profile
5、下載軟體包
zookeeper-3.4.10.tar.gz
#解壓
tar -zxvf zookeeper-3.4.10.tar.gz
mv zookeeper-3.4.10 zookeeper
cd /opt/zookeeper/config/
cp zoo_sample.cfg zoo.cfg
6、編輯zookeeper配置文件
[root@kafka1 opt]# cat /opt/zookeeper/conf/zoo.cfg | grep -v '^#' | grep -v '^$' tickTime=2000 initLimit=20 syncLimit=10 dataDir=/opt/data/zookeeper/data datalogDir=/opt/data/zookeeper/logs clientPort=2181 server.1=172.16.81.131:2888:3888 server.2=172.16.81.132:2888:3888 [root@kafka2 opt]# cat /opt/zookeeper/conf/zoo.cfg | grep -v '^#' | grep -v '^$' tickTime=2000 initLimit=20 syncLimit=10 dataDir=/opt/data/zookeeper/data datalogDir=/opt/data/zookeeper/logs clientPort=2181 server.1=172.16.81.131:2888:3888 server.2=172.16.81.132:2888:3888
#註意:在zookeeper配置文件中或者後面不能跟註釋文字,不然會報錯!
#說明:
tickTime: 這個時間是作為 Zookeeper 伺服器之間或客戶端與伺服器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。
2888埠:表示的是這個伺服器與集群中的 Leader 伺服器交換信息的埠;
3888埠:表示的是萬一集群中的 Leader 伺服器掛了,需要一個埠來重新進行選舉,選出一個新的 Leader,而這個埠就是用來執行選舉時伺服器相互通信的埠
7、分別在kafka1和kafka2伺服器上創建datadir目錄
mkdir -p /opt/kafka/data mkdir -p /opt/kafka/data/zookeeper
8、分別寫入id
[root@kafka1 opt]# echo "1" > /opt/kafka/data/zookeeper/myid [root@kafka2 ~]# echo "2" > /opt/kafka/data/zookeeper/myid #註意ID不能一樣
9、啟動zookeeper集群
cd /opt/zookeeper/ bin/zkServer.sh start
10、啟動效果
[rootkafka1 ~]# netstat -nlpt | grep -E "2181|2888|3888" tcp 0 0 :::2181 :::* LISTEN 33644/java tcp 0 0 ::ffff:10.1.1.247:3888 :::* LISTEN 33644/java [root@kafka2 ~]# netstat -nlpt | grep -E "2181|2888|3888" tcp 0 0 :::2181 :::* LISTEN 35016/java tcp 0 0 ::ffff:10.1.1.248:2888 :::* LISTEN 35016/java #哪台是leader,那麼他就擁有2888埠 tcp 0 0 ::ffff:10.1.1.248:3888 :::* LISTEN 35016/java
二、kafka集群搭建
1、配置文件
[root@kafka1 opt]# cat /opt/kafka/config/server.properties | grep -v '^#'|grep -v '^$' broker.id=1 listeners=PLAINTEXT://172.16.81.131:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/kafka/data/kafka-logs num.partitions=10 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=172.16.81.131:2181,172.16.81.132:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 [root@kafka2 ~]# cat /opt/kafka/config/server.properties | grep -v '^#'|grep -v '^$' broker.id=2 listeners=PLAINTEXT://172.16.81.132:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/kafka/data/kafka-logs num.partitions=10 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=172.16.81.131:2181,172.16.81.132:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 #註意:broker.id不能相同
2、啟動kafka集群
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties &
3、啟動效果
[root@kafka1 opt]# netstat -lntp Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp6 0 0 :::47457 :::* LISTEN 6582/java tcp6 0 0 172.16.81.131:9092 :::* LISTEN 9260/java tcp6 0 0 :::2181 :::* LISTEN 6582/java tcp6 0 0 :::33230 :::* LISTEN 9260/java tcp6 0 0 172.16.81.131:3888 :::* LISTEN 6582/java [root@kafka2 ~]# netstat -lntp Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp6 0 0 172.16.81.132:9092 :::* LISTEN 9395/java tcp6 0 0 :::42884 :::* LISTEN 6779/java tcp6 0 0 :::2181 :::* LISTEN 6779/java tcp6 0 0 172.16.81.132:2888 :::* LISTEN 6779/java tcp6 0 0 172.16.81.132:3888 :::* LISTEN 6779/java tcp6 0 0 :::38557 :::* LISTEN 9395/java
4、測試zookeeper和kafka是否正常
(1)建立一個主題 [root@kafka2 ~]# /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic summer Created topic "summer". #註意:factor大小不能超過broker數,否則報錯,當前集群broker值值為2 (2)查看有哪些主題已經創建 [root@kafka2 ~]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 172.16.81.132:2181 summer [root@kafka1 ~]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 172.16.81.131:2181 summer (3)查看topic的詳情 [root@kafka2 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic summer Topic:summer PartitionCount:1 ReplicationFactor:2 Configs: Topic: summer Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 #主題名稱:summer #Partition:只有一個,從0開始 #leader :id為2的broker #Replicas 副本存在於broker id為2,1的上面 #Isr:活躍狀態的broker (4)發送消息,這裡使用的是生產者角色 [root@kafka2 ~]# /bin/bash /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic summer >Hello,wangyanlin >I am from china. > > >; >^C[root@kafka2 ~]# (5)接收消息,這裡使用的是消費者角色 [root@kafka2 ~]# /opt/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic summer --from-beginning Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. Hello,wangyanlin I am from china. ; ^CProcessed a total of 5 messages [root@kafka1 kafka]# /opt/kafka/bin/kafka-console-consumer.sh --zookeeper 172.16.81.132:2181 --topic summer --from-beginning Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. Hello,wangyanlin I am from china. ; ^CProcessed a total of 5 messages (6)刪除消費主題 /opt/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic summer 開啟conf裡面的 delete.topic.enable改成true #測試正常!!完成!
測試kafka集群能正常接收消費信息和消費信息!!
後續將發佈配置logstash日誌收集和過濾,還有kibana圖形化展示。