# java操作zookeeper 1. 創建一個maven項目在pom文件里引入如下依賴: ~~~XML junit junit 4.10 test org.apache.curator curator-framework 4.0.0 org.apache.curator curator-reci ...
java操作zookeeper
- 創建一個maven項目在pom文件里引入如下依賴:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<!-- curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<!-- 日誌 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
- 創建一個測試類進行相關操作的測試
- 連接客戶端
@Before
public void testConnect(){
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 10);
client = CuratorFrameworkFactory.builder() // 使用工廠類來建造客戶端的實例對象
.connectString("192.168.223.131:2181") // 指定連接zookeeper的伺服器地址
.retryPolicy(retryPolicy) // 指定重試策略
.namespace("test") // 指定命名空間
.build(); // 建造客戶端實例對象
client.start(); // 啟動客戶端
}
- 關閉客戶端
@After
public void testClose(){
// 關閉客戶端
if (client != null){
client.close();
}
}
- 創建節點
@Test
public void testCreateNode() throws Exception {
// 如果沒有指定命名空間,那麼節點的完整路徑為 /node2,如果指定了命名空間,那麼節點的完整路徑為 /test/node2
// 如果沒有數據,那麼節點的數據為當前客戶端的ip地址
// 如果沒有指定節點類型,那麼節點類型為持久節點
// CreateMode.EPHEMERAL 臨時節點
// creatingParentsIfNeeded() 如果父節點不存在,那麼自動創建父節點
String path = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/node3/min", "node3min".getBytes());
System.out.println(path);
// 讓線程阻塞,不讓程式結束,這樣可以在zookeeper中看到創建的臨時節點,因為臨時節點的生命周期是和客戶端綁定的
Thread.sleep(100000);
}
- 獲取節點數據
@Test
public void testGetData() throws Exception {
byte[] bytes = client.getData().forPath("/node2");
System.out.println(new String(bytes));
}
- 查詢子節點
@Test
public void testGetChildren() throws Exception {
List<String> childrenList = client.getChildren().forPath("/");
for (String child : childrenList) {
System.out.println(child);
}
}
- 查詢節點狀態信息
@Test
public void testGetStat() throws Exception {
// Stat類用於存儲節點狀態信息
Stat stat = new Stat();
// storingStatIn(stat) 將節點狀態信息存儲到stat對象中
byte[] data = client.getData().storingStatIn(stat).forPath("/node2");
System.out.println(new String(data));
System.out.println(stat);
}
- 更新節點數據
@Test
public void testSetData() throws Exception {
client.setData().forPath("/node2/min1", "minqiliang".getBytes());
}
- 更新節點數據,帶版本號
@Test
public void testSetDataWithVersion() throws Exception {
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/node3");
System.out.println(stat.getVersion());
client.setData().withVersion(stat.getVersion()).forPath("/node3", "minqiliang".getBytes());
}
- 刪除節點
@Test
public void testDeleteNode() throws Exception {
// deletingChildrenIfNeeded() 如果存在子節點,那麼先刪除子節點,再刪除父節點
client.delete().deletingChildrenIfNeeded().forPath("/node2");
}
- 刪除節點,必須成功
@Test
public void testDeleteNodeWithVersion() throws Exception {
// guaranteed() 如果刪除失敗,那麼會在後臺一直嘗試刪除,直到刪除成功為止
client.delete().guaranteed().forPath("/node3");
}
- 刪除節點,回調函數
@Test
public void testDeleteNodeWithCallback() throws Exception {
// inBackground() 指定回調函數
client.delete().guaranteed().inBackground((client, event) -> System.out.println(event)).forPath("/node3");
Thread.sleep(100000);
}
- 監聽節點的創建、修改、刪除
@Test
public void testNodeCache() throws Exception {
// 創建一個nodeCache對象
NodeCache nodeCache = new NodeCache(client, "/node3");
// 註冊監聽器
nodeCache.getListenable().addListener(() -> {
System.out.println("節點數據發生變化");
byte[] bytes = nodeCache.getCurrentData().getData();
System.out.println(new String(bytes));
});
// 啟動監聽器
nodeCache.start(true);
while (true){
}
}
- 監聽子節點的創建、修改、刪除
@Test
public void testpathChildrenCache() throws Exception {
// 創建一個nodeCache對象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node3",true);
// 註冊監聽器
pathChildrenCache.getListenable().addListener((client,event) -> {
System.out.println("節點數據發生變化");
System.out.println(event);
PathChildrenCacheEvent.Type type = event.getType();
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("子節點數據發生變化");
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
});
// 啟動監聽器
pathChildrenCache.start(true);
while (true){
}
}
- 樹形監聽器
@Test
public void testTreeCache() throws Exception {
// 創建一個nodeCache對象
TreeCache treeCache = new TreeCache(client, "/node3");
// 註冊監聽器
treeCache.getListenable().addListener((client,event) -> {
System.out.println("節點數據發生變化");
System.out.println(event);
TreeCacheEvent.Type type = event.getType();
if (type.equals(TreeCacheEvent.Type.NODE_UPDATED)){
System.out.println("子節點數據發生變化");
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
});
// 啟動監聽器
treeCache.start();
while (true){
}
}
- 分散式鎖
InterProcessMutex lock = new InterProcessMutex(client, "/lock");
// 獲取鎖
try {
// 獲取鎖
boolean acquire = lock.acquire(3, TimeUnit.SECONDS);
}catch (Exception e) {
e.printStackTrace();
}finally {
// 釋放鎖
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}