輕量級RPC設計與實現第一版

来源:https://www.cnblogs.com/maratong/archive/2020/02/19/12333741.html
-Advertisement-
Play Games

什麼是RPC RPC (Remote Procedure Call Protocol), 遠程過程調用,通俗的解釋就是:客戶端在不知道調用細節的情況下,調用存在於遠程電腦上的某個對象,就像調用本地應用程式中的對象一樣,不需要瞭解底層網路技術的協議。 簡單的整體工作流程 請求端發送一個調用的數據包, ...


什麼是RPC

RPC (Remote Procedure Call Protocol), 遠程過程調用,通俗的解釋就是:客戶端在不知道調用細節的情況下,調用存在於遠程電腦上的某個對象,就像調用本地應用程式中的對象一樣,不需要瞭解底層網路技術的協議。

簡單的整體工作流程
請求端發送一個調用的數據包,該包中包含有調用標識,參數等協議要求的參數。當響應端接收到這個數據包,對應的程式被調起,然後返回結果數據包,返回的數據包含了和請求的數據包中同樣的請求標識,結果等。

性能影響因素

  1. 利用的網路協議。可以使用應用層協議,例如HTTP或者HTTP/2協議;也可以利用傳輸層協議,例如TCP協議,但是主流的RPC還沒有採用UDP傳輸協議。
  2. 消息封裝格式。選擇或設計一種協議來封裝信息進行組裝發送。比如,dubbo中消息體數據包含dubbo版本號、介面名稱、介面版本、方法名稱、參數類型列表、參數、附加信息等。
  3. 序列化。信息在網路傳輸中要以二進位格式進行傳輸。序列化和反序列化,是對象到而二進位數據的轉換。常見的序列化方法有JSON、Hessian、Protostuff等。
  4. 網路IO模型。可以採用非阻塞式同步IO,也可以在伺服器上實現對多路IO模型的支持。
  5. 線程管理方式。在高併發請求下,可以使用單個線程運行服務的具體實現,但是會出現請求阻塞等待現象。也可以為每一個RPC具體服務的實現開啟一個獨立的線程運行,最大線程數有限制,可以使用線程池來管理多個線程的分配和調度。

第一版RPC

第一個版本簡單實現了RPC的最基本功能,即服務信息的發送與接收序列化方式動態代理等。
項目利用Springboot來實現依賴註入與參數配置,使用netty實現NIO方式的數據傳輸,使用Hessian來實現對象序列化。
動態代理
這裡要提到代理模式,它的特征是代理類與委托類有同樣的介面,代理類主要負責為委托類預處理消息、過濾消息、把消息轉發給委托類,以及事後處理消息等。代理類與委托類之間通常會存在關聯關係。
根據創建代理類的時間點,又可以分為靜態代理和動態代理。
在以往的靜態代理中需要手動為每一個目標編寫對應的代理類。如果系統已經有了成百上千個類,工作量太大了。
靜態代理由程式員創建或特定工具自動生成源代碼,也就是在編譯時就已經將介面與被代理類,代理類等確定下來。在程式運行之前,代理類的.class文件就已經生成。
代理類在程式運行時創建的代理方式被稱為代理模式。在靜態代理中,代理類是自己定義好的,在運行之前就已經編譯完成了。而在動態代理中,可以很方便地對代理類的函數進行統一的處理,而不用修改每個代理類中的方法。可以通過InvocationHandler介面來實現。

客戶端的動態代理

public class ProxyFactory {
    public static <T> T create(Class<T> interfaceClass) throws Exception {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new LwRpcClientDynamicProxy<T>(interfaceClass));
    }
}
@Slf4j
public class LwRpcClientDynamicProxy<T> implements InvocationHandler {
    private Class<T> clazz;
    public LwRpcClientDynamicProxy(Class<T> clazz) throws Exception {
        this.clazz = clazz;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        LwRequest lwRequest = new LwRequest();
        String requestId = UUID.randomUUID().toString();
        String className = method.getDeclaringClass().getName();
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();

        lwRequest.setRequestId(requestId);
        lwRequest.setClassName(className);
        lwRequest.setMethodName(methodName);
        lwRequest.setParameterTypes(parameterTypes);
        lwRequest.setParameters(args);
        NettyClient nettyClient = new NettyClient("127.0.0.1", 8888);
        log.info("開始連接伺服器端:{}", new Date());
        LwResponse send = nettyClient.send(lwRequest);
        log.info("請求後返回的結果:{}", send.getResult());
        return send.getResult();
    }
}

在服務端會利用在客戶端獲取到的類名。參數等信息利用反射機制進行調用。

Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] paramethers = request.getParameters();
        // 使用CGLIB 反射
        FastClass fastClass = FastClass.create(serviceClass);
        FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);
        return fastMethod.invoke(serviceBean, paramethers);

Netty客戶端

@Slf4j
public class NettyClient  {
    private String host;
    private Integer port;
    private LwResponse response;
    private EventLoopGroup group;
    private ChannelFuture future = null;
    private Object obj = new Object();
    private NettyClientHandler nettyClientHandler;
    public NettyClient(String host, Integer port) {
        this.host = host;
        this.port = port;
    }


    public LwResponse send(LwRequest request) throws Exception{
        nettyClientHandler = new NettyClientHandler(request);
        group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4));
                        pipeline.addLast(new LwRpcEncoder(LwRequest.class, new HessianSerializer()));
                        pipeline.addLast(new LwRpcDecoder(LwResponse.class, new HessianSerializer()));
                        pipeline.addLast(nettyClientHandler);
                    }
                });
        future = bootstrap.connect(host, port).sync();
        nettyClientHandler.getCountDownLatch().await();
        this.response = nettyClientHandler.getLwResponse();
        return this.response;
    }

    @PreDestroy
    public void close() {
        group.shutdownGracefully();
        future.channel().closeFuture().syncUninterruptibly();
    }

}
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    private LwResponse response = null;
    private LwRequest request;

    public NettyClientHandler(LwRequest request) {
        this.request = request;
    }


    public CountDownLatch getCountDownLatch() {
        return countDownLatch;
    }

    public LwResponse getLwResponse() {
        return this.response;
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("客戶端向客戶端發送消息");
        ctx.writeAndFlush(request);
        log.info("客戶端請求成功");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        LwResponse lwResponse = (LwResponse) msg;
        log.info("收到服務端的信息:{}", lwResponse.getResult());
        this.response = lwResponse;
        this.countDownLatch.countDown();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
                                Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}

在客戶端發送服務信息時,用LwQuest類進行封裝,返回的結果用LwResponse進行封裝,當客戶端讀取到伺服器端返回的響應時,在NettyClientHandler中進行處理,並利用CountDownLatch進行線程的阻塞和運行。
Netty服務端

@Component
@Slf4j
public class NettyServer {
    private EventLoopGroup boss = null;
    private EventLoopGroup worker = null;
    @Autowired
    private ServerHandler serverHandler;
    @Value("${server.address}")
    private String address;
    public void start() throws Exception {
        log.info("成功");
        boss = new NioEventLoopGroup();
        worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4));
                            pipeline.addLast(new LwRpcEncoder(LwResponse.class, new HessianSerializer()));
                            pipeline.addLast(new LwRpcDecoder(LwRequest.class, new HessianSerializer()));
                            pipeline.addLast(serverHandler);
                        }
                    });
            String[] strs = address.split(":");
            String addr = strs[0];
            int port = Integer.valueOf(strs[1]);
            ChannelFuture future = serverBootstrap.bind(addr, port).sync();
            future.channel().closeFuture().sync();
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }

    @PreDestroy
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        worker.shutdownGracefully().sync();
        log.info("關閉netty");
    }
}
@Component
@Slf4j
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<LwRequest> implements ApplicationContextAware {
    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, LwRequest msg) throws Exception {
        LwResponse lwResponse = new LwResponse();
        lwResponse.setRequestId(msg.getRequestId());
        log.info("從客戶端接收到請求信息:{}", msg);
        try {
            Object result = handler(msg);
            lwResponse.setResult(result);
        } catch (Throwable throwable) {
            lwResponse.setCause(throwable);
            throwable.printStackTrace();

        }
        channelHandlerContext.writeAndFlush(lwResponse);
    }

    private Object handler(LwRequest request) throws ClassNotFoundException, InvocationTargetException {

        Class<?> clazz = Class.forName(request.getClassName());
        Object serviceBean = applicationContext.getBean(clazz);
        Class<?> serviceClass = serviceBean.getClass();
        String methodName = request.getMethodName();
        log.info("獲取到的服務類:{}", serviceBean);
        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] paramethers = request.getParameters();
        // 使用CGLIB 反射
        FastClass fastClass = FastClass.create(serviceClass);
        FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);
        return fastMethod.invoke(serviceBean, paramethers);
    }
}

在Netty服務端中,會利用`serverHandler來處理從客戶端中接收的信息,並利用反射的思想調用本地的方法,並將處理的結構封裝在LwResponse中。

LwRequestLwRespnse要想在網路中進行傳輸,需要轉化為二進位轉換。具體方法如下:

public class HessianSerializer implements Serializer {
    @Override
    public byte[] serialize(Object object) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Hessian2Output output = new Hessian2Output(byteArrayOutputStream);
        output.writeObject(object);
        output.flush();
        return byteArrayOutputStream.toByteArray();
    }

    public <T> T deserialize(Class<T> clazz, byte[] bytes) throws IOException {
        Hessian2Input input = new Hessian2Input(new ByteArrayInputStream(bytes));
        return (T) input.readObject(clazz);
    }
}
public class LwRpcDecoder extends ByteToMessageDecoder {

    private Class<?> clazz;
    private Serializer serializer;

    public LwRpcDecoder(Class<?> clazz, Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }


    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if (byteBuf.readableBytes() < 4)
            return;
        byteBuf.markReaderIndex();
        int dataLength = byteBuf.readInt();
        if (dataLength < 0) {
            channelHandlerContext.close();
        }
        if (byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();
        }
        byte[] data = new byte[dataLength];
        byteBuf.readBytes(data);

        Object obj = serializer.deserialize(clazz, data);
        list.add(obj);
    }
}
public class LwRpcEncoder extends MessageToByteEncoder<Object> {
    private Class<?> clazz;
    private Serializer serializer;

    public LwRpcEncoder(Class<?> clazz, Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (clazz.isInstance(in)) {
            byte[] data = serializer.serialize(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }

    }

}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 攔截器 攔截器分同步攔截器和非同步攔截器; HandlerInterceptor 方法和執行時機 可以看DispathcerServlet的原來確定它的三個方法的執行時機; AsynHandlerInterceptor 看註釋,主要用來清理在併發環境加清理ThreadLocal的數據; Respons ...
  • 8、JSP 8.1、什麼是JSP Java Server Pages : Java伺服器端頁面,也和Servlet一樣,用於動態Web技術! 最大的特點: 寫JSP就像在寫HTML 區別: HTML只給用戶提供靜態的數據 JSP頁面中可以嵌入JAVA代碼,為用戶提供動態數據; 8.2、JSP原理 思 ...
  • 類載入與實例化 基本步驟 類裝載分為以下 5 個步驟: 載入:根據查找路徑找到相應的 class 文件然後導入 檢查:檢查載入的 class 文件的正確性 準備:給類中的靜態變數分配記憶體空間 解析:虛擬機將常量池中的符號引用替換成直接引用的過程。符號引用理解為一個標示,而直接引用直接指向記憶體中的地址 ...
  • 左傾堆,用於堆的快速合併。 規則: ① 節點的鍵值小於或等於它的左右子節點的鍵值。 ② 節點的左孩子的NPL >= 右孩子的NPL。 ③ 節點的NPL = 它的右孩子的NPL + 1。 測試文件 main.cpp: #include <iostream> #include "LeftistHeap. ...
  • 在最近一段時間里,通過搜集有關資料加上自己的理解,設計了一款輕量級RPC,起了一個名字 lightWeightRPC 。它擁有一個RPC常見的基本功能。主要功能和特點如下: 利用Spring實現依賴註入與參數配置 利用Netty來實現客戶端與服務端的遠程通信 利用Hessian來實現序列化 設置Zo ...
  • 在本版本中引入了SPI機制,關於Java的SPI機制與Dubbo的SPI機制在以前的文章中介紹過。 傳送門: "Dubbo的SPI機制與JDK機制的不同及原理分析" 因為設計的RPC框架是基於Spring的,時常會遇到依賴註入問題。Spring中也有SPI機制,但是它有有個缺點,就是在利用SPI機制 ...
  • 在前兩個版本中,每次發起請求一次就新建一個netty的channel連接,如果在高併發情況下就會造成資源的浪費,這時實現 非同步請求 就十分重要,當有多個請求線程時,需要設計一個 線程池 來進行管理。除此之外,當前方法過於依賴註冊中心,在高併發情況下對註冊中心造成了壓力;另外如果註冊中心出現宕機等情況 ...
  • 在上一個版本中利用netty實現了簡單的一對一的RPC,需要手動設置服務地址,限制性較大。 在本文中,利用zookeeper作為服務註冊中心,在服務端啟動時將本地的服務信息註冊到zookeeper中,當客戶端發起遠程服務調用時,先從zookeeper中獲取該服務的地址,然後根據獲得的這個地址來利用n ...
一周排行
    -Advertisement-
    Play Games
  • 前言 在我們開發過程中基本上不可或缺的用到一些敏感機密數據,比如SQL伺服器的連接串或者是OAuth2的Secret等,這些敏感數據在代碼中是不太安全的,我們不應該在源代碼中存儲密碼和其他的敏感數據,一種推薦的方式是通過Asp.Net Core的機密管理器。 機密管理器 在 ASP.NET Core ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 順序棧的介面程式 目錄順序棧的介面程式頭文件創建順序棧入棧出棧利用棧將10進位轉16進位數驗證 頭文件 #include <stdio.h> #include <stdbool.h> #include <stdlib.h> 創建順序棧 // 指的是順序棧中的元素的數據類型,用戶可以根據需要進行修改 ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • C總結與剖析:關鍵字篇 -- <<C語言深度解剖>> 目錄C總結與剖析:關鍵字篇 -- <<C語言深度解剖>>程式的本質:二進位文件變數1.變數:記憶體上的某個位置開闢的空間2.變數的初始化3.為什麼要有變數4.局部變數與全局變數5.變數的大小由類型決定6.任何一個變數,記憶體賦值都是從低地址開始往高地 ...
  • 如果讓你來做一個有狀態流式應用的故障恢復,你會如何來做呢? 單機和多機會遇到什麼不同的問題? Flink Checkpoint 是做什麼用的?原理是什麼? ...
  • C++ 多級繼承 多級繼承是一種面向對象編程(OOP)特性,允許一個類從多個基類繼承屬性和方法。它使代碼更易於組織和維護,並促進代碼重用。 多級繼承的語法 在 C++ 中,使用 : 符號來指定繼承關係。多級繼承的語法如下: class DerivedClass : public BaseClass1 ...
  • 前言 什麼是SpringCloud? Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的開發便利性簡化了分散式系統的開發,比如服務註冊、服務發現、網關、路由、鏈路追蹤等。Spring Cloud 並不是重覆造輪子,而是將市面上開發得比較好的模塊集成進去,進行封裝,從 ...
  • class_template 類模板和函數模板的定義和使用類似,我們已經進行了介紹。有時,有兩個或多個類,其功能是相同的,僅僅是數據類型不同。類模板用於實現類所需數據的類型參數化 template<class NameType, class AgeType> class Person { publi ...
  • 目錄system v IPC簡介共用記憶體需要用到的函數介面shmget函數--獲取對象IDshmat函數--獲得映射空間shmctl函數--釋放資源共用記憶體實現思路註意 system v IPC簡介 消息隊列、共用記憶體和信號量統稱為system v IPC(進程間通信機制),V是羅馬數字5,是UNI ...