kafka+zookeeper集群搭建

来源:https://www.cnblogs.com/JeremyWYL/archive/2018/01/03/8183103.html
-Advertisement-
Play Games

上次介紹了ES集群搭建的方法,希望能幫助大家,這兒我再接著介紹kafka集群,接著上次搭建的效果。 首先我們來簡單瞭解下什麼是kafka和zookeeper? Apache kafka 是一個分散式的基於push-subscribe的消息系統,它具備快速、可擴展、可持久化的特點。它現在是Apache ...


上次介紹了ES集群搭建的方法,希望能幫助大家,這兒我再接著介紹kafka集群,接著上次搭建的效果。

首先我們來簡單瞭解下什麼是kafka和zookeeper?

Apache kafka 是一個分散式的基於push-subscribe的消息系統,它具備快速、可擴展、可持久化的特點。它現在是Apache旗下的一個開源系統,作為hadoop生態系統的一部分,被各種商業公司廣泛應用。它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/spark流式處理引擎。

特點:

  • 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒
  • 可擴展性:kafka集群支持熱擴展
  • 持久性、可靠性:消息被持久化到本地磁碟,並且支持數據備份防止數據丟失
  • 容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
  • 高併發:支持數千個客戶端同時讀寫

ZooKeeper是一個分散式的,開放源碼的分散式應用程式協調服務,它包含一個簡單的原語集,分散式應用程式可以基於它實現同步服務,配置維護和命名服務等。

集群角色:

  • Leader伺服器是整個zookeeper集群工作機制中的核心
  • Follower伺服器是zookeeper集群狀態的跟隨者
  • Observer 伺服器充當一個觀察者的角色

接下來就直接進去正題,如何正確的搭建kafka和zookeeper集群。

 一、zookeeper集群配置

1、修改主機名

kafka1.example.com --> 172.16.81.131
kafka2.example.com --> 172.16.81.132

2、修改hosts文件

[root@kafka1 opt]# cat /etc/hosts
  127.0.0.1   kafka1.example.com localhost localhost.localdomain localhost4 localhost4.localdomain4
  ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
[root@kafka2 opt]# cat /etc/hosts
  127.0.0.1   kafka2.example.com localhost localhost.localdomain localhost4 localhost4.localdomain4
  ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

3、安裝jdk

cd /opt
jdk-8u131-linux-x64.tar.gz
tar -zxvf jdk-8u131-linux-x64.tar.gz
mv jdk-1.8.0_131 /usr/local/

4、配置jdk環境變數

[root@kafka1 opt]# tail -10 /etc/profile
 #JAVA環境變數
 export JAVA_HOME=/usr/local/jdk1.8.0_131
 export JAVA_BIN=$JAVA_HOME/bin
 export JAVA_LIB=$JAVA_HOME/lib
 export CLASSPATH=.:$JAVA_LIB/tools.jar:$JAVA_LIB/dt.jar
 export PATH=$JAVA_BIN:$PATH
 export _JAVA_SR_SIGNUM=12
 #zookeeper環境變數
 export ZOOKEEPER_HOME=/opt/zookeeper/
 export PATH=$ZOOKEEPER_HOME/bin:$PATH
 export PATH
[root@kafka2 opt]# tail -10 /etc/profile
 #JAVA環境變數
 export JAVA_HOME=/usr/local/jdk1.8.0_131
 export JAVA_BIN=$JAVA_HOME/bin
 export JAVA_LIB=$JAVA_HOME/lib
 export CLASSPATH=.:$JAVA_LIB/tools.jar:$JAVA_LIB/dt.jar
 export PATH=$JAVA_BIN:$PATH
 export _JAVA_SR_SIGNUM=12
 #zookeeper環境變數
 export ZOOKEEPER_HOME=/opt/zookeeper/
 export PATH=$ZOOKEEPER_HOME/bin:$PATH
 export PATH
 #應用環境變數
 source /etc/profile

5、下載軟體包   

zookeeper-3.4.10.tar.gz

  #解壓
  tar -zxvf zookeeper-3.4.10.tar.gz
  mv zookeeper-3.4.10 zookeeper
  cd /opt/zookeeper/config/
  cp zoo_sample.cfg zoo.cfg

6、編輯zookeeper配置文件

[root@kafka1 opt]# cat /opt/zookeeper/conf/zoo.cfg | grep -v '^#' | grep -v '^$'
 tickTime=2000
 initLimit=20
 syncLimit=10
 dataDir=/opt/data/zookeeper/data
 datalogDir=/opt/data/zookeeper/logs
 clientPort=2181
 server.1=172.16.81.131:2888:3888	
 server.2=172.16.81.132:2888:3888
[root@kafka2 opt]# cat /opt/zookeeper/conf/zoo.cfg | grep -v '^#' | grep -v '^$'
 tickTime=2000
 initLimit=20
 syncLimit=10
 dataDir=/opt/data/zookeeper/data
 datalogDir=/opt/data/zookeeper/logs
 clientPort=2181
 server.1=172.16.81.131:2888:3888
 server.2=172.16.81.132:2888:3888

#註意:在zookeeper配置文件中或者後面不能跟註釋文字,不然會報錯!
#說明:
tickTime: 這個時間是作為 Zookeeper 伺服器之間或客戶端與伺服器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。
2888埠:表示的是這個伺服器與集群中的 Leader 伺服器交換信息的埠;
3888埠:表示的是萬一集群中的 Leader 伺服器掛了,需要一個埠來重新進行選舉,選出一個新的 Leader,而這個埠就是用來執行選舉時伺服器相互通信的埠

7、分別在kafka1和kafka2伺服器上創建datadir目錄

mkdir -p /opt/kafka/data
mkdir -p /opt/kafka/data/zookeeper

8、分別寫入id

[root@kafka1 opt]# echo "1" > /opt/kafka/data/zookeeper/myid 
[root@kafka2 ~]# echo "2" > /opt/kafka/data/zookeeper/myid
#註意ID不能一樣

9、啟動zookeeper集群

cd /opt/zookeeper/
bin/zkServer.sh start

10、啟動效果

[rootkafka1 ~]#   netstat -nlpt | grep -E "2181|2888|3888"
	tcp        0      0 :::2181                     :::*                        LISTEN      33644/java
	tcp        0      0 ::ffff:10.1.1.247:3888      :::*                        LISTEN      33644/java
[root@kafka2 ~]#   netstat -nlpt | grep -E "2181|2888|3888"
	tcp        0      0 :::2181                     :::*                        LISTEN      35016/java
	tcp        0      0 ::ffff:10.1.1.248:2888      :::*                        LISTEN      35016/java #哪台是leader,那麼他就擁有2888埠
	tcp        0      0 ::ffff:10.1.1.248:3888      :::*                        LISTEN      35016/java	  

二、kafka集群搭建

 1、配置文件

[root@kafka1 opt]# cat /opt/kafka/config/server.properties | grep -v '^#'|grep -v '^$'
	broker.id=1
	listeners=PLAINTEXT://172.16.81.131:9092
	num.network.threads=3
	num.io.threads=8
	socket.send.buffer.bytes=102400
	socket.receive.buffer.bytes=102400
	socket.request.max.bytes=104857600
	log.dirs=/opt/kafka/data/kafka-logs
	num.partitions=10
	num.recovery.threads.per.data.dir=1
	offsets.topic.replication.factor=1
	transaction.state.log.replication.factor=1
	transaction.state.log.min.isr=1
	log.retention.hours=168
	log.segment.bytes=1073741824
	log.retention.check.interval.ms=300000
	zookeeper.connect=172.16.81.131:2181,172.16.81.132:2181
	zookeeper.connection.timeout.ms=6000
	group.initial.rebalance.delay.ms=0
[root@kafka2 ~]# cat /opt/kafka/config/server.properties | grep -v '^#'|grep -v '^$'
	broker.id=2
	listeners=PLAINTEXT://172.16.81.132:9092
	num.network.threads=3
	num.io.threads=8
	socket.send.buffer.bytes=102400
	socket.receive.buffer.bytes=102400
	socket.request.max.bytes=104857600
	log.dirs=/opt/kafka/data/kafka-logs
	num.partitions=10
	num.recovery.threads.per.data.dir=1
	offsets.topic.replication.factor=1
	transaction.state.log.replication.factor=1
	transaction.state.log.min.isr=1
	log.retention.hours=168
	log.segment.bytes=1073741824
	log.retention.check.interval.ms=300000
	zookeeper.connect=172.16.81.131:2181,172.16.81.132:2181
	zookeeper.connection.timeout.ms=6000
	group.initial.rebalance.delay.ms=0
	#註意:broker.id不能相同

2、啟動kafka集群

/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties &

3、啟動效果

[root@kafka1 opt]# netstat -lntp
	Active Internet connections (only servers)
	Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name          
	tcp6       0      0 :::47457                :::*                    LISTEN      6582/java           
	tcp6       0      0 172.16.81.131:9092      :::*                    LISTEN      9260/java           
	tcp6       0      0 :::2181                 :::*                    LISTEN      6582/java           
	tcp6       0      0 :::33230                :::*                    LISTEN      9260/java           
	tcp6       0      0 172.16.81.131:3888      :::*                    LISTEN      6582/java           
[root@kafka2 ~]# netstat -lntp
	Active Internet connections (only servers)
	Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name          
	tcp6       0      0 172.16.81.132:9092      :::*                    LISTEN      9395/java           
	tcp6       0      0 :::42884                :::*                    LISTEN      6779/java           
	tcp6       0      0 :::2181                 :::*                    LISTEN      6779/java           
	tcp6       0      0 172.16.81.132:2888      :::*                    LISTEN      6779/java           
	tcp6       0      0 172.16.81.132:3888      :::*                    LISTEN      6779/java                 
	tcp6       0      0 :::38557                :::*                    LISTEN      9395/java

4、測試zookeeper和kafka是否正常

(1)建立一個主題
[root@kafka2 ~]# /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic summer
	Created topic "summer".
	#註意:factor大小不能超過broker數,否則報錯,當前集群broker值值為2
(2)查看有哪些主題已經創建
[root@kafka2 ~]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 172.16.81.132:2181
	summer
[root@kafka1 ~]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 172.16.81.131:2181
	summer
(3)查看topic的詳情
[root@kafka2 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic summer
	Topic:summer	PartitionCount:1	ReplicationFactor:2	Configs:
	Topic: summer	Partition: 0	Leader: 2	Replicas: 2,1	Isr: 2,1
	#主題名稱:summer
	#Partition:只有一個,從0開始
	#leader :id為2的broker
	#Replicas 副本存在於broker id為2,1的上面
	#Isr:活躍狀態的broker
(4)發送消息,這裡使用的是生產者角色
[root@kafka2 ~]# /bin/bash /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092  --topic summer
	>Hello,wangyanlin
	>I am from china.
	>
	>
	>;
	>^C[root@kafka2 ~]# 
(5)接收消息,這裡使用的是消費者角色
[root@kafka2 ~]# /opt/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic summer --from-beginning
	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
	Hello,wangyanlin
	I am from china.


	;

	^CProcessed a total of 5 messages
[root@kafka1 kafka]# /opt/kafka/bin/kafka-console-consumer.sh --zookeeper 172.16.81.132:2181 --topic summer --from-beginning
	Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
	Hello,wangyanlin
	I am from china.


	;
	^CProcessed a total of 5 messages
(6)刪除消費主題
	/opt/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic summer
	開啟conf裡面的 delete.topic.enable改成true
#測試正常!!完成!  

測試kafka集群能正常接收消費信息和消費信息!! 

後續將發佈配置logstash日誌收集和過濾,還有kibana圖形化展示。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1. 前言 IValueConverter是用於數據綁定的強大的武器,它用於Value在Binding Source和Binding Target之間的轉換。本文將介紹IValueConverter的用法及一些常用的實現。 2. 為什麼要使用IValueConverter 假設有如下的類TestRe ...
  • 以下涉及到的所有資源都在這裡: 鏈接:https://pan.baidu.com/s/1eSctT5K 密碼:174s *我的VS2010的安裝位置:D:\Program Files (x86)\Microsoft Visual Studio 10.0 1.關於Glut的配置 1.1. 下載GLUT ...
  • Linux的root密碼破解不像Windows的密碼破解,windows的登錄密碼破解需要介入工具進行破解。Centos6和centos7的密碼方法也是不一樣的,具體如下: 首先是Centos 6的Root密碼破解 開機按esc 按 e 鍵進入編輯模式 選擇Kernel /vmlinz-2.6.32 ...
  • 上章分析了uboot啟動流程後,接下來便來配置新的單板,實現nor、nand啟動 1.首先在uboot里新建單板2440 1.1將2410的單板文件夾拷貝成2440: 然後將smdk2440下的smdk2410.c改為smdk2440.c,以及修改更改好的Makefile 1.2 將2410的頭文件 ...
  • 這是我在項目實戰中的個人總結,寫的倉促,有些東西也不一定准確,有些是自己推斷的,還希望各位多多指教,多多評論。 關於QCombox如果不需要自定義,其實寫UI是很簡單的。 創建實例:QComboBox* m_pMicrophoneCombox = new QComboBox; 我是用的QSS去的寫樣 ...
  • 安裝apache 1.安裝yum -y install httpd 2.設置apache服務開機啟動systemctl enable httpd.service 3.開啟apache服務systemctl start httpd.service 使用公網訪問能看到apache就說明安裝成功;如果未成 ...
  • 1、顯示硬碟及所屬分區情況。在終端視窗中輸入如下命令 可以看到要掛在的2T磁碟 2、對硬碟進行分區。在終端視窗中輸入如下命令: 如下圖所示:在Command (m for help)提示符後面輸入m顯示一個幫助菜單。 在Command (m for help)提示符後面輸入n,執行 add a ne ...
  • 在向伺服器拷貝文件的時候卡死,直接任務管理器結束應用程式,但是隨之引發一個問題,就是之後不能從本地向伺服器拷貝文件了,只能伺服器自己複製粘貼。 解決辦法重啟rdpclip.exe,先在任務管理器中結束rdpclip.exe 進程,然後重新運行(開始->運行->rdpclip.exe ),解決問題。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...