分散式系統中zookeeper實現配置管理+集群管理

来源:https://www.cnblogs.com/iforever/archive/2018/05/27/9095095.html
-Advertisement-
Play Games

引言 之前就瞭解過kafka,看的似懂非懂,最近項目組中引入了 "kafka" ,剛好接著這個機會再次學習下。 Kafka在很多公司被用作分散式高性能消息隊列,kafka之前我只用過redis的list來做簡單的隊列處理,也還算好用,可能數據量比較小,也是單機運行,未出現過問題,用作輕量級消息隊列還 ...


引言

之前就瞭解過kafka,看的似懂非懂,最近項目組中引入了kafka,剛好接著這個機會再次學習下。

Kafka在很多公司被用作分散式高性能消息隊列,kafka之前我只用過redis的list來做簡單的隊列處理,也還算好用,可能數據量比較小,也是單機運行,未出現過問題,用作輕量級消息隊列還是比較好用的。而redis的作者antirez,設計redis的初衷並不是用來做消息隊列,但用它做消息隊列的人貌似還挺多,以至於後來antirez後來新開了個項目disque,專門用來做消息隊列,但這個不是本文的重點。

在瞭解kafka的時候,發現他與zookeeper綁定的比較緊密,為了更好的理解kafka,我必須先將zookeeper搞明白。

ZooKeeper是一種分散式協調服務,用於管理大型主機。在分散式環境中協調和管理服務是一個複雜的過程。ZooKeeper通過其簡單的架構和API解決了這個問題。 ZooKeeper允許開發人員專註於核心應用程式邏輯,而不必擔心應用程式的分散式特性。

這是從互聯網上引用的一段話,分散式應用不同於單機引用,維護起來非常複雜,現在的分散式系統大部分已經離不開zookeeper(或者類似的解決方案)了,zookeeper簡化了分散式應用的管理和部署,本文就通過實例來探討學習下zookeeper。

實例

本人也是持著學習的態度來寫本篇文章的,後文的實例都未在生產環境中使用過,都是學習之後的實踐整理,偏向於應用,對其中的演算法原理並未深究。有瑕疵遺漏的地方還望斧正。

配置管理

假如,我們線上有個伺服器集群,成百上千台伺服器,如果更新代碼的時候怎麼更新呢,一臺台機器去更新?就算是強大的麒麟臂爬也要累折了o(╯□╰)o,今天我們就試試用zookeeper來給伺服器集群部署代碼。

原理

zookeeper提供了節點watch的功能,zookeeper的client(對外提供服務的server)監控zookeeper上的節點(znode),當節點變動的時候,client會收到變動事件和變動後的內容,基於zookeeper的這個特性,我們可以給伺服器集群中的所有機器(client)都註冊watch事件,監控特定znode,節點中存儲部署代碼的配置信息,需要更新代碼的時候,修改znode中的值,伺服器集群中的每一臺server都會收到代碼更新事件,然後觸發調用,更新目標代碼。也可以很容易的橫向擴展,可以隨意的增刪機器,機器啟動的時候註冊監控節點事件即可。

我的機器數量有限,在本地模擬zookeeper集群和伺服器集群,原理都是一樣的,可能具體實施的時候有些小異。

在本機通過3個埠模擬zookeeper集群,多個目錄模擬伺服器集群。

zookeeper配置

本文只是模擬,為了方便,所有的節點全在一臺機器上,效果是類似的。

創建/path/to/zookeeper/conf/zoo1.cfg/path/to/zookeeper/conf/zoo2.cfg/path/to/zookeeper/conf/zoo3.cfg三個文件,配置分別如下:

zoo1.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zk1/data
dataLogDir=/tmp/zk1/log
clientPort=2181
server.1=localhost:2888:3888
server.2=localhost:2899:3899
server.3=localhost:2877:3877

zoo2.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zk2/data
dataLogDir=/tmp/zk2/log
clientPort=2182
server.1=localhost:2888:3888
server.2=localhost:2899:3899
server.3=localhost:2877:3877

zoo3.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zk3/data
dataLogDir=/tmp/zk3/log
clientPort=2183
server.1=localhost:2888:3888
server.2=localhost:2899:3899
server.3=localhost:2877:3877

配置文件中dataDirdataLogDirclientPort這三個配置是有差別的。

分別在3個節點對應的dataDir中建立myid文件,裡面輸入伺服器標識號

echo 1 > /tmp/zk1/data/myid
echo 2 > /tmp/zk2/data/myid
echo 3 > /tmp/zk3/data/myid

啟動三個節點

bin/zkServer.sh start conf/zoo1.cfg
bin/zkServer.sh start conf/zoo2.cfg
bin/zkServer.sh start conf/zoo3.cfg

查看三個節點,可以看到1、3號接節點是follower節點,2號節點是leader節點

zookeeper bin/zkServer.sh status conf/zoo3.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo3.cfg
Mode: follower
➜  zookeeper bin/zkServer.sh status conf/zoo2.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo2.cfg
Mode: leader
➜  zookeeper bin/zkServer.sh status conf/zoo1.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo1.cfg
Mode: follower

客戶端代碼模擬

from kazoo.client import KazooClient
import time
import json
import subprocess
import os


zk = KazooClient(hosts="10.222.76.148:2181, 10.222.76.148:2182, 10.222.76.148:2183")

zk.start()

FILE_DIR = os.path.split(os.path.realpath(__file__))[0]

'''切換到指定文件夾,不存在的話創建並切換'''


def go_dir(dir_name):
    if os.path.exists(dir_name):
        pass
    else:
        os.makedirs(dir_name)
    os.chdir(dir_name)


'''從git獲取代碼'''


def handle_watch(data):
    try:
        info = json.loads(data)
        if not isinstance(info, dict):
            raise Exception("節點數據不是json穿")
        if not "relativePath" in info:
            raise Exception("節點json缺少[relativePath]欄位")
        if not "url" in info:
            raise Exception("節點json缺少[url]欄位")
        if not "commitId" in info:
            raise Exception("節點json缺少[commitId]欄位")

        chdir = os.path.join(FILE_DIR, info["relativePath"])
        go_dir(chdir)

        print("開始執行git clone ...")
        res = subprocess.call(['git', 'status'])

        if 0 == res:
            res = subprocess.call(['git', 'pull'])
        else:
            res = subprocess.call(['git', 'clone', info["url"], '.'])

        if 0 != res:
            raise Exception("clone/pull代碼失敗")

        commitId = subprocess.check_output(["git", "rev-parse", "HEAD"])
        commitId = commitId.decode()
        commitId = commitId.strip()
        if commitId != info["commitId"]:
            raise Exception("正確版本Id[%s],當前版本Id[%s]" % (commitId, info["commitId"]))

    except Exception as e:
        print(e)
        print("更新失敗")
        return 1
    else:
        print("正確版本Id[%s],當前版本Id[%s]" % (commitId, info["commitId"]))
        print("更新成功")
        return 0
    finally:
        pass


@zk.DataWatch("/app/business/config")
def watch_node(data, stat):
    if data:
        data = data.decode("utf-8")
        handle_watch(data)
    else:
        print("數據為空")


while True:
    time.sleep(100)
    print('tick')

新建2個文件夾模擬server集群,複製client.py到每個伺服器中

mkdir /tmp/server1
mkdir /tmp/server2

分別運行伺服器上監控zookeeper節點變動的代碼:

python3 /tmp/server1/client.py
python3 /tmp/server2/client.py

啟動之後,像znode節點/app/business/config中寫入信息:

from kazoo.client import KazooClient
import json
zk = KazooClient(hosts="192.168.0.105:2181, 192.168.0.105:2182, 192.168.0.105:2183")
zk.start()
znode = {
  "url": "https://github.com/aizuyan/daemon.git",
  "commitId": "d5f5f144c66f0a36d452e9e13067b21d3d89b743",
  "relativePath": "daemon"
}
znode = json.dumps(znode)
znode = bytes(znode, encoding="utf-8")
zk.set("/app/business/config", znode);

寫完之後,會看到上面兩個模擬的伺服器會馬上收到信息:

開始執行git clone ...
On branch master
Your branch is up-to-date with 'origin/master'.

nothing to commit, working tree clean
Already up-to-date.
正確版本Id[aea4096f490ff9556124fa5059ca702cc2acdf0e],當前版本Id[aea4096f490ff9556124fa5059ca702cc2acdf0e]
更新成功

開始執行git clone ...
On branch master
Your branch is up-to-date with 'origin/master'.

nothing to commit, working tree clean
Already up-to-date.
正確版本Id[aea4096f490ff9556124fa5059ca702cc2acdf0e],當前版本Id[aea4096f490ff9556124fa5059ca702cc2acdf0e]
更新成功


配合上git的hook機制,可以做一個完整的系統,當代碼有更新的時候更新保存代碼信息znode上的數據,zookeeper push到所有watch這個節點的伺服器,伺服器更新代碼,所有伺服器完成一次更新操作。

服務發現

原理


註冊一個持久節點/service/business/what,他下麵的每個子節點都是一個可用服務,保存了服務的地址埠等信息,服務調用者通過zookeeper獲取/service/business/what所有子節點信息來得到可用的服務。下麵的節點都是臨時節點,伺服器啟動的時候會過來註冊一個臨時節點,伺服器掛掉之後或主動關閉之後,臨時節點會自動移除,這樣就可以保證使用者獲取的what服務都是可用的,而且可以動態的擴容縮容。

我在本地通過docker來模擬伺服器集群,集群中的所有nginx都通過各自的80埠對外提供服務。通過python-nmap定時掃描埠占用情況,如果是open狀態則可對外提供服務,如果是closed狀態,則停止對外提供服務。如果由於網路抖動刪除了臨時節點,網路恢復之後,會重新掃描到自身服務可用,然後創建臨時節點。

監控服務

容器中啟動一個nginx,通過一個進程監控nginx綁定的埠,當埠對外提供服務時,我就認為服務可用,當埠停止對外提供服務時,我就認為服務不可用,相應的刪除或者創建臨時節點,代碼如下所示:

from kazoo.client import KazooClient
import time
import nmap
import os
import json

ZNODE_BASE_PATH="/service/business/what/"
zk = KazooClient(
    hosts="192.168.0.105:2181, 192.168.0.105:2182, 192.168.0.105:2183"
)
zk.start()
znode = ZNODE_BASE_PATH+"/s"+os.environ["PORT"]

def get_server_info():
    server_info = (os.environ["URL"], os.environ["PORT"])
    return server_info

def is_port_run(ip, port):
    nm = nmap.PortScanner()
    info = nm.scan(ip, port)
    state = info['scan'][ip]['tcp'][int(port)]['state']
    ret = False
    if state == "open":
        ret = True
    return ret

server_info = get_server_info()
server_info = json.dumps(server_info).encode("utf-8")

while True:
    time.sleep(2)
    is_alive = is_port_run("127.0.0.1", "80")
    if is_alive:
        if not zk.exists(znode):
            zk.create(znode, server_info, ephemeral=True, makepath=True)
    else:
        if zk.exists(znode):
            zk.delete(znode)

docker配置

每個伺服器綁定的埠信息通過docker運行的時候傳入參數決定,這樣就可以通過同一個鏡像方便的創建多個容器實例了,方便快捷,下麵是dockerfile:

FROM python:latest
MAINTAINER Liam Yan

# 擴充源
RUN grep '^deb ' /etc/apt/sources.list | sed 's/^deb/deb-src/g' > /etc/apt/sources.list.d/deb-src.list
RUN apt-get update -y

RUN apt-get install nginx -y
RUN mkdir /usr/share/nginx/logs
RUN apt-get install nmap -y
RUN pip3 install python-nmap
RUN pip3 install kazoo
ADD nginx.conf /etc/nginx/nginx.conf
ADD is_alive.py /usr/local/is_alive.py
ADD run.sh /usr/local/run.sh


EXPOSE 80

CMD ["/bin/bash", "/usr/local/run.sh"]

其中nginx.conf是容器中的nginx配置文件,最簡單的就可以,只要可以驗證該伺服器是否可用即可,但一定要註意,要在nginx配置文件中加入daemon off;,不然docker可能會啟動之後馬上退出。is_alive.py就是上面的用來檢測容器中的服務是否可用。run.sh內容如下,啟動一個後臺監控進程之後,再啟動nginx。

nohup python3 /usr/local/is_alive.py &
nginx

創建鏡像並運行

通過dockerfile創建鏡像docker build --rm -t zookeeper_test .,創建成功之後運行5個伺服器:

docker run -e "URL=127.0.0.1" -e "PORT=9099" --name yrt5 -p 9099:80 -d nzookeeper_test
docker run -e "URL=127.0.0.1" -e "PORT=9098" --name yrt4 -p 9098:80 -d nzookeeper_test
docker run -e "URL=127.0.0.1" -e "PORT=9097" --name yrt3 -p 9097:80 -d nzookeeper_test
docker run -e "URL=127.0.0.1" -e "PORT=9096" --name yrt2 -p 9096:80 -d nzookeeper_test
docker run -e "URL=127.0.0.1" -e "PORT=9095" --name yrt1 -p 9095:80 -d nzookeeper_test

啟動之後運行docker ps -a,可以看到,埠可以隨便取,只要別衝突就行,

➜  zookeeper git:(master) docker ps -a
CONTAINER ID        IMAGE                COMMAND                  CREATED                  STATUS              PORTS                  NAMES
5ae23ae351ed        nginx_python_alive   "/bin/bash /usr/loca…"   Less than a second ago   Up 2 seconds        0.0.0.0:9096->80/tcp   yrt5
e4a961e7853e        nginx_python_alive   "/bin/bash /usr/loca…"   44 seconds ago           Up 49 seconds       0.0.0.0:9095->80/tcp   yrt4
f96650b188be        nginx_python_alive   "/bin/bash /usr/loca…"   35 minutes ago           Up 35 minutes       0.0.0.0:9099->80/tcp   yrt3
084f71db25f2        nginx_python_alive   "/bin/bash /usr/loca…"   35 minutes ago           Up 35 minutes       0.0.0.0:9090->80/tcp   yrt2
159199bee2ed        nginx_python_alive   "/bin/bash /usr/loca…"   36 minutes ago           Up 36 minutes       0.0.0.0:8080->80/tcp   yrt1

對外提供服務

通過讀取/service/business/what節點下的所有子節點就可以獲取到所有的可用服務,代碼如下:

from kazoo.client import KazooClient
import json

def get_servers():
  zk = KazooClient(hosts="192.168.0.105:2181, 192.168.0.105:2182, 192.168.0.105:2183")
  zk.start()
  ZNODE = "/service/business/what"
  children = zk.get_children(ZNODE)
  servers = []

  for child in children:
    child_znode = ZNODE + "/" + child
    child_server_info, stat = zk.get(child_znode)
    child_server_info = child_server_info.decode()
    child_server_info = json.loads(child_server_info)
    servers.append(child_server_info[0] + ":" + child_server_info[1])

  return servers

運行之後得到可用服務列表['127.0.0.1:9096', '127.0.0.1:9095', '127.0.0.1:8080', '127.0.0.1:9099', '127.0.0.1:9090'],使用者只需要隨機選擇一個使用就可以了。

除此之外,還可以在從zookeeper獲取可用服務列表的時候加一層緩存,提高性能,額外一個進程watch/service/business/what的子節點變動,當有子節點變動的時候,刪除緩存,這樣就可以做到緩存中的內容'時時'和zookeeper中保持一致了

在kafka中的作用

至此大概對zookeeper在實際應用中的作用有了大概瞭解,這對我理解他在kafka中的作用有很大的幫助。在kafka中,zookeeper負責的是存儲kafka中的元數據信息,隊列的數據是不會存儲到zookeeper的,kafka是分散式的,zookeeper協調broker、producer、consumer之間的關係,當有新的角色加入的時候,更新zookeeper中的數據,其他角色就可以得到通知,並作出相應的調整,不需要停機更新配置,做到動態擴容。下圖來自互聯網,比較清晰的展示了zookeeper中存儲的kafka元信息數據。

zookeeper在kafka中充當的更像是分散式服務中配置中心的角色,所有配置信息、公共信息都丟到這裡來了,此為吾之愚見,望斧正。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • static和class的使用 static 使用 在非class的類型(包括enum和struct)中,一般使用static來描述類型作用域。在這個類型中,我們可以在類型範圍中聲明並使用存儲屬性,計算屬性和方法。 1 //other 2 struct Point { 3 let x: Double ...
  • 一、前言 博主也是vue道路上的行者,道行不深,希望自己的東西能對大家有所幫助。這篇博客針對 瞭解過vue基礎,但是沒有做過vue項目的童鞋。如果想看基礎指令,可以看我之前的一篇博客,請點擊 跳轉, 不過我還是建議看文檔比較好。os: Vue文檔是非常詳細的 二、準備 做vue單頁應用都需要會什麼? ...
  • 最近項目回歸使用jquery,頁面渲染全是使用jquery做的,所以做的時候也遇到了許多以前沒有見過的問題,如這次操作【radio】控制項的"checked"屬性時有遇到問題, $("...").attr("checked",false);無法起到作用,上網查了下使用prop()完美的解決了該問題,特 ...
  • 內容:回調函數;阻塞/同步、非阻塞、和非同步區別;阻塞和非阻塞代碼實例 Node.js 回調函數Node.js 非同步編程的直接體現就是回調。非同步編程依托於回調來實現,但不能說使用了回調後程式就非同步化了。回調函數在完成任務後就會被調用,Node 使用了大量的回調函數,Node 所有 API 都支持回調函 ...
  • 使用背景: 為什麼使用: 定義: 為什麼選擇兩根連詞線(--)表示變數? 用法: 作用域: 使用:root 作用域來定義全局變數: 如果想讓某個變數只在部分元素/組件下可見,只需要在特定的元素下定義該變數: 媒體查詢也可以提供作用域: 下麵一個例子來展示偽類下的作用域(例如,:hover): ...
  • 最近兄弟團隊讓我去幫忙優化兩個頁面,前端用的react全家桶,後端用的python,上一次寫react代碼都過去一年了,順著以前的的學習思路,再捋順一下react的要點 組件的生命周期就是Reac的工作過程,就好比人有生老病死,自然界有日月更替,每個組件在網頁中也會有被創建、更新和刪除,如同有聲明的 ...
  • 可以利用js中函數的閉包進行封裝 通常我們可以用下麵這種方法進行一個封裝,這樣在外部引入我們寫的這個js文件後,就可以直接使用export.getUserId()這種形式去調用該函數 上面寫法等價於下麵這一種,下麵可能更易於理解,但都差不多,這樣就進行了封裝然後在其他地方就可以通過window的全局 ...
  • 工廠方法模式 簡單工廠類 簡單工廠模式屬於創建型模式,又稱靜態工廠方法(Static factory method)模式。其是由一個工廠對象決定創建出哪一種產品類的實例,可理解為不同工廠模式的一個特殊實現。 上述代碼對於修改開放了,違反了開放封閉原則。故而引出工廠方法模式,去解決這樣的矛盾。 GOF ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...