使用ZooKeeper實現一個生產-消費者隊列,可用於多節點的分散式數據結構。生產者創建一個數據對象,並放到隊列中;消費者從隊列中取出一個數據對象併進行處理。 ...
【歡迎關註公眾號:程式猿講故事 (codestory),及時接收最新文章】
生產-消費者隊列,用於多節點的分散式數據結構,生產和消費數據。生產者創建一個數據對象,並放到隊列中;消費者從隊列中取出一個數據對象併進行處理。在ZooKeeper中,隊列可以使用一個容器節點下創建多個子節點來實現;創建子節點時,CreateMode使用 PERSISTENT_SEQUENTIAL,ZooKeeper會自動在節點名稱後面添加唯一序列號。EPHEMERAL_SEQUENTIAL也有同樣的特點,區別在於會話結束後是否會自動刪除。
敲小黑板:*_SEQUENTIAL是ZooKeeper的一個很重要的特性,分散式鎖、選舉制度都依靠這個特性實現的。
1 對前續代碼的重構
之前的文章,我們已經用實現了Watcher和Barrier,創建ZooKeeper連接的代碼已經複製了一遍。後續還需要類似的工作,因此先對原有代碼做一下重構,讓代碼味道乾凈一點。
以下是 process(WatchedEvent)的代碼
final public void process(WatchedEvent event) { if (Event.EventType.None.equals(event.getType())) { // 連接狀態發生變化 if (Event.KeeperState.SyncConnected.equals(event.getState())) { // 連接建立成功 connectedSemaphore.countDown(); } } else if (Event.EventType.NodeCreated.equals(event.getType())) { processNodeCreated(event); } else if (Event.EventType.NodeDeleted.equals(event.getType())) { processNodeDeleted(event); } else if (Event.EventType.NodeDataChanged.equals(event.getType())) { processNodeDataChanged(event); } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) { processNodeChildrenChanged(event); } }
以ZooKeeperBarrier為例,看看重構之後的構造函數和監聽Event的代碼
ZooKeeperBarrier(String address, String tableSerial, int tableCapacity, String customerName) throws IOException { super(address); this.tableSerial = createRootNode(tableSerial); this.tableCapacity = tableCapacity; this.customerName = customerName; } protected void processNodeChildrenChanged(WatchedEvent event) { log.info("{} 接收到了通知 : {}", customerName, event.getType()); // 子節點有變化 synchronized (mutex) { mutex.notify(); } }
2 隊列的生產者
生產者的關鍵代碼
String elementName = queueName + "/element"; ArrayList<ACL> ids = ZooDefs.Ids.OPEN_ACL_UNSAFE; CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL; getZooKeeper().create(elementName, value, ids, createMode);
註意,重點是PERSISTENT_SEQUENTIAL,PERSISTENT是表示永久存儲直到有命令刪除,SEQUENTIAL表示自動在後面添加自增的唯一序列號。這樣,儘管elementName都一樣,但實際生成的zNode名字在 “element”後面會添加格式為%010d的10個數字,如0000000001。如一個完整的zNode名可能為/queue/element0000000021。
3 隊列的消費者
消費者嘗試從子節點列表獲取zNode名最小的一個子節點,如果隊列為空則等待NodeChildrenChanged事件。關鍵代碼
/** 隊列的同步信號 */ private static Integer queueMutex = Integer.valueOf(1); @Override protected void processNodeChildrenChanged(WatchedEvent event) { synchronized (queueMutex) { queueMutex.notify(); } } /** * 從隊列中刪除第一個對象 * * @return * @throws KeeperException * @throws InterruptedException */ int consume() throws KeeperException, InterruptedException { while (true) { synchronized (queueMutex) { List<String> list = getZooKeeper().getChildren(queueName, true); if (list.size() == 0) { queueMutex.wait(); } else { // 獲取第一個子節點的名稱 String firstNodeName = getFirstElementName(list); // 刪除節點,並返回節點的值 return deleteNodeAndReturnValue(firstNodeName); } } } }
4 測試日誌
把測試結果放源碼前面,免得大家被無聊的代碼晃暈。
測試代碼創建了兩個線程,一個線程是生產者,按隨機間隔往隊列中添加對象;一個線程是消費者,隨機間隔嘗試從隊列中取出第一個,如果當時隊列為空,會等到直到新的數據。
兩個進程都加上隨機間隔,是為了模擬生產可能比消費更快的情況。以下是測試日誌,為了更突出,生產和消費的日誌我增加了不同的文字樣式。
49:47.866 [INFO] ZooKeeperQueueTest.testQueue(29) 開始ZooKeeper隊列測試,本次將測試 10 個數據 49:48.076 [DEBUG] ZooKeeperQueue.log(201) + Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper] |-- elapsed time [開始鏈接] 119.863 milliseconds. |-- elapsed time [等待連接成功的Event] 40.039 milliseconds. |-- Total [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper] 159.911 milliseconds. 49:48.082 [DEBUG] ZooKeeperQueue.log(201) + Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper] |-- elapsed time [開始鏈接] 103.795 milliseconds. |-- elapsed time [等待連接成功的Event] 65.899 milliseconds. |-- Total [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper] 170.263 milliseconds. 49:48.102 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 1 , 然後等待 1700 毫秒 49:48.134 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 1 , 然後等待 4000 毫秒 49:49.814 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 2 , 然後等待 900 毫秒 49:50.717 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 3 , 然後等待 1300 毫秒 49:52.020 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 4 , 然後等待 3700 毫秒 49:52.139 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 2 , 然後等待 2800 毫秒 49:54.947 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 3 , 然後等待 4500 毫秒 49:55.724 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 5 , 然後等待 3500 毫秒 49:59.228 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 6 , 然後等待 4200 毫秒 49:59.454 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 4 , 然後等待 2400 毫秒 50:01.870 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 5 , 然後等待 4900 毫秒 50:03.435 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 7 , 然後等待 4500 毫秒 50:06.776 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 6 , 然後等待 3600 毫秒 50:07.938 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 8 , 然後等待 1900 毫秒 50:09.846 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 9 , 然後等待 3200 毫秒 50:10.388 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 7 , 然後等待 2900 毫秒 50:13.051 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 10 , 然後等待 4900 毫秒 50:13.294 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 8 , 然後等待 300 毫秒 50:13.600 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 9 , 然後等待 4800 毫秒 50:18.407 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 10 , 然後等待 2400 毫秒
5 完整源碼
5.1 ZooKeeperBase.java
package tech.codestory.zookeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.profiler.Profiler; /** * 為 ZooKeeper測試代碼創建一個基類,封裝建立連接的過程 * * @author code story * @date 2019/8/16 */ public class ZooKeeperBase implements Watcher { /** 日誌,不使用 @Slf4j ,是要使用子類的log */ Logger log = null; /** 等待連接建立成功的信號 */ private CountDownLatch connectedSemaphore = new CountDownLatch(1); /** ZooKeeper 客戶端 */ private ZooKeeper zooKeeper = null; /** 避免重覆根節點 */ static Integer rootNodeInitial = Integer.valueOf(1); /** 構造函數 */ public ZooKeeperBase(String address) throws IOException { log = LoggerFactory.getLogger(getClass()); Profiler profiler = new Profiler(this.getClass().getName() + " 連接到ZooKeeper"); profiler.start("開始鏈接"); zooKeeper = new ZooKeeper(address, 3000, this); try { profiler.start("等待連接成功的Event"); connectedSemaphore.await(); } catch (InterruptedException e) { log.error("InterruptedException", e); } profiler.stop(); profiler.setLogger(log); profiler.log(); } /** * 創建測試需要的根節點 * * @param rootNodeName * @return */ public String createRootNode(String rootNodeName) { synchronized (rootNodeInitial) { // 創建 tableSerial 的zNode try { Stat existsStat = getZooKeeper().exists(rootNodeName, false); if (existsStat == null) { rootNodeName = getZooKeeper().create(rootNodeName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { log.error("KeeperException", e); } catch (InterruptedException e) { log.error("InterruptedException", e); } } return rootNodeName; } /** 讀取ZooKeeper對象,供子類調用 */ protected ZooKeeper getZooKeeper() { return zooKeeper; } @Override final public void process(WatchedEvent event) { if (Event.EventType.None.equals(event.getType())) { // 連接狀態發生變化 if (Event.KeeperState.SyncConnected.equals(event.getState())) { // 連接建立成功 connectedSemaphore.countDown(); } } else if (Event.EventType.NodeCreated.equals(event.getType())) { processNodeCreated(event); } else if (Event.EventType.NodeDeleted.equals(event.getType())) { processNodeDeleted(event); } else if (Event.EventType.NodeDataChanged.equals(event.getType())) { processNodeDataChanged(event); } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) { processNodeChildrenChanged(event); } } /** * 處理事件: NodeCreated * * @param event */ protected void processNodeCreated(WatchedEvent event) {} /** * 處理事件: NodeDeleted * * @param event */ protected void processNodeDeleted(WatchedEvent event) {} /** * 處理事件: NodeDataChanged * * @param event */ protected void processNodeDataChanged(WatchedEvent event) {} /** * 處理事件: NodeChildrenChanged * * @param event */ protected void processNodeChildrenChanged(WatchedEvent event) {} }
5.2 ZooKeeperQueue.java
package tech.codestory.zookeeper.queue; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import lombok.extern.slf4j.Slf4j; import tech.codestory.zookeeper.ZooKeeperBase; /** * ZooKeeper實現Queue * * @author code story * @date 2019/8/16 */ @Slf4j public class ZooKeeperQueue extends ZooKeeperBase { /** 隊列名稱 */ private String queueName; /** 隊列的同步信號 */ private static Integer queueMutex = Integer.valueOf(1); /** * 構造函數 * * @param address * @param queueName * @throws IOException */ public ZooKeeperQueue(String address, String queueName) throws IOException { super(address); this.queueName = createRootNode(queueName); } @Override protected void processNodeChildrenChanged(WatchedEvent event) { synchronized (queueMutex) { queueMutex.notify(); } } /** * 將對象添加到隊列中 * * @param i * @return */ boolean produce(int i) throws KeeperException, InterruptedException { ByteBuffer b = ByteBuffer.allocate(4); byte[] value; // Add child with value i b.putInt(i); value = b.array(); String elementName = queueName + "/element"; ArrayList<ACL> ids = ZooDefs.Ids.OPEN_ACL_UNSAFE; CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL; getZooKeeper().create(elementName, value, ids, createMode); return true; } /** * 從隊列中刪除第一個對象 * * @return * @throws KeeperException * @throws InterruptedException */ int consume() throws KeeperException, InterruptedException { while (true) { synchronized (queueMutex) { List<String> list = getZooKeeper().getChildren(queueName, true); if (list.size() == 0) { queueMutex.wait(); } else { // 獲取第一個子節點的名稱 String firstNodeName = getFirstElementName(list); // 刪除節點,並返回節點的值 return deleteNodeAndReturnValue(firstNodeName); } } } } /** * 獲取第一個子節點的名稱 * * @param list * @return */ private String getFirstElementName(List<String> list) { Integer min = Integer.MAX_VALUE; String minNode = null; for (String s : list) { Integer tempValue = Integer.valueOf(s.substring(7)); if (tempValue < min) { min = tempValue; minNode = s; } } return minNode; } /** * 刪除節點,並返回節點的值 * * @param minNode * @return * @throws KeeperException * @throws InterruptedException */ private int deleteNodeAndReturnValue(String minNode) throws KeeperException, InterruptedException { String fullNodeName = queueName + "/" + minNode; Stat stat = new Stat(); byte[] b = getZooKeeper().getData(fullNodeName, false, stat); getZooKeeper().delete(fullNodeName, stat.getVersion()); ByteBuffer buffer = ByteBuffer.wrap(b); return buffer.getInt(); } }
5.3 ZooKeeperQueueTest.java
package tech.codestory.zookeeper.queue; import java.io.IOException; import java.security.SecureRandom; import java.util.Random; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.KeeperException; import org.testng.annotations.Test; import lombok.extern.slf4j.Slf4j; /** * ZooKeeperQueue測試 * * @author code story * @date 2019/8/16 */ @Slf4j public class ZooKeeperQueueTest { final String address = "192.168.5.128:2181"; final String queueName = "/queue"; final Random random = new SecureRandom(); // 隨機生成10-20之間的個數 final int count = 10 + random.nextInt(10); /** 等待生產者和消費者線程都結束 */ private CountDownLatch connectedSemaphore = new CountDownLatch(2); @Test public void testQueue() { log.info("開始ZooKeeper隊列測試,本次將測試 {} 個數據", count); new QueueProducer().start(); new QueueConsumer().start(); try { connectedSemaphore.await(); } catch (InterruptedException e) { log.error("InterruptedException", e); } } /** * 隊列的生產者 */ class QueueProducer extends Thread { @Override public void run() { try { ZooKeeperQueue queue = new ZooKeeperQueue(address, queueName); for (int i = 0; i < count; i++) { int elementValue = i + 1; long waitTime = random.nextInt(50) * 100; log.info("生產對象 : {} , 然後等待 {} 毫秒", elementValue, waitTime); queue.produce(elementValue); Thread.sleep(waitTime); } } catch (IOException e) { log.error("IOException", e); } catch (InterruptedException e) { log.error("InterruptedException", e); } catch (KeeperException e) { log.error("KeeperException", e); } connectedSemaphore.countDown(); } } /** * 隊列的消費者 */ class QueueConsumer extends Thread { @Override public void run() { try { ZooKeeperQueue queue = new ZooKeeperQueue(address, queueName); for (int i = 0; i < count; i++) { try { int elementValue = queue.consume(); long waitTime = random.nextInt(50) * 100; log.info("消費對象: {} , 然後等待 {} 毫秒", elementValue, waitTime); Thread.sleep(waitTime); } catch (KeeperException e) { i--; log.error("KeeperException", e); } catch (InterruptedException e) { log.error("InterruptedException", e); } } connectedSemaphore.countDown(); } catch (IOException e) { log.error("IOException", e); } } } }