本文為原創文章,未經允許不得轉載 Zookeeper原生API使用 1、jar包引入,演示版本為3.4.6,非maven項目,可以下載jar包導入到項目中 2、創建zookeeper連接 ZooKeeper(java.lang.String connectString, int sessionTim ...
本文為原創文章,未經允許不得轉載
Zookeeper原生API使用
1、jar包引入,演示版本為3.4.6,非maven項目,可以下載jar包導入到項目中
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency>
2、創建zookeeper連接
ZooKeeper(java.lang.String connectString, int sessionTimeout, org.apache.zookeeper.Watcher watcher)
-
- connectString:zookeeper服務地址,例如“192.168.117.128:2181”
- sessionTimeout :超時時間,單位為毫秒
- watcher:實現org.apache.zookeeper.Watcher介面的實現類,需實現process(WatchedEvent watchedEvent) 方法
ZooKeeper zooKeeper = new ZooKeeper("192.168.117.128:2181",5000, new MyWatcher());
示例代碼:
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; /** * * zookeeper連接 */ public class CreateSession implements Watcher{ private static ZooKeeper zooKeeper; @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) { doBus(); } System.out.println("接收內容:"+watchedEvent.toString()); } private void doBus() { System.out.println("做業務!"); } public static void main(String[] args) { try { zooKeeper = new ZooKeeper("192.168.117.128:2181",5000, new CreateSession()); System.out.println(zooKeeper.getState()); Thread.sleep(5000); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
ps:此處沒有新創建新的java類實現Watcher,而是直接在本類中實現Watcher介面並重寫process方法
3、同步創建
create(java.lang.String path, byte[] data, java.util.List<org.apache.zookeeper.data.ACL> acl, org.apache.zookeeper.CreateMode createMode)
-
- path:創建節點路徑,需保證父節點已存在
- data:節點數據
- acl:許可權列表
- 提供預設的許可權OPEN_ACL_UNSAFE、CREATOR_ALL_ACL、READ_ACL_UNSAFE
- OPEN_ACL_UNSAFE:完全開放
- CREATOR_ALL_ACL:創建該znode的連接擁有所有許可權
- READ_ACL_UNSAFE:所有的客戶端都可讀
- 自定義許可權
ACL aclIp = new ACL(ZooDefs.Perms.READ,new Id("ip","127.0.0.1")); ACL aclDigest = new ACL(ZooDefs.Perms.READ| ZooDefs.Perms.WRITE, new Id("digest", DigestAuthenticationProvider.generateDigest("id:pass")));
- session設置許可權
zk.addAuthInfo("digest", "id:pass".getBytes());
- 提供預設的許可權OPEN_ACL_UNSAFE、CREATOR_ALL_ACL、READ_ACL_UNSAFE
- createMode:節點類型
- PERSISTENT:持久化節點
- PERSISTENT_SEQUENTIAL:持久化有序節點
- EPHEMERAL:臨時節點(連接斷開自動刪除)
- EPHEMERAL_SEQUENTIAL:臨時有序節點(連接斷開自動刪除)
示例代碼:
import org.apache.zookeeper.*; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import java.io.IOException; import java.security.NoSuchAlgorithmException; /** * 創建節點(同步) * Created by scot on 2016/6/8. */ public class CreateSessionSync implements Watcher{ private static ZooKeeper zooKeeper; @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) { doBus(); } System.out.println("接收內容:"+watchedEvent.toString()); } private void doBus() { try { if(null != zooKeeper.exists("/note_scot/note_scot_a",false)) { System.out.println("/note_scot/note_scot_a 節點已存在"); return; } String path = zooKeeper.create("/note_scot/note_scot_a","aa".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); /*許可權相關 try { ACL aclIp = new ACL(ZooDefs.Perms.READ,new Id("ip","127.0.0.1")); ACL aclDigest = new ACL(ZooDefs.Perms.READ| ZooDefs.Perms.WRITE, new Id("digest", DigestAuthenticationProvider.generateDigest("id:pass"))); zooKeeper.addAuthInfo("digest", "id:pass".getBytes()); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); }*/ System.out.println("zookeeper return:" + path); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { try { zooKeeper = new ZooKeeper("192.168.117.128:2181",5000, new CreateSessionSync()); System.out.println(zooKeeper.getState()); Thread.sleep(5000); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
4、非同步創建
create(java.lang.String path, byte[] data, java.util.List<org.apache.zookeeper.data.ACL> acl,
org.apache.zookeeper.CreateMode createMode, org.apache.zookeeper.AsyncCallback.StringCallback cb, java.lang.Object ctx)
-
- StringCallback cb:回調介面,執行創建操作後,結果以及數據發送到此介面的實現類中
- Object ctx:自定義回調數據,在回調實現類可以獲取此數據
示例代碼:
import org.apache.zookeeper.*; import java.io.IOException; /** * 創建節點(非同步) * Created by scot on 2016/6/8. */ public class CreateSessionASync implements Watcher{ private static ZooKeeper zooKeeper; @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) { doBus(); } System.out.println("接收內容:"+watchedEvent.toString()); } private void doBus() { zooKeeper.create("/note_scot/note_scot_b","aa".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL,new IStringCallBack(),"testAsync"); } public static void main(String[] args) { try { zooKeeper = new ZooKeeper("192.168.117.128:2181",5000, new CreateSessionASync()); System.out.println(zooKeeper.getState()); Thread.sleep(5000); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } static class IStringCallBack implements AsyncCallback.StringCallback { @Override public void processResult(int i, String s, Object o, String s2) { System.out.println("i="+i);//創建成功返回0 System.out.println("s="+s);//自定義節點名稱 System.out.println("o="+o);//自定義回調數據 System.out.println("s2="+s2);//最終節點名稱(順序節點最終名稱與自定義名稱不同) } } }
常用方法列表:
String create(final String path, byte data[], List acl, CreateMode createMode) |
參數: 路徑、 znode內容,ACL(訪問控制列表)、 znode創建類型; 用途:創建znode節點 |
void delete(final String path, int version) |
參數: 路徑、版本號;如果版本號與znode的版本號不一致,將無法刪除,是一種樂觀加鎖機制;如果將版本號設置為-1,不會去檢測版本,直接刪除; 用途:刪除節點 |
Stat exists(final String path, Watcher watcher) |
參數: 路徑、Watcher(監視器);當這個znode節點被改變時,將會觸發當前Watcher 用途:判斷znode節點是否存在 |
Stat exists(String path, boolean watch) |
參數: 路徑、並設置是否監控這個目錄節點,這裡的 watcher 是在創建 ZooKeeper 實例時指定的 watcher; 判斷znode節點是否存在 |
Stat setData(final String path, byte data[], int version) |
參數: 路徑、數據、版本號;如果為-1,跳過版本檢查 用途:設置znode上的數據 |
byte[] getData(final String path, Watcher watcher, Stat stat) |
參數: 路徑、監視器、數據版本等信息 用途:獲取znode上的數據 |
List getChildren(final String path, Watcher watcher) |
參數: 路徑、監視器;該方法有多個重載 用途:獲取節點下的所有子節點 |