ZooKeeper實現生產-消費者隊列

来源:https://www.cnblogs.com/codestory/archive/2019/08/17/11368667.html
-Advertisement-
Play Games

使用ZooKeeper實現一個生產-消費者隊列,可用於多節點的分散式數據結構。生產者創建一個數據對象,並放到隊列中;消費者從隊列中取出一個數據對象併進行處理。 ...


【歡迎關註公眾號:程式猿講故事 (codestory),及時接收最新文章】

生產-消費者隊列,用於多節點的分散式數據結構,生產和消費數據。生產者創建一個數據對象,並放到隊列中;消費者從隊列中取出一個數據對象併進行處理。在ZooKeeper中,隊列可以使用一個容器節點下創建多個子節點來實現;創建子節點時,CreateMode使用 PERSISTENT_SEQUENTIAL,ZooKeeper會自動在節點名稱後面添加唯一序列號。EPHEMERAL_SEQUENTIAL也有同樣的特點,區別在於會話結束後是否會自動刪除。

敲小黑板:*_SEQUENTIAL是ZooKeeper的一個很重要的特性,分散式鎖、選舉制度都依靠這個特性實現的。

1      對前續代碼的重構

之前的文章,我們已經用實現了Watcher和Barrier,創建ZooKeeper連接的代碼已經複製了一遍。後續還需要類似的工作,因此先對原有代碼做一下重構,讓代碼味道乾凈一點。

 

以下是 process(WatchedEvent)的代碼

final public void process(WatchedEvent event) {

  if (Event.EventType.None.equals(event.getType())) {

    // 連接狀態發生變化

    if (Event.KeeperState.SyncConnected.equals(event.getState())) {

      // 連接建立成功

      connectedSemaphore.countDown();

    }

  } else if (Event.EventType.NodeCreated.equals(event.getType())) {

    processNodeCreated(event);

  } else if (Event.EventType.NodeDeleted.equals(event.getType())) {

    processNodeDeleted(event);

  } else if (Event.EventType.NodeDataChanged.equals(event.getType())) {

    processNodeDataChanged(event);

  } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {

    processNodeChildrenChanged(event);

  }

}

 

以ZooKeeperBarrier為例,看看重構之後的構造函數和監聽Event的代碼 

ZooKeeperBarrier(String address, String tableSerial, int tableCapacity, String customerName)

    throws IOException {

  super(address);

  this.tableSerial = createRootNode(tableSerial);

  this.tableCapacity = tableCapacity;

  this.customerName = customerName;

}

protected void processNodeChildrenChanged(WatchedEvent event) {

  log.info("{} 接收到了通知 : {}", customerName, event.getType());

  // 子節點有變化

  synchronized (mutex) {

    mutex.notify();

  }

}

 

2 隊列的生產者

生產者的關鍵代碼

String elementName = queueName + "/element";
ArrayList<ACL> ids = ZooDefs.Ids.OPEN_ACL_UNSAFE;
CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL;
getZooKeeper().create(elementName, value, ids, createMode);

 

註意,重點是PERSISTENT_SEQUENTIAL,PERSISTENT是表示永久存儲直到有命令刪除,SEQUENTIAL表示自動在後面添加自增的唯一序列號。這樣,儘管elementName都一樣,但實際生成的zNode名字在 “element”後面會添加格式為%010d的10個數字,如0000000001。如一個完整的zNode名可能為/queue/element0000000021。

3 隊列的消費者

消費者嘗試從子節點列表獲取zNode名最小的一個子節點,如果隊列為空則等待NodeChildrenChanged事件。關鍵代碼

/** 隊列的同步信號 */

private static Integer queueMutex = Integer.valueOf(1);

 

@Override

protected void processNodeChildrenChanged(WatchedEvent event) {

  synchronized (queueMutex) {

    queueMutex.notify();

  }

}

 

/**

 * 從隊列中刪除第一個對象

 *

 * @return

 * @throws KeeperException

 * @throws InterruptedException

 */

int consume() throws KeeperException, InterruptedException {

  while (true) {

    synchronized (queueMutex) {

      List<String> list = getZooKeeper().getChildren(queueName, true);

      if (list.size() == 0) {

        queueMutex.wait();

      } else {

        // 獲取第一個子節點的名稱

        String firstNodeName = getFirstElementName(list);

        // 刪除節點,並返回節點的值

        return deleteNodeAndReturnValue(firstNodeName);

      }

    }

  }

}

 

4 測試日誌

把測試結果放源碼前面,免得大家被無聊的代碼晃暈。

測試代碼創建了兩個線程,一個線程是生產者,按隨機間隔往隊列中添加對象;一個線程是消費者,隨機間隔嘗試從隊列中取出第一個,如果當時隊列為空,會等到直到新的數據。

兩個進程都加上隨機間隔,是為了模擬生產可能比消費更快的情況。以下是測試日誌,為了更突出,生產和消費的日誌我增加了不同的文字樣式。

49:47.866 [INFO] ZooKeeperQueueTest.testQueue(29) 開始ZooKeeper隊列測試,本次將測試 10 個數據

49:48.076 [DEBUG] ZooKeeperQueue.log(201)

+ Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]

|-- elapsed time                   [開始鏈接]   119.863 milliseconds.

|-- elapsed time           [等待連接成功的Event]    40.039 milliseconds.

|-- Total        [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]   159.911 milliseconds.

 

49:48.082 [DEBUG] ZooKeeperQueue.log(201)

+ Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]

|-- elapsed time                   [開始鏈接]   103.795 milliseconds.

|-- elapsed time           [等待連接成功的Event]    65.899 milliseconds.

|-- Total        [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]   170.263 milliseconds.

 

49:48.102 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 1 , 然後等待 1700 毫秒

49:48.134 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 1 , 然後等待 4000 毫秒

49:49.814 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 2 , 然後等待 900 毫秒

49:50.717 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 3 , 然後等待 1300 毫秒

49:52.020 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 4 , 然後等待 3700 毫秒

49:52.139 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 2 , 然後等待 2800 毫秒

49:54.947 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 3 , 然後等待 4500 毫秒

49:55.724 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 5 , 然後等待 3500 毫秒

49:59.228 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 6 , 然後等待 4200 毫秒

49:59.454 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 4 , 然後等待 2400 毫秒

50:01.870 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 5 , 然後等待 4900 毫秒

50:03.435 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 7 , 然後等待 4500 毫秒

50:06.776 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 6 , 然後等待 3600 毫秒

50:07.938 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 8 , 然後等待 1900 毫秒

50:09.846 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 9 , 然後等待 3200 毫秒

50:10.388 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 7 , 然後等待 2900 毫秒

50:13.051 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 10 , 然後等待 4900 毫秒

50:13.294 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 8 , 然後等待 300 毫秒

50:13.600 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 9 , 然後等待 4800 毫秒

50:18.407 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 10 , 然後等待 2400 毫秒

 

5 完整源碼

5.1 ZooKeeperBase.java

package tech.codestory.zookeeper;

 

import java.io.IOException;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.slf4j.profiler.Profiler;

 

/**

 * 為 ZooKeeper測試代碼創建一個基類,封裝建立連接的過程

 *

 * @author code story

 * @date 2019/8/16

 */

public class ZooKeeperBase implements Watcher {

  /** 日誌,不使用 @Slf4j ,是要使用子類的log */

  Logger log = null;

 

  /** 等待連接建立成功的信號 */

  private CountDownLatch connectedSemaphore = new CountDownLatch(1);

  /** ZooKeeper 客戶端 */

  private ZooKeeper zooKeeper = null;

  /** 避免重覆根節點 */

  static Integer rootNodeInitial = Integer.valueOf(1);

 

  /** 構造函數 */

  public ZooKeeperBase(String address) throws IOException {

    log = LoggerFactory.getLogger(getClass());

 

    Profiler profiler = new Profiler(this.getClass().getName() + " 連接到ZooKeeper");

    profiler.start("開始鏈接");

    zooKeeper = new ZooKeeper(address, 3000, this);

    try {

      profiler.start("等待連接成功的Event");

      connectedSemaphore.await();

    } catch (InterruptedException e) {

      log.error("InterruptedException", e);

    }

    profiler.stop();

    profiler.setLogger(log);

    profiler.log();

  }

 

  /**

   * 創建測試需要的根節點

   *

   * @param rootNodeName

   * @return

   */

  public String createRootNode(String rootNodeName) {

    synchronized (rootNodeInitial) {

      // 創建 tableSerial 的zNode

      try {

        Stat existsStat = getZooKeeper().exists(rootNodeName, false);

        if (existsStat == null) {

          rootNodeName = getZooKeeper().create(rootNodeName, new byte[0],

              ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        }

      } catch (KeeperException e) {

        log.error("KeeperException", e);

      } catch (InterruptedException e) {

        log.error("InterruptedException", e);

      }

    }

    return rootNodeName;

  }

 

  /** 讀取ZooKeeper對象,供子類調用 */

  protected ZooKeeper getZooKeeper() {

    return zooKeeper;

  }

 

  @Override

  final public void process(WatchedEvent event) {

    if (Event.EventType.None.equals(event.getType())) {

      // 連接狀態發生變化

      if (Event.KeeperState.SyncConnected.equals(event.getState())) {

        // 連接建立成功

        connectedSemaphore.countDown();

      }

    } else if (Event.EventType.NodeCreated.equals(event.getType())) {

      processNodeCreated(event);

    } else if (Event.EventType.NodeDeleted.equals(event.getType())) {

      processNodeDeleted(event);

    } else if (Event.EventType.NodeDataChanged.equals(event.getType())) {

      processNodeDataChanged(event);

    } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {

      processNodeChildrenChanged(event);

    }

  }

 

  /**

   * 處理事件: NodeCreated

   *

   * @param event

   */

  protected void processNodeCreated(WatchedEvent event) {}

 

  /**

   * 處理事件: NodeDeleted

   *

   * @param event

   */

  protected void processNodeDeleted(WatchedEvent event) {}

 

  /**

   * 處理事件: NodeDataChanged

   *

   * @param event

   */

  protected void processNodeDataChanged(WatchedEvent event) {}

 

  /**

   * 處理事件: NodeChildrenChanged

   *

   * @param event

   */

  protected void processNodeChildrenChanged(WatchedEvent event) {}

}

  

5.2 ZooKeeperQueue.java

package tech.codestory.zookeeper.queue;

 

import java.io.IOException;

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.List;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.data.ACL;

import org.apache.zookeeper.data.Stat;

import lombok.extern.slf4j.Slf4j;

import tech.codestory.zookeeper.ZooKeeperBase;

 

/**

 * ZooKeeper實現Queue

 *

 * @author code story

 * @date 2019/8/16

 */

@Slf4j

public class ZooKeeperQueue extends ZooKeeperBase {

  /** 隊列名稱 */

  private String queueName;

 

  /** 隊列的同步信號 */

  private static Integer queueMutex = Integer.valueOf(1);

 

  /**

   * 構造函數

   *

   * @param address

   * @param queueName

   * @throws IOException

   */

  public ZooKeeperQueue(String address, String queueName) throws IOException {

    super(address);

 

    this.queueName = createRootNode(queueName);

  }

 

  @Override

  protected void processNodeChildrenChanged(WatchedEvent event) {

    synchronized (queueMutex) {

      queueMutex.notify();

    }

  }

 

  /**

   * 將對象添加到隊列中

   *

   * @param i

   * @return

   */

  boolean produce(int i) throws KeeperException, InterruptedException {

    ByteBuffer b = ByteBuffer.allocate(4);

    byte[] value;

 

    // Add child with value i

    b.putInt(i);

    value = b.array();

    String elementName = queueName + "/element";

    ArrayList<ACL> ids = ZooDefs.Ids.OPEN_ACL_UNSAFE;

    CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL;

    getZooKeeper().create(elementName, value, ids, createMode);

 

    return true;

  }

 

  /**

   * 從隊列中刪除第一個對象

   *

   * @return

   * @throws KeeperException

   * @throws InterruptedException

   */

  int consume() throws KeeperException, InterruptedException {

    while (true) {

      synchronized (queueMutex) {

        List<String> list = getZooKeeper().getChildren(queueName, true);

        if (list.size() == 0) {

          queueMutex.wait();

        } else {

          // 獲取第一個子節點的名稱

          String firstNodeName = getFirstElementName(list);

          // 刪除節點,並返回節點的值

          return deleteNodeAndReturnValue(firstNodeName);

        }

      }

    }

  }

 

  /**

   * 獲取第一個子節點的名稱

   *

   * @param list

   * @return

   */

  private String getFirstElementName(List<String> list) {

    Integer min = Integer.MAX_VALUE;

    String minNode = null;

    for (String s : list) {

      Integer tempValue = Integer.valueOf(s.substring(7));

      if (tempValue < min) {

        min = tempValue;

        minNode = s;

      }

    }

    return minNode;

  }

 

  /**

   * 刪除節點,並返回節點的值

   *

   * @param minNode

   * @return

   * @throws KeeperException

   * @throws InterruptedException

   */

  private int deleteNodeAndReturnValue(String minNode)

      throws KeeperException, InterruptedException {

    String fullNodeName = queueName + "/" + minNode;

    Stat stat = new Stat();

    byte[] b = getZooKeeper().getData(fullNodeName, false, stat);

    getZooKeeper().delete(fullNodeName, stat.getVersion());

    ByteBuffer buffer = ByteBuffer.wrap(b);

    return buffer.getInt();

  }

}

 

5.3 ZooKeeperQueueTest.java

package tech.codestory.zookeeper.queue;

 

import java.io.IOException;

import java.security.SecureRandom;

import java.util.Random;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.KeeperException;

import org.testng.annotations.Test;

import lombok.extern.slf4j.Slf4j;

 

/**

 * ZooKeeperQueue測試

 *

 * @author code story

 * @date 2019/8/16

 */

@Slf4j

public class ZooKeeperQueueTest {

  final String address = "192.168.5.128:2181";

  final String queueName = "/queue";

  final Random random = new SecureRandom();

  // 隨機生成10-20之間的個數

  final int count = 10 + random.nextInt(10);

  /** 等待生產者和消費者線程都結束 */

  private CountDownLatch connectedSemaphore = new CountDownLatch(2);

 

  @Test

  public void testQueue() {

    log.info("開始ZooKeeper隊列測試,本次將測試 {} 個數據", count);

    new QueueProducer().start();

    new QueueConsumer().start();

    try {

      connectedSemaphore.await();

    } catch (InterruptedException e) {

      log.error("InterruptedException", e);

    }

  }

 

  /**

   * 隊列的生產者

   */

  class QueueProducer extends Thread {

    @Override

    public void run() {

      try {

        ZooKeeperQueue queue = new ZooKeeperQueue(address, queueName);

        for (int i = 0; i < count; i++) {

          int elementValue = i + 1;

 

          long waitTime = random.nextInt(50) * 100;

          log.info("生產對象 : {} , 然後等待 {} 毫秒", elementValue, waitTime);

          queue.produce(elementValue);

          Thread.sleep(waitTime);

        }

      } catch (IOException e) {

        log.error("IOException", e);

      } catch (InterruptedException e) {

        log.error("InterruptedException", e);

      } catch (KeeperException e) {

        log.error("KeeperException", e);

      }

      connectedSemaphore.countDown();

    }

  }

 

  /**

   * 隊列的消費者

   */

  class QueueConsumer extends Thread {

    @Override

    public void run() {

      try {

        ZooKeeperQueue queue = new ZooKeeperQueue(address, queueName);

 

        for (int i = 0; i < count; i++) {

          try {

            int elementValue = queue.consume();

 

            long waitTime = random.nextInt(50) * 100;

            log.info("消費對象: {} , 然後等待 {} 毫秒", elementValue, waitTime);

            Thread.sleep(waitTime);

          } catch (KeeperException e) {

            i--;

            log.error("KeeperException", e);

          } catch (InterruptedException e) {

            log.error("InterruptedException", e);

          }

        }

        connectedSemaphore.countDown();

      } catch (IOException e) {

        log.error("IOException", e);

      }

    }

  }

}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • vue - Error: Can't resolve '@/assets/img/github.svg (vue-cli3.0,無法解析.svg圖片,已解決) ...
  • HTML顏色代碼是由16進位的三對數字分別表示紅、綠、藍(#RRGGBB)三種基本色。以紅顏色為例,紅色的代碼是 #FF0000, 代碼組成為’255′ 紅, ‘0’ 綠, 和 ‘0’ 藍。這些顏色可以用於裝飾Web頁面的背景,文字和表格等。 ...
  • JSP 中EL表達式用法詳解 EL 全名為Expression Language EL 語法很簡單,它最大的特點就是使用上很方便。接下來介紹EL主要的語法結構: ${sessionScope.user.sex} 所有EL都是以${為起始、以}為結尾的。上述EL範例的意思是:從Session的範圍中, ...
  • 數組Array是JavaScript中最常用的類型之一,同一數組中可以保存任意類型的數據,並且它的長度是動態的,會隨著數組中數據的加減自動變化。每個數組都有一個表示其長度(數組元素的個數)的length屬性。並且數組元素的索引(下標)是從0開始的,所以數組最後一個元素的索引永遠等於length – ...
  • 控制台提示“Invalid string length”,瀏覽器直接卡掉,是為什麼呢? 答:因為在寫嵌套迴圈時,定義的變數重名了,內層和外層用了同一個i變數。 -THE END- ...
  • 偽類和偽元素 有時候,你需要選擇本身沒有標簽,但是仍然易於識別的網頁部位,比如段落首行或滑鼠滑過的連接。CSS為他們提供一些選擇器:偽類和偽元素。 常用的一些偽類選擇器: 表示訪問過的鏈接 瀏覽器是通過歷史記錄來判斷一個鏈接是否訪問過, *由於涉及到用戶的隱私問題,所以使用visited偽類只能設置 ...
  • 概述: Bootstrap 是最受歡迎的 HTML、CSS 和 JS 框架,用於開發響應式佈局、移動設備優先的 WEB 項目。 作用: 開發響應式的頁面 響應式:就是一個網站能夠相容多個終端 節約開發成本,提高開發效率 入門: 下載BootStrap www.bootcss.com 官網地址 模版 ... ...
  • 單例模式 定義 保證一個類僅有一個實例,並提供一個訪問它的全局訪問點。 通常我們可以讓一個全局變數使得一個對象被訪問,但它不能防止你實例化多個對象。一個最好的辦法就是,讓類自身負責保存它的唯一實例。這個類可以保證沒有其他實例可以創建,並且它可以提供一個訪問該實例的方法。 UML圖 方式一:單線程下的 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...