輕量級RPC設計與實現第二版

来源:https://www.cnblogs.com/maratong/archive/2020/02/19/12333748.html
-Advertisement-
Play Games

在上一個版本中利用netty實現了簡單的一對一的RPC,需要手動設置服務地址,限制性較大。 在本文中,利用zookeeper作為服務註冊中心,在服務端啟動時將本地的服務信息註冊到zookeeper中,當客戶端發起遠程服務調用時,先從zookeeper中獲取該服務的地址,然後根據獲得的這個地址來利用n ...


在上一個版本中利用netty實現了簡單的一對一的RPC,需要手動設置服務地址,限制性較大。
在本文中,利用zookeeper作為服務註冊中心,在服務端啟動時將本地的服務信息註冊到zookeeper中,當客戶端發起遠程服務調用時,先從zookeeper中獲取該服務的地址,然後根據獲得的這個地址來利用netty進行網路傳送。
在服務端和註冊中心之間需要建立監聽,當服務信息發生變化或網路連接等問題時需要對註冊中心的服務信息進行修改。在本文中創建了服務註冊監控中心,利用心跳機制來判斷與服務端是否有較穩定的連接,當出現網路不穩定時,則從註冊中心中刪除屬於該服務端的服務信息。在本項目中設定在5分鐘內3次以上沒有發送心跳包為不穩定狀態。

關於心跳機制,之前有一篇文章介紹過:Dubbo心跳機制

zookeeper註冊中心

zookeeper是hadoop中的一個重要組件,其主要是作為分散式協調服務
zookeeper採用節點樹的數據模型,類似linux文件系統。
每個節點稱做一個ZNode,每個ZNode都可以通過路徑唯一標識,同時每個節點還可以存儲少量數據。
本項目借鑒dubbo的註冊中心模型來設計本文的註冊中心。
總體上設計了四級節點,在一個節點是一個持久節點/register,表示是記錄註冊服務的區域。二級節點是服務介面名,三級節點是遠程服務ip地址,該節點是臨時節點,節點存儲的數據是具體的實現類名。
1.png
在客戶端會根據服務介面名在註冊中心進行查找,得到遠程服務ip地址,並根據節點中存儲的具體實現類名進行反射。

首先進行zookeeper初始化,利用了CuratorFramework有關類

private static void init() {
        RetryPolicy retryPolicy = new RetryNTimes(ZKConsts.RETRYTIME, ZKConsts.SLEEP_MS_BEWTEENR_RETRY);
        client = CuratorFrameworkFactory.builder().connectString(ZKConsts.ZK_SERVER_PATH)
                .sessionTimeoutMs(ZKConsts.SESSION_TIMEOUT_MS).retryPolicy(retryPolicy)
                .namespace(ZKConsts.WORK_SPACE).build();
        client.start();
    }

服務的註冊代碼

public static void register(URL url) {
        try {
            String interfaceName = url.getInterfaceName();
            String implClassName = url.getImplClassName();
            Stat stat = client.checkExists().forPath(getPath(interfaceName, url.toString()));
            if (stat != null) {
                System.out.println("該節點已存在!");
                client.delete().forPath(getPath(interfaceName, url.toString()));
            }
            client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    //許可權控制,任何連接的客戶端都可以操作該屬性znode
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(getPath(interfaceName, url.toString()), implClassName.getBytes());
            System.out.println(getPath(interfaceName, url.toString()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

根據服務介面名來獲取遠程服務連接地址

public static URL random(String interfaceName) {
        try {

            System.out.println("開始查找服務節點:" + getPath(interfaceName));
            List<String> urlList = client.getChildren().forPath("/" + interfaceName);
            System.out.println("結果:" + urlList);
            String serviceUrl = urlList.get(0);
            String[] urls = serviceUrl.split(":");
            String implClassName = get(interfaceName, serviceUrl);
            return new URL(urls[0], Integer.valueOf(urls[1]), interfaceName, implClassName);
        } catch (Exception e) {
            System.out.println(e);
            e.printStackTrace();
        }
        return null;
    }

註冊中心與服務端進行連接時需要判斷是否維持了穩定的連接,如果服務端出現宕機等情況時需要從註冊中心中刪除這些服務。
以前的一些處理機制,有session機制和wacher機制。
session機制
每個zookeeper註冊中心與服務端進行連接時會創建一個session,在設置的sessionTimeout內,服務端會與註冊中心進行心跳包的定時發送,從而感知每個客戶端是否宕機,如果創建某個臨時Znode節點對應的session銷毀時,相應的臨時節點也會被註冊中心刪除。
watcher機制
針對每個節點的操作,都有要給監督者進行watcher,當監控的某個節點發生了變化,則會觸發watcher事件。註冊中心的watcher是一次性的,觸發後會被銷毀。父節點,子節點增刪改都能夠觸發watcher。觸發銷毀後,下次需要監聽時還需要再註冊一次。
本文心跳機制
服務端定時向註冊中心發送本機地址,看作心跳數據包,而註冊中心監控則維持一個channelId和具體地址的map,並且通過IdleHandler監聽空閑事件,到達一定的空閑次數則認為不活躍,當不活躍時,zookeeper刪除對應的url節點。該版本實現了上面的內容,後續的步驟在以後的版本實現。
如果10s內沒有觸發讀,就會執行userEventTriggered方法。如果5分鐘中出現兩次不活躍次數,就認定該連接不穩定,註冊中心會移除屬於該服務端的服務。你也可以根據實際情況設定不穩定標準。

 service.scheduleAtFixedRate(() -> {
                if (future.channel().isActive()) {
                    int time = new Random().nextInt(5);
                    log.info("本次定時任務獲取的隨機數:{}", time);
                    if (time > 3) {
                        log.info("發送本地地址到註冊中心:{}", url);
                        future.channel().writeAndFlush(url);
                    }
                }
            }, 60, 60, TimeUnit.SECONDS);
 @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent state = (IdleStateEvent)evt;
            if (state.state().equals(IdleState.READER_IDLE)) {
                log.info("讀空閑");
            } else if (state.state().equals(IdleState.WRITER_IDLE)) {
                log.info("寫空閑");
            }
            //在一定時間內讀寫空閑才會關閉鏈接
            else if (state.state().equals(IdleState.ALL_IDLE)) {
                if (++inActiveCount == 1) {
                    start = System.currentTimeMillis();
                }
                int minute = (int)((System.currentTimeMillis() - start) / (60 * 1000)) + 1;
                log.info("第{}次讀寫都空閑,計時分鐘數{}", inActiveCount, minute);
                if (inActiveCount > 2 && minute < 5) {
                    log.info("移除不活躍ip");
                    removeAndClose(ctx);
                } else {
                    if (minute >= 5) {
                        log.info("新周期開始");
                        start = 0;
                        inActiveCount = 0;
                    }
                }
            }
        }
    }

具體實現代碼:RPC第二版


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 斜堆。 測試文件 main.cpp: #include <iostream> #include "SkewHeap.h" using std::cout; using std::endl; int main() { SkewHeap<int> lh(SkewHeap<int>::HeapType:: ...
  • 攔截器 攔截器分同步攔截器和非同步攔截器; HandlerInterceptor 方法和執行時機 可以看DispathcerServlet的原來確定它的三個方法的執行時機; AsynHandlerInterceptor 看註釋,主要用來清理在併發環境加清理ThreadLocal的數據; Respons ...
  • 8、JSP 8.1、什麼是JSP Java Server Pages : Java伺服器端頁面,也和Servlet一樣,用於動態Web技術! 最大的特點: 寫JSP就像在寫HTML 區別: HTML只給用戶提供靜態的數據 JSP頁面中可以嵌入JAVA代碼,為用戶提供動態數據; 8.2、JSP原理 思 ...
  • 類載入與實例化 基本步驟 類裝載分為以下 5 個步驟: 載入:根據查找路徑找到相應的 class 文件然後導入 檢查:檢查載入的 class 文件的正確性 準備:給類中的靜態變數分配記憶體空間 解析:虛擬機將常量池中的符號引用替換成直接引用的過程。符號引用理解為一個標示,而直接引用直接指向記憶體中的地址 ...
  • 左傾堆,用於堆的快速合併。 規則: ① 節點的鍵值小於或等於它的左右子節點的鍵值。 ② 節點的左孩子的NPL >= 右孩子的NPL。 ③ 節點的NPL = 它的右孩子的NPL + 1。 測試文件 main.cpp: #include <iostream> #include "LeftistHeap. ...
  • 在最近一段時間里,通過搜集有關資料加上自己的理解,設計了一款輕量級RPC,起了一個名字 lightWeightRPC 。它擁有一個RPC常見的基本功能。主要功能和特點如下: 利用Spring實現依賴註入與參數配置 利用Netty來實現客戶端與服務端的遠程通信 利用Hessian來實現序列化 設置Zo ...
  • 在本版本中引入了SPI機制,關於Java的SPI機制與Dubbo的SPI機制在以前的文章中介紹過。 傳送門: "Dubbo的SPI機制與JDK機制的不同及原理分析" 因為設計的RPC框架是基於Spring的,時常會遇到依賴註入問題。Spring中也有SPI機制,但是它有有個缺點,就是在利用SPI機制 ...
  • 在前兩個版本中,每次發起請求一次就新建一個netty的channel連接,如果在高併發情況下就會造成資源的浪費,這時實現 非同步請求 就十分重要,當有多個請求線程時,需要設計一個 線程池 來進行管理。除此之外,當前方法過於依賴註冊中心,在高併發情況下對註冊中心造成了壓力;另外如果註冊中心出現宕機等情況 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 在我們開發過程中基本上不可或缺的用到一些敏感機密數據,比如SQL伺服器的連接串或者是OAuth2的Secret等,這些敏感數據在代碼中是不太安全的,我們不應該在源代碼中存儲密碼和其他的敏感數據,一種推薦的方式是通過Asp.Net Core的機密管理器。 機密管理器 在 ASP.NET Core ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 順序棧的介面程式 目錄順序棧的介面程式頭文件創建順序棧入棧出棧利用棧將10進位轉16進位數驗證 頭文件 #include <stdio.h> #include <stdbool.h> #include <stdlib.h> 創建順序棧 // 指的是順序棧中的元素的數據類型,用戶可以根據需要進行修改 ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • C總結與剖析:關鍵字篇 -- <<C語言深度解剖>> 目錄C總結與剖析:關鍵字篇 -- <<C語言深度解剖>>程式的本質:二進位文件變數1.變數:記憶體上的某個位置開闢的空間2.變數的初始化3.為什麼要有變數4.局部變數與全局變數5.變數的大小由類型決定6.任何一個變數,記憶體賦值都是從低地址開始往高地 ...
  • 如果讓你來做一個有狀態流式應用的故障恢復,你會如何來做呢? 單機和多機會遇到什麼不同的問題? Flink Checkpoint 是做什麼用的?原理是什麼? ...
  • C++ 多級繼承 多級繼承是一種面向對象編程(OOP)特性,允許一個類從多個基類繼承屬性和方法。它使代碼更易於組織和維護,並促進代碼重用。 多級繼承的語法 在 C++ 中,使用 : 符號來指定繼承關係。多級繼承的語法如下: class DerivedClass : public BaseClass1 ...
  • 前言 什麼是SpringCloud? Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的開發便利性簡化了分散式系統的開發,比如服務註冊、服務發現、網關、路由、鏈路追蹤等。Spring Cloud 並不是重覆造輪子,而是將市面上開發得比較好的模塊集成進去,進行封裝,從 ...
  • class_template 類模板和函數模板的定義和使用類似,我們已經進行了介紹。有時,有兩個或多個類,其功能是相同的,僅僅是數據類型不同。類模板用於實現類所需數據的類型參數化 template<class NameType, class AgeType> class Person { publi ...
  • 目錄system v IPC簡介共用記憶體需要用到的函數介面shmget函數--獲取對象IDshmat函數--獲得映射空間shmctl函數--釋放資源共用記憶體實現思路註意 system v IPC簡介 消息隊列、共用記憶體和信號量統稱為system v IPC(進程間通信機制),V是羅馬數字5,是UNI ...