前言:Zookeeper的監聽機制很多人都踩過坑,感覺實現了watcher 介面,後面節點的變化都會一一推送過來,然而並非如此。 Watch機制官方聲明:一個Watch事件是一個一次性的觸發器,當被設置了Watch的數據發生了改變的時候,則伺服器將這個改變發送給設置了Watch的客戶端,以便通知它們 ...
前言:Zookeeper的監聽機制很多人都踩過坑,感覺實現了watcher 介面,後面節點的變化都會一一推送過來,然而並非如此。
Watch機制官方聲明:一個Watch事件是一個一次性的觸發器,當被設置了Watch的數據發生了改變的時候,則伺服器將這個改變發送給設置了Watch的客戶端,以便通知它們。
Zookeeper機制的特點:
1.一次性觸發 數據發生改變時,一個watcher event會被髮送到client,但是client只會收到一次這樣的信息。
2.watcher event非同步發送 watcher 的通知事件從server發送到client是非同步的,這就存在一個問題,不同的客戶端和伺服器之間通過socket進行通信,由於網路延遲或其他因素導致客戶端在不通的時刻監聽到事件,由於Zookeeper本身提供了ordering guarantee,即客戶端監聽事件後,才會感知它所監視znode發生了變化。
3.數據監視 Zookeeper有數據監視和子數據監視 getdata() and exists() 設置數據監視,getchildren()設置了子節點監視
4.註冊watcher getData、exists、getChildren
5. 觸發watcher create、delete、setData
watcher裡面的相關事件 |
||
event For "/path" |
event For "/path/child" |
|
create("/path") |
EventType.NodeCreated |
NA |
delete("/path") |
EventType.NodeDeleted |
NA |
setData("/path") |
EventType.NodeDataChanged |
NA |
create("/path/child") |
EventType.NodeChildrenChanged |
EventType.NodeCreated |
delete("/path/child") |
EventType.NodeChildrenChanged |
EventType.NodeDeleted |
setData("/path/child") |
NA |
EventType.NodeDataChanged |
各種操作觸發的watcher事件 |
||||||||
"/path" |
"/path/child" |
|||||||
exists |
getData |
getChildren |
exists |
getData |
getChildren |
|||
create("/path") |
√ |
√ |
||||||
delete("/path") |
√ |
√ |
√ |
|||||
setData("/path") |
√ |
√ |
||||||
create("/path/child") |
√ |
√ |
√ |
|||||
delete("/path/child") |
√ |
√ |
√ |
√ |
||||
setData("/path/child") |
√ |
√ |
操作與watcher關係 |
|||||||
"/path" |
"/path/child" |
||||||
exists |
getData |
getChildren |
exists |
getData |
getChildren |
||
create("/path") |
√ |
√ |
|||||
delete("/path") |
√ |
√ |
√ |
||||
setData("/path") |
√ |
√ |
|||||
create("/path/child") |
√ |
√ |
√ |
||||
delete("/path/child") |
√ |
√ |
√ |
√ |
|||
setData("/path/child") |
√ |
√ |
各種watch測試
/** * watch test * */ public class App implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); public static Stat stat = new Stat(); ZooKeeper zooKeeper; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { String p = "/testaa"; ZooKeeper zooKeeper = new ZooKeeper("192.168.1.10:2181", 5000, new App()); connectedSemaphore.await(); //exists register watch zooKeeper.exists(p, true); String path = zooKeeper.create(p, "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //get register watch zooKeeper.getData(path, true, stat); zooKeeper.setData(path, "hhhh".getBytes(), -1); zooKeeper.exists(path, true); //exists register watch zooKeeper.delete(path, -1); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); System.out.println("Zookeeper session established"); } else if (EventType.NodeCreated == event.getType()) { System.out.println("success create znode"); } else if (EventType.NodeDataChanged == event.getType()) { System.out.println("success change znode: " + event.getPath()); } else if (EventType.NodeDeleted == event.getType()) { System.out.println("success delete znode"); } else if (EventType.NodeChildrenChanged == event.getType()) { System.out.println("NodeChildrenChanged"); } } } }