簡單RPC框架-基於Consul的服務註冊與發現

来源:http://www.cnblogs.com/ASPNET2008/archive/2017/05/22/6892137.html
-Advertisement-
Play Games

一般我們常見的RPC框架都包含如下三個部分: 註冊中心,用於服務端註冊遠程服務以及客戶端發現服務 服務端,對外提供後臺服務,將自己的服務信息註冊到註冊中心 客戶端,從註冊中心獲取遠程服務的註冊信息,然後進行遠程過程調用 上面提到的註冊中心其實屬於服務治理,即使沒有註冊中心,RPC的功能也是完整的。之 ...


一般我們常見的RPC框架都包含如下三個部分:

  • 註冊中心,用於服務端註冊遠程服務以及客戶端發現服務
  • 服務端,對外提供後臺服務,將自己的服務信息註冊到註冊中心
  • 客戶端,從註冊中心獲取遠程服務的註冊信息,然後進行遠程過程調用
    上面提到的註冊中心其實屬於服務治理,即使沒有註冊中心,RPC的功能也是完整的。之前我大多接觸的是基於zookeeper的註冊中心,這裡基於consul來實現註冊中心的基本功能。

Consul的一些特點:

  • Raft相比Paxos直接

此外不多描述,還沒研究raft

  • 支持數據中心,可以用來解決單點故障之類的問題
  • 集成相比zookeeper更加簡單(代碼量少,邏輯清晰簡單)
  • 支持健康檢查,支持http以及tcp
  • 自帶UI管理功能,不需要額外第三方支持。(zookeeper需要單獨部署zkui之類的第三方工具)
  • 支持key/value存儲

啟動consul之後訪問管理頁面

RPC集成

提取出服務註冊與服務發現兩個介面,然後使用Consul實現,這裡主要通過consul-client來實現(也可以是consul-api),需要在pom中引入:

<dependency>
	<groupId>com.orbitz.consul</groupId>
	<artifactId>consul-client</artifactId>
	<version>0.14.1</version>
</dependency>

服務註冊

  • RegistryService
    提供服務的註冊與刪除功能
public interface RegistryService {
    void register(RpcURL url);
    void unregister(RpcURL url);
}
  • AbstractConsulService
    consul的基類,用於構建Consl對象,服務於服務端以及客戶端。
public class AbstractConsulService {
    private static final Logger logger = LoggerFactory.getLogger(AbstractConsulService.class);

    protected final static String CONSUL_NAME="consul_node_jim";
    protected final static String CONSUL_ID="consul_node_id";
    protected final static String CONSUL_TAGS="v3";
    protected final static String CONSUL_HEALTH_INTERVAL="1s";

    protected Consul buildConsul(String registryHost, int registryPort){
        return Consul.builder().withHostAndPort(HostAndPort.fromString(registryHost+":"+registryPort)).build();
    }
}
  • ConsulRegistryService
    服務註冊實現類,在註冊服務的同時,指定了健康檢查。

服務的刪除暫時未實現

public class ConsulRegistryService extends AbstractConsulService implements RegistryService {

    private final static int CONSUL_CONNECT_PERIOD=1*1000;

    @Override
    public void register(RpcURL url) {
        Consul consul = this.buildConsul(url.getRegistryHost(),url.getRegistryPort());
        AgentClient agent = consul.agentClient();

        ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();
        ImmutableRegistration.Builder builder = ImmutableRegistration.builder();
        builder.id(CONSUL_ID).name(CONSUL_NAME).addTags(CONSUL_TAGS).address(url.getHost()).port(url.getPort()).addChecks(check);

        agent.register(builder.build());

    }

    @Override
    public void unregister(RpcURL url) {

    }

}

由於我實現的RPC是基於TCP的,所以服務註冊的健康檢查也指定為TCP,consul會按指定的IP以及埠建立連接以此判斷服務的健康狀態。如果是http,則需要調用http方法,同時指定健康檢查地址。

ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();

後臺的監控信息如下:

雖然只是指定了TCP,可能出於某種機制後臺依然會發起HTTP的健康檢查請求,上圖第一條請求日誌。

服務發現

  • DiscoveryService
    獲取所有註冊的有效的服務信息。
public interface DiscoveryService {

    List<RpcURL> getUrls(String registryHost, int registryPort);
}
  • ConsulDiscoveryService
    首先是獲取有效的服務列表:
List<RpcURL> urls= Lists.newArrayList();
Consul consul = this.buildConsul(registryHost,registryPort);
HealthClient client = consul.healthClient();
String name = CONSUL_NAME;
ConsulResponse object= client.getAllServiceInstances(name);
List<ImmutableServiceHealth> serviceHealths=(List<ImmutableServiceHealth>)object.getResponse();
for(ImmutableServiceHealth serviceHealth:serviceHealths){
    RpcURL url=new RpcURL();
    url.setHost(serviceHealth.getService().getAddress());
    url.setPort(serviceHealth.getService().getPort());
    urls.add(url);
}

服務更新監聽,當可用服務列表發現變化時需要通知調用端。

try {
    ServiceHealthCache serviceHealthCache = ServiceHealthCache.newCache(client, name);
    serviceHealthCache.addListener(new ConsulCache.Listener<ServiceHealthKey, ServiceHealth>() {
        @Override
        public void notify(Map<ServiceHealthKey, ServiceHealth> map) {
            logger.info("serviceHealthCache.addListener notify");
            RpcClientInvokerCache.clear();

        }
    });
    serviceHealthCache.start();
} catch (Exception e) {
    logger.info("serviceHealthCache.start error:",e);
}

由於之前對客戶端的Invoker有緩存,所以當服務列表有變化時需要對緩存信息進行更新。

這裡簡單的直接對緩存做清除處理,其實好一點的方法應該只對有變化的做處理。

  • RpcClientInvokerCache
    對客戶端實例化後的Invoker的緩存類
public class RpcClientInvokerCache {

    private static CopyOnWriteArrayList<RpcClientInvoker> connectedHandlers = new CopyOnWriteArrayList<>();

    public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlersClone(){
        return (CopyOnWriteArrayList<RpcClientInvoker>) RpcClientInvokerCache.getConnectedHandlers().clone();
    }

    public static void addHandler(RpcClientInvoker handler) {
        CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone();
        newHandlers.add(handler);
        connectedHandlers=newHandlers;
    }

    public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlers(){
        return connectedHandlers;
    }

    public static RpcClientInvoker get(int i){
        return connectedHandlers.get(i);
    }

    public static int size(){
        return connectedHandlers.size();
    }

    public static void clear(){
        CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone();
        newHandlers.clear();
        connectedHandlers=newHandlers;
    }
}
  • 負載均衡
    當同一個介面有多個服務同時提供服務時,客戶端需要有一定的負載均衡機制去決策將客戶端的請求分配給哪一臺伺服器,這裡實現一個簡易的輪詢實現方式。請求次數累加,累加的值與服務列表的大小做取模操作。

代碼中取服務列表的方法有小問題,未按介面信息取,後續再完成

public class RoundRobinLoadbalanceService implements LoadbalanceService {

    private AtomicInteger roundRobin = new AtomicInteger(0);
    private static final int MAX_VALUE=1000;
    private static final int MIN_VALUE=1;

    private AtomicInteger getRoundRobinValue(){
        if(this.roundRobin.getAndAdd(1)>MAX_VALUE){
            this.roundRobin.set(MIN_VALUE);
        }
        return this.roundRobin;
    }

    @Override
    public int index(int size) {
        return  (this.getRoundRobinValue().get() + size) % size;
    }
}

待完善的功能

  • 代碼中取服務列表的方法有小問題,未按介面信息取
  • 註冊中心的可用服務地址信息變化時,需要優化為按需更新
  • 註冊中心的服務刪除未實現

源碼地址

https://github.com/jiangmin168168/jim-framework


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 簡介 RabbitMQ是用erlang開發的,集群非常方便,因為erlang天生就是一門分散式語言,但其本身並不支持負載均衡。 Rabbit模式大概分為以下三種:單一模式、普通模式、鏡像模式 RabbitMQ工作模式 單一模式 (最簡單的情況,非集群模式) 普通模式 (預設的集群模式) 流程 對於Q ...
  • 原創聲明:本文為本人原創作品,絕非他處摘取,轉載請聯繫博主 相信大家在各大網站都會遇到,登錄時,在登錄框出現下次免登陸/一個月免登陸的類似選項,本次博文就是講解如何實現,在這記錄一下,也算是做個備忘錄合集,如果文中有錯,歡迎大家指出 為啥說自登陸一次呢,因為當訪問某個頁面時,如果第一次自動登錄失敗時 ...
  • 根據 autowire 的配置選擇裝配策略 byName 選擇和屬性名 name 一致的 bean 進行裝配; byType 根據類型選擇,如果對應的類型匹配到多個bean,則會報錯,如下配置: 報錯: 還能配置在 beans 標簽下,設置整個配置文件的裝配策略,裡面的值也是那幾個配置。 代碼鏈接: ...
  • 回顧大二的數據結構知識。從數組開始。實現了一個可自動擴充容量的泛型數組。 頭文件:Array.h 實現:Array.cpp ...
  • 1.什麼是JSTL JSP標準標簽庫(JSTL)是一個JSP標簽集合,它封裝了JSP應用的通用核心功能。 JSTL支持通用的、結構化的任務,比如迭代,條件判斷,XML文檔操作,國際化標簽,SQL標簽。 除了這些,它還提供了一個框架來使用集成JSTL的自定義標簽。 根據JSTL標簽所提供的功能,可以將 ...
  • 捲積運算,兩個輸入張量(輸入數據和捲積核)進行捲積,輸出代表來自每個輸入的信息張量。tf.nn.conv2d完成捲積運算。捲積核(kernel),權值、濾波器、捲積矩陣或模版,filter。權值訓練習得。捲積核(filter參數)權值數量決定需要學習捲積核數量。通道,電腦器視覺,描述輸出向量。RG ...
  • 1.什麼叫activeMQ? ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息匯流排。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。 2 ...
  • logging模塊 re正則表達式 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...