Hadoop學習(5)-zookeeper的安裝和命令行,java操作

来源:https://www.cnblogs.com/wpbing/archive/2019/08/06/11309761.html
-Advertisement-
Play Games

zookeeper是幹嘛的呢 Zookeeper的作用1.可以為客戶端管理少量的數據kvkey:是以路徑的形式表示的,那就意味著,各key之間有父子關係,比如/ 是頂層key用戶建的key只能在/ 下作為子節點,比如建一個key: /aa 這個key可以帶value數據也可以建一個key: /bb也 ...


zookeeper是幹嘛的呢

Zookeeper的作用
1.可以為客戶端管理少量的數據kv
key:是以路徑的形式表示的,那就意味著,各key之間有父子關係,比如
/ 是頂層key
用戶建的key只能在/ 下作為子節點,比如建一個key: /aa 這個key可以帶value數據
也可以建一個key: /bb
也可以建key: /aa/xx

 

 

2.可以為客戶端監聽指定數據節點的狀態,併在數據節點發生變化是,通知客戶端

 


Zookeeper 安裝步驟
把包上傳linux後解壓到apps/
[root@hdp-01 ~]# tar -zxvf zookeeper-3.4.6.tar.gz -C apps/
/root/apps/zookeeper-3.4.6/conf下該配置文件
[root@hdp-01 conf]# cp zoo_sample.cfg zoo.cfg
然後vim zoo.cfg
更改為
dataDir=/root/zkdata
最後添加
server.1=hdp-01:2888:3888
server.2=hdp-02:2888:3888
server.3=hdp-03:2888:3888
server.4=hdp-04:2888:3888
接著,在hdp-01上,新建數據目錄/root/zkdata,併在目錄重生成一個文件myid,內容為1
接著,在hdp-02上,新建數據目錄/root/zkdata,併在目錄重生成一個文件myid,內容為2
接著,在hdp-03上,新建數據目錄/root/zkdata,併在目錄重生成一個文件myid,內容為3
接著,在hdp-04上,新建數據目錄/root/zkdata,併在目錄重生成一個文件myid,內容為4
然後將zookeeper scp給其他機器
啟動
[root@hdp-01 ~]# /root/apps/zookeeper-3.4.6/bin/zkServer.sh start
查看狀態
[root@hdp-01 ~]# /root/apps/zookeeper-3.4.6/bin/zkServer.sh status

可以自己寫一個腳本進行啟動名字叫zkmanage.sh
用的時候後面跟上參數,傳入$1.
sh ./zkmanage.sh start
或者關閉的時候
sh ./zkmanager.sh stop
腳本代碼如下

複製代碼
#!/bin/bash
for host in hdp-01 hdp-02 hdp-03 hdp-04
do
echo "${host}:starting...."
ssh $host "/root/apps/zookeeper-3.4.6/bin/zkServer.sh $1"
done
sleep 2
for host in hdp-01 hdp-02 hdp-03 hdp-04
do
ssh $host "/root/apps/zookeeper-3.4.6/bin/zkServer.sh status"
done
複製代碼

註意一點,如果有的結點沒有啟動,一定要看一下是不是這幾台機器的時間是不是不對應,如果差別太大是啟動不起來的。f**k.

簡單補充一點就是,啟動之後,這幾台機器,有的當leader,有的當follower,只有一個leader,他們誰當leader是根據他們 '投票的形式'的決定的。

只有一個leader

 

 

zookeeper的命令行客戶端和java客戶端

命令行

在bin/zkCli.sh

這樣會連到本機localhost

指定連到哪一臺zookeeper

bin/zkcli.sh –server hdp-02:2181

 

兩個作用,管理數據和監聽

首先是管理數據

 

也可以自己建數據

[zk: hdp-03:2181(CONNECTED) 8] create /aa "hellozk"

created /aa

 

[zk: hdp-03:2181(CONNECTED) 9] ls /

[aa, root, hbase, zookeeper]

 

[zk: hdp-03:2181(CONNECTED) 10] get /aa

"hellozk"

cZxid = 0xc00000023

ctime = Mon Aug 05 14:41:52 CST 2019

mZxid = 0xc00000023

mtime = Mon Aug 05 14:41:52 CST 2019

pZxid = 0xc00000023

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 9

numChildren = 0

 

 

 

修改數據

[zk: hdp-03:2181(CONNECTED) 11] set /aa hellospark

cZxid = 0xc00000023

ctime = Mon Aug 05 14:41:52 CST 2019

mZxid = 0xc00000024

mtime = Mon Aug 05 14:42:40 CST 2019

pZxid = 0xc00000023

cversion = 0

dataVersion = 1

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 10

numChildren = 0

這個數據版本,你沒修改幾次就會變成幾

也可以在/aa下建立子目錄

如果有些命令忘了,可以輸入help查看幫助

 

刪除就是rmr

[zk: hdp-03:2181(CONNECTED) 13] rmr /aa

 

監聽

[zk: hdp-03:2181(CONNECTED) 17] create /aa iamfine

Created /aa

 

[zk: hdp-03:2181(CONNECTED) 18] get /aa watch

然後這時候如果改變了/aa 就讓他通知我

在另一臺機器上啟動一個zookeeper

 

[zk: hdp-03:2181(CONNECTED) 2] set /aa iamnotfine

此時就會有信息

 

但當你再改一次的話,這個連接就不會再提醒了,這個監聽只起一次作用。

 

數據類型分為好幾種

zookeeper中的znode有多種類型:

1、PERSISTENT  持久的:創建者就算跟集群斷開聯繫,該類節點也會持久存在與zk集群中

2、EPHEMERAL  短暫的:創建者一旦跟集群斷開聯繫,zk就會將這個節點刪除

3、SEQUENTIAL  帶序號的:這類節點,zk會自動拼接上一個序號,而且序號是遞增的

我們一般創建的都是持久的

create –e /bb xxx

這時候就是短暫的

create /cc yyyy

create –s /cc/c qq

然後他們就會自動的在這些子節點下帶上序號

 

java客戶端

 

 

 需要的jar包

 

import java.io.UnsupportedEncodingException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import org.junit.Test;

public class ZookeeperClientDemo {
    ZooKeeper zk = null;
    @Before
    public void init()  throws Exception{
        // 構造一個連接zookeeper的客戶端對象
        zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);
    }
    //
    @Test
    public void testCreate() throws Exception{

        // 參數1:要創建的節點路徑  參數2:數據  參數3:訪問許可權  參數4:節點類型
        String create = zk.create("/zkTest", "hello zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(create);
        
        zk.close();
        
    }
    
    //
    @Test
    public void testUpdate() throws Exception {
        
        // 參數1:節點路徑   參數2:數據    參數3:所要修改的版本,-1代表任何版本
        zk.setData("/zkTest", "我愛你".getBytes("UTF-8"), -1);
        
        zk.close();
        
    }
    
    //
    @Test    
    public void testGet() throws Exception {
        // 參數1:節點路徑    參數2:是否要監聽    參數3:所要獲取的數據的版本,null表示最新版本
        byte[] data = zk.getData("/zkTest", false, null);
        System.out.println(new String(data,"UTF-8"));
        
        zk.close();
    }
    
    
    //查子節點
    @Test    
    public void testListChildren() throws Exception {
        // 參數1:節點路徑    參數2:是否要監聽   
        // 註意:返回的結果中只有子節點名字,不帶全路徑
        List<String> children = zk.getChildren("/zkTest", false);
        
        for (String child : children) {
            System.out.println(child);
        }
        
        zk.close();
    }
    
    //
    @Test
    public void testRm() throws InterruptedException, KeeperException{
        
        zk.delete("/zkTest", -1);
        
        zk.close();
    }
    
    
    

}

java客戶端監聽節點是否發生了變化

import java.util.List;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import org.junit.Test;

public class ZookeeperWatchDemo {

    ZooKeeper zk = null;

    @Before
    public void init() throws Exception {
        // 構造一個連接zookeeper的客戶端對象
        zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() {

            @Override
            public void process(WatchedEvent event) {
                //如果在連接,並且為該節點的數據變化了
                if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeDataChanged) {
                    System.out.println(event.getPath()); // 收到的事件所發生的節點路徑
                    System.out.println(event.getType()); // 收到的事件的類型
                    System.out.println("數據變化了啊....."); // 收到事件後,我們的處理邏輯

                    try {
                        zk.getData("/mygirls", true, null);

                    } catch (KeeperException | InterruptedException e) {
                        e.printStackTrace();
                    }
                    //如果在連接,並且是位元組點變化了
                }else if(event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged){
                    
                    System.out.println("子節點變化了......");
                }

            }
        });
    }

    @Test
    public void testGetWatch() throws Exception {
        //此時監聽的邏輯就是new ZooKeeper時的watcher,這裡也可以自己寫一個watcher,
        //但如果自己寫的話,就會只運行一次了,不能重覆監聽
        byte[] data = zk.getData("/mygirls", true, null); // 監聽節點數據變化
        
        List<String> children = zk.getChildren("/mygirls", true); //監聽節點的子節點變化事件

        System.out.println(new String(data, "UTF-8"));
        //這時候啟動的監聽線程為一個守護線程,當主線程結束後,就會退出,所以這裡讓主線程睡眠時間,當主線程結束,他也就沒了
        //這個守護線程使我們在創建的zookeeper的時候就創建的,
        Thread.sleep(Long.MAX_VALUE);

    }

}

 

 監聽伺服器上下線

首先是一個伺服器的業務邏輯

 

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;

public class TimeQueryService extends Thread{
    
    int port = 0;
    public TimeQueryService(int port){
        
        this.port = port;
    }
    @Override
    public void run() {
        
        try {
            //javaSocket編程,創建一個指定的埠號接受數據
            ServerSocket ss = new ServerSocket(port);
            System.out.println("業務線程已綁定埠"+port+"準備接受消費端請求了.....");
            while(true){
                Socket sc = ss.accept();
                InputStream inputStream = sc.getInputStream();
                OutputStream outputStream = sc.getOutputStream();
                outputStream.write(new Date().toString().getBytes());
            }
            
        } catch (IOException e) {
            e.printStackTrace();
        }
        
        
    }
    

}

 

然後伺服器上線時,先向zookeeper註冊,等待消費者來訪問

package cn.edu360.zk.distributesystem;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class TimeQueryServer {
    ZooKeeper zk = null;
    
    // 構造zk客戶端連接
    public void connectZK() throws Exception{
        zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181,hdp-04:2181", 2000, null);
    }
    // 註冊伺服器信息
    public void registerServerInfo(String hostname,String port) throws Exception{
        
        /**
         * 先判斷註冊節點的父節點是否存在,如果不存在,則創建
         */
        Stat stat = zk.exists("/servers", false);
        if(stat==null){
            zk.create("/servers", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        
        // 註冊伺服器數據到zk的約定註冊節點下
        String create = zk.create("/servers/server", (hostname+":"+port).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        
        System.out.println(hostname+" 伺服器向zk註冊信息成功,註冊的節點為:" + create);
        
    }

    public static void main(String[] args) throws Exception {
        
        TimeQueryServer timeQueryServer = new TimeQueryServer();
        
        // 構造zk客戶端連接
        timeQueryServer.connectZK();
        
        // 註冊伺服器信息
        timeQueryServer.registerServerInfo(args[0], args[1]);
        
        // 啟動業務線程開始處理業務
        new TimeQueryService(Integer.parseInt(args[1])).start();
        
    }
    

}

然後是消費者端的業務邏輯

先看一下zookeeper有哪些alive的伺服器,然後隨便挑一臺訪問

package cn.edu360.zk.distributesystem;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;

public class Consumer {

    // 定義一個list用於存放最新的線上伺服器列表
    private volatile ArrayList<String> onlineServers = new ArrayList<>();

    // 構造zk連接對象
    ZooKeeper zk = null;

    // 構造zk客戶端連接
    public void connectZK() throws Exception {

        zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181,hdp-04:2181", 2000, new Watcher() {

            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged) {

                    try {
                        // 事件回調邏輯中,再次查詢zk上的線上伺服器節點即可,查詢邏輯中又再次註冊了子節點變化事件監聽
                        getOnlineServers();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }

            }
        });

    }

    // 查詢線上伺服器列表
    public void getOnlineServers() throws Exception {

        List<String> children = zk.getChildren("/servers", true);
        ArrayList<String> servers = new ArrayList<>();

        for (String child : children) {
            byte[] data = zk.getData("/servers/" + child, false, null);

            String serverInfo = new String(data);

            servers.add(serverInfo);
        }

        onlineServers = servers;
        System.out.println("查詢了一次zk,當前線上的伺服器有:" + servers);

    }

    public void sendRequest() throws Exception {
        Random random = new Random();
        while (true) {
            try {
                // 挑選一臺當前線上的伺服器
                int nextInt = random.nextInt(onlineServers.size());
                String server = onlineServers.get(nextInt);
                String hostname = server.split(":")[0];
                int port = Integer.parseInt(server.split(":")[1]);

                System.out.println("本次請求挑選的伺服器為:" + server);

                Socket socket = new Socket(hostname, port);
                OutputStream out = socket.getOutputStream();
                InputStream in = socket.getInputStream();

                out.write("haha".getBytes());
                out.flush();

                byte[] buf = new byte[256];
                int read = in.read(buf);
                System.out.println("伺服器響應的時間為:" + new String(buf, 0, read));

                out.close();
                in.close();
                socket.close();

                Thread.sleep(2000);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

    }

    public static void main(String[] args) throws Exception {

        Consumer consumer = new Consumer();
        // 構造zk連接對象
        consumer.connectZK();

        // 查詢線上伺服器列表
        consumer.getOnlineServers();

        // 處理業務(向一臺伺服器發送時間查詢請求)
        consumer.sendRequest();

    }

}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1. gpedit.msc 組策略 2. sndrec32 錄音機 3. Nslookup IP地址偵測器 4. explorer 打開資源管理器 5. logoff 註銷命令 6. tsshutdn 60秒倒計時關機命令 7. lusrmgr.msc 本機用戶和組 8. services.msc ...
  • 一、echo在屏幕上列印內容 echo [選項] [輸出內容] -e 支持轉義字元控制的字元轉換 輸出帶顏色的文本 二、第一個腳本 編寫腳本 註意: 運行腳本 兩種方式 (1)賦予執行許可權,直接運行 (2)通過bash調用執行腳本 三、bash的基本功能 (1)命令別名 顯示已有的別名 alisa ...
  • vi -- 終端中的編輯器 visual interface ssh-- secure shell vim vi improved 打開和新建文件 vi 文件名 #如果文件已經存在,會直接打開文件 #如果文件不存在,會新建一個文件 打開文件並定位行 vi 文件 游標定位在最開頭 vi 文件 + 游標 ...
  • [TOC] 手動編譯PHP開發環境 這是一篇來自深夜加班的手稿 問題復盤 你有沒有遇到過這樣的情況,部署了集成環境,每次添加擴展的時候,總是需要找一堆的配置文件的位置(其實很多人都能熟練使用集成環境) 你有沒有遇到過這樣的情況,去面試,面試官問你: 有沒有自己手動編譯過環境? 你卻回答 我一般都使用 ...
  • 1. 遷移背景和限制條件 隨著功能的迭代或者數據表中數據量的增加,將現有數據進行遷移已是工作中經常遇到的事情。通常我們在平時遷移數據數據的時候,只需要用mysqldump、mysqlimport指令就能完成遷移功能,但在實際工作中,作為開發者的我們往往沒有這麼大的許可權(例如寫許可權)來操作線上數據,只 ...
  • 目前最流行的大數據查詢引擎非hive莫屬,它是基於MR的類SQL查詢工具,會把輸入的查詢SQL解釋為MapReduce,能極大的降低使用大數據查詢的門檻, 讓一般的業務人員也可以直接對大數據進行查詢。但因其基於MR,運行速度是一個弊端,通常運行一個查詢需等待很久才會有結果。對於此情況,創造了hive ...
  • 一、創建表 create table 表裡包含什麼類型的數據 表的名稱是什麼 主鍵 列的名稱是什麼 每一列的數據類型是什麼 每一列的長度是多少 表裡哪些列可以是空的 語法: create table table_name (field1 data_type [not null], field2 da ...
  • 1. MySQL多表查詢 1.1 外鍵約束 為了消除多張表查詢出現的笛卡爾積的現象,MySQL在建表併進行多表之間的關鍵查詢可以使用外鍵關聯查詢。 外鍵:從表1(sub)的某列引用(ref)另外一個表2(main)的某列的值,把表1的這列叫做表2這列的外鍵。 1.2 外鍵的設置使用 比如上述最簡單的 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...