ZooKeeper的三種典型應用場景

来源:https://www.cnblogs.com/jian0110/archive/2019/04/03/10650396.html
-Advertisement-
Play Games

引言 ZooKeeper是中典型的pub/sub模式的分散式數據管理與協調框架,開發人員可以使用它進行分散式數據的發佈與訂閱。另外,其豐富的數據節點類型可以交叉使用,配合Watcher事件通知機制,可以應用於分散式都會涉及的一些核心功能:數據發佈/訂閱、Master選舉、命名服務、分散式協調/通知、 ...


引言

  ZooKeeper是中典型的pub/sub模式的分散式數據管理與協調框架,開發人員可以使用它進行分散式數據的發佈與訂閱。另外,其豐富的數據節點類型可以交叉使用,配合Watcher事件通知機制,可以應用於分散式都會涉及的一些核心功能:數據發佈/訂閱、Master選舉、命名服務、分散式協調/通知、集群管理、分散式鎖、分散式隊列等。本博文主要介紹:發佈/訂閱、分散式鎖、Master選舉三種最常用的場景

  本文中的代碼示例均是由Curator客戶端編寫的,已經對ZooKeeper原生API做好很多封裝。參考資料《從Paxos到Zookeeper  分散式一致性原理與實踐》(有需要電子PDF的朋友,可以評論私信我)

 


 

一、數據發佈/訂閱

1、基本概念

(1)數據發佈/訂閱系統即所謂的配置中心,也就是發佈者將數據發佈到ZooKeeper的一個節點或者一系列節點上,提供訂閱者進行數據訂閱,從而實現動態更新數據的目的,實現配置信息的集中式管理和數據的動態更新。ZooKeeper採用的是推拉相結合的方式:客戶端向伺服器註冊自己需要關註的節點,一旦該節點的數據發生改變,那麼服務端就會向相應的客戶端發送Wacher事件通知,客戶端接收到消息通知後,需要主動到服務端獲取最新的數據。

(2)實際系統開發過程中:我們可以將初始化配置信息放到節點上集中管理,應用在啟動時都會主動到ZooKeeper服務端進行一次配置讀取,同時在指定節點註冊Watcher監聽,主要配置信息一旦變更,訂閱者就可以獲取讀取最新的配置信息。通常系統中需要使用一些通用的配置信息,比如機器列表信息、運行時的開關配置、資料庫配置信息等全局配置信息,這些都會有以下3點特性:

  1) 數據量通常比較小(通常是一些配置文件)

  2) 數據內容在運行時會經常發生動態變化(比如資料庫的臨時切換等)

  3) 集群中各機器共用,配置一致(比如資料庫配置共用)。

(3)利用的ZooKeeper特性是:ZooKeeper對任何節點(包括子節點)的變更,只要註冊Wacther事件(使用Curator等客戶端工具已經被封裝好)都可以被其它客戶端監聽

2、代碼示例

package com.lijian.zookeeper.demo;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

import java.util.concurrent.CountDownLatch;

public class ZooKeeper_Subsciption {
    private static final String ADDRESS = "xxx.xxx.xxx.xxx:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private static final String PATH = "/configs";
    private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    private static String config = "jdbc_configuration";
    private static CountDownLatch countDownLatch = new CountDownLatch(4);

    public static void main(String[] args) throws Exception {
        // 訂閱該配置信息的集群節點(客戶端):sub1-sub3
        for (int i = 0; i < 3; i++) {
            CuratorFramework consumerClient = getClient();
            subscribe(consumerClient, "sub" + String.valueOf(i));
        }
        // 更改配置信息的集群節點(客戶端):pub
        CuratorFramework publisherClient = getClient();
        publish(publisherClient, "pub");

    }
    private static void init() throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(ADDRESS)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
        // 檢查節點是否存在,不存在則初始化創建
        if (client.checkExists().forPath(PATH) == null) {
            client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    .forPath(PATH, config.getBytes());
        }
    }


    /**
     * 創建客戶端並且初始化建立一個存儲配置數據的節點
     *
     * @return
     * @throws Exception
     */
    private static CuratorFramework getClient() throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(ADDRESS)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
        if (client.checkExists().forPath(PATH) == null) {
            client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    .forPath(PATH, config.getBytes());
        }
        return client;
    }

    /**
     * 集群中的某個節點機器更改了配置信息:即發佈了更新了數據
     *
     * @param client
     * @throws Exception
     */
    private static void publish(CuratorFramework client, String znode) throws Exception {

        System.out.println("節點[" + znode + "]更改了配置數據...");
        client.setData().forPath(PATH, "configuration".getBytes());
        countDownLatch.await();
    }

    /**
     * 集群中訂閱的節點客戶端(機器)獲得最新的配置數據
     *
     * @param client
     * @param znode
     * @throws Exception
     */
    private static void subscribe(CuratorFramework client, String znode) throws Exception {
        // NodeCache監聽ZooKeeper數據節點本身的變化
        final NodeCache cache = new NodeCache(client, PATH);
        // 設置為true:NodeCache在第一次啟動的時候就立刻從ZooKeeper上讀取節點數據並保存到Cache中
        cache.start(true);
        System.out.println("節點["+ znode +"]已訂閱當前配置數據:" + new String(cache.getCurrentData().getData()));
        // 節點監聽
        countDownLatch.countDown();
        cache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() {
                System.out.println("配置數據已發生改變, 節點[" + znode + "]讀取當前新配置數據: " + new String(cache.getCurrentData().getData()));
            }
        });
    }
}
View Code

運行結果:節點[pub]更改了配置數據為“configuration”,訂閱"/configs"節點的sub1-sub3觀測到配置被改變,立馬讀取當前最新的配置數據“configuration”

 

二、Master選舉

1、基本概念

  (1)在一些讀寫分離的應用場景中,客戶端寫請求往往是由Master處理的,而另一些場景中,Master則常常負責處理一些複雜的邏輯,並將處理結果同步給集群中其它系統單元。比如一個廣告投放系統後臺與ZooKeeper交互,廣告ID通常都是經過一系列海量數據處理中計算得到(非常消耗I/O和CPU資源的過程),那就可以只讓集群中一臺機器處理數據得到計算結果,之後就可以共用給整個集群中的其它所有客戶端機器。

  (2)利用ZooKeeper的特性:利用ZooKeeper的強一致性,即能夠很好地保證分散式高併發情況下節點的創建一定能夠保證全局唯一性,ZooKeeper將會保證客戶端無法重覆創建一個已經存在的數據節點,也就是說如果多個客戶端請求創建同一個節點,那麼最終一定只有一個客戶端請求能夠創建成功,這個客戶端就是Master,而其它客戶端註在該節點上註冊子節點Wacther,用於監控當前Master是否存活,如果當前Master掛了,那麼其餘客戶端立馬重新進行Master選舉。

  (3)競爭成為Master角色之後,創建的子節點都是臨時順序節點,比如:_c_862cf0ce-6712-4aef-a91d-fc4c1044d104-lock-0000000001,並且序號是遞增的。需要註意的是這裡有"lock"單詞,這說明ZooKeeper這一特性,也可以運用於分散式鎖。

   

 

2、代碼示例

 

package com.lijian.zookeeper.demo;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class ZooKeeper_Master {

    private static final String ADDRESS="xxx.xxx.xxx.xxx:2181";
    private static final int SESSION_TIMEOUT=5000;
    private static final String MASTER_PATH = "/master_path";
    private static final int CLIENT_COUNT = 5;

    private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);


    public static void main(String[] args) throws InterruptedException {

        ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
        for (int i = 0; i < CLIENT_COUNT; i++) {
            final String index = String.valueOf(i);
            service.submit(() -> {
                masterSelect(index);
            });
        }
    }

    private static void  masterSelect(final String znode){
        // client成為master的次數統計
        AtomicInteger leaderCount = new AtomicInteger(1);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(ADDRESS)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
        // 一旦執行完takeLeadership,就會重新進行選舉
        LeaderSelector selector = new LeaderSelector(client, MASTER_PATH, new LeaderSelectorListenerAdapter() {
            @Override
            public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                System.out.println("節點["+ znode +"]成為master");
                System.out.println("節點["+ znode +"]已經成為master次數:"+ leaderCount.getAndIncrement());
                // 睡眠5s模擬成為master後完成任務
                Thread.sleep(5000);
                System.out.println("節點["+ znode +"]釋放master");
            }
        });
        // autoRequeue自動重新排隊:使得上一次選舉為master的節點還有可能再次成為master
        selector.autoRequeue();
        selector.start();
    }
}
View Code

運行結果:由於執行selector.autoRequeue()方法,被選舉為master後的節點可能會再次獲被選舉為master,所以會一直迴圈執行,以下只截圖部分。其中獲取成為master的次數充分表明瞭Master選舉的公平性。

 

 

 

三、分散式鎖

1、基本概念

  (1)對於排他鎖:ZooKeeper通過數據節點表示一個鎖,例如/exclusive_lock/lock節點就可以定義一個鎖,所有客戶端都會調用create()介面,試圖在/exclusive_lock下創建lock子節點,但是ZooKeeper的強一致性會保證所有客戶端最終只有一個客戶創建成功。也就可以認為獲得了鎖,其它線程Watcher監聽子節點變化(等待釋放鎖,競爭獲取資源)。

     對於共用鎖:ZooKeeper同樣可以通過數據節點表示一個鎖,類似於/shared_lock/[Hostname]-請求類型(讀/寫)-序號的臨時節點,比如/shared_lock/192.168.0.1-R-0000000000

2、代碼示例

Curator提供的有四種鎖,分別如下:

  (1)InterProcessMutex:分散式可重入排它鎖

  (2)InterProcessSemaphoreMutex:分散式排它鎖

  (3)InterProcessReadWriteLock:分散式讀寫鎖

  (4)InterProcessMultiLock:將多個鎖作為單個實體管理的容器

主要是以InterProcessMutex為例,編寫示例:

package com.lijian.zookeeper.demo;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ZooKeeper_Lock {
    private static final String ADDRESS = "xxx.xxx.xxx.xxx:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private static final String LOCK_PATH = "/lock_path";
    private static final int CLIENT_COUNT = 10;

    private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    private static int resource = 0;

    public static void main(String[] args){
        ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
        for (int i = 0; i < CLIENT_COUNT; i++) {
            final String index = String.valueOf(i);
            service.submit(() -> {
                distributedLock(index);
            });
        }
    }

    private static void distributedLock(final String znode) {
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(ADDRESS)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
        final InterProcessMutex lock = new InterProcessMutex(client, LOCK_PATH);
        try {
//            lock.acquire();
            System.out.println("客戶端節點[" + znode + "]獲取lock");
            System.out.println("客戶端節點[" + znode + "]讀取的資源為:" + String.valueOf(resource));
            resource ++;
//            lock.release();
            System.out.println("客戶端節點[" + znode + "]釋放lock");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
View Code

 

運行結果:加鎖後可以從左圖看到讀取的都是最新的資源值。如果去掉鎖的話讀取的資源值不能保證是最新值看右圖

    

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 相對路徑推薦使用這個 絕對路徑推薦這個 封裝成jar時候區別: ...
  • JMS消息服務介紹和使用場景 什麼是JMS JMS : Java Message Service(Java消息服務),Java平臺中關於面向消息中間件的介面. 重點在於介面,介面就意味著與JDBC類似,僅僅有聲明,沒有實現,具體的實現交給廠商. 介面本身是一種與廠商無關的API. 使用場景 每一種技 ...
  • 什麼是this this是一個const指針,存的是 當前對象 的地址,指向 當前對象 ,通過this指針可以訪問類中的所有成員。 當前對象是指正在使用的對象,比如 ,`a`就是當前對象。 關於this 1. 每個對象都有this指針,通過this來訪問自己的地址。 2. 每個成員函數都有一個指針形 ...
  • 首先是拉去鏡像(或者直接創建容器自然會拉去) 創建容器 啟動 查看日誌 可以看到由於我的伺服器只有2G記憶體導致啟動因為記憶體不足無法啟動,修改啟動的記憶體大小 重新啟動ES 測試 ...
  • // 設置 restTemplate FormHttpMessageConverter 編碼方式 @Configurationpublic class RestTemplateConfig { @Bean RestTemplate restTemplate(){ SimpleClientHttpRe ...
  • 1.XML基礎: XML全稱為eXtensible Markup Language;即可擴展標記型語言,同HTML一樣使用標簽來操作。它的可擴展性體現在標簽可以由自己定義,可以是中文標簽。 XML用途: 同HTML一樣可用於顯示數據,但是不是XML的主要用途。XML我們多用來存儲數據。 應用: 1) ...
  • 再議Java中的static關鍵字 java中的static關鍵字在很久之前的一篇博文中已經講到過了,感興趣的朋友可以參考:《Java中的static關鍵字解析》。 今天我們再來談一談static關鍵字,這次我們側重講述static關鍵字的一些使用場景和方式,以下是本文目錄大綱: 一.static關 ...
  • 1. Hystrix概念設計 1.1. 大綱 1.2. 基本的容錯模式 1.3. 斷路器模式 1.4. 艙壁隔離模式 1.5. 容錯理念 1. 凡事依賴都可能失敗 2. 凡事資源都有限制 3. 網路並不可靠 4. 延遲是應用穩定性殺手 1.6. 彈性理念 1.7. 攜程案例(2015) 1.8. D ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...