參考博文:kafka 配置文件參數詳解 參考博文:Kafka【第一篇】Kafka集群搭建 參考博文:如何為Kafka集群選擇合適的Partitions數量 參考博文:Kafka Server.properties 參考博文:kafka常用配置【重要】 參考博文:kafka常用配置 1. 主機規劃 主 ...
參考博文:kafka 配置文件參數詳解
參考博文:Kafka【第一篇】Kafka集群搭建
參考博文:如何為Kafka集群選擇合適的Partitions數量
參考博文:kafka常用配置【重要】
參考博文:kafka常用配置
1. 主機規劃
主機名稱 |
IP信息 |
內網IP |
操作系統 |
安裝軟體 |
備註:運行程式 |
mini01 |
10.0.0.11 |
172.16.1.11 |
CentOS 7.4 |
jdk、zookeeper、kafka |
QuorumPeerMain、Kafka |
mini02 |
10.0.0.12 |
172.16.1.12 |
CentOS 7.4 |
jdk、zookeeper、kafka |
QuorumPeerMain、Kafka |
mini03 |
10.0.0.13 |
172.16.1.13 |
CentOS 7.4 |
jdk、zookeeper、kafka |
QuorumPeerMain、Kafka |
mini04 |
10.0.0.14 |
172.16.1.14 |
CentOS 7.4 |
jdk、zookeeper |
QuorumPeerMain |
mini05 |
10.0.0.15 |
172.16.1.15 |
CentOS 7.4 |
jdk、zookeeper |
QuorumPeerMain |
其中zookeeper的安裝可參見:Hbase-2.0.0_01_安裝部署
添加hosts信息,保證每台Linux都可以相互ping通
1 [root@mini01 ~]# cat /etc/hosts 2 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 3 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 4 5 172.16.1.11 mini01 6 172.16.1.12 mini02 7 172.16.1.13 mini03 8 172.16.1.14 mini04 9 172.16.1.15 mini05
windows的hosts文件也追加如下信息
1 c:\windows\system32\drivers\etc 2 ########################################## 追加信息如下: 3 10.0.0.11 mini01 4 10.0.0.12 mini02 5 10.0.0.13 mini03 6 10.0.0.14 mini04 7 10.0.0.15 mini05
2. Kafka的部署與配置修改
2.1. 軟體部署
1 [yun@mini01 software]$ pwd 2 /app/software 3 [yun@mini01 software]$ tar xf kafka_2.11-2.0.0.tgz 4 [yun@mini01 software]$ mv kafka_2.11-2.0.0 /app/ 5 [yun@mini01 software]$ cd /app/ 6 [yun@mini01 ~]$ ln -s kafka_2.11-2.0.0/ kafka 7 [yun@mini01 ~]$ ll -d kafka* 8 lrwxrwxrwx 1 yun yun 17 Sep 15 11:46 kafka -> kafka_2.11-2.0.0/ 9 drwxr-xr-x 6 yun yun 89 Jul 24 22:19 kafka_2.11-2.0.0
2.2. 環境變數
需要使用root許可權
1 [root@mini01 ~]# tail /etc/profile 2 done 3 4 unset i 5 unset -f pathmunge 6 7 # kafka 8 export KAFKA_HOME=/app/kafka 9 export PATH=$KAFKA_HOME/bin:$PATH 10 11 [root@mini01 profile.d]# logout 12 [yun@mini01 hbase]$ source /etc/profile # 使用yun用戶,並重新載入環境變數
2.3. 配置修改
1 [yun@mini01 config]$ pwd 2 /app/kafka/config 3 [yun@mini01 config]$ vim server.properties 4 ############################# Server Basics ############################# 5 # 每一個broker在集群中的唯一標示★★★ 6 # 比如mini01 為0 mini02 為1 mini03 為2 7 broker.id=0 8 9 ############################# Socket Server Settings ############################# 10 # The address the socket server listens on. It will get the value returned from 11 # java.net.InetAddress.getCanonicalHostName() if not configured. 12 # FORMAT: 13 # listeners = listener_name://host_name:port 14 # EXAMPLE: 15 # listeners = PLAINTEXT://your.host.name:9092 16 # 啟動kafka服務監聽的ip和埠,預設為java.net.InetAddress.getCanonicalHostName()獲取的ip 17 #listeners=PLAINTEXT://:9092 18 19 # broker通知到producers和consumers的主機地址和埠號 20 # 如果未設置,使用listeners的配置。否則,使用java.net.InetAddress.getCanonicalHostName()返回的值 21 # 對於ipv4,基本就是localhost了 127.0.0.1 最後就是訪問失敗 22 #advertised.listeners=PLAINTEXT://your.host.name:9092 23 # 在不同的機器,名稱不同 如mini01、mini02、mini03★★★ 24 advertised.listeners=PLAINTEXT://mini01:9092 25 26 # broker 處理消息的最大線程數,一般情況下不需要去修改 27 num.network.threads=3 28 29 # broker處理磁碟IO 的線程數 ,數值應該大於你的硬碟數 30 num.io.threads=8 31 32 # 發送緩衝區buffer大小,數據不是一下子就發送的,先回存儲到緩衝區了到達一定的大小後在發送,能提高性能 33 socket.send.buffer.bytes=102400 34 35 # kafka接收緩衝區大小,當數據到達一定大小後在序列化到磁碟 36 socket.receive.buffer.bytes=102400 37 38 # 這個參數是向kafka請求消息或者向kafka發送消息的請求的最大數,這個值不能超過java的堆棧大小 39 socket.request.max.bytes=104857600 40 41 ############################# Log Basics ############################# 42 # kafka數據的存放地址,多個地址的話用逗號分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2 43 log.dirs=/app/kafka/logs 44 45 # 預設的分區數,一個topic預設1個分區數 46 num.partitions=1 47 48 # 每個數據目錄在啟動時用於日誌恢復和關閉時用於刷新的線程數。 49 num.recovery.threads.per.data.dir=1 50 51 ############################# Internal Topic Settings ############################# 52 # 組元數據內部主題“__consumer_offset”和“__transaction_state”的複製因數 53 # 對於開發測試之外的任何內容,建議使用大於1的值來確保可用性,比如3。 54 offsets.topic.replication.factor=3 55 transaction.state.log.replication.factor=3 56 transaction.state.log.min.isr=3 57 58 ############################# Log Flush Policy ############################# 59 # 在持久化到磁碟前message最大接收條數 60 #log.flush.interval.messages=10000 61 log.flush.interval.messages=10000 62 63 # 持久化的最大時間間隔 64 #log.flush.interval.ms=1000 65 log.flush.interval.ms=3000 66 67 ############################# Log Retention Policy ############################# 68 # 預設消息的最大持久化時間,168小時,7天 69 # segment 文件保留的最長時間,超時則被刪除 70 log.retention.hours=168 71 72 # 當分片的大小超過該值時,就會被刪除。該功能不依賴於log.retention.hours。為 -1沒有大小限制 73 #log.retention.bytes=1073741824 74 log.retention.bytes=-1 75 76 # 滾動生成新的segment文件的最大時間 77 log.roll.hours=168 78 79 # 單個分片的上限,達到該大小後會生成新的日誌分片 1G 80 log.segment.bytes=1073741824 81 82 # 日誌分片的檢測時間間隔,每隔該時間會根據log保留策略決定是否刪除log分片 83 log.retention.check.interval.ms=300000 84 85 # 預設為true 啟用日誌清理器進程在伺服器上運行 86 log.cleaner.enable=true 87 88 # 預設為true 【當前版本】 啟用刪除主題。 如果此配置已關閉,則通過管理工具刪除主題將不起作用 89 delete.topic.enable=true 90 ############################# Zookeeper ############################# 91 # Zookeeper connection string (see zookeeper docs for details). 92 # This is a comma separated host:port pairs, each corresponding to a zk 93 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". 94 # You can also append an optional chroot string to the urls to specify the 95 # root directory for all kafka znodes. 96 zookeeper.connect=mini01:2181,mini02:2181,mini03:2181,mini04:2181,mini05:2181 97 98 # Timeout in ms for connecting to zookeeper 99 zookeeper.connection.timeout.ms=6000 100 101 102 ############################# Group Coordinator Settings ############################# 103 # 在開發測試環境下該值設置為0,保證啟動後馬上可以使用。但在生產環境下,預設值3秒更適合 104 group.initial.rebalance.delay.ms=3000
2.4. 其他配置修改
1 [yun@mini01 config]$ pwd 2 /app/kafka/config 3 [yun@mini01 config]$ vim producer.properties 4 ……………… 5 # 用於建立到Kafka集群的初始連接的主機/埠對列表。客戶機將使用所有伺服器,而不管這裡為引導綁定指定了哪些伺服器——此列表隻影響用於發現完整伺服器集的初始主機。 6 # 由於這些伺服器僅用於初始連接,以發現完整的集群成員關係(可能會動態更改),因此這個列表不需要包含完整的伺服器集(但是,如果伺服器宕機,您可能需要多個伺服器)。 7 bootstrap.servers=mini01:9092,mini02:9092,mini03:9092 8 ……………… 9 [yun@mini01 config]$ vim consumer.properties 10 ……………… 11 bootstrap.servers=mini01:9092,mini02:9092,mini03:9092 12 ………………
2.5. 啟動/停止kafka
1 [yun@mini01 ~]$ kafka-server-start.sh -daemon /app/kafka/config/server.properties # -daemon 可選參數,後臺運行 2 # 如果是後臺運行,則控制台沒有下麵日誌信息 3 [2018-09-15 18:38:38,700] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) 4 [2018-09-15 18:38:39,564] INFO starting (kafka.server.KafkaServer) 5 …………………… 6 [yun@mini01 ~]$ kafka-server-stop.sh # 停止kafka
2.6. zookeeper命令行查看
1 [zk: localhost:2181(CONNECTED) 10] ls / 2 [cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, controller_epoch, zhang01, consumers, latest_producer_id_block, config, hbase] 3 [zk: localhost:2181(CONNECTED) 11] ls /brokers 4 [ids, topics, seqid] 5 [zk: localhost:2181(CONNECTED) 12] ls /brokers/ids 6 [0, 1, 2]