基於Netty打造RPC伺服器設計經驗談

来源:http://www.cnblogs.com/jietang/archive/2016/10/20/5983038.html
-Advertisement-
Play Games

自從在園子里,發表了兩篇如何基於Netty構建RPC伺服器的文章:談談如何使用Netty開發實現高性能的RPC伺服器、Netty實現高性能RPC伺服器優化篇之消息序列化 之後,收到了很多同行、園友們熱情的反饋和若幹個優化建議,於是利用閑暇時間,打算對原來NettyRPC中不合理的模塊進行重構,並且增 ...


  自從在園子里,發表了兩篇如何基於Netty構建RPC伺服器的文章:談談如何使用Netty開發實現高性能的RPC伺服器Netty實現高性能RPC伺服器優化篇之消息序列化 之後,收到了很多同行、園友們熱情的反饋和若幹個優化建議,於是利用閑暇時間,打算對原來NettyRPC中不合理的模塊進行重構,並且增強了一些特性,主要的優化點如下:

  1. 在原來編碼解碼器:JDK原生的對象序列化方式、kryo、hessian,新增了:protostuff。
  2. 優化了NettyRPC服務端的線程池模型,支持LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue,並擴展了多個線程池任務處理策略。
  3. RPC服務啟動、註冊、卸載支持,通過Spring中自定義的nettyrpc標簽進行統一管理。

  現在重點整理一下重構思路、經驗,記錄下來。對應源代碼代碼,大家可以查看我的開源github:https://github.com/tang-jie/NettyRPC 項目中的NettyRPC 2.0目錄。

  在最早的NettyRPC消息編解碼插件中,我使用的是:JDK原生的對象序列化(ObjectOutputStream/ObjectInputStream)、Kryo、Hessian這三種方式,後續有園友向我提議,可以引入Protostuff序列化方式。經過查閱網路的相關資料,Protostuff基於Google protobuf,但是提供了更多的功能和更簡易的用法。原生的protobuff是需要數據結構的預編譯過程,需要編寫.proto格式的配置文件,再通過protobuf提供的工具翻譯成目標語言代碼,而Protostuff則省略了這個預編譯的過程。以下是Java主流序列化框架的性能測試結果(圖片來自網路):

  

  可以發現,Protostuff序列化確實是一種很高效的序列化框架,相比起其他主流的序列化、反序列化框架,其序列化性能可見一斑。如果用它來進行RPC消息的編碼、解碼工作,再合適不過了。現在貼出具體的Protostuff序列化編解碼器的實現代碼。

  首先是定義Schema,這個是因為Protostuff-Runtime實現了無需預編譯對java bean進行protobuf序列化/反序列化的能力。我們可以把運行時的Schema緩存起來,提高序列化性能。具體實現類SchemaCache代碼如下:

package com.newlandframework.rpc.serialize.protostuff;

import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
 * @author tangjie<https://github.com/tang-jie>
 * @filename:SchemaCache.java
 * @description:SchemaCache功能模塊
 * @blogs http://www.cnblogs.com/jietang/
 * @since 2016/10/7
 */
public class SchemaCache {
    private static class SchemaCacheHolder {
        private static SchemaCache cache = new SchemaCache();
    }

    public static SchemaCache getInstance() {
        return SchemaCacheHolder.cache;
    }

    private Cache<Class<?>, Schema<?>> cache = CacheBuilder.newBuilder()
            .maximumSize(1024).expireAfterWrite(1, TimeUnit.HOURS)
            .build();

    private Schema<?> get(final Class<?> cls, Cache<Class<?>, Schema<?>> cache) {
        try {
            return cache.get(cls, new Callable<RuntimeSchema<?>>() {
                public RuntimeSchema<?> call() throws Exception {
                    return RuntimeSchema.createFrom(cls);
                }
            });
        } catch (ExecutionException e) {
            return null;
        }
    }

    public Schema<?> get(final Class<?> cls) {
        return get(cls, cache);
    }
}

  然後定義真正的Protostuff序列化、反序列化類,它實現了RpcSerialize介面的方法:

package com.newlandframework.rpc.serialize.protostuff;

import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;

import java.io.InputStream;
import java.io.OutputStream;

import com.newlandframework.rpc.model.MessageRequest;
import com.newlandframework.rpc.model.MessageResponse;
import com.newlandframework.rpc.serialize.RpcSerialize;

import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;

/**
 * @author tangjie<https://github.com/tang-jie>
 * @filename:ProtostuffSerialize.java
 * @description:ProtostuffSerialize功能模塊
 * @blogs http://www.cnblogs.com/jietang/
 * @since 2016/10/7
 */
public class ProtostuffSerialize implements RpcSerialize {
    private static SchemaCache cachedSchema = SchemaCache.getInstance();
    private static Objenesis objenesis = new ObjenesisStd(true);
    private boolean rpcDirect = false;

    public boolean isRpcDirect() {
        return rpcDirect;
    }

    public void setRpcDirect(boolean rpcDirect) {
        this.rpcDirect = rpcDirect;
    }

    private static <T> Schema<T> getSchema(Class<T> cls) {
        return (Schema<T>) cachedSchema.get(cls);
    }

    public Object deserialize(InputStream input) {
        try {
            Class cls = isRpcDirect() ? MessageRequest.class : MessageResponse.class;
            Object message = (Object) objenesis.newInstance(cls);
            Schema<Object> schema = getSchema(cls);
            ProtostuffIOUtil.mergeFrom(input, message, schema);
            return message;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    public void serialize(OutputStream output, Object object) {
        Class cls = (Class) object.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema schema = getSchema(cls);
            ProtostuffIOUtil.writeTo(output, object, schema, buffer);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }
}

  同樣為了提高Protostuff序列化/反序列化類的利用效率,我們可以對其進行池化處理,而不要頻繁的創建、銷毀對象。現在給出Protostuff池化處理類:ProtostuffSerializeFactory、ProtostuffSerializePool的實現代碼:

package com.newlandframework.rpc.serialize.protostuff;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;

/**
 * @author tangjie<https://github.com/tang-jie>
 * @filename:ProtostuffSerializeFactory.java
 * @description:ProtostuffSerializeFactory功能模塊
 * @blogs http://www.cnblogs.com/jietang/
 * @since 2016/10/7
 */
public class ProtostuffSerializeFactory extends BasePooledObjectFactory<ProtostuffSerialize> {

    public ProtostuffSerialize create() throws Exception {
        return createProtostuff();
    }

    public PooledObject<ProtostuffSerialize> wrap(ProtostuffSerialize hessian) {
        return new DefaultPooledObject<ProtostuffSerialize>(hessian);
    }

    private ProtostuffSerialize createProtostuff() {
        return new ProtostuffSerialize();
    }
}
package com.newlandframework.rpc.serialize.protostuff;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

/**
 * @author tangjie<https://github.com/tang-jie>
 * @filename:ProtostuffSerializePool.java
 * @description:ProtostuffSerializePool功能模塊
 * @blogs http://www.cnblogs.com/jietang/
 * @since 2016/10/7
 */
public class ProtostuffSerializePool {

    private GenericObjectPool<ProtostuffSerialize> ProtostuffPool;
    volatile private static ProtostuffSerializePool poolFactory = null;

    private ProtostuffSerializePool() {
        ProtostuffPool = new GenericObjectPool<ProtostuffSerialize>(new ProtostuffSerializeFactory());
    }

    public static ProtostuffSerializePool getProtostuffPoolInstance() {
        if (poolFactory == null) {
            synchronized (ProtostuffSerializePool.class) {
                if (poolFactory == null) {
                    poolFactory = new ProtostuffSerializePool();
                }
            }
        }
        return poolFactory;
    }

    public ProtostuffSerializePool(final int maxTotal, final int minIdle, final long maxWaitMillis, final long minEvictableIdleTimeMillis) {
        ProtostuffPool = new GenericObjectPool<ProtostuffSerialize>(new ProtostuffSerializeFactory());

        GenericObjectPoolConfig config = new GenericObjectPoolConfig();

        config.setMaxTotal(maxTotal);
        config.setMinIdle(minIdle);
        config.setMaxWaitMillis(maxWaitMillis);
        config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);

        ProtostuffPool.setConfig(config);
    }

    public ProtostuffSerialize borrow() {
        try {
            return getProtostuffPool().borrowObject();
        } catch (final Exception ex) {
            ex.printStackTrace();
            return null;
        }
    }

    public void restore(final ProtostuffSerialize object) {
        getProtostuffPool().returnObject(object);
    }

    public GenericObjectPool<ProtostuffSerialize> getProtostuffPool() {
        return ProtostuffPool;
    }
}

  現在有了Protostuff池化處理類,我們就通過它來實現NettyRPC的編碼、解碼介面,達到對RPC消息編碼、解碼的目的。首先是Protostuff方式實現的RPC解碼器代碼:

package com.newlandframework.rpc.serialize.protostuff;

import com.newlandframework.rpc.serialize.MessageCodecUtil;
import com.newlandframework.rpc.serialize.MessageDecoder;

/**
 * @author tangjie<https://github.com/tang-jie>
 * @filename:ProtostuffDecoder.java
 * @description:ProtostuffDecoder功能模塊
 * @blogs http://www.cnblogs.com/jietang/
 * @since 2016/10/7
 */
public class ProtostuffDecoder extends MessageDecoder {

    public ProtostuffDecoder(MessageCodecUtil util) {
        super(util);
    }
}

  然後是Protostuff方式實現的RPC編碼器代碼:

package com.newlandframework.rpc.serialize.protostuff;

import com.newlandframework.rpc.serialize.MessageCodecUtil;
import com.newlandframework.rpc.serialize.MessageEncoder;

/**
 * @author tangjie<https://github.com/tang-jie>
 * @filename:ProtostuffEncoder.java
 * @description:ProtostuffEncoder功能模塊
 * @blogs http://www.cnblogs.com/jietang/
 * @since 2016/10/7
 */
public class ProtostuffEncoder extends MessageEncoder {

    public ProtostuffEncoder(MessageCodecUtil util) {
        super(util);
    }
}

  最後重構出Protostuff方式的RPC編碼、解碼器工具類ProtostuffCodecUtil的實現代碼:

package com.newlandframework.rpc.serialize.protostuff;

import com.google.common.io.Closer;
import com.newlandframework.rpc.serialize.MessageCodecUtil;
import io.netty.buffer.ByteBuf;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

/**
 * @author tangjie<https://github.com/tang-jie>
 * @filename:ProtostuffCodecUtil.java
 * @description:ProtostuffCodecUtil功能模塊
 * @blogs http://www.cnblogs.com/jietang/
 * @since 2016/10/7
 */
public class ProtostuffCodecUtil implements MessageCodecUtil {
    private static Closer closer = Closer.create();
    private ProtostuffSerializePool pool = ProtostuffSerializePool.getProtostuffPoolInstance();
    private boolean rpcDirect = false;

    public boolean isRpcDirect() {
        return rpcDirect;
    }

    public void setRpcDirect(boolean rpcDirect) {
        this.rpcDirect = rpcDirect;
    }

    public void encode(final ByteBuf out, final Object message) throws IOException {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            closer.register(byteArrayOutputStream);
            ProtostuffSerialize protostuffSerialization = pool.borrow();
            protostuffSerialization.serialize(byteArrayOutputStream, message);
            byte[] body = byteArrayOutputStream.toByteArray();
            int dataLength = body.length;
            out.writeInt(dataLength);
            out.writeBytes(body);
            pool.restore(protostuffSerialization);
        } finally {
            closer.close();
        }
    }

    public Object decode(byte[] body) throws IOException {
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
            closer.register(byteArrayInputStream);
            ProtostuffSerialize protostuffSerialization = pool.borrow();
            protostuffSerialization.setRpcDirect(rpcDirect);
            Object obj = protostuffSerialization.deserialize(byteArrayInputStream);
            pool.restore(protostuffSerialization);
            return obj;
        } finally {
            closer.close();
        }
    }
}

  這樣就使得NettyRPC的消息序列化又多了一種方式,進一步增強了其RPC消息網路傳輸的能力。

  其次是優化了NettyRPC服務端的線程模型,使得RPC消息處理線程池對任務的隊列容器的支持更加多樣。具體RPC非同步處理線程池RpcThreadPool的代碼如下:

package com.newlandframework.rpc.parallel;

import com.newlandframework.rpc.core.RpcSystemConfig;
import com.newlandframework.rpc.parallel.policy.AbortPolicy;
import com.newlandframework.rpc.parallel.policy.BlockingPolicy;
import com.newlandframework.rpc.parallel.policy.CallerRunsPolicy;
import com.newlandframework.rpc.parallel.policy.DiscardedPolicy;
import com.newlandframework.rpc.parallel.policy.RejectedPolicy;
import com.newlandframework.rpc.parallel.policy.RejectedPolicyType;

import java.util.concurrent.Executor;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.RejectedExecutionHandler;

/**
 * @author tangjie<https://github.com/tang-jie>
 * @filename:RpcThreadPool.java
 * @description:RpcThreadPool功能模塊
 * @blogs http://www.cnblogs.com/jietang/
 * @since 2016/10/7
 */
public class RpcThreadPool {

    private static RejectedExecutionHandler createPolicy() {
        RejectedPolicyType rejectedPolicyType = RejectedPolicyType.fromString(System.getProperty(RpcSystemConfig.SystemPropertyThreadPoolRejectedPolicyAttr, "AbortPolicy"));

        switch (rejectedPolicyType) {
            case BLOCKING_POLICY:
                return new BlockingPolicy();
            case CALLER_RUNS_POLICY:
                return new CallerRunsPolicy();
            case ABORT_POLICY:
                return new AbortPolicy();
            case REJECTED_POLICY:
                return new RejectedPolicy();
            case DISCARDED_POLICY:
                return new DiscardedPolicy();
        }

        return null;
    }

    private static BlockingQueue<Runnable> createBlockingQueue(int queues) {
        BlockingQueueType queueType = BlockingQueueType.fromString(System.getProperty(RpcSystemConfig.SystemPropertyThreadPoolQueueNameAttr, "LinkedBlockingQueue"));

        switch (queueType) {
            case LINKED_BLOCKING_QUEUE:
                return new LinkedBlockingQueue<Runnable>();
            case ARRAY_BLOCKING_QUEUE:
                return new ArrayBlockingQueue<Runnable>(RpcSystemConfig.PARALLEL * queues);
            case SYNCHRONOUS_QUEUE:
                return new SynchronousQueue<Runnable>();
        }

        return null;
    }

    public static Executor getExecutor(int threads, int queues) {
        String name = "RpcThreadPool";
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                createBlockingQueue(queues),
                new NamedThreadFactory(name, true), createPolicy());
    }
}

  其中創建線程池方法getExecutor是依賴JDK自帶的線程ThreadPoolExecutor的實現,參考JDK的幫助文檔,可以發現其中的一種ThreadPoolExecutor構造方法重載實現的版本:

  參數的具體含義如下:

  • corePoolSize是線程池保留大小。
  • maximumPoolSize是線程池最大線程大小。
  • keepAliveTime是指空閑(idle)線程結束的超時時間。
  • unit用來指定keepAliveTime對應的時間單位,諸如:毫秒、秒、分鐘、小時、天 等等。
  • workQueue用來存放待處理的任務隊列。
  • handler用來具體指定,當任務隊列填滿、並且線程池最大線程大小也達到的情形下,線程池的一些應對措施策略。

  NettyRPC的線程池支持的任務隊列類型主要有以下三種:

  1. LinkedBlockingQueue:採用鏈表方式實現的無界任務隊列,當然你可以額外指定其容量,使其有界。
  2. ArrayBlockingQueue:有界的的數組任務隊列。
  3. SynchronousQueue:任務隊列的容量固定為1,當客戶端提交執行任務過來的時候,有進行阻塞。直到有個處理線程取走這個待執行的任務,否則會一直阻塞下去。

  NettyRPC的線程池模型,當遇到線程池也無法處理的情形的時候,具體的應對措施策略主要有:

  1. AbortPolicy:直接拒絕執行,拋出rejectedExecution異常。
  2. DiscardedPolicy:從任務隊列的頭部開始直接丟棄一半的隊列元素,為任務隊列“減負”。
  3. CallerRunsPolicy:不拋棄任務,也不拋出異常,而是調用者自己來運行。這個是主要是因為過多的並行請求會加劇系統的負載,線程之間調度操作系統會頻繁的進行上下文切換。當遇到線程池滿的情況,與其頻繁的切換、中斷。不如把並行的請求,全部串列化處理,保證儘量少的處理延時,大概是我能想到的Doug Lea的設計初衷吧。

  經過詳細的介紹了線程池參數的具體內容之後,下麵我就詳細說一下,NettyRPC的線程池RpcThreadPool的工作流程:

  

  1. NettyRPC的線程池收到RPC數據處理請求之後,判斷當前活動的線程數小於線程池設置的corePoolSize的大小的時候,會繼續生成執行任務。
  2. 而當達到corePoolSize的大小的時候的時候,這個時候,線程池會把待執行的任務放入任務隊列之中。
  3. 當任務隊列也被存滿了之後,如果當前活動的線程個數還是小於線程池中maximumPoolSize參數的設置,線程池還會繼續分配出任務線程進行救急處理,並且會立馬執行。
  4. 如果達到線程池中maximumPoolSize參數的設置的線程上限,線程池分派出來的救火隊也無法處理的時候,線程池就會調用拒絕自保策略RejectedExecutionHandler進行處理。

  NettyRPC中預設的線程池設置是把corePoolSize、maximumPoolSize都設置成16,任務隊列設置成無界鏈表構成的阻塞隊列。在應用中要根據實際的壓力、吞吐量對NettyRPC的線程池參數進行合理的規劃。目前NettyRPC暴露了一個JMX介面,JMX是“Java管理擴展的(Java Management Extensions)”的縮寫,是一種類似J2EE的規範,這樣就可以靈活的擴展系統的監控、管理功能。實時監控RPC伺服器線程池任務的執行情況,具體JMX監控度量線程池關鍵指標代碼實現如下:

package com.newlandframework.rpc.parallel.jmx;

import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;

/**
 * @author tangjie<https://github.com/tang-jie>
 * @filename:ThreadPoolStatus.java
 * @description:ThreadPoolStatus功能模塊
 * @blogs http://www.cnblogs.com/jietang/
 * @since 2016/10/13
 */

@ManagedResource
public class ThreadPoolStatus {
    private int poolSize;
    private int activeCount;
    private int corePoolSize;
    private int maximumPoolSize;
    private int largestPoolSize;
    private long taskCount;
    private long completedTaskCount;

    @ManagedOperation
    public int getPoolSize() {
        return poolSize;
    }

    @ManagedOperation
    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    @ManagedOperation
    public int getActiveCount() {
        return activeCount;
    }

    @ManagedOperation
    public void setActiveCount(int activeCount) {
        this.activeCount = activeCount;
    }

    @ManagedOperation
    public int getCorePoolSize() {
        return corePoolSize;
    }

    @ManagedOperation
    public void setCorePoolSize(int corePoolSize) {
        this.corePoolSize = corePoolSize;
    }

    @ManagedOperation
    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

    @ManagedOperation
    public void setMaximumPoolSize(int maximumPoolSize) {
        this.maximumPoolSize = maximumPoolSize;
    }

    @ManagedOperation
    public int getLargestPoolSize() {
        return largestPoolSize;
    }

    @ManagedOperation
    public void setLargestPoolSize(int largestPoolSize) {
        this.largestPoolSize = largestPoolSize;
    }

    @ManagedOperation
    public long getTaskCount() {
        return taskCount;
    }

    @ManagedOperation
    public void setTaskCount(long taskCount) {
        this.taskCount = taskCount;
    }

    @ManagedOperation
    public long getCompletedTaskCount() {
        return completedTaskCount;
    }

    @ManagedOperation
    public void setCompletedTaskCount(long completedTaskCount) {
        this.completedTaskCount = completedTaskCount;
    }
}

  線程池狀態監控類:ThreadPoolStatus,具體監控的指標如下:

  • poolSize:池中的當前線程數
  • activeCount:主動執行任務的近似線程數
  • corePoolSize:核心線程數
  • maximumPoolSize:允許的最大線程數
  • largestPoolSize:歷史最大的線程數
  • taskCount:曾計劃執行的近似任務總數
  • completedTaskCount:已完成執行的近似任務總數

  其中corePoolSize、maximumPoolSize具體含義上文已經詳細講述,這裡就不具體展開。

  NettyRPC線程池監控JMX介面:ThreadPoolMonitorProvider,JMX通過JNDI-RMI的方式進行遠程連接通訊,具體實現方式如下:

package com.newlandframework.rpc.parallel.jmx;

import com.newlandframework.rpc.netty.MessageRecvExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.EnableMBeanExport;
import org.springframework.jmx.support.ConnectorServerFactoryBean;
import org.springframework.jmx.support.MBeanServerConnectionFactoryBean;
import org.springframework.jmx.support.MBeanServerFactoryBean;
import org.springframework.remoting.rmi.RmiRegistryFactoryBean;
import org.apache.commons.lang3.StringUtils;

import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import javax.management.MBeanException;
import javax.management.InstanceNotFoundException;
import java.io.IOException;

/**
 * @author tangjie<https://github.com/tang-jie>
 * @filename:ThreadPoolMonitorProvider.java
 * @description:ThreadPoolMonitorProvider功能模塊
 * @blogs http://www.cnblogs.com/jietang/
 * @since 2016/10/13
 */

@Configuration
@EnableMBeanExport
@ComponentScan("com.newlandframework.rpc.parallel.jmx")
public class ThreadPoolMonitorProvider {
    public final static String DELIMITER = ":";
    public static String url;
    public static String jmxPoolSizeMethod = "setPoolSize";
    public static String jmxActiveCountMethod = "setActiveCount";
    public static String jmxCorePoolSizeMethod = "setCorePoolSize";
    public static String jmxMaximumPoolSizeMethod = "setMaximumPoolSize";
    public static String jmxLargestPoolSizeMethod = "setLargestPoolSize";
    public static String jmxTaskCountMethod = "setTaskCount";
    public static String jmxCompletedTaskCountMethod = "setCompletedTaskCount";

    @Bean
    public ThreadPoolStatus threadPoolStatus() {
        return new ThreadPoolStatus();
    }

    @Bean
    public MBeanServerFactoryBean mbeanServer() {
        return new MBeanServerFactoryBean();
    }

    @Bean
    public RmiRegistryFactoryBean registry() {
        return new RmiRegistryFactoryBean();
    }

    @Bean
    @DependsOn("registry")
    public ConnectorServerFactoryBean connectorServer() throws MalformedObjectNameException {
        MessageRecvExecutor ref = MessageRecvExecutor.getInstance();
        String ipAddr = StringUtils.isNotEmpty(ref.getServerAddress()) ? StringUtils.substringBeforeLast(ref.getServerAddress(), DELIMITER) : "localhost";
        url = "service:jmx:rmi://" + ipAddr + "/jndi/rmi://" + ipAddr + ":1099/nettyrpcstatus";
        System.out.println("NettyRPC JMX MonitorURL : [" + url + "]");
        ConnectorServerFactoryBean connectorServerFactoryBean = new ConnectorServerFactoryBean();
        connectorServerFactoryBean.setObjectName("connector:name=rmi");
        connectorServerFactoryBean.setServiceUrl(url);
        return connectorServerFactoryBean;
    }

    public static void monitor(ThreadPoolStatus status) throws IOException, MalformedObjectNameException, ReflectionException, MBeanException, InstanceNotFoundException {
        MBeanServerConnectionFactoryBean mBeanServerConnectionFactoryBean = new MBeanServerConnectionFactoryBean();
        mBeanServerConnectionFactoryBean.setServiceUrl(url);
        mBeanServerConnectionFactoryBean.afterPropertiesSet();
        MBeanServerConnection connection = mBeanServerConnectionFactoryBean.getObject();
        ObjectName objectName = new ObjectName("com.newlandframework.rpc.parallel.jmx:name=threadPoolStatus,type=ThreadPoolStatus");

        connection.invoke(objectName, jmxPoolSizeMethod, new Object[]{status.getPoolSize()}, new String[]{int.class.getName()});
        connection.invoke(objectName, jmxActiveCountMethod, new Object[]{status.getActiveCount()}, new String[]{int.class.getName()});
        connection.invoke(objectName, jmxCorePoolSizeMethod, new Object[]{status.getCorePoolSize()}, new String[]{int.class.getName()});
        connection.invoke(objectName, jmxMaximumPoolSizeMethod, new Object[]{status.getMaximumPoolSize()}, new String[]{int.class.getName()});
        connection.invoke(objectName, jmxLargestPoolSizeMethod, new Object[]{status.getLargestPoolSize()}, new String[]{int.class.getName()});
        connection.invoke(objectName, jmxTaskCountMethod, new Object[]{status.getTaskCount()}, new String[]{long.class.getName()});
        connection.invoke(objectName, jmxCompletedTaskCountMethod, new Object[]{status.getCompletedTaskCount()}, new String[]{long.class.getName()});
    }
}

  NettyRPC伺服器啟動成功之後,就可以通過JMX介面進行監控:可以打開jconsole,然後輸入URL:service:jmx:rmi://127.0.0.1/jndi/rmi://127.0.0.1:1099/nettyrpcstatus,用戶名、密碼預設為空,點擊連接按鈕。

    

  當有客戶端進行RPC請求的時候,通過JMX可以看到如下的監控界面:

     

  這個時候點擊NettyRPC線程池各個監控指標的按鈕,就可以直觀的看到NettyRPC實際運行中,線程池的主要參數指標的實時監控。比如點擊:getCompletedTaskCount,想查看一下目前已經完成的線程任務總數指標。具體情況如下圖所示:

     

  可以看到,目前已經處理了40280筆RPC請求。這樣,我們就可以準實時監控NettyRPC線程池參數設置、容量規劃是否合理,以便及時作出調整,合理的最大程度利用軟硬體資源。

  最後經過重構之後,NettyRPC服務端的Spring配置(NettyRPC/NettyRPC 2.0/main/resources/rpc-invoke-config-server.xml)如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:nettyrpc="http://www.newlandframework.com/nettyrpc" xsi:schemaLocation="
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.newlandframework.com/nettyrpc http://www.newlandframework.com/nettyrpc/nettyrpc.xsd">
    <!--載入rpc伺服器的ip地址、埠信息-->
    <context:property-placeholder location="classpath:rpc-server.properties"/>
    <!--定義rpc服務介面-->
    <nettyrpc:service id="demoAddService" interfaceName="com.newlandframework.rpc.services.AddCalculate"
                      ref="calcAddService"></nettyrpc:service>
    <nettyrpc:service id="demoMultiService" interfaceName="com.newlandframework.rpc.services.MultiCalculate"
                      ref="calcMultiService"></nettyrpc:service>
    <!--註冊rpc伺服器,並通過protocol指定序列化協議-->                  
    <nettyrpc:registry id="rpcRegistry" ipAddr="${rpc.server.addr}" protocol="PROTOSTUFFSERIALIZE"></nettyrpc:registry>
    <!--rpc服務實現類聲明-->
    <bean id="calcAddService" class="com.newlandframework.rpc.services.impl.AddCalculateImpl"></bean>
    <bean id="calcMultiService" class="com.newlandframework.rpc.services.impl.MultiCalculateImpl"></bean>
</beans>

  通過nettyrpc:service標簽定義rpc伺服器支持的服務介面,這裡的樣例聲明瞭當前的rpc伺服器提供了加法計算、乘法計算兩種服務給客戶端進行調用。具體通過Spring自定義標簽的實現,大家可以自行參考github:NettyRPC/NettyRPC 2.0/main/java/com/newlandframework/rpc/spring(路徑/包)中的實現代碼,代碼比較多得利用到了Spring框架的特性,希望大

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 引入別人的項目發現利用HibernateTemplate的load的方法報錯了。錯誤提示為: The method load(Class, Serializable) in the type HibernateTemplate is not applicable for the arguments ...
  • Spring AOP應用:xml配置及註解實現。 動態代理:jdk、cglib、javassist 緩存應用:高速緩存提供程式ehcache,頁面緩存,session緩存 項目地址:https://github.com/windwant/spring-aop-test ...
  • 記得當初自己剛開始學習Java的時候,對Java的IO流這一塊特別不明白,所以寫了這篇隨筆希望能對剛開始學習Java的人有所幫助,也方便以後自己查詢。Java的IO流分為字元流(Reader,Writer)和位元組流(InputStream,OutputStream),位元組流顧名思義位元組流就是將文件的 ...
  • 英文文檔: compile(source, filename, mode, flags=0, dont_inherit=False, optimize=-1) ...
  • 測試環境:php5.3.29 unix時間戳(從Unix 紀元(January 1 1970 00:00:00 GMT)到給定時間的秒數。)。以下簡稱時間戳。 返回某一時間的時間戳。 time(); //獲取當前本機時間的時間戳。 mktime(時,分,秒,月,日,年); //從右向左可以省略,省略 ...
  • Go的 switch 非常靈活,表達式不必是常量或整數,執行的過程從上至下,直到找到匹配項,不要break; switch 後面的表達式甚至不是必需的 利子: RadioButton為單選按鈕,可以分組, radiobuttongroup和radiobuttongroupbox ...
  • 通用精準化推薦平臺 平臺結構 以下為推薦流程可視化系統設計圖 以下為推薦結果可追溯系統設計圖 通過推薦流程可視化系統以及推薦結果可追溯系統,我們可以解決原有推薦系統架構的問題 推薦流程可視化系統 --------------------------... ...
  • 這兩個星期開始系統地學習設計模式相關的知識,對每一個原則或者設計模式主要從下麵幾點分析學習: - 定義:簡單地描述其作用 - 解決問題:說明該原則或設計模式解決什麼限制條件下的問題。 - 結構圖:繪製相關例子的UML機構圖。 - 代碼示例:通過一個例子解釋該模式的實現方法。 - 優缺點:該模式的局限... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...