輕量級RPC設計與實現第一版

来源:https://www.cnblogs.com/maratong/archive/2020/02/19/12333741.html
-Advertisement-
Play Games

什麼是RPC RPC (Remote Procedure Call Protocol), 遠程過程調用,通俗的解釋就是:客戶端在不知道調用細節的情況下,調用存在於遠程電腦上的某個對象,就像調用本地應用程式中的對象一樣,不需要瞭解底層網路技術的協議。 簡單的整體工作流程 請求端發送一個調用的數據包, ...


什麼是RPC

RPC (Remote Procedure Call Protocol), 遠程過程調用,通俗的解釋就是:客戶端在不知道調用細節的情況下,調用存在於遠程電腦上的某個對象,就像調用本地應用程式中的對象一樣,不需要瞭解底層網路技術的協議。

簡單的整體工作流程
請求端發送一個調用的數據包,該包中包含有調用標識,參數等協議要求的參數。當響應端接收到這個數據包,對應的程式被調起,然後返回結果數據包,返回的數據包含了和請求的數據包中同樣的請求標識,結果等。

性能影響因素

  1. 利用的網路協議。可以使用應用層協議,例如HTTP或者HTTP/2協議;也可以利用傳輸層協議,例如TCP協議,但是主流的RPC還沒有採用UDP傳輸協議。
  2. 消息封裝格式。選擇或設計一種協議來封裝信息進行組裝發送。比如,dubbo中消息體數據包含dubbo版本號、介面名稱、介面版本、方法名稱、參數類型列表、參數、附加信息等。
  3. 序列化。信息在網路傳輸中要以二進位格式進行傳輸。序列化和反序列化,是對象到而二進位數據的轉換。常見的序列化方法有JSON、Hessian、Protostuff等。
  4. 網路IO模型。可以採用非阻塞式同步IO,也可以在伺服器上實現對多路IO模型的支持。
  5. 線程管理方式。在高併發請求下,可以使用單個線程運行服務的具體實現,但是會出現請求阻塞等待現象。也可以為每一個RPC具體服務的實現開啟一個獨立的線程運行,最大線程數有限制,可以使用線程池來管理多個線程的分配和調度。

第一版RPC

第一個版本簡單實現了RPC的最基本功能,即服務信息的發送與接收序列化方式動態代理等。
項目利用Springboot來實現依賴註入與參數配置,使用netty實現NIO方式的數據傳輸,使用Hessian來實現對象序列化。
動態代理
這裡要提到代理模式,它的特征是代理類與委托類有同樣的介面,代理類主要負責為委托類預處理消息、過濾消息、把消息轉發給委托類,以及事後處理消息等。代理類與委托類之間通常會存在關聯關係。
根據創建代理類的時間點,又可以分為靜態代理和動態代理。
在以往的靜態代理中需要手動為每一個目標編寫對應的代理類。如果系統已經有了成百上千個類,工作量太大了。
靜態代理由程式員創建或特定工具自動生成源代碼,也就是在編譯時就已經將介面與被代理類,代理類等確定下來。在程式運行之前,代理類的.class文件就已經生成。
代理類在程式運行時創建的代理方式被稱為代理模式。在靜態代理中,代理類是自己定義好的,在運行之前就已經編譯完成了。而在動態代理中,可以很方便地對代理類的函數進行統一的處理,而不用修改每個代理類中的方法。可以通過InvocationHandler介面來實現。

客戶端的動態代理

public class ProxyFactory {
    public static <T> T create(Class<T> interfaceClass) throws Exception {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new LwRpcClientDynamicProxy<T>(interfaceClass));
    }
}
@Slf4j
public class LwRpcClientDynamicProxy<T> implements InvocationHandler {
    private Class<T> clazz;
    public LwRpcClientDynamicProxy(Class<T> clazz) throws Exception {
        this.clazz = clazz;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        LwRequest lwRequest = new LwRequest();
        String requestId = UUID.randomUUID().toString();
        String className = method.getDeclaringClass().getName();
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();

        lwRequest.setRequestId(requestId);
        lwRequest.setClassName(className);
        lwRequest.setMethodName(methodName);
        lwRequest.setParameterTypes(parameterTypes);
        lwRequest.setParameters(args);
        NettyClient nettyClient = new NettyClient("127.0.0.1", 8888);
        log.info("開始連接伺服器端:{}", new Date());
        LwResponse send = nettyClient.send(lwRequest);
        log.info("請求後返回的結果:{}", send.getResult());
        return send.getResult();
    }
}

在服務端會利用在客戶端獲取到的類名。參數等信息利用反射機制進行調用。

Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] paramethers = request.getParameters();
        // 使用CGLIB 反射
        FastClass fastClass = FastClass.create(serviceClass);
        FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);
        return fastMethod.invoke(serviceBean, paramethers);

Netty客戶端

@Slf4j
public class NettyClient  {
    private String host;
    private Integer port;
    private LwResponse response;
    private EventLoopGroup group;
    private ChannelFuture future = null;
    private Object obj = new Object();
    private NettyClientHandler nettyClientHandler;
    public NettyClient(String host, Integer port) {
        this.host = host;
        this.port = port;
    }


    public LwResponse send(LwRequest request) throws Exception{
        nettyClientHandler = new NettyClientHandler(request);
        group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4));
                        pipeline.addLast(new LwRpcEncoder(LwRequest.class, new HessianSerializer()));
                        pipeline.addLast(new LwRpcDecoder(LwResponse.class, new HessianSerializer()));
                        pipeline.addLast(nettyClientHandler);
                    }
                });
        future = bootstrap.connect(host, port).sync();
        nettyClientHandler.getCountDownLatch().await();
        this.response = nettyClientHandler.getLwResponse();
        return this.response;
    }

    @PreDestroy
    public void close() {
        group.shutdownGracefully();
        future.channel().closeFuture().syncUninterruptibly();
    }

}
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    private LwResponse response = null;
    private LwRequest request;

    public NettyClientHandler(LwRequest request) {
        this.request = request;
    }


    public CountDownLatch getCountDownLatch() {
        return countDownLatch;
    }

    public LwResponse getLwResponse() {
        return this.response;
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("客戶端向客戶端發送消息");
        ctx.writeAndFlush(request);
        log.info("客戶端請求成功");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        LwResponse lwResponse = (LwResponse) msg;
        log.info("收到服務端的信息:{}", lwResponse.getResult());
        this.response = lwResponse;
        this.countDownLatch.countDown();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
                                Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}

在客戶端發送服務信息時,用LwQuest類進行封裝,返回的結果用LwResponse進行封裝,當客戶端讀取到伺服器端返回的響應時,在NettyClientHandler中進行處理,並利用CountDownLatch進行線程的阻塞和運行。
Netty服務端

@Component
@Slf4j
public class NettyServer {
    private EventLoopGroup boss = null;
    private EventLoopGroup worker = null;
    @Autowired
    private ServerHandler serverHandler;
    @Value("${server.address}")
    private String address;
    public void start() throws Exception {
        log.info("成功");
        boss = new NioEventLoopGroup();
        worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4));
                            pipeline.addLast(new LwRpcEncoder(LwResponse.class, new HessianSerializer()));
                            pipeline.addLast(new LwRpcDecoder(LwRequest.class, new HessianSerializer()));
                            pipeline.addLast(serverHandler);
                        }
                    });
            String[] strs = address.split(":");
            String addr = strs[0];
            int port = Integer.valueOf(strs[1]);
            ChannelFuture future = serverBootstrap.bind(addr, port).sync();
            future.channel().closeFuture().sync();
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }

    @PreDestroy
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        worker.shutdownGracefully().sync();
        log.info("關閉netty");
    }
}
@Component
@Slf4j
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<LwRequest> implements ApplicationContextAware {
    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, LwRequest msg) throws Exception {
        LwResponse lwResponse = new LwResponse();
        lwResponse.setRequestId(msg.getRequestId());
        log.info("從客戶端接收到請求信息:{}", msg);
        try {
            Object result = handler(msg);
            lwResponse.setResult(result);
        } catch (Throwable throwable) {
            lwResponse.setCause(throwable);
            throwable.printStackTrace();

        }
        channelHandlerContext.writeAndFlush(lwResponse);
    }

    private Object handler(LwRequest request) throws ClassNotFoundException, InvocationTargetException {

        Class<?> clazz = Class.forName(request.getClassName());
        Object serviceBean = applicationContext.getBean(clazz);
        Class<?> serviceClass = serviceBean.getClass();
        String methodName = request.getMethodName();
        log.info("獲取到的服務類:{}", serviceBean);
        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] paramethers = request.getParameters();
        // 使用CGLIB 反射
        FastClass fastClass = FastClass.create(serviceClass);
        FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);
        return fastMethod.invoke(serviceBean, paramethers);
    }
}

在Netty服務端中,會利用`serverHandler來處理從客戶端中接收的信息,並利用反射的思想調用本地的方法,並將處理的結構封裝在LwResponse中。

LwRequestLwRespnse要想在網路中進行傳輸,需要轉化為二進位轉換。具體方法如下:

public class HessianSerializer implements Serializer {
    @Override
    public byte[] serialize(Object object) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Hessian2Output output = new Hessian2Output(byteArrayOutputStream);
        output.writeObject(object);
        output.flush();
        return byteArrayOutputStream.toByteArray();
    }

    public <T> T deserialize(Class<T> clazz, byte[] bytes) throws IOException {
        Hessian2Input input = new Hessian2Input(new ByteArrayInputStream(bytes));
        return (T) input.readObject(clazz);
    }
}
public class LwRpcDecoder extends ByteToMessageDecoder {

    private Class<?> clazz;
    private Serializer serializer;

    public LwRpcDecoder(Class<?> clazz, Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }


    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if (byteBuf.readableBytes() < 4)
            return;
        byteBuf.markReaderIndex();
        int dataLength = byteBuf.readInt();
        if (dataLength < 0) {
            channelHandlerContext.close();
        }
        if (byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();
        }
        byte[] data = new byte[dataLength];
        byteBuf.readBytes(data);

        Object obj = serializer.deserialize(clazz, data);
        list.add(obj);
    }
}
public class LwRpcEncoder extends MessageToByteEncoder<Object> {
    private Class<?> clazz;
    private Serializer serializer;

    public LwRpcEncoder(Class<?> clazz, Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (clazz.isInstance(in)) {
            byte[] data = serializer.serialize(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }

    }

}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 攔截器 攔截器分同步攔截器和非同步攔截器; HandlerInterceptor 方法和執行時機 可以看DispathcerServlet的原來確定它的三個方法的執行時機; AsynHandlerInterceptor 看註釋,主要用來清理在併發環境加清理ThreadLocal的數據; Respons ...
  • 8、JSP 8.1、什麼是JSP Java Server Pages : Java伺服器端頁面,也和Servlet一樣,用於動態Web技術! 最大的特點: 寫JSP就像在寫HTML 區別: HTML只給用戶提供靜態的數據 JSP頁面中可以嵌入JAVA代碼,為用戶提供動態數據; 8.2、JSP原理 思 ...
  • 類載入與實例化 基本步驟 類裝載分為以下 5 個步驟: 載入:根據查找路徑找到相應的 class 文件然後導入 檢查:檢查載入的 class 文件的正確性 準備:給類中的靜態變數分配記憶體空間 解析:虛擬機將常量池中的符號引用替換成直接引用的過程。符號引用理解為一個標示,而直接引用直接指向記憶體中的地址 ...
  • 左傾堆,用於堆的快速合併。 規則: ① 節點的鍵值小於或等於它的左右子節點的鍵值。 ② 節點的左孩子的NPL >= 右孩子的NPL。 ③ 節點的NPL = 它的右孩子的NPL + 1。 測試文件 main.cpp: #include <iostream> #include "LeftistHeap. ...
  • 在最近一段時間里,通過搜集有關資料加上自己的理解,設計了一款輕量級RPC,起了一個名字 lightWeightRPC 。它擁有一個RPC常見的基本功能。主要功能和特點如下: 利用Spring實現依賴註入與參數配置 利用Netty來實現客戶端與服務端的遠程通信 利用Hessian來實現序列化 設置Zo ...
  • 在本版本中引入了SPI機制,關於Java的SPI機制與Dubbo的SPI機制在以前的文章中介紹過。 傳送門: "Dubbo的SPI機制與JDK機制的不同及原理分析" 因為設計的RPC框架是基於Spring的,時常會遇到依賴註入問題。Spring中也有SPI機制,但是它有有個缺點,就是在利用SPI機制 ...
  • 在前兩個版本中,每次發起請求一次就新建一個netty的channel連接,如果在高併發情況下就會造成資源的浪費,這時實現 非同步請求 就十分重要,當有多個請求線程時,需要設計一個 線程池 來進行管理。除此之外,當前方法過於依賴註冊中心,在高併發情況下對註冊中心造成了壓力;另外如果註冊中心出現宕機等情況 ...
  • 在上一個版本中利用netty實現了簡單的一對一的RPC,需要手動設置服務地址,限制性較大。 在本文中,利用zookeeper作為服務註冊中心,在服務端啟動時將本地的服務信息註冊到zookeeper中,當客戶端發起遠程服務調用時,先從zookeeper中獲取該服務的地址,然後根據獲得的這個地址來利用n ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...