zookeeper是幹嘛的呢 Zookeeper的作用1.可以為客戶端管理少量的數據kvkey:是以路徑的形式表示的,那就意味著,各key之間有父子關係,比如/ 是頂層key用戶建的key只能在/ 下作為子節點,比如建一個key: /aa 這個key可以帶value數據也可以建一個key: /bb也 ...
zookeeper是幹嘛的呢
Zookeeper的作用
1.可以為客戶端管理少量的數據kv
key:是以路徑的形式表示的,那就意味著,各key之間有父子關係,比如
/ 是頂層key
用戶建的key只能在/ 下作為子節點,比如建一個key: /aa 這個key可以帶value數據
也可以建一個key: /bb
也可以建key: /aa/xx
2.可以為客戶端監聽指定數據節點的狀態,併在數據節點發生變化是,通知客戶端
Zookeeper 安裝步驟
把包上傳linux後解壓到apps/
[root@hdp-01 ~]# tar -zxvf zookeeper-3.4.6.tar.gz -C apps/
/root/apps/zookeeper-3.4.6/conf下該配置文件
[root@hdp-01 conf]# cp zoo_sample.cfg zoo.cfg
然後vim zoo.cfg
更改為
dataDir=/root/zkdata
最後添加
server.1=hdp-01:2888:3888
server.2=hdp-02:2888:3888
server.3=hdp-03:2888:3888
server.4=hdp-04:2888:3888
接著,在hdp-01上,新建數據目錄/root/zkdata,併在目錄重生成一個文件myid,內容為1
接著,在hdp-02上,新建數據目錄/root/zkdata,併在目錄重生成一個文件myid,內容為2
接著,在hdp-03上,新建數據目錄/root/zkdata,併在目錄重生成一個文件myid,內容為3
接著,在hdp-04上,新建數據目錄/root/zkdata,併在目錄重生成一個文件myid,內容為4
然後將zookeeper scp給其他機器
啟動
[root@hdp-01 ~]# /root/apps/zookeeper-3.4.6/bin/zkServer.sh start
查看狀態
[root@hdp-01 ~]# /root/apps/zookeeper-3.4.6/bin/zkServer.sh status
可以自己寫一個腳本進行啟動名字叫zkmanage.sh
用的時候後面跟上參數,傳入$1.
sh ./zkmanage.sh start
或者關閉的時候
sh ./zkmanager.sh stop
腳本代碼如下
#!/bin/bash
for host in hdp-01 hdp-02 hdp-03 hdp-04
do
echo "${host}:starting...."
ssh $host "/root/apps/zookeeper-3.4.6/bin/zkServer.sh $1"
done
sleep 2
for host in hdp-01 hdp-02 hdp-03 hdp-04
do
ssh $host "/root/apps/zookeeper-3.4.6/bin/zkServer.sh status"
done
註意一點,如果有的結點沒有啟動,一定要看一下是不是這幾台機器的時間是不是不對應,如果差別太大是啟動不起來的。f**k.
簡單補充一點就是,啟動之後,這幾台機器,有的當leader,有的當follower,只有一個leader,他們誰當leader是根據他們 '投票的形式'的決定的。
只有一個leader
zookeeper的命令行客戶端和java客戶端
命令行
在bin/zkCli.sh
這樣會連到本機localhost
指定連到哪一臺zookeeper
bin/zkcli.sh –server hdp-02:2181
兩個作用,管理數據和監聽
首先是管理數據
也可以自己建數據
[zk: hdp-03:2181(CONNECTED) 8] create /aa "hellozk"
created /aa
[zk: hdp-03:2181(CONNECTED) 9] ls /
[aa, root, hbase, zookeeper]
[zk: hdp-03:2181(CONNECTED) 10] get /aa
"hellozk"
cZxid = 0xc00000023
ctime = Mon Aug 05 14:41:52 CST 2019
mZxid = 0xc00000023
mtime = Mon Aug 05 14:41:52 CST 2019
pZxid = 0xc00000023
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
修改數據
[zk: hdp-03:2181(CONNECTED) 11] set /aa hellospark
cZxid = 0xc00000023
ctime = Mon Aug 05 14:41:52 CST 2019
mZxid = 0xc00000024
mtime = Mon Aug 05 14:42:40 CST 2019
pZxid = 0xc00000023
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 10
numChildren = 0
這個數據版本,你沒修改幾次就會變成幾
也可以在/aa下建立子目錄
如果有些命令忘了,可以輸入help查看幫助
刪除就是rmr
[zk: hdp-03:2181(CONNECTED) 13] rmr /aa
監聽
[zk: hdp-03:2181(CONNECTED) 17] create /aa iamfine
Created /aa
[zk: hdp-03:2181(CONNECTED) 18] get /aa watch
然後這時候如果改變了/aa 就讓他通知我
在另一臺機器上啟動一個zookeeper
[zk: hdp-03:2181(CONNECTED) 2] set /aa iamnotfine
此時就會有信息
但當你再改一次的話,這個連接就不會再提醒了,這個監聽只起一次作用。
數據類型分為好幾種
zookeeper中的znode有多種類型:
1、PERSISTENT 持久的:創建者就算跟集群斷開聯繫,該類節點也會持久存在與zk集群中
2、EPHEMERAL 短暫的:創建者一旦跟集群斷開聯繫,zk就會將這個節點刪除
3、SEQUENTIAL 帶序號的:這類節點,zk會自動拼接上一個序號,而且序號是遞增的
我們一般創建的都是持久的
create –e /bb xxx
這時候就是短暫的
create /cc yyyy
create –s /cc/c qq
然後他們就會自動的在這些子節點下帶上序號
java客戶端
需要的jar包
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import org.junit.Test;
public class ZookeeperClientDemo {
ZooKeeper zk = null;
@Before
public void init() throws Exception{
// 構造一個連接zookeeper的客戶端對象
zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);
}
//增
@Test
public void testCreate() throws Exception{
// 參數1:要創建的節點路徑 參數2:數據 參數3:訪問許可權 參數4:節點類型
String create = zk.create("/zkTest", "hello zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(create);
zk.close();
}
//改
@Test
public void testUpdate() throws Exception {
// 參數1:節點路徑 參數2:數據 參數3:所要修改的版本,-1代表任何版本
zk.setData("/zkTest", "我愛你".getBytes("UTF-8"), -1);
zk.close();
}
//查
@Test
public void testGet() throws Exception {
// 參數1:節點路徑 參數2:是否要監聽 參數3:所要獲取的數據的版本,null表示最新版本
byte[] data = zk.getData("/zkTest", false, null);
System.out.println(new String(data,"UTF-8"));
zk.close();
}
//查子節點
@Test
public void testListChildren() throws Exception {
// 參數1:節點路徑 參數2:是否要監聽
// 註意:返回的結果中只有子節點名字,不帶全路徑
List<String> children = zk.getChildren("/zkTest", false);
for (String child : children) {
System.out.println(child);
}
zk.close();
}
//刪
@Test
public void testRm() throws InterruptedException, KeeperException{
zk.delete("/zkTest", -1);
zk.close();
}
}
java客戶端監聽節點是否發生了變化
import java.util.List;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import org.junit.Test;
public class ZookeeperWatchDemo {
ZooKeeper zk = null;
@Before
public void init() throws Exception {
// 構造一個連接zookeeper的客戶端對象
zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() {
@Override
public void process(WatchedEvent event) {
//如果在連接,並且為該節點的數據變化了
if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeDataChanged) {
System.out.println(event.getPath()); // 收到的事件所發生的節點路徑
System.out.println(event.getType()); // 收到的事件的類型
System.out.println("數據變化了啊....."); // 收到事件後,我們的處理邏輯
try {
zk.getData("/mygirls", true, null);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
//如果在連接,並且是位元組點變化了
}else if(event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged){
System.out.println("子節點變化了......");
}
}
});
}
@Test
public void testGetWatch() throws Exception {
//此時監聽的邏輯就是new ZooKeeper時的watcher,這裡也可以自己寫一個watcher,
//但如果自己寫的話,就會只運行一次了,不能重覆監聽
byte[] data = zk.getData("/mygirls", true, null); // 監聽節點數據變化
List<String> children = zk.getChildren("/mygirls", true); //監聽節點的子節點變化事件
System.out.println(new String(data, "UTF-8"));
//這時候啟動的監聽線程為一個守護線程,當主線程結束後,就會退出,所以這裡讓主線程睡眠時間,當主線程結束,他也就沒了
//這個守護線程使我們在創建的zookeeper的時候就創建的,
Thread.sleep(Long.MAX_VALUE);
}
}
監聽伺服器上下線
首先是一個伺服器的業務邏輯
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;
public class TimeQueryService extends Thread{
int port = 0;
public TimeQueryService(int port){
this.port = port;
}
@Override
public void run() {
try {
//javaSocket編程,創建一個指定的埠號接受數據
ServerSocket ss = new ServerSocket(port);
System.out.println("業務線程已綁定埠"+port+"準備接受消費端請求了.....");
while(true){
Socket sc = ss.accept();
InputStream inputStream = sc.getInputStream();
OutputStream outputStream = sc.getOutputStream();
outputStream.write(new Date().toString().getBytes());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
然後伺服器上線時,先向zookeeper註冊,等待消費者來訪問
package cn.edu360.zk.distributesystem;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class TimeQueryServer {
ZooKeeper zk = null;
// 構造zk客戶端連接
public void connectZK() throws Exception{
zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181,hdp-04:2181", 2000, null);
}
// 註冊伺服器信息
public void registerServerInfo(String hostname,String port) throws Exception{
/**
* 先判斷註冊節點的父節點是否存在,如果不存在,則創建
*/
Stat stat = zk.exists("/servers", false);
if(stat==null){
zk.create("/servers", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 註冊伺服器數據到zk的約定註冊節點下
String create = zk.create("/servers/server", (hostname+":"+port).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname+" 伺服器向zk註冊信息成功,註冊的節點為:" + create);
}
public static void main(String[] args) throws Exception {
TimeQueryServer timeQueryServer = new TimeQueryServer();
// 構造zk客戶端連接
timeQueryServer.connectZK();
// 註冊伺服器信息
timeQueryServer.registerServerInfo(args[0], args[1]);
// 啟動業務線程開始處理業務
new TimeQueryService(Integer.parseInt(args[1])).start();
}
}
然後是消費者端的業務邏輯
先看一下zookeeper有哪些alive的伺服器,然後隨便挑一臺訪問
package cn.edu360.zk.distributesystem;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
public class Consumer {
// 定義一個list用於存放最新的線上伺服器列表
private volatile ArrayList<String> onlineServers = new ArrayList<>();
// 構造zk連接對象
ZooKeeper zk = null;
// 構造zk客戶端連接
public void connectZK() throws Exception {
zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181,hdp-04:2181", 2000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged) {
try {
// 事件回調邏輯中,再次查詢zk上的線上伺服器節點即可,查詢邏輯中又再次註冊了子節點變化事件監聽
getOnlineServers();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
// 查詢線上伺服器列表
public void getOnlineServers() throws Exception {
List<String> children = zk.getChildren("/servers", true);
ArrayList<String> servers = new ArrayList<>();
for (String child : children) {
byte[] data = zk.getData("/servers/" + child, false, null);
String serverInfo = new String(data);
servers.add(serverInfo);
}
onlineServers = servers;
System.out.println("查詢了一次zk,當前線上的伺服器有:" + servers);
}
public void sendRequest() throws Exception {
Random random = new Random();
while (true) {
try {
// 挑選一臺當前線上的伺服器
int nextInt = random.nextInt(onlineServers.size());
String server = onlineServers.get(nextInt);
String hostname = server.split(":")[0];
int port = Integer.parseInt(server.split(":")[1]);
System.out.println("本次請求挑選的伺服器為:" + server);
Socket socket = new Socket(hostname, port);
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream();
out.write("haha".getBytes());
out.flush();
byte[] buf = new byte[256];
int read = in.read(buf);
System.out.println("伺服器響應的時間為:" + new String(buf, 0, read));
out.close();
in.close();
socket.close();
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Consumer consumer = new Consumer();
// 構造zk連接對象
consumer.connectZK();
// 查詢線上伺服器列表
consumer.getOnlineServers();
// 處理業務(向一臺伺服器發送時間查詢請求)
consumer.sendRequest();
}
}