本文為原創文章,轉載請註明出處,謝謝 Curator使用 1、jar包引入,演示版本為2.6.0,非maven項目,可以下載jar包導入到項目中 2、RetryPolicy:重試機制 ExponentialBackoffRetry:每次重試會增加重試時間baseSleepTimeMs Exponen ...
本文為原創文章,轉載請註明出處,謝謝
Curator使用
1、jar包引入,演示版本為2.6.0,非maven項目,可以下載jar包導入到項目中
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.6.0</version> </dependency>
2、RetryPolicy:重試機制
- ExponentialBackoffRetry:每次重試會增加重試時間baseSleepTimeMs
- ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
- ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
- baseSleepTimeMs:基本重試時間差
- maxRetries:最大重試次數
- maxSleepMs:最大重試時間
- RetryNTimes
- RetryNTimes(int n, int sleepMsBetweenRetries)
- n:重試次數
- sleepMsBetweenRetries:每次重試間隔時間
- RetryNTimes(int n, int sleepMsBetweenRetries)
- RetryUntilElapsed
- RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
- maxElapsedTimeMs:最大重試時間
- sleepMsBetweenRetries:每次重試間隔時間
- RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
- BoundedExponentialBackoffRetry、RetryOneTime、SleepingRetry
3、創建Zookeeper連接
- 傳統方式
示例:CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.117.128:2181",5000,5000,retryPolicy);
API:
newClient(java.lang.String connectString, org.apache.curator.RetryPolicy retryPolicy) newClient(java.lang.String connectString, int sessionTimeoutMs, int connectionTimeoutMs, org.apache.curator.RetryPolicy retryPolicy)
-
- connectString:Zookeeper伺服器地址
- retryPolicy:自定義重試機制
- sessionTimeoutMs:session超時時間
- connectionTimeoutMs:連接超時時間
- 鏈式方式
curatorFramework = CuratorFrameworkFactory.builder() .connectString("192.168.117.128:2181") //.authorization() 設置訪問許可權 設置方法同原生API .sessionTimeoutMs(5000).connectionTimeoutMs(5000) .retryPolicy(retryPolicy).build();
- 代碼示例
public void createSession() { //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);//基本重試間隔時間,重試次數(每次重試時間加長) //RetryPolicy retryPolicy = new RetryNTimes(5,1000);//重試次數,重試間隔時間 RetryPolicy retryPolicy = new RetryUntilElapsed(5000,1000);//重試時間,重試間隔時間 //curatorFramework = CuratorFrameworkFactory.newClient("192.168.117.128:2181",5000,5000,retryPolicy); curatorFramework = CuratorFrameworkFactory.builder() .connectString("192.168.117.128:2181") //.authorization() 設置訪問許可權 設置方法同原生API .sessionTimeoutMs(5000).connectionTimeoutMs(5000) .retryPolicy(retryPolicy).build(); curatorFramework.start(); }
4、創建節點
public void createNode() throws Exception { createSession(); String path = curatorFramework.create() .creatingParentsIfNeeded()//如果父節點沒有自動創建 //.withACL()設置許可權 許可權創建同原生API .withMode(CreateMode.PERSISTENT)//節點類型 .forPath("/note_curator/02", "02".getBytes()); System.out.println("path:"+path); }
節點類型、許可權設置詳見2.1Zookeeper原生API使用
5、節點刪除
public void del() throws Exception { createSession(); curatorFramework.delete() .guaranteed()//保證機制,出錯後後臺刪除 直到刪除成功 .deletingChildrenIfNeeded()//刪除當前節點下的所有節點,再刪除自身 .forPath("/note_curator"); }
6、獲取子節點
public void getChildren() throws Exception { createSession(); List<String> children = curatorFramework.getChildren().forPath("/note_curator"); System.out.println(children); }
7、獲取節點信息
public void getData() throws Exception { createSession(); Stat stat = new Stat(); byte[] u = curatorFramework.getData().storingStatIn(stat).forPath("/note_curator"); System.out.println(new String(u)); System.out.println(stat); }
8、設置節點信息
public void setData() throws Exception { createSession(); curatorFramework.setData() //.withVersion(1) 設置版本號 樂觀鎖概念 .forPath("/note_curator/01", "shengke0815".getBytes()); }
9、是否存在節點
public void exists() throws Exception { createSession(); Stat s = curatorFramework.checkExists().forPath("/note_curator"); System.out.println(s); }
10、設置節點信息回調
ExecutorService executorService = Executors.newFixedThreadPool(5);//線程池 @Test public void setDataAsync() throws Exception { createSession(); curatorFramework.setData().inBackground(new BackgroundCallback() {//設置節點信息時回調方法 @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println(curatorFramework.getZookeeperClient()); System.out.println(curatorEvent.getResultCode()); System.out.println(curatorEvent.getPath()); System.out.println(curatorEvent.getContext()); } },"shangxiawen",executorService).forPath("/note_curator","sksujer0815".getBytes()); Thread.sleep(Integer.MAX_VALUE); }
API:
inBackground(org.apache.curator.framework.api.BackgroundCallback backgroundCallback, java.lang.Object o, java.util.concurrent.Executor executor);
-
-
backgroundCallback:自定義BackgroundCallback
- o:上下文信息,回調方法中curatorEvent.getContext()可獲取此信息
- executor:線程池
-
11、監聽節點改變事件
public void nodeListen() throws Exception { createSession(); final NodeCache cache = new NodeCache(curatorFramework,"/note_curator"); cache.start(); cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println(new String(cache.getCurrentData().getData())); System.out.println(cache.getCurrentData().getPath()); } }); Thread.sleep(Integer.MAX_VALUE); }
12、監聽子節點列表改變事件
public void nodeClildrenListen() throws Exception { createSession(); final PathChildrenCache cache = new PathChildrenCache(curatorFramework,"/note_curator",true); cache.start(); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { switch (pathChildrenCacheEvent.getType()){ case CHILD_ADDED: System.out.println("add children"); System.out.println(new String(pathChildrenCacheEvent.getData().getData())); System.out.println(new String(pathChildrenCacheEvent.getData().getPath())); break; case CHILD_REMOVED: System.out.println("remove children"); System.out.println(new String(pathChildrenCacheEvent.getData().getData())); System.out.println(new String(pathChildrenCacheEvent.getData().getPath())); break; case CHILD_UPDATED: System.out.println("update children"); System.out.println(new String(pathChildrenCacheEvent.getData().getData())); System.out.println(new String(pathChildrenCacheEvent.getData().getPath())); break; } } }); Thread.sleep(Integer.MAX_VALUE); }