引言 上一篇文章瞭解了kafka的重要組件zookeeper,用來保存broker、consumer等相關信息,做到平滑擴展。這篇文章就實際操作部署下kafka,用幾個簡單的例子加深對kafka的理解,學會基本使用kafka。 環境搭建 我將會在本地部署一個三台機器的zookeeper集群,和一個2 ...
引言
上一篇文章瞭解了kafka的重要組件zookeeper,用來保存broker、consumer等相關信息,做到平滑擴展。這篇文章就實際操作部署下kafka,用幾個簡單的例子加深對kafka的理解,學會基本使用kafka。
環境搭建
我將會在本地部署一個三台機器的zookeeper集群,和一個2台機器的kafka集群。
zookeeper集群
zookeeper的搭建可以看我的上一篇文章分散式系統中zookeeper實現配置管理+集群管理,按照步驟,一步步可以很容易的搭建3太伺服器的zookeeper集群。跟之前一樣,我還是在本地的3個埠搭建了3台伺服器,地址如下所示:
192.168.0.105:2181
192.168.0.105:2182
192.168.0.105:2183
這三台伺服器一會兒會在kafka配置中用到。
kafka集群
第一步. 下載kafka
到kafka官網下載apache kafka,解壓到/path/to/kafka
目錄。
第二步. 修改配置文件
複製/path/to/kafka/config/server.properties
,到/path/to/kafka/config/server-1.properties
和/path/to/kafka/config/server-2.properties
配置文件中修改的差異內容如下所示:
server-1.properties
:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183
server-2.properties
:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
zookeeper.connect=192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183
其中broker.id
是broker的唯一標示,集群中的broker標識必須唯一。
listeners
是broker監聽的地址和埠,advertised.listeners
用於和producer、consumer交互,後者未配置會預設使用前者,listeners的完整格式是listeners = listener_name://host_name:port
,其中PLAINTEXT
是協議,還有一種是SSL
,具體還沒太搞明白(TODO)。
log.dirs
是日誌數據的存放目錄,也就是producer產生的數據存放的目錄。
zookeeper.connect
配置是zookeeper的集群,broker啟動之後將信息註冊到zookeeper集群中。
第三步. 啟動伺服器
cd /path/to/kafka
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties
使用jps
命令可以看見2個kafka進程,證明啟動成功了。
第四步. 創建topic
創建topic一般使用kafka自帶的腳本創建:
bin/kafka-topics.sh --create --zookeeper 192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183 --replication-factor 2 --partitions 10 --topic user-event
其中--zookeeper
就是後面就是我們上面配置的zookeeper集群,--replication-factor
代表每個分區在集群中複製的份數,後面的值要小於kafka集群中伺服器數量,--partitions
表示創建主題的分區數量,一般分區越大,性能越好,--topic
後邊兒就是創建主題的名字,運行成功之後會看到Created topic "user-event".
字樣,表示創建成功,會在kafka配置的日誌目錄下創建主題信息,比如下麵的:
ll /tmp/kafka-logs-1
drwxr-xr-x 7 ritoyan wheel 224 6 3 21:21 clock-tick-0
drwxr-xr-x 7 ritoyan wheel 224 6 3 21:21 clock-tick-2
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-0
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-1
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-2
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-3
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-4
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-5
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-6
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-7
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-8
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-9
ll /tmp/kafka-logs-2
drwxr-xr-x 7 ritoyan wheel 224 6 3 21:21 clock-tick-1
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-0
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-1
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-2
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-3
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-4
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-5
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-6
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-7
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-8
drwxr-xr-x 6 ritoyan wheel 192 6 3 21:26 user-event-9
可以看到兩個broker中都創建了主題user-event
的10個分區。可能也有人要問了,clock-tick
這個主題怎麼在broker1中有2個分區,broker2中有1個分區,這個是我之前創建的一個分區,用了下麵的命令bin/kafka-topics.sh --create --zookeeper 192.168.0.105:2181,192.168.0.105:2182,192.168.0.105:2183 --replication-factor 1 --partitions 3 --topic clock-tick
,只有一份日誌記錄,3個分區,分區會均勻的分佈在所有broker上。
至此kafka環境配置好了,西面我們看看如何使用。
基本使用
安裝kafka-python
,用來操作kafka,pip3 install kafka-python
,這裡是他的文檔,文檔寫的不錯,簡潔易懂kafka-python
producer 向broker發送消息
bootstrap_servers
是kafka集群地址信息,下麵事項主題user-event
發送一條消息,send
發送消息是非同步的,會馬上返回,因此我們要通過阻塞的方式等待消息發送成功(或者flush()
也可以,flush會阻塞知道所有log都發送成功),否則消息可能會發送失敗,但也不會有提示,關於上面這個可以通過刪除send之後的語句試試,會發現broker不會收到消息,然後在send後加上time.sleep(10)
之後,會看到broker收到消息。
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(
bootstrap_servers=[
"localhost:9093",
"localhost:9094"
]
)
future = producer.send("user-event", b'I am rito yan')
try:
record_metadata = future.get(timeout=10)
print_r(record_metadata)
except KafkaError as e:
print(e)
阻塞等待發送成功之後,會看到返回插入記錄的信息:
RecordMetadata(topic='user-event', partition=7, topic_partition=TopicPartition(topic='user-event', partition=7), offset=1, timestamp=1528034253757, checksum=None, serialized_key_size=-1, serialized_value_size=13)
,裡面包括了插入log的主題、分區等信息。
格式化發送的信息
創建producer的時候可以通過value_serializer
指定格式化函數,比如我們數據是個dict,可以指定格式化函數,將dict轉化為byte:
import json
producer = KafkaProducer(
bootstrap_servers=[
"localhost:9093",
"localhost:9094"
],
value_serializer=lambda m: json.dumps(m).encode('ascii')
)
future = producer.send("user-event", {
"name": "燕睿濤",
"age": 26,
"friends": [
"ritoyan",
"luluyrt"
]
})
這樣就可以將格式化之後的信息發送給broker,不用每次發送的時候都自己格式化,真是不要太好用。
consumer 消費數據
創建一個consumer,其中group_id
是分組,broker中的每一個數據只能被consumer組中的一個consumer消費。
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"user-event",
group_id = "user-event-test",
bootstrap_servers = [
"localhost:9093",
"localhost:9094"
]
)
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
啟動之後,進程會一直阻塞在哪裡,等broker中有消息的時候就會去消費,啟動多個進程,只要保證group_id一致,就可以保證消息只被組內的一個consumer消費,上面的程式會輸出:
user-event:8:2: key=None value=b'{"name": "\\u71d5\\u777f\\u6d9b", "age": 26, "friends": ["ritoyan", "luluyrt"]}'
同樣,進入的時候有value_serializer
,出來的時候對應的也有value_deserializer
,消費者可以配置value_deserializer
來格式化內容,跟producer對應起來
consumer = KafkaConsumer(
"user-event",
group_id = "user-event-test",
bootstrap_servers = [
"localhost:9093",
"localhost:9094"
],
value_deserializer=lambda m: json.loads(m.decode('ascii'))
)
輸出內容user-event:8:3: key=None value={'name': '燕睿濤', 'age': 26, 'friends': ['ritoyan', 'luluyrt']}
kafka其他命令
查看分組
我們的consumer可能有很多分組,可以通過西面的命令查看分組信息:
cd /path/to/kafka
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093,localhost:9094 --list
可以看到我使用中的分組有4個,分別如下所示
clock-tick-test3
user-event-test
clock-tick-test2
clock-tick-test
查看特定分組信息
可以通過bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9093 --group user-event-test --describe
,查看分組user-event-test
的信息,可以看到西面的信息,包含消費的主題、分區信息,以及consumer在分區中的offset和分區的總offset。(為了格式化顯示,刪了部分列的部分字母)
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
user-event 3 0 0 0 kafka-python-154b2 /127.0.0.1 kafka-python
user-event 0 0 0 0 kafka-python-154b2 /127.0.0.1 kafka-python
user-event 1 1 1 0 kafka-python-154b2 /127.0.0.1 kafka-python
user-event 2 1 1 0 kafka-python-154b2 /127.0.0.1 kafka-python
user-event 4 0 0 0 kafka-python-154b2 /127.0.0.1 kafka-python
user-event 9 1 1 0 kafka-python-78517 /127.0.0.1 kafka-python
user-event 8 4 4 0 kafka-python-78517 /127.0.0.1 kafka-python
user-event 7 2 2 0 kafka-python-78517 /127.0.0.1 kafka-python
user-event 6 1 1 0 kafka-python-78517 /127.0.0.1 kafka-python
user-event 5 0 0 0 kafka-python-78517 /127.0.0.1 kafka-python
結語
至此,kafka的基本使用算是掌握了,以後要是有機會在項目中實踐就好了,在實際工程中的各種問題可以更加深刻的理解其中的原理。