【深入淺出 Yarn 架構與實現】2-2 Yarn 基礎庫 - 底層通信庫 RPC

来源:https://www.cnblogs.com/shuofxz/archive/2022/11/09/16874715.html
-Advertisement-
Play Games

RPC(Remote Procedure Call) 是 Hadoop 服務通信的關鍵庫,支撐上層分散式環境下複雜的進程間(Inter-Process Communication, IPC)通信邏輯,是分散式系統的基礎。允許運行於一臺電腦上的程式像調用本地方法一樣,調用另一臺電腦的子程式。由於 ...


RPC(Remote Procedure Call) 是 Hadoop 服務通信的關鍵庫,支撐上層分散式環境下複雜的進程間(Inter-Process Communication, IPC)通信邏輯,是分散式系統的基礎。允許運行於一臺電腦上的程式像調用本地方法一樣,調用另一臺電腦的子程式。
由於 RPC 服務整體知識較多,本節僅針對對 Yarn RPC 進行簡略介紹,詳細內容會後續開專欄介紹。

一、RPC 通信模型介紹

為什麼會有 RPC 框架?
在分散式或微服務情境下,會有大量的服務間交互,如果用傳統的 HTTP 協議埠來通信,需要耗費大量時間處理網路數據交換上,還要考慮編解碼等問題。如下圖所示。
image.png

  • 客戶端通過 RPC 框架的動態代理得到一個代理類實例,稱為 Stub(樁)
  • 客戶端調用介面方法(實際是 Stub 對應的方法),Stub 會構造一個請求,包括函數名和參數
  • 服務端收到這個請求後,先將服務名(函數)解析出來,查找是否有對應的服務提供者
  • 服務端找到對應的實現類後,會傳入參數調用
  • 服務端 RPC 框架得到返回結果後,再進行封裝返回給客戶端
  • 客戶端的 Stub 收到返回值後,進行解析,返回給調用者,完成 RPC 調用。

二、Hadoop RPC 介紹

一)簡介

Hadoop RPC 是 Hadoop 自己實現的一個 RPC 框架,主要有以下幾個特點:

  • 透明性:像調用本地方法一樣調用遠程方法。
  • 高性能:Hadoop 各個系統均採用 Master/Slave 結構,Master 是一個 RPC Server 用於處理各個 Slave 節點發送的請求,需要有高性能。
  • 可控性:由於 JDK 中的 RPC 框架 RMI 重量級過大,且封裝度太高,不方便控制和修改。因此實現了自己的 RPC 框架,以保證輕量級、高性能、可控性。

框架原理和整體執行流程與第一節介紹的 RPC 框架一致,感興趣可深入源碼進行瞭解。

二)總體架構

Hadoop RPC 架構底層依靠 Java 的 nio、反射、動態代理等功能實現「客戶端 - 伺服器(C/S)」通信模型。
上層封裝供程式調用的 RPC 介面。
image.png

三、案例 demo

下麵兩個案例的 demo 已上傳至 github。有幫助的話點個⭐️。
https://github.com/Simon-Ace/hadoop_rpc_demo

一)RPC Writable 案例實現

1、新建一個 maven 工程,添加依賴

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.8.5</version>
</dependency>

2、定義 RPC 協議

public interface BusinessProtocol {
    void mkdir(String path);
    String getName(String name);
    long versionID = 345043000L;
}

3、定義協議實現

public class BusinessIMPL implements BusinessProtocol {
    @Override
    public void mkdir(String path) {
        System.out.println("成功創建了文件夾 :" + path);
    }

    @Override
    public String getName(String name) {
        System.out.println("成功打了招呼: hello :" + name);
        return "bigdata";
    }
}

4、通過 Hadoop RPC 構建一個 RPC 服務端

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

import java.io.IOException;

public class MyServer {
    public static void main(String[] args) {
        try {
            // 構建一個 RPC server 端,提供了一個 BussinessProtocol 協議的 BusinessIMPL 服務實現
            RPC.Server server = new RPC.Builder(new Configuration())
                    .setProtocol(BusinessProtocol.class)
                    .setInstance(new BusinessIMPL())
                    .setBindAddress("localhost")
                    .setPort(6789)
                    .build();

            server.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

5、構建一個 RPC 客戶端

import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.net.InetSocketAddress;

public class MyClient {
    public static void main(String[] args) {
        try {
        	// 獲取代理類實例,也就是 Stub
            BusinessProtocol proxy = RPC.getProxy(BusinessProtocol.class, BusinessProtocol.versionID,
                    new InetSocketAddress("localhost", 6789), new Configuration());

            // 通過 Stub 發送請求,實際使用就像調用本地方法一樣
            proxy.mkdir("/tmp/ABC");
            String res = proxy.getName("Simon");
            System.out.println("從 RPC 服務端接收到的返回值:" + res);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

6、測試,先啟動服務端,再啟動客戶端
服務端輸出

成功創建了文件夾 :/tmp/ABC
成功打了招呼: hello :Simon

客戶端輸出

從 RPC 服務端接收到的返回值:bigdata

二)RPC Protobuf 案例實現

項目結構如下
image.png

對 proto 文件格式不熟悉的同學,參考上一篇文章《2-1 Yarn 基礎庫概述》

MyResourceTrackerMessage.proto 定義數據格式

syntax = "proto3";
option java_package = "com.shuofxz.protobuf_rpc.proto";
option java_outer_classname = "MyResourceTrackerMessageProto";
option java_generic_services = true;
option java_generate_equals_and_hash = true;

message MyRegisterNodeManagerRequestProto {
    string hostname = 1;
    int32 cpu = 2;
    int32 memory = 3;
}

message MyRegisterNodeManagerResponseProto {
    string flag = 1;
}

MyResourceTracker.proto 定義 rpc 介面

syntax = "proto3";

import "com/shuofxz/protobuf_rpc/proto/MyResourceTrackerMessage.proto";
option java_package = "com.shuofxz.protobuf_rpc.proto";
option java_outer_classname = "MyResourceTrackerProto";
option java_generic_services = true;
option java_generate_equals_and_hash = true;

service MyResourceTrackerService {
    rpc registerNodeManager(MyRegisterNodeManagerRequestProto) returns (MyRegisterNodeManagerResponseProto);
}

2、對 proto 文件編譯,生成 java 類

# 在項目根目錄執行,路徑按照自己的進行修改
protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResource.proto

protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResourceTracker.proto

3、定義調用方法介面 MyResourceTracker

import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto;

public interface MyResourceTracker {
    MyRegisterNodeManagerResponseProto registerNodeManager(MyRegisterNodeManagerRequestProto request) throws Exception;
}

4、對調用方法介面的實現(服務端)

import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;

public class MyResourceTrackerImpl implements MyResourceTracker {
    @Override
    public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(
            MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) {

        // 輸出註冊的消息
        String hostname = request.getHostname();
        int cpu = request.getCpu();
        int memory = request.getMemory();
        System.out.println("NodeManager 的註冊消息: hostname = " + hostname + ", cpu = " + cpu + ", memory = " + memory);

        // 省略處理邏輯
        // 構建一個響應對象,用於返回
        MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.Builder builder =
                MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.newBuilder();
        // 直接返回 True
        builder.setFlag("true");
        MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = builder.build();
        return response;
    }
}

5、編寫 proto 的協議介面

import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;
import org.apache.hadoop.ipc.ProtocolInfo;

@ProtocolInfo(protocolName = "com.shuofxz.blablabla", protocolVersion = 1)
public interface MyResourceTrackerPB extends MyResourceTrackerProto.MyResourceTrackerService.BlockingInterface {
}

6、編寫 proto 的協議介面實現(服務端)

import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import com.shuofxz.protobuf_rpc.interf.MyResourceTracker;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;

public class MyResourceTrackerServerSidePB implements MyResourceTrackerPB {
    final private MyResourceTracker server;

    public MyResourceTrackerServerSidePB(MyResourceTracker server) {
        this.server = server;
    }

    @Override
    public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(
            RpcController controller, MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) throws ServiceException {
        try {
            return server.registerNodeManager(request);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

7、RPC Server 的實現

import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;

import java.io.IOException;

public class ProtobufRpcServer {
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();

        RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);

        // 構建 Rpc Server
        RPC.Server server = new RPC.Builder(conf)
                .setProtocol(MyResourceTrackerPB.class)
                .setInstance(MyResourceTrackerProto.MyResourceTrackerService
                        .newReflectiveBlockingService(new MyResourceTrackerServerSidePB(new MyResourceTrackerImpl())))
                .setBindAddress("localhost")
                .setPort(9998)
                .setNumHandlers(1)
                .setVerbose(true)
                .build();

        // Rpc Server 啟動
        server.start();
    }
}

8、RPC Client 的實現

import com.google.protobuf.ServiceException;
import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;

import java.io.IOException;
import java.net.InetSocketAddress;

public class ProtobufRpcClient {
    public static void main(String[] args) throws IOException {
        // 設置 RPC 引擎為 ProtobufRpcEngine
        Configuration conf = new Configuration();
        String hostname = "localhost";
        int port = 9998;
        RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);

        // 獲取代理
        MyResourceTrackerPB protocolProxy = RPC
                .getProxy(MyResourceTrackerPB.class, 1, new InetSocketAddress(hostname, port), conf);

        // 構建請求對象
        MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.Builder builder =
                MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.newBuilder();
        MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto bigdata02 =
                builder.setHostname("bigdata02").setCpu(64).setMemory(128).build();

        // 發送 RPC 請求,獲取響應
        MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = null;
        try {
            response = protocolProxy.registerNodeManager(null, bigdata02);
        } catch (ServiceException e) {
            e.printStackTrace();
        }

        // 處理響應
        String flag = response.getFlag();
        System.out.println("最終註冊結果: flag = " + flag);
    }
}

9、測試
先啟動服務端,在啟動客戶端。

四、總結

本節介紹了 Hadoop 底層通信庫 RPC。首先介紹了 RPC 的框架和原理,之後對 Hadoop 自己實現的 RPC 進行了介紹,並給出了兩個 demo 實踐。
強烈建議瞭解基礎知識後,跟著 demo 實現一個案例出來,可以更好的幫助你理解。
文中 Demo:https://github.com/Simon-Ace/hadoop_rpc_demo


參考文章:
YARN-RPC網路通信架構設計
YARN-高併發RPC源碼實現
Hadoop3.2.1 【 HDFS 】源碼分析 : RPC原理 [八] Client端實現&源碼
Hadoop RPC機制詳解
Hadoop2源碼分析-RPC探索實戰
《Hadoop 技術內幕 - 深入解析 Yarn 結構設計與實現原理》3.3 節


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 簡述 將各個功能拆分後分別封裝(各功能解耦),需要時可自由組合(包括執行順序) 話不多說,看個優化案例吧。 優化案例 最初版 以下是模擬客戶端想服務端發送請求的業務流程。 客戶端調用代碼如下。 // 客戶端 public class Client { public static void main( ...
  • 前言 大家早好、午好、晚好吖~ 這不光棍節快到了,表弟準備寫一封情書給他的女神,想在光棍節之前脫單。 為了提高成功率,於是跑來找我給他參謀參謀,本來我是不想理他的。 不過誰讓他是我表弟呢(請我洗jio),於是教給他程式員的終極浪漫絕招 先假裝給女神拍照,然後再把情書寫到她的照片上列印出來送給她,嘿嘿 ...
  • HTTP協議 1.什麼是HTTP協議? 超文本傳輸協議(HTTP,HyperText Transfer Protocol)是互聯網上應用廣泛的一種網路協議。是工作在tcp/ip協議基礎上的,所有的www文件都遵守這個標準 http1.0 短連接 http1.1 長連接 HTTP是TCP/IP協議的一 ...
  • “心有所向,日復一日,必有精進” 前言: 想必大家看完我之前寫的搭建redis伺服器,大家都已經把redis搭建起來了吧~如果沒有搭建起來的小可愛請移步這裡哦~[從0到1搭建redis6](https://www.cnblogs.com/qsmm/p/16871488.html "從0到1搭建red ...
  • 文章有點長,我決定用半個小時來和你分享~😂 廢話不多說,上代碼。。。 基於Seata 1.5.2,項目中用 seata-spring-boot-starter 1. SeataDataSourceAutoConfiguration SeataDataSourceAutoConfiguration ...
  • 1、開發文檔 微信開發文檔:https://pay.weixin.qq.com/wiki/doc/api/jsapi.php?chapter=9_1 安全規範:https://pay.weixin.qq.com/wiki/doc/api/jsapi.php?chapter=4_3 1、簽名演算法 (簽 ...
  • 簡介: 命令模式,又稱之為動作模式或者事務模式,屬於行為型的設計模式。 將不同的請求封裝成不同的請求對象,以便使用不同的請求; 角色都會用飯館來舉例子: 命令下達者:顧客 命令接受者:服務員 命令本身: 菜單 命令執行者:廚師 適用場景: Laravel的事件調度機制有用到了命令模式。 想要解耦服務 ...
  • Java 基礎一 【註釋】 comment 對代碼進行解釋說明1.Java規範的註釋有3種單行註釋://多行註釋:/* */文檔註釋(java特有)2.單行註釋和多行註釋的作用:對所寫的程式進行解釋說明,增強可讀性。方便自己,方便別人。可以調試所寫的代碼3.特點單行註釋和多行註釋,註釋了的內容不參與 ...
一周排行
    -Advertisement-
    Play Games
  • 1、預覽地址:http://139.155.137.144:9012 2、qq群:801913255 一、前言 隨著網路的發展,企業對於信息系統數據的保密工作愈發重視,不同身份、角色對於數據的訪問許可權都應該大相徑庭。 列如 1、不同登錄人員對一個數據列表的可見度是不一樣的,如數據列、數據行、數據按鈕 ...
  • 前言 上一篇文章寫瞭如何使用RabbitMQ做個簡單的發送郵件項目,然後評論也是比較多,也是準備去學習一下如何確保RabbitMQ的消息可靠性,但是由於時間原因,先來說說設計模式中的簡單工廠模式吧! 在瞭解簡單工廠模式之前,我們要知道C#是一款面向對象的高級程式語言。它有3大特性,封裝、繼承、多態。 ...
  • Nodify學習 一:介紹與使用 - 可樂_加冰 - 博客園 (cnblogs.com) Nodify學習 二:添加節點 - 可樂_加冰 - 博客園 (cnblogs.com) 介紹 Nodify是一個WPF基於節點的編輯器控制項,其中包含一系列節點、連接和連接器組件,旨在簡化構建基於節點的工具的過程 ...
  • 創建一個webapi項目做測試使用。 創建新控制器,搭建一個基礎框架,包括獲取當天日期、wiki的請求地址等 創建一個Http請求幫助類以及方法,用於獲取指定URL的信息 使用http請求訪問指定url,先運行一下,看看返回的內容。內容如圖右邊所示,實際上是一個Json數據。我們主要解析 大事記 部 ...
  • 最近在不少自媒體上看到有關.NET與C#的資訊與評價,感覺大家對.NET與C#還是不太瞭解,尤其是對2016年6月發佈的跨平臺.NET Core 1.0,更是知之甚少。在考慮一番之後,還是決定寫點東西總結一下,也回顧一下.NET的發展歷史。 首先,你沒看錯,.NET是跨平臺的,可以在Windows、 ...
  • Nodify學習 一:介紹與使用 - 可樂_加冰 - 博客園 (cnblogs.com) Nodify學習 二:添加節點 - 可樂_加冰 - 博客園 (cnblogs.com) 添加節點(nodes) 通過上一篇我們已經創建好了編輯器實例現在我們為編輯器添加一個節點 添加model和viewmode ...
  • 前言 資料庫併發,數據審計和軟刪除一直是數據持久化方面的經典問題。早些時候,這些工作需要手寫複雜的SQL或者通過存儲過程和觸發器實現。手寫複雜SQL對軟體可維護性構成了相當大的挑戰,隨著SQL字數的變多,用到的嵌套和複雜語法增加,可讀性和可維護性的難度是幾何級暴漲。因此如何在實現功能的同時控制這些S ...
  • 類型檢查和轉換:當你需要檢查對象是否為特定類型,並且希望在同一時間內將其轉換為那個類型時,模式匹配提供了一種更簡潔的方式來完成這一任務,避免了使用傳統的as和is操作符後還需要進行額外的null檢查。 複雜條件邏輯:在處理複雜的條件邏輯時,特別是涉及到多個條件和類型的情況下,使用模式匹配可以使代碼更 ...
  • 在日常開發中,我們經常需要和文件打交道,特別是桌面開發,有時候就會需要載入大批量的文件,而且可能還會存在部分文件缺失的情況,那麼如何才能快速的判斷文件是否存在呢?如果處理不當的,且文件數量比較多的時候,可能會造成卡頓等情況,進而影響程式的使用體驗。今天就以一個簡單的小例子,簡述兩種不同的判斷文件是否... ...
  • 前言 資料庫併發,數據審計和軟刪除一直是數據持久化方面的經典問題。早些時候,這些工作需要手寫複雜的SQL或者通過存儲過程和觸發器實現。手寫複雜SQL對軟體可維護性構成了相當大的挑戰,隨著SQL字數的變多,用到的嵌套和複雜語法增加,可讀性和可維護性的難度是幾何級暴漲。因此如何在實現功能的同時控制這些S ...