談談如何使用Netty開發實現高性能的RPC伺服器

来源:http://www.cnblogs.com/jietang/archive/2016/06/25/5615681.html
-Advertisement-
Play Games

RPC(Remote Procedure Call Protocol)遠程過程調用協議,它是一種通過網路,從遠程電腦程式上請求服務,而不必瞭解底層網路技術的協議。說的再直白一點,就是客戶端在不必知道調用細節的前提之下,調用遠程電腦上運行的某個對象,使用起來就像調用本地的對象一樣。目前典型的RPC ...


  RPC(Remote Procedure Call Protocol)遠程過程調用協議,它是一種通過網路,從遠程電腦程式上請求服務,而不必瞭解底層網路技術的協議。說的再直白一點,就是客戶端在不必知道調用細節的前提之下,調用遠程電腦上運行的某個對象,使用起來就像調用本地的對象一樣。目前典型的RPC實現框架有:Thrift(facebook開源)、Dubbo(alibaba開源)等等。RPC框架針對網路協議、網路I/O模型的封裝是透明的,對於調用的客戶端而言,它就認為自己在調用本地的一個對象。至於傳輸層上,運用的是TCP協議、UDP協議、亦或是HTTP協議,一概不關心。從網路I/O模型上來看,是基於select、poll、epoll方式、還是IOCP(I/O Completion Port)方式承載實現的,對於調用者而言也不用關心。

  目前,主流的RPC框架都支持跨語言調用,即有所謂的IDL(介面定義語言),其實,這個並不是RPC所必須要求的。如果你的RPC框架沒有跨語言的要求,IDL就可以不用包括了。

  最後,值得一提的是,衡量一個RPC框架性能的好壞與否,RPC的網路I/O模型的選擇,至關重要。在此基礎上,設計出來的RPC伺服器,可以考慮支持阻塞式同步IO、非阻塞式同步IO、當然還有所謂的多路復用IO模型、非同步IO模型。支持不同的網路IO模型,在高併發的狀態下,處理性能上會有很大的差別。還有一個衡量的標準,就是選擇的傳輸協議。是基於TCP協議、還是HTTP協議、還是UDP協議?對性能也有一定的影響。但是從我目前瞭解的情況來看,大多數RPC開源實現框架都是基於TCP、或者HTTP的,目測沒有採用UDP協議做為主要的傳輸協議的。

  明白了RPC的使用原理和性能要求。現在,我們能不能撇開那些RPC開源框架,自己動手開發一個高性能的RPC伺服器呢?我想,還是可以的。現在本人就使用Java,基於Netty,開發實現一個高性能的RPC伺服器。

  如何實現、基於什麼原理?併發處理性能如何?請繼續接著看下文。

  我們有的時候,為了提高單個節點的通信吞吐量,提高通信性能。如果是基於Java後端的,一般首選的是NIO框架(No-block IO)。但是問題也來了,Java的NIO掌握起來要相當的技術功底,和足夠的技術積累,使用起來才能得心應手。一般的開發人員,如果要使用NIO開發一個後端的TCP/HTTP伺服器,附帶考慮TCP粘包、網路通信異常、消息鏈接處理等等網路通信細節,開發門檻太高,所以比較明智的選擇是,採用業界主流的NIO框架進行伺服器後端開發。主流的NIO框架主要有Netty、Mina。它們主要都是基於TCP通信,非阻塞的IO、靈活的IO線程池而設計的,應對高併發請求也是綽綽有餘。隨著Netty、Mina這樣優秀的NIO框架,設計上日趨完善,Java後端高性能伺服器開發,在技術上提供了有力的支持保障,從而打破了C++在伺服器後端,一統天下的局面。因為在此之前,Java的NIO一直受人詬病,讓人敬而遠之!

  既然,這個RPC伺服器是基於Netty的,那就在說說Netty吧。實際上Netty是對JAVA NIO框架的再次封裝,它的開源網址是http://netty.io/,本文中使用的Netty版本是:4.0版本,可以通過http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2,進行下載使用。那也許你會問,如何使用Netty進行RPC伺服器的開發呢?實際不難,下麵我就簡單的說明一下技術原理:

  1、定義RPC請求消息、應答消息結構,裡面要包括RPC的介面定義模塊、包括遠程調用的類名、方法名稱、參數結構、參數值等信息。

  2、服務端初始化的時候通過容器載入RPC介面定義和RPC介面實現類對象的映射關係,然後等待客戶端發起調用請求。

  3、客戶端發起的RPC消息裡面包含,遠程調用的類名、方法名稱、參數結構、參數值等信息,通過網路,以位元組流的方式送給RPC服務端,RPC服務端接收到位元組流的請求之後,去對應的容器裡面,查找客戶端介面映射的具體實現對象。

  4、RPC服務端找到實現對象的參數信息,通過反射機制創建該對象的實例,並返回調用處理結果,最後封裝成RPC應答消息通知到客戶端。

  5、客戶端通過網路,收到位元組流形式的RPC應答消息,進行拆包、解析之後,顯示遠程調用結果。

  上面說的是很簡單,但是實現的時候,我們還要考慮如下的問題:

  1、RPC伺服器的傳輸層是基於TCP協議的,出現粘包咋辦?這樣客戶端的請求,服務端不是會解析失敗?好在Netty裡面已經提供瞭解決TCP粘包問題的解碼器:LengthFieldBasedFrameDecoder,可以靠它輕鬆搞定TCP粘包問題。

  2、Netty服務端的線程模型是單線程、多線程(一個線程負責客戶端連接,連接成功之後,丟給後端IO的線程池處理)、還是主從模式(客戶端連接、後端IO處理都是基於線程池的實現)。當然在這裡,我出於性能考慮,使用了Netty主從線程池模型。

  3、Netty的IO處理線程池,如果遇到非常耗時的業務,出現阻塞了咋辦?這樣不是很容易把後端的NIO線程給掛死、阻塞?本文的處理方式是,對於複雜的後端業務,分派到專門的業務線程池裡面,進行非同步回調處理。

  4、RPC消息的傳輸是通過位元組流在NIO的通道(Channel)之間傳輸,那具體如何實現呢?本文,是通過基於Java原生對象序列化機制的編碼、解碼器(ObjectEncoder、ObjectDecoder)進行實現的。當然出於性能考慮,這個可能不是最優的方案。更優的方案是把消息的編碼、解碼器,搞成可以配置實現的。具體比如可以通過:protobuf、JBoss Marshalling方式進行解碼和編碼,以提高網路消息的傳輸效率。

  5、RPC伺服器要考慮多線程、高併發的使用場景,所以線程安全是必須的。此外儘量不要使用synchronized進行加鎖,改用輕量級的ReentrantLock方式進行代碼塊的條件加鎖。比如本文中的RPC消息處理回調,就有這方面的使用。

  6、RPC服務端的服務介面對象和服務介面實現對象要能輕易的配置,輕鬆進行載入、卸載。在這裡,本文是通過Spring容器進行統一的對象管理。

  綜上所述,本文設計的RPC伺服器調用的流程圖如下所示:

     

  客戶端併發發起RPC調用請求,然後RPC服務端使用Netty連接器,分派出N個NIO連接線程,這個時候Netty連接器的任務結束。然後NIO連接線程是統一放到Netty NIO處理線程池進行管理,這個線程池裡面會對具體的RPC請求連接進行消息編碼、消息解碼、消息處理等等一系列操作。最後進行消息處理(Handler)的時候,處於性能考慮,這裡的設計是,直接把複雜的消息處理過程,丟給專門的RPC業務處理線程池集中處理,然後Handler對應的NIO線程就立即返回、不會阻塞。這個時候RPC調用結束,客戶端會非同步等待服務端消息的處理結果,本文是通過消息回調機制實現(MessageCallBack)。

  再來說一說Netty對於RPC消息的解碼、編碼、處理對應的模塊和流程,具體如下圖所示:

   

  從上圖可以看出客戶端、服務端對RPC消息編碼、解碼、處理調用的模塊以及調用順序了。Netty就是把這樣一個一個的處理器串在一起,形成一個責任鏈,統一進行調用。

  說了這麼多,現在先簡單看下,我設計實現的NettyRPC的代碼目錄層級結構:

     

  其中newlandframework.netty.rpc.core包是NettyRPC的核心實現。newlandframework.netty.rpc.model包裡面,則封裝了RPC消息請求、應答報文結構,以及RPC服務介面與實現綁定關係的容器定義。newlandframework.netty.rpc.config裡面定義了NettyRPC的服務端文件配置屬性。

  下麵先來看下newlandframework.netty.rpc.model包中定義的內容。具體是RPC消息請求、應答消息的結構定義:

  RPC請求消息結構

/**
 * @filename:MessageRequest.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:rpc服務請求結構
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.model;

import java.io.Serializable;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;

public class MessageRequest implements Serializable {

    private String messageId;
    private String className;
    private String methodName;
    private Class<?>[] typeParameters;
    private Object[] parametersVal;

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] gettypeParameters() {
        return typeParameters;
    }

    public void settypeParameters(Class<?>[] typeParameters) {
        this.typeParameters = typeParameters;
    }

    public Object[] getParameters() {
        return parametersVal;
    }

    public void setParameters(Object[] parametersVal) {
        this.parametersVal = parametersVal;
    }

    public String toString() {
        return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
                .append("messageId", messageId).append("className", className)
                .append("methodName", methodName).toString();
    }
}

  RPC應答消息結構

/**
 * @filename:MessageResponse.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:rpc服務應答結構
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.model;

import java.io.Serializable;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;

public class MessageResponse implements Serializable {

    private String messageId;
    private String error;
    private Object resultDesc;

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

    public String getError() {
        return error;
    }

    public void setError(String error) {
        this.error = error;
    }

    public Object getResult() {
        return resultDesc;
    }

    public void setResult(Object resultDesc) {
        this.resultDesc = resultDesc;
    }

    public String toString() {
        return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
                .append("messageId", messageId).append("error", error).toString();
    }
}

  RPC服務介面定義、服務介面實現綁定關係容器定義,提供給spring作為容器使用。

/**
 * @filename:MessageKeyVal.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:rpc服務映射容器
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.model;

import java.util.Map;

public class MessageKeyVal {

    private Map<String, Object> messageKeyVal;

    public void setMessageKeyVal(Map<String, Object> messageKeyVal) {
        this.messageKeyVal = messageKeyVal;
    }

    public Map<String, Object> getMessageKeyVal() {
        return messageKeyVal;
    }
}

  好了,定義好核心模型結構之後,現在再向大家展示一下NettyRPC核心包:newlandframework.netty.rpc.core的關鍵部分實現代碼,首先是業務線程池相關類的實現代碼,具體如下:

  線程工廠定義實現

/**
 * @filename:NamedThreadFactory.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:線程工廠
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class NamedThreadFactory implements ThreadFactory {

    private static final AtomicInteger threadNumber = new AtomicInteger(1);

    private final AtomicInteger mThreadNum = new AtomicInteger(1);

    private final String prefix;

    private final boolean daemoThread;

    private final ThreadGroup threadGroup;

    public NamedThreadFactory() {
        this("rpcserver-threadpool-" + threadNumber.getAndIncrement(), false);
    }

    public NamedThreadFactory(String prefix) {
        this(prefix, false);
    }

    public NamedThreadFactory(String prefix, boolean daemo) {
        this.prefix = prefix + "-thread-";
        daemoThread = daemo;
        SecurityManager s = System.getSecurityManager();
        threadGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
    }

    public Thread newThread(Runnable runnable) {
        String name = prefix + mThreadNum.getAndIncrement();
        Thread ret = new Thread(threadGroup, runnable, name, 0);
        ret.setDaemon(daemoThread);
        return ret;
    }

    public ThreadGroup getThreadGroup() {
        return threadGroup;
    }
}

  業務線程池定義實現

/**
 * @filename:RpcThreadPool.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:rpc線程池封裝
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RpcThreadPool {

    //獨立出線程池主要是為了應對複雜耗I/O操作的業務,不阻塞netty的handler線程而引入
    //當然如果業務足夠簡單,把處理邏輯寫入netty的handler(ChannelInboundHandlerAdapter)也未嘗不可
    public static Executor getExecutor(int threads, int queues) {
        String name = "RpcThreadPool";
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>()
                        : (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
    }
}
/**
 * @filename:AbortPolicyWithReport.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:線程池異常策略
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

    private final String threadName;

    public AbortPolicyWithReport(String threadName) {
        this.threadName = threadName;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("RpcServer["
                + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d),"
                + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)]",
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
        System.out.println(msg);
        throw new RejectedExecutionException(msg);
    }
}

  RPC調用客戶端定義實現

/**
 * @filename:MessageSendExecutor.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc客戶端執行模塊
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import java.lang.reflect.Proxy;

public class MessageSendExecutor {

    private RpcServerLoader loader = RpcServerLoader.getInstance();

    public MessageSendExecutor(String serverAddress) {
        loader.load(serverAddress);
    }

    public void stop() {
        loader.unLoad();
    }

    public static <T> T execute(Class<T> rpcInterface) {
        return (T) Proxy.newProxyInstance(
                rpcInterface.getClassLoader(),
                new Class<?>[]{rpcInterface},
                new MessageSendProxy<T>(rpcInterface)
        );
    }
}

  這裡的RPC客戶端實際上,是動態代理了MessageSendProxy,當然這裡是應用了,JDK原生的動態代理實現,你還可以改成CGLIB(Code Generation Library)方式。不過本人測試了一下CGLIB方式,在高併發的情況下麵會出現空指針異常,但是同樣的情況,JDK原生的動態代理卻沒有問題。併發程度不高的情況下麵,兩種代理方式都運行正常。後續再深入研究看看吧!廢話不說了,現在給出MessageSendProxy的實現方式

/**
 * @filename:MessageSendProxy.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc客戶端消息處理
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.UUID;
import newlandframework.netty.rpc.model.MessageRequest;

public class MessageSendProxy<T> implements InvocationHandler {

    private Class<T> cls;

    public MessageSendProxy(Class<T> cls) {
        this.cls = cls;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        MessageRequest request = new MessageRequest();
        request.setMessageId(UUID.randomUUID().toString());
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.settypeParameters(method.getParameterTypes());
        request.setParameters(args);

        MessageSendHandler handler = RpcServerLoader.getInstance().getMessageSendHandler();
        MessageCallBack callBack = handler.sendRequest(request);
        return callBack.start();
    }
}

  進一步發現MessageSendProxy其實是把消息發送給RpcServerLoader模塊,它的代碼如下:

/**
 * @filename:RpcServerLoader.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:rpc伺服器配置載入
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadPoolExecutor;

public class RpcServerLoader {

    private volatile static RpcServerLoader rpcServerLoader;
    private final static String DELIMITER = ":";

    //方法返回到Java虛擬機的可用的處理器數量
    private final static int parallel = Runtime.getRuntime().availableProcessors() * 2;
    //netty nio線程池
    private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(parallel);
    private static ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1);
    private MessageSendHandler messageSendHandler = null;

    private RpcServerLoader() {
    }

    //併發雙重鎖定
    public static RpcServerLoader getInstance() {
        if (rpcServerLoader == null) {
            synchronized (RpcServerLoader.class) {
                if (rpcServerLoader == null) {
                    rpcServerLoader = new RpcServerLoader();
                }
            }
        }
        return rpcServerLoader;
    }

    public void load(String serverAddress) {
        String[] ipAddr = serverAddress.split(RpcServerLoader.DELIMITER);
        if (ipAddr.length == 2) {
            String host = ipAddr[0];
            int port = Integer.parseInt(ipAddr[1]);
            final InetSocketAddress remoteAddr = new InetSocketAddress(host, port);

            threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, this));
        }
    }

    public void setMessageSendHandler(MessageSendHandler messageInHandler) {
        this.messageSendHandler = messageInHandler;
    }

    public MessageSendHandler getMessageSendHandler() {
        return messageSendHandler;
    }

    public void unLoad() {
        messageSendHandler.close();
        threadPoolExecutor.shutdown();
        eventLoopGroup.shutdownGracefully();
    }
}

  好了,現在一次性給出RPC客戶端消息編碼、解碼、處理的模塊實現代碼。

/**
 * @filename:MessageSendInitializeTask.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc客戶端線程任務處理
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;

public class MessageSendInitializeTask implements Runnable {

    private EventLoopGroup eventLoopGroup = null;
    private InetSocketAddress serverAddress = null;
    private RpcServerLoader loader = null;

    MessageSendInitializeTask(EventLoopGroup eventLoopGroup, InetSocketAddress serverAddress, RpcServerLoader loader) {
        this.eventLoopGroup = eventLoopGroup;
        this.serverAddress = serverAddress;
        this.loader = loader;
    }

    public void run() {
        Bootstrap b = new Bootstrap();
        b.group(eventLoopGroup)
                .channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);
        b.handler(new MessageSendChannelInitializer());

        ChannelFuture channelFuture = b.connect(serverAddress);
        channelFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(final ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    MessageSendHandler handler = channelFuture.channel().pipeline().get(MessageSendHandler.class);
                    MessageSendInitializeTask.this.loader.setMessageSendHandler(handler);
                }
            }
        });
    }
}
/**
 * @filename:MessageSendChannelInitializer.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc客戶端管道初始化
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class MessageSendChannelInitializer extends ChannelInitializer<SocketChannel> {

    //ObjectDecoder 底層預設繼承半包解碼器LengthFieldBasedFrameDecoder處理粘包問題的時候,
    //消息頭開始即為長度欄位,占據4個位元組。這裡出於保持相容的考慮
    final public static int MESSAGE_LENGTH = 4;

    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        //ObjectDecoder的基類半包解碼器LengthFieldBasedFrameDecoder的報文格式保持相容。因為底層的父類LengthFieldBasedFrameDecoder
        //的初始化參數即為super(maxObjectSize, 0, 4, 0, 4);
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageSendChannelInitializer.MESSAGE_LENGTH, 0, MessageSendChannelInitializer.MESSAGE_LENGTH));
        //利用LengthFieldPrepender回填補充ObjectDecoder消息報文頭
        pipeline.addLast(new LengthFieldPrepender(MessageSendChannelInitializer.MESSAGE_LENGTH));
        pipeline.addLast(new ObjectEncoder());
        //考慮到併發性能,採用weakCachingConcurrentResolver緩存策略。一般情況使用:cacheDisabled即可
        pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
        pipeline.addLast(new MessageSendHandler());
    }
}
/**
 * @filename:MessageSendHandler.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc客戶端處理模塊
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;

public class MessageSendHandler extends ChannelInboundHandlerAdapter {

    private ConcurrentHashMap<String, MessageCallBack> mapCallBack = new ConcurrentHashMap<String, MessageCallBack>();

    private volatile Channel channel;
    private SocketAddress remoteAddr;

    public Channel getChannel() {
        return channel;
    }

    public SocketAddress getRemoteAddr() {
        return remoteAddr;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.remoteAddr = this.channel.remoteAddress();
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        this.channel = ctx.channel();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        MessageResponse response = (MessageResponse) msg;
        String messageId = response.getMessageId();
        MessageCallBack callBack = mapCallBack.get(messageId);
        if (callBack != null) {
            mapCallBack.remove(messageId);
            callBack.over(response);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    public void close() {
        channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    public MessageCallBack sendRequest(MessageRequest request) {
        MessageCallBack callBack = new MessageCallBack(request);
        mapCallBack.put(request.getMessageId(), callBack);
        channel.writeAndFlush(request);
        return callBack;
    }
}

  最後給出RPC服務端的實現。首先是通過spring自動載入RPC服務介面、介面實現容器綁定載入,初始化Netty主/從線程池等操作,具體是通過MessageRecvExecutor模塊實現的,現在給出實現代碼:

/**
 * @filename:MessageRecvExecutor.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc伺服器執行模塊
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level;
import newlandframework.netty.rpc.model.MessageKeyVal;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class MessageRecvExecutor implements ApplicationContextAware, InitializingBean {

    private String serverAddress;
    private final static String DELIMITER = ":";

    private Map<String, Object> handlerMap = new ConcurrentHashMap<String, Object>();

    private static ThreadPoolExecutor threadPoolExecutor;

    public MessageRecvExecutor(String serverAddress) {
        this.serverAddress = serverAddress;
    }

    public static void submit(Runnable task) {
        if (threadPoolExecutor == null) {
            synchronized (MessageRecvExecutor.class) {
                if (threadPoolExecutor == null) {
                    threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1);
                }
            }
        }
        threadPoolExecutor.submit(task);
    }

    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        try {
            MessageKeyVal keyVal = (MessageKeyVal) ctx.getBean(Class.forName("newlandframework.netty.rpc.model.MessageKeyVal"));
            Map<String, Object> rpcServiceObject = keyVal.getMessageKeyVal();

            Set s = rpcServiceObject.entrySet();
            Iterator<Map.Entry<String, Object>> it = s.iterator();
            Map.Entry<String, Object> entry;

            while (it.hasNext()) {
                entry = it.next();
                handlerMap.put(entry.getKey(), entry.getValue());
            }
        } catch (ClassNotFoundException ex) {
            java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void afterPropertiesSet() throws Exception {
        //netty的線程池模型設置成主從線程池模式,這樣可以應對高併發請求
        //當然netty還支持單線程、多線程網路IO模型,可以根據業務需求靈活配置
        ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory");
        
        //方法返回到Java虛擬機的可用的處理器數量
        int parallel = Runtime.getRuntime().availableProcessors() * 2;
    
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup(parallel,threadRpcFactory,SelectorProvider.provider());
        
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
                    .childHandler(new MessageRecvChannelInitializer(handlerMap))
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            String[] ipAddr = serverAddress.split(MessageRecvExecutor.DELIMITER);

            if (ipAddr.length == 2) {
                String host = ipAddr[0];
                int port = Integer.parseInt(ipAddr[1]);
                ChannelFuture future = bootstrap.bind(host, port).sync();
                System.out.printf("[author tangjie] Netty RPC Server start success ip:%s port:%d\n", host, port);
                future.channel().closeFuture().sync();
            } else {
                System.out.printf("[author tangjie] Netty RPC Server start fail!\n");
            }
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}

  最後還是老規矩,給出RPC服務端消息編碼、解碼、處理的核心模塊代碼實現,具體如下:

/**
 * @filename:MessageRecvChannelInitializer.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc服務端管道初始化
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;

              
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、什麼是字典? 字典是Python語言中唯一的映射類型。 映射類型對象里哈希值(鍵,key)和指向的對象(值,value)是一對多的的關係,通常被認為是可變的哈希表。 字典對象是可變的,它是一個容器類型,能存儲任意個數的Python對象,其中也可包括其他容器類型。 字典類型與序列類型的區別: 1. ...
  • (期末考試快到了,所以比較粗糙,請各位讀者理解。。) 一、 概念 DBSCAN是一種產生劃分聚類的基於密度的聚類演算法,簇的個數由演算法自動地確定。低密度區域中的點被視為雜訊而忽略,因此DBSCAN不產生完全聚類。 二、 偽代碼 1 將所有點標記為核心點、邊界點和雜訊點。 2 刪除雜訊點。 3 為距離在 ...
  • 編寫一個簡單的留言簿,實現添加留言和顯示留言內容的功能 運行: ...
  • Akka可能很多人都沒有用過,也不知道是什麼,但如果說起Scala或Spark就有很多人都聽說過或使用過 ,這裡簡單說下三者的關係Akka是使用Scala開發的,Spark中使用了Akka作為其消息的通信工具;這篇文章主要 說說Akka的一些特性,做個簡要的介紹; 要說Akka首先要從 併發 開始說 ...
  • 大數據已經成為一種發展趨勢,得到越來越多的公司參與。最近從事大數據系統設計開發和推薦引擎方面的工作,分幾篇文章兩個系列把自己的心得記錄一下, 和大家分享一下大數據方面的經驗。 整個平臺包括監控系統、日誌分析系統、推薦系統。按數據業務步驟劃分,分為數據採集、清洗、存儲、分析和服務。整個數據流程如下圖: ...
  • 1.意圖 使多個對象都有機會處理請求,從而避免請求的發送者和接收者之間的耦合關係。將這些對象連成一條鏈,並沿著這條鏈傳遞該請求,直到有一個對象處理它為止。 2.動機 給多個對象處理一個請求的機會,從而解耦發送者和接收者。 3.適用性 有多個的對象可以處理一個請求,哪個對象處理該請求運行時刻自動確定。 ...
  • 第一次完整看一遍(JavaScript設計模式)該模式的介紹,感覺這不就是繼承而已嗎,只不過可能是部分繼承。 混入(Mixin)模式 定義: Mixin是可以輕鬆被一個子類或一組子類繼承功能的類,目的是函數復用。繼承Mixin是擴展功能的方式,另外也可能從多個Mixin類進行繼承。 繼承方式: 這個 ...
  • 最近公司作為眾多外部廠商之一,需要依托一個大型平臺系統( 這裡簡稱為Big-S) 給特定用戶提供一些服務。 作為外部廠商開發的 Web 應用(這裡簡稱 Small-S),需要提取 Big-S 中的基礎數據,包括用戶、組織結構、代碼表......部分欄位到本地數據表中。 融合 Small-S 自己特點 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...