Rpc-實現Zookeeper註冊中心

来源:https://www.cnblogs.com/zko0/archive/2023/02/17/17130553.html
-Advertisement-
Play Games

1.前言 本文章是筆主在聲哥的手寫RPC框架的學習下,對註冊中心的一個拓展。因為聲哥某些部分沒有保留拓展性,所以本文章的項目與聲哥的工程有部分區別,核心內容在Curator的註冊發現與註銷,思想看準即可。 本文章Git倉庫:zko0/zko0-rpc 聲哥的RPC項目寫的確實很詳細,跟學一遍受益匪淺 ...


1.前言

本文章是筆主在聲哥的手寫RPC框架的學習下,對註冊中心的一個拓展。因為聲哥某些部分沒有保留拓展性,所以本文章的項目與聲哥的工程有部分區別,核心內容在Curator的註冊發現與註銷,思想看準即可。

本文章Git倉庫:zko0/zko0-rpc

聲哥的RPC項目寫的確實很詳細,跟學一遍受益匪淺:

何人聽我楚狂聲的博客

在聲哥的項目里使用Nacos作為了服務註冊中心。本人拓展添加了ZooKeeper實現服務註冊。

Nacos的服務註冊和發現,設計的不是非常好,每次服務的發現都需要去註冊中心拉取。本人實現ZooKeeper註冊中心時,參考了Dubbo的設計原理,結合本人自身想法,添加了本地緩存:

  • Client發現服務後緩存在本地,維護一個服務——實例列表
  • 當監聽到註冊中心的服務列表發生了變化,Client更新本地列表
  • 當註冊中心宕機,Client能夠依靠本地的服務列表繼續提供服務

問題:

  1. 實現服務註冊的本地緩存,還需要實現註冊中心的監聽,當註冊中心的服務發生更改時能夠實現動態更新。或者用輪訓的方式,定時更新,不過這種方式的服務實時性較差
  2. 當Server宕機,非臨時節點註冊容易出現服務殘留無法清除的問題。所以我建議全部使用臨時節點去註冊。

2.內容

zookeeper需要簡單學一下,知識內容非常簡單,搭建也很簡單,在此跳過。

如果你感興趣,可以參考我的ZooKeeper的文章:Zookeeper學習筆記 - zko0

①添加依賴

Curator:(簡化ZooKeeper客戶端使用)(Netfix研發,捐給Apache,是Apache頂級項目)

這裡排除slf4j依賴,因為筆主使用的slf4j存在衝突

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>

②代碼編寫

1.首先創建一個連接類:

@Slf4j
public class ZookeeperUtil {

    //內部化構造方法
    private ZookeeperUtil(){
    }

    private static final String SERVER_HOSTNAME= RegisterCenterConfig.getHostName();

    private static final Integer SERVER_PORT=RegisterCenterConfig.getServerPort();

    private static CuratorFramework zookeeperClient;

    public static CuratorFramework getZookeeperClient(){
        if (zookeeperClient==null){
            synchronized (ZookeeperUtil.class){
                if (zookeeperClient==null){
                    RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
                    zookeeperClient = CuratorFrameworkFactory.builder()
                            .connectString(SERVER_HOSTNAME+":"+SERVER_PORT)
                            .retryPolicy(retryPolic)
                            //  zookeeper根目錄為/serviceRegister,不為/
                            .namespace("serviceRegister")
                            .build();
                    zookeeperClient.start();
                }
            }
        }
        return zookeeperClient;
    }

    public static String getServerHostname(){
        return SERVER_HOSTNAME;
    }

    public static Integer getServerPort(){
        return SERVER_PORT;
    }

}

其中HOST,PORT信息我保存在regiserCenter.properties配置文件夾中,使用類讀取:

public class RpcConfig {


    //註冊中心類型
    private static String registerCenterType;

    //序列化類型
    private static String serializerType;

    //負載均衡類型
    private static String loadBalanceType;

    //配置Nacos地址
    private static String registerCenterHost;

    private static Integer registerCenterPort;


    private static boolean zookeeperDestoryIsEphemeral;

    private static String serverHostName;

    private static Integer serverPort;

    static {
        ResourceBundle bundle = ResourceBundle.getBundle("rpc");
        registerCenterType=bundle.getString("registerCenter.type");
        loadBalanceType=bundle.getString("loadBalance.type");
        registerCenterHost=bundle.getString("registerCenter.host");
        registerCenterPort = Integer.parseInt(bundle.getString("registerCenter.port"));
        try {
            zookeeperDestoryIsEphemeral="true".equals(bundle.getString("registerCenter.destory.isEphemeral"));
        } catch (Exception e) {
            zookeeperDestoryIsEphemeral=false;
        }
        serializerType=bundle.getString("serializer.type");
        serverHostName=bundle.getString("server.hostName");
        serverPort=Integer.parseInt(bundle.getString("server.port"));
    }

    public static String getRegisterCenterType() {
        return registerCenterType;
    }

    public static String getSerializerType() {
        return serializerType;
    }

    public static String getLoadBalanceType() {
        return loadBalanceType;
    }

    public static String getRegisterCenterHost() {
        return registerCenterHost;
    }

    public static Integer getRegisterCenterPort() {
        return registerCenterPort;
    }


    public static String getServerHostName() {
        return serverHostName;
    }

    public static Integer getServerPort() {
        return serverPort;
    }

    public static boolean isZookeeperDestoryIsEphemeral() {
        return zookeeperDestoryIsEphemeral;
    }

}

下麵的代碼我和聲哥有些不同,我將服務註冊,註銷方法放在ServerUtils中,服務發現方法放在ClientUtils中:

服務的高一致性存在兩種做法:

  • 因為ZooKeeper存在臨時節點,註冊中心可以實現Client(RPC的Server)斷開,註冊服務信息的自動丟失
  • 不設置為臨時節點,手動的服務註冊清除

我這裡兩種都實現了,雖然做兩種方式不同但是功能相同的代碼放在一起看起來很奇怪,這裡只是做演示。選擇其中一種即可。(我建議使用臨時節點,當Server宕機,殘留的服務信息也能及時清除)

註冊實現原理圖:

626723792516afae5530e634e691794

介面:

public interface ServiceDiscovery {
    InetSocketAddress searchService(String serviceName);

    void cleanLoaclCache(String serviceName);
}
public interface ServiceRegistry {
    //服務註冊
    void register(String serviceName, InetSocketAddress inetAddress);

    void cleanRegistry();
}

ZooKeeper介面實現:

public class ZookeeperServiceDiscovery implements ServiceDiscovery{

    private final LoadBalancer loadBalancer;

    public ZookeeperServiceDiscovery(LoadBalancer loadBalancer) {
        this.loadBalancer = loadBalancer;
    }
    
    @Override
    public InetSocketAddress searchService(String serviceName) {
        return ZookeeperClientUtils.searchService(serviceName,loadBalancer);
    }

    @Override
    public void cleanLoaclCache(String serviceName) {
        ZookeeperClientUtils.cleanLocalCache(serviceName);
    }
}
public class ZookeeperServiceRegistry implements ServiceRegistry{
    @Override
    public void register(String serviceName, InetSocketAddress inetAddress) {
        ZookeeperServerUitls.register(serviceName,inetAddress);
    }

    @Override
    public void cleanRegistry() {
        ZookeeperServerUitls.cleanRegistry();
    }
}

Factory工廠:

public class ServiceFactory {

    private static String center = RpcConfig.getRegisterCenterType();
    private static String lb= RpcConfig.getLoadBalanceType();

    private static  ServiceRegistry registry;

    private static  ServiceDiscovery discovery;

    private static Object registerLock=new Object();

    private static Object discoveryLock=new Object();

    public static ServiceDiscovery getServiceDiscovery(){
        if (discovery==null){
            synchronized (discoveryLock){
                if (discovery==null){
                    if ("nacos".equalsIgnoreCase(center)){
                        discovery= new NacosServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb));
                    }else if ("zookeeper".equalsIgnoreCase(center)){
                        discovery= new ZookeeperServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb));
                    }
                }
            }
        }
        return discovery;
    }

    public static ServiceRegistry getServiceRegistry(){
        if (registry==null){
            synchronized (registerLock){
                if (registry==null){
                    if ("nacos".equalsIgnoreCase(center)){
                        registry=  new NacosServiceRegistry();
                    }else if ("zookeeper".equalsIgnoreCase(center)){
                        registry= new ZookeeperServiceRegistry();
                    }
                }
            }
        }
        return registry;
    }

}

使用Gson序列化InetSocketAddress存在問題,編寫Util:

public class InetSocketAddressSerializerUtil {
    public static String getJsonByInetSockerAddress(InetSocketAddress address){
        HashMap<String, String> map = new HashMap<>();
        map.put("host",address.getHostName());
        map.put("port",address.getPort()+"");
        return new Gson().toJson(map);
    }

    public static InetSocketAddress getInetSocketAddressByJson(String json){
        HashMap<String,String> hashMap = new Gson().fromJson(json, HashMap.class);
        String host = hashMap.get("host");
        Integer port=Integer.parseInt(hashMap.get("port"));
        return new InetSocketAddress(host,port);
    }

}

上面主要是註冊,發現的邏輯,我把主要方法寫在了Utils中:

@Slf4j
public class ZookeeperServerUitls {

    private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();

    private static final Set<String> instances=new ConcurrentHashSet<>();

    public static void register(String serviceName, InetSocketAddress inetSocketAddress){

        serviceName=ZookeeperUtil.serviceName2Path(serviceName);;
        String uuid = UUID.randomUUID().toString();
        serviceName=serviceName+"/"+uuid;
        String json = InetSocketAddressSerializerUtil.getJsonByInetSockerAddress(inetSocketAddress);
        try {
            if (RpcConfig.isZookeeperDestoryIsEphemeral()){
                //會話結束節點,創建消失
                client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .forPath(serviceName,json.getBytes());
            } else {
                client.create()
                        .creatingParentsIfNeeded()
                        .forPath(serviceName,json.getBytes());
            }
        }
            catch (Exception e) {
            log.error("服務註冊失敗");
            throw  new RpcException(RpcError.REGISTER_SERVICE_FAILED);
        }
        //放入map
        instances.add(serviceName);
    }

    public static void cleanRegistry(){
        log.info("註銷所有註冊的服務");
        //如果自動銷毀,不需要清除
        if (RpcConfig.isZookeeperDestoryIsEphemeral()) return;
        if (ZookeeperUtil.getServerHostname()!=null&&ZookeeperUtil.getServerPort()!=null&&!instances.isEmpty()){
            for (String path:instances) {
                try {
                    client.delete().forPath(path);
                } catch (Exception e) {
                    log.error("服務註銷失敗");
                    throw new RpcException(RpcError.DESTORY_REGISTER_FALL);
                }
            }
        }
    }
}
@Slf4j
public class ZookeeperClientUtils {

    private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();

    private static final Map<String,  List<InetSocketAddress>> instances=new ConcurrentHashMap<>();

    public static InetSocketAddress searchService(String serviceName, LoadBalancer loadBalancer) {
        InetSocketAddress address;
        //本地緩存查詢
        if (instances.containsKey(serviceName)){
            List<InetSocketAddress> addressList = instances.get(serviceName);
            if (!addressList.isEmpty()){
                //使用lb進行負載均衡
                return loadBalancer.select(addressList);
            }
        }
        try {
            String path = ZookeeperUtil.serviceName2Path(serviceName);
            //獲取路徑下所有的實現
            List<String> instancePaths = client.getChildren().forPath(path);
            List<InetSocketAddress> addressList = new ArrayList<>();
            for (String instancePath : instancePaths) {
                byte[] bytes = client.getData().forPath(path+"/"+instancePath);
                String json = new String(bytes);
                InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
                addressList.add(instance);
            }
            addLocalCache(serviceName,addressList);
            return loadBalancer.select(addressList);
        } catch (Exception e) {
            log.error("服務獲取失敗====>{}",e);
            throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
        }
    }

    public static void cleanLocalCache(String serviceName){
        log.info("服務調用失敗,清除本地緩存,重新獲取實例===>{}",serviceName);
        instances.remove(serviceName);
    }

    public static void addLocalCache(String serviceName,List<InetSocketAddress> addressList){
        //直接替換原本的緩存
        instances.put(serviceName,addressList);
    }
}

③配置文件

rpc.properties放在resources下

#nacos    zookeeper
#registerCenter.type=nacos
registerCenter.type=zookeeper

#registerCenter.host=127.0.0.1
registerCenter.host=101.43.244.40

#zookeeper port default 2181
#registerCenter.port=9000
registerCenter.port=2181

registerCenter.destory.isEphemeral=false

#??random?roundRobin
loadBalance.type=random

#kryo json jdk
serializer.type=kryo

server.hostName=127.0.0.1
server.port=9999

④更多

聲哥的代碼我做了很多修改,如果上述代碼和你參考的項目代碼出入比較大,可以查看本文章的工程閱讀。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 談到java中的併發,我們就避不開線程之間的同步和協作問題,談到線程同步和協作我們就不能不談談jdk中提供的AbstractQueuedSynchronizer(翻譯過來就是抽象的隊列同步器)機制; (一)、AQS中的state和Node含義: AQS中提供了一個int volatile state ...
  • 題目來源:https://www.acwing.com/problem/content/description/789/ 題目描述 給定你一個長度為 n 的整數數列。 請你使用歸併排序對這個數列按照從小到大進行排序。 並將排好序的數列按順序輸出。 輸入格式 輸入共兩行,第一行包含整數 n。 第二行包 ...
  • SpringMVC文件上傳 1.基本介紹 SpringMVC 為文件上傳提供了直接的支持,這種支持是通過即插即用的 MultipartResolver 實現的。spring 用 Jacarta Commons FileUpload 技術實現了一個 MultipartResolver 的實現類:Com ...
  • 對於廣大書蟲而言,沒有小說看是最痛苦的,你身邊有這樣的人嗎? 今天咱們分享一個小說下載器代碼,打包成exe後,發給你的小伙伴也能直接使用… 思路流程 什麼是爬蟲? 按照一定的規則, 去採集互聯網上面數據 爬蟲可以做什麼? 採集數據: 定製化採集數據 自動化腳本:自動點贊/評論/刷票/商品搶購腳本/自 ...
  • 系列內容 elasticsearch 概述 elasticsearch 安裝 elasticsearch 查詢 客戶端api使用 1. elasticsearch 概述 1.1 簡介 官網: https://www.elastic.co/ Elasticsearch (簡稱ES)是一個分散式、RES ...
  • 1 TreeMap基本介紹 Java TreeMap實現了SortedMap介面,也就是說會按照key的大小順序對Map中的元素進行排序 key大小的評判可以通過其本身的自然順序(natural ordering),也可以通過構造時傳入的比較器(Comparator)。 TreeMap底層通過紅黑樹 ...
  • 關於靜態代碼塊和匿名代碼塊以及結構體在程式運行過程中的調用順序實驗(續) 之前發過一篇博客講述自己對靜態代碼塊、匿名代碼塊以及結構體在程式中運行結果的小實驗。本次再接觸到abstract抽象類後,覺得在做一個實驗,看看抽象類方法繼承中三個模塊的調用順序。所編寫的代碼如下: Application類( ...
  • 特別:下文的“容量”、“數組長度”,“capacity” 都是指底層數組長度,即 table.length 1 一般數據結構及特點 數組:占用連續記憶體的數據結構,查找容易[O(1)],插入困難[O(n)] 鏈表:由一組指向(單向或者雙向)的節點連接的數據結構,記憶體不連續,查找困難,但插入刪除容易 哈 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...