參考博文:Kafka消費組(consumer group) 參考博文:kafka 1.0 中文文檔(九):操作 參考博文:kafka集群管理工具kafka-manager部署安裝 以下操作可以在mini01、mini02、mini03任意一臺操作即可。 1. kafka通過網頁管理 參考博文:kaf ...
參考博文:kafka 1.0 中文文檔(九):操作
參考博文:kafka集群管理工具kafka-manager部署安裝
以下操作可以在mini01、mini02、mini03任意一臺操作即可。
1. kafka通過網頁管理
參考博文:kafka集群管理工具kafka-manager部署安裝
2. 創建topic
1 # 參數說明 --replication-factor 2 表示有2個副本 2 # --partitions 4 表示有4個分區 3 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 2 --partitions 4 --topic test 4 Created topic "test". 5 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 3 --partitions 4 --topic zhang 6 Created topic "zhang". 7 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 # 再次查看 8 zhang 9 test
2.1. 各主機信息查看
mini01
1 [yun@mini01 logs]$ pwd 2 /app/kafka/logs 3 [yun@mini01 logs]$ ll 4 total 160 5 ……………… 6 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-1 7 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-2 8 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-3
mini02
1 [yun@mini02 logs]$ pwd 2 /app/kafka/logs 3 [yun@mini02 logs]$ ll 4 total 260 5 ……………… 6 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-0 7 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-2
mini03
1 [yun@mini03 logs]$ pwd 2 /app/kafka/logs 3 [yun@mini03 logs]$ ll 4 total 132 5 ……………… 6 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-0 7 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-1 8 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-3
3. 修改topic
3.1. 增加分區數
註意:分區數不能減少
Kafka目前不支持減少主題的分區數量。
1 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 2 __consumer_offsets 3 test 4 test01 5 test02 6 test03 7 test04 8 zhang 9 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic test01 10 Topic:test01 PartitionCount:5 ReplicationFactor:1 Configs: 11 Topic: test01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 12 Topic: test01 Partition: 1 Leader: 1 Replicas: 1 Isr: 1 13 Topic: test01 Partition: 2 Leader: 2 Replicas: 2 Isr: 2 14 Topic: test01 Partition: 3 Leader: 0 Replicas: 0 Isr: 0 15 Topic: test01 Partition: 4 Leader: 1 Replicas: 1 Isr: 1 16 [yun@mini01 ~]$ kafka-topics.sh --alter --zookeeper mini01:2181 --partitions 2 --topic test01 # 失敗,分區數不能減少 17 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected 18 Error while executing topic command : The number of partitions for a topic can only be increased. Topic test01 currently has 5 partitions, 2 would not be an increase. 19 [2018-09-16 09:12:40,034] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic test01 currently has 5 partitions, 2 would not be an increase. 20 (kafka.admin.TopicCommand$) 21 [yun@mini01 ~]$ kafka-topics.sh --alter --zookeeper mini01:2181 --partitions 7 --topic test01 # 增加分區數 22 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected 23 Adding partitions succeeded! 24 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic test01 25 Topic:test01 PartitionCount:7 ReplicationFactor:1 Configs: 26 Topic: test01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 27 Topic: test01 Partition: 1 Leader: 1 Replicas: 1 Isr: 1 28 Topic: test01 Partition: 2 Leader: 2 Replicas: 2 Isr: 2 29 Topic: test01 Partition: 3 Leader: 0 Replicas: 0 Isr: 0 30 Topic: test01 Partition: 4 Leader: 1 Replicas: 1 Isr: 1 31 Topic: test01 Partition: 5 Leader: 2 Replicas: 2 Isr: 2 32 Topic: test01 Partition: 6 Leader: 0 Replicas: 0 Isr: 0
4. 刪除topic
1 # server.properties中設置delete.topic.enable=true 【當前版本預設就是true】否則只是標記刪除或者直接重啟 2 [yun@mini01 ~]$ kafka-topics.sh --delete --zookeeper mini01:2181 --topic test 3 Topic test is marked for deletion. 4 Note: This will have no impact if delete.topic.enable is not set to true. 5 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 # 再次查看 只有 zhang,則表示真的刪除了 6 zhang
5. 查看所有topic
1 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 2 __consumer_offsets 3 test 4 zhang
6. 查看某個Topic的詳情
1 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic zhang 2 Topic:zhang PartitionCount:4 ReplicationFactor:3 Configs: 3 Topic: zhang Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 4 Topic: zhang Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 5 Topic: zhang Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 6 Topic: zhang Partition: 3 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
7. 通過shell命令生產消息
7.1. 輸入單條數據
1 [yun@mini01 ~]$ kafka-console-producer.sh --broker-list mini01:9092 --topic zhang 2 >111 3 >222 4 >333 5 >444 6 >555 7 >666 8 >777 9 >888 10 >999
7.2. 批量導入數據
1 [yun@mini01 zhangliang]$ kafka-console-producer.sh --broker-list mini01:9092 --topic liang < 001.info
8. 通過shell命令消費消息
1 # --from-beginning 從最開始讀取 2 # kafka-console-consumer.sh --zookeeper mini01:2181 --from-beginning --topic zhang # 老版本 3 [yun@mini01 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --from-beginning --topic zhang 4 111 5 555 6 999 7 333 8 777 9 444 10 888 11 222 12 666
9. 消費組消費
9.1. 創建topic
1 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 4 --topic order 2 Created topic "order". 3 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 # 查看所有topic列表 4 __consumer_offsets 5 order 6 test 7 zhang 8 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order # 查看topic詳情 9 Topic:order PartitionCount:4 ReplicationFactor:1 Configs: 10 Topic: order Partition: 0 Leader: 0 Replicas: 0 Isr: 0 11 Topic: order Partition: 1 Leader: 1 Replicas: 1 Isr: 1 12 Topic: order Partition: 2 Leader: 2 Replicas: 2 Isr: 2 13 Topic: order Partition: 3 Leader: 0 Replicas: 0 Isr: 0
9.2. 生產消息
1 [yun@mini01 ~]$ kafka-console-producer.sh --broker-list mini01:9092 --topic order 2 >111 3 >222 4 >333 5 >444 6 >555
9.3. 消費組消費消息
mini02機器上運行
1 # --group 指定組 2 [yun@mini02 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group
mini03機器上運行
1 # --group 指定組 2 [yun@mini03 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group 3 4 # 新開一個視窗執行 5 [yun@mini03 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group
表示order-group消費組有3個消費者,消費topic order的信息。
9.4. 消費組消費位置信息查看
1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group order-group # 查看消費情況 2 3 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID 4 order 0 4 4 0 consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13 consumer-1 5 order 1 5 5 0 consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13 consumer-1 6 order 2 5 5 0 consumer-1-9e65dcfb-246f-4043-aaf7-3ee83532237f /172.16.1.13 consumer-1 7 order 3 4 4 0 consumer-1-ee17939d-1ffe-42c7-8261-b19be8acea43 /172.16.1.12 consumer-1 8 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group order-group --members --verbose 9 10 CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT 11 consumer-1-9e65dcfb-246f-4043-aaf7-3ee83532237f /172.16.1.13 consumer-1 1 order(2) 12 consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13 consumer-1 2 order(0,1) 13 consumer-1-ee17939d-1ffe-42c7-8261-b19be8acea43 /172.16.1.12 consumer-1 1 order(3)
10. 管理消費組
10.1. 查看所有消費組
1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list 2 console-consumer-26727 3 console-consumer-92984 4 console-consumer-60755 5 console-consumer-11661 6 console-consumer-31713 7 console-consumer-20244 8 console-consumer-65733
10.2. 查看消費組消費情況【消費位置】
1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-26727 2 Consumer group 'console-consumer-26727' has no active members. 3 4 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID 5 zhang 3 11 11 0 - - - 6 zhang 0 9 9 0 - - - 7 zhang 2 8 8 0 - - - 8 zhang 1 11 11 0 - - - 9 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 10 11 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID 12 zhang 0 11 11 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 13 zhang 1 12 12 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 14 zhang 2 10 10 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 15 zhang 3 12 12 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1
--members
1 # --members 此選項提供使用者組中所有活動成員的列表。 2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --members 3 4 CONSUMER-ID HOST CLIENT-ID #PARTITIONS 5 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 4
--verbose
1 # --verbose 這個選項還提供了分配給每個成員的分區。 2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --members --verbose 3 4 CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT 5 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 4 zhang(0,1,2,3)
--state
1 # --state 這個選項提供了有用的組級信息。 2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --state 3 4 COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS 5 mini01:9092 (0) range Stable 1
10.3. 刪除一個或多個用戶組
1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list 2 console-consumer-3826 3 console-consumer-92984 4 console-consumer-60755 5 console-consumer-11661 6 console-consumer-31713 7 console-consumer-20244 8 console-consumer-65733 9 # 刪除一個或多個組 10 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --delete --group console-consumer-11661 --group console-consumer-31713 11 Deletion of requested consumer groups ('console-consumer-31713', 'console-consumer-11661') was successful. 12 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list 13 console-consumer-3826 14 console-consumer-92984 15 console-consumer-60755 16 console-consumer-20244 17 console-consumer-65733
11. 增加副本因數
1 [yun@mini01 kafka_20180916]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 4 --topic order 2 Created topic "order". 3 [yun@mini01 kafka_20180916]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order 4 Topic:order PartitionCount:4 ReplicationFactor:1 Configs: 5 Topic: order Partition: 0 Leader: 0 Replicas: 0 Isr: 0 6 Topic: order Partition: 1 Leader: 1 Replicas: 1 Isr: 1 7 Topic: order Partition: 2 Leader: 2 Replicas: 2 Isr: 2 8 Topic: order Partition: 3 Leader: 0 Replicas: 0 Isr: 0
要求:topic order 的副本數由1變為2, 之前每個分區在哪台機器上在上面已給出。
說明:part 0分佈在群集0,1; part 1分佈在集群1,2;part 2 分佈在集群2,0;part 3分佈在集群0,1。
11.1. 創建一個重新調整計劃文件
1 [yun@mini01 kafka_20180916]$ cat increase-replication-factor.json 2 { 3 "version":1, 4 "partitions":[ 5 {"topic": "order","partition": 0,"replicas": [0,1]}, 6 {"topic": "order","partition": 1,"replicas": [1,2]}, 7 {"topic": "order","partition": 2,"replicas": [2,0]}, 8 {"topic": "order","partition": 3,"replicas": [0,1]} 9 ] 10 }
11.2. 語句執行
1 [yun@mini01 kafka_20180916]$ kafka-reassign-partitions.sh --zookeeper mini01:2181 --reassignment-json-file increase-replication-factor.json --execute 2 Current partition replica assignment 3 4 {"version":1,"partitions":[{"topic":"order","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"order","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"order","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"order","partition":0,"replicas":[0],"log_dirs":["any"]}]} 5 6 Save this to use as the --reassignment-json-file option during rollback 7 Successfully started reassignment of partitions.
11.3. 查看是否執行成功
1 [yun@mini01 kafka_20180916]$ kafka-reassign-partitions.sh --zookeeper mini01:2181 --reassignment-json-file increase-replication-factor.json --verify 2 Status of partition reassignment: 3 Reassignment of partition order-0 completed successfully 4 Reassignment of partition order-1 completed successfully 5 Reassignment of partition order-2 completed successfully 6 Reassignment of partition order-3 completed successfully
11.4. 再次查看該topic詳情
1 [yun@mini01 kafka_20180916]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order # 由下可見分配成功 2 Topic:order PartitionCount:4 ReplicationFactor:2 Configs: 3 Topic: order Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 4 Topic: order Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 5 Topic: order Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0 6 Topic: order Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1
12. 創建partitions時在broker的分配機制
1 kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 5 --topic test01 2 kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 11 --topic test02
註意在各機器上partitions的分佈
1 mini01 2 test01-0 3 test01-3 4 test02-2 5 test02-5 6 test02-8 7 8 mini02 9 test01-1 10 test01-4 11 test02-0 12 test02-3 13 test02-6 14 test02-9 15 16 mini03 17 test01-2 18 test02-1 19 test02-10 20 test02-4 21 test02-7