Netty實現高性能RPC伺服器優化篇之消息序列化

来源:http://www.cnblogs.com/jietang/archive/2016/07/16/5675171.html
-Advertisement-
Play Games

在本人寫的前一篇文章中,談及有關如何利用Netty開發實現,高性能RPC伺服器的一些設計思路、設計原理,以及具體的實現方案(具體參見:談談如何使用Netty開發實現高性能的RPC伺服器)。在文章的最後提及到,其實基於該方案設計的RPC伺服器的處理性能,還有優化的餘地。於是利用周末的時間,在原來Net ...


  在本人寫的前一篇文章中,談及有關如何利用Netty開發實現,高性能RPC伺服器的一些設計思路、設計原理,以及具體的實現方案(具體參見:談談如何使用Netty開發實現高性能的RPC伺服器)。在文章的最後提及到,其實基於該方案設計的RPC伺服器的處理性能,還有優化的餘地。於是利用周末的時間,在原來NettyRPC框架的基礎上,加以優化重構,本次主要優化改造點如下:

  1、NettyRPC中對RPC消息進行編碼、解碼採用的是Netty自帶的ObjectEncoder、ObjectDecoder(對象編碼、解碼器),該編碼、解碼器基於的是Java的原生序列化機制,從已有的文章以及測試數據來看,Java的原生序列化性能效率不高,而且產生的序列化二進位碼流太大,故本次在優化中,引入RPC消息序列化協議的概念。所謂消息序列化協議,就是針對RPC消息的序列化、反序列化過程進行特殊的定製,引入第三方編解碼框架。本次引入的第三方編解碼框架有Kryo、Hessian。這裡,不得不再次提及一下,對象序列化、反序列化的概念,在RPC的遠程服務調用過程中,需要把消息對象通過網路傳輸,這個就要用到序列化將對象轉變成位元組流,到達另外一端之後,再反序列化回來變成消息對象。

  2、引入Google Guava併發編程框架對NettyRPC的NIO線程池、業務線程池進行重新梳理封裝。

  3、利用第三方編解碼框架(Kryo、Hessian)的時候,考慮到高併發的場景下,頻繁的創建、銷毀序列化對象,會非常消耗JVM的記憶體資源,影響整個RPC伺服器的處理性能,因此引入對象池化(Object Pooling)技術。眾所周知,創建新對象並初始化,可能會消耗很多的時間。當需要產生大量對象的時候,可能會對性能造成一定的影響。為瞭解決這個問題,除了提升硬體條件之外,對象池化技術就是這方面的銀彈,而Apache Commons Pool框架就是對象池化技術的一個很好的實現(開源項目路徑:http://commons.apache.org/proper/commons-pool/download_pool.cgi)。本文中的Hessian池化工作,主要是基於Apache Commons Pool框架,進行封裝處理。

  本文將著重,從上面的三個方面,對重構優化之後的NettyRPC伺服器的實現思路、實現方式進行重點講解。首先請大家簡單看下,本次優化之後的NettyRPC伺服器支持的序列化協議,如下圖所示:

  

  可以很清楚的看到,優化之後的NettyRPC可以支持Kryo、Hessian、Java本地序列化三種消息序列化方式。其中Java本地序列化方式,相信大家應該很熟悉了,再次不在重覆講述。現在我們重點講述一下,另外兩種序列化方式:

  1、Kryo序列化。它是針對Java,而定製實現的高效對象序列化框架,相比Java本地原生序列化方式,Kryo在處理性能上、碼流大小上等等方面有很大的優化改進。目前已知的很多著名開源項目,都引入採用了該序列化方式。比如alibaba開源的dubbo RPC等等。本文中採用的Kryo的預設版本是基於:kryo-3.0.3。它的下載鏈接是:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-3.0.3。為什麼採用這個版本?主要原因我上面也說明瞭,出於應對高併發場景下,頻繁地創建、銷毀序列化對象,會非常消耗JVM的記憶體資源、以及時間。Kryo的這個發行版本中,集成引入了序列化對象池功能模塊(KryoFactory、KryoPool),這樣我們就不必再利用Apache Commons Pool對其進行二次封裝。

  2、Hessian序列化。Hessian本身是一種序列化協議,它比Java原生的序列化、反序列化速度更快、序列化出來的數據也更小。它是採用二進位格式進行數據傳輸,而且,目前支持多種語言格式。本文中採用的是:hessian-4.0.37 版本,它的下載鏈接是:http://hessian.caucho.com/#Java

  接下來,先來看下優化之後的NettyRPC的消息協議編解碼包(newlandframework.netty.rpc.serialize.support、newlandframework.netty.rpc.serialize.support.kryo、newlandframework.netty.rpc.serialize.support.hessian)的結構,如下圖所示:

     

  其中RPC請求消息結構代碼如下:

/**
 * @filename:MessageRequest.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:rpc服務請求結構
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.model;

import java.io.Serializable;
import org.apache.commons.lang.builder.ReflectionToStringBuilder;

public class MessageRequest implements Serializable {

    private String messageId;
    private String className;
    private String methodName;
    private Class<?>[] typeParameters;
    private Object[] parametersVal;

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getTypeParameters() {
        return typeParameters;
    }

    public void setTypeParameters(Class<?>[] typeParameters) {
        this.typeParameters = typeParameters;
    }

    public Object[] getParameters() {
        return parametersVal;
    }

    public void setParameters(Object[] parametersVal) {
        this.parametersVal = parametersVal;
    }

    public String toString() {
        return ReflectionToStringBuilder.toStringExclude(this, new String[]{"typeParameters", "parametersVal"});
    }
}

  RPC應答消息結構,如下所示:

/**
 * @filename:MessageResponse.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:rpc服務應答結構
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.model;

import java.io.Serializable;
import org.apache.commons.lang.builder.ReflectionToStringBuilder;

public class MessageResponse implements Serializable {

    private String messageId;
    private String error;
    private Object resultDesc;

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

    public String getError() {
        return error;
    }

    public void setError(String error) {
        this.error = error;
    }

    public Object getResult() {
        return resultDesc;
    }

    public void setResult(Object resultDesc) {
        this.resultDesc = resultDesc;
    }

    public String toString() {
        return ReflectionToStringBuilder.toString(this);
    }
}

  現在,我們就來對上述的RPC請求消息、應答消息進行編解碼框架的設計。由於NettyRPC中的協議類型,目前已經支持Kryo序列化、Hessian序列化、Java原生本地序列化方式。考慮到可擴展性,故要抽象出RPC消息序列化,協議類型對象(RpcSerializeProtocol),它的代碼實現如下所示:

/**
 * @filename:RpcSerializeProtocol.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC消息序序列化協議類型
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;

public enum RpcSerializeProtocol {

    //目前由於沒有引入跨語言RPC通信機制,暫時採用支持同構語言Java序列化/反序列化機制的第三方插件
    //NettyRPC目前已知的序列化插件有:Java原生序列化、Kryo、Hessian
    JDKSERIALIZE("jdknative"), KRYOSERIALIZE("kryo"), HESSIANSERIALIZE("hessian");

    private String serializeProtocol;

    private RpcSerializeProtocol(String serializeProtocol) {
        this.serializeProtocol = serializeProtocol;
    }

    public String toString() {
        ReflectionToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE);
        return ReflectionToStringBuilder.toString(this);
    }

    public String getProtocol() {
        return serializeProtocol;
    }
}

  針對不同編解碼序列化的框架(這裡主要是指Kryo、Hessian),再抽象、萃取出一個RPC消息序列化/反序列化介面(RpcSerialize)、RPC消息編解碼介面(MessageCodecUtil)。

/**
 * @filename:RpcSerialize.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC消息序列化/反序列化介面定義
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public interface RpcSerialize {

    void serialize(OutputStream output, Object object) throws IOException;

    Object deserialize(InputStream input) throws IOException;
}
/**
 * @filename:MessageCodecUtil.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC消息編解碼介面
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support;

import io.netty.buffer.ByteBuf;
import java.io.IOException;

public interface MessageCodecUtil {

    //RPC消息報文頭長度4個位元組
    final public static int MESSAGE_LENGTH = 4;

    public void encode(final ByteBuf out, final Object message) throws IOException;

    public Object decode(byte[] body) throws IOException;
}

  最後我們的NettyRPC框架要能自由地支配、定製Netty的RPC服務端、客戶端,採用何種序列化來進行RPC消息對象的網路傳輸。因此,要再抽象一個RPC消息序列化協議選擇器介面(RpcSerializeFrame),對應的實現如下:

/**
 * @filename:RpcSerializeFrame.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC消息序序列化協議選擇器介面
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support;

import io.netty.channel.ChannelPipeline;

public interface RpcSerializeFrame {

    public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline);
}

  現在有了上面定義的一系列的介面,現在就可以定製實現,基於Kryo、Hessian方式的RPC消息序列化、反序列化模塊了。先來看下整體的類圖結構:

  首先是RPC消息的編碼器MessageEncoder,它繼承自Netty的MessageToByteEncoder編碼器。主要是把RPC消息對象編碼成二進位流的格式,對應實現如下:

/**
 * @filename:MessageEncoder.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC消息編碼介面
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class MessageEncoder extends MessageToByteEncoder<Object> {

    private MessageCodecUtil util = null;

    public MessageEncoder(final MessageCodecUtil util) {
        this.util = util;
    }

    protected void encode(final ChannelHandlerContext ctx, final Object msg, final ByteBuf out) throws Exception {
        util.encode(out, msg);
    }
}

  接下來是RPC消息的解碼器MessageDecoder,它繼承自Netty的ByteToMessageDecoder。主要針對二進位流反序列化成消息對象。當然了,在之前的一篇文章中我曾經提到,NettyRPC是基於TCP協議的,TCP在傳輸數據的過程中會出現所謂的“粘包”現象,所以我們的MessageDecoder要對RPC消息體的長度進行校驗,如果不滿足RPC消息報文頭中指定的消息體長度,我們直接重置一下ByteBuf讀索引的位置,具體可以參考如下的代碼方式,進行RPC消息協議的解析:

/**
 * @filename:MessageDecoder.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:RPC消息解碼介面
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.io.IOException;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MessageDecoder extends ByteToMessageDecoder {

    final public static int MESSAGE_LENGTH = MessageCodecUtil.MESSAGE_LENGTH;
    private MessageCodecUtil util = null;

    public MessageDecoder(final MessageCodecUtil util) {
        this.util = util;
    }

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        //出現粘包導致消息頭長度不對,直接返回
        if (in.readableBytes() < MessageDecoder.MESSAGE_LENGTH) {
            return;
        }

        in.markReaderIndex();
        //讀取消息的內容長度
        int messageLength = in.readInt();
        
        if (messageLength < 0) {
            ctx.close();
        }

        //讀到的消息長度和報文頭的已知長度不匹配。那就重置一下ByteBuf讀索引的位置
        if (in.readableBytes() < messageLength) {
            in.resetReaderIndex();
            return;
        } else {
            byte[] messageBody = new byte[messageLength];
            in.readBytes(messageBody);

            try {
                Object obj = util.decode(messageBody);
                out.add(obj);
            } catch (IOException ex) {
                Logger.getLogger(MessageDecoder.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

  現在,我們進一步實現,利用Kryo序列化方式,對RPC消息進行編解碼的模塊。首先是要實現NettyRPC消息序列化介面(RpcSerialize)的方法。

/**
 * @filename:KryoSerialize.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Kryo序列化/反序列化實現
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.kryo;

import newlandframework.netty.rpc.serialize.support.RpcSerialize;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.pool.KryoPool;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class KryoSerialize implements RpcSerialize {

    private KryoPool pool = null;

    public KryoSerialize(final KryoPool pool) {
        this.pool = pool;
    }

    public void serialize(OutputStream output, Object object) throws IOException {
        Kryo kryo = pool.borrow();
        Output out = new Output(output);
        kryo.writeClassAndObject(out, object);
        out.close();
        pool.release(kryo);
    }

    public Object deserialize(InputStream input) throws IOException {
        Kryo kryo = pool.borrow();
        Input in = new Input(input);
        Object result = kryo.readClassAndObject(in);
        in.close();
        pool.release(kryo);
        return result;
    }
}

   接著利用Kryo庫裡面的對象池,對RPC消息對象進行編解碼。首先是Kryo對象池工廠(KryoPoolFactory),這個也是我為什麼選擇kryo-3.0.3版本的原因了。代碼如下:

/**
 * @filename:KryoPoolFactory.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Kryo對象池工廠
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.kryo;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.pool.KryoFactory;
import com.esotericsoftware.kryo.pool.KryoPool;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
import org.objenesis.strategy.StdInstantiatorStrategy;

public class KryoPoolFactory {

    private static KryoPoolFactory poolFactory = null;

    private KryoFactory factory = new KryoFactory() {
        public Kryo create() {
            Kryo kryo = new Kryo();
            kryo.setReferences(false);
            //把已知的結構註冊到Kryo註冊器裡面,提高序列化/反序列化效率
            kryo.register(MessageRequest.class);
            kryo.register(MessageResponse.class);
            kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
            return kryo;
        }
    };

    private KryoPool pool = new KryoPool.Builder(factory).build();

    private KryoPoolFactory() {
    }

    public static KryoPool getKryoPoolInstance() {
        if (poolFactory == null) {
            synchronized (KryoPoolFactory.class) {
                if (poolFactory == null) {
                    poolFactory = new KryoPoolFactory();
                }
            }
        }
        return poolFactory.getPool();
    }

    public KryoPool getPool() {
        return pool;
    }
}

  Kryo對RPC消息進行編碼、解碼的工具類KryoCodecUtil,實現了RPC消息編解碼介面(MessageCodecUtil),具體實現方式如下:

/**
 * @filename:KryoCodecUtil.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Kryo編解碼工具類
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.kryo;

import com.esotericsoftware.kryo.pool.KryoPool;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import com.google.common.io.Closer;

public class KryoCodecUtil implements MessageCodecUtil {

    private KryoPool pool;
    private static Closer closer = Closer.create();

    public KryoCodecUtil(KryoPool pool) {
        this.pool = pool;
    }

    public void encode(final ByteBuf out, final Object message) throws IOException {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            closer.register(byteArrayOutputStream);
            KryoSerialize kryoSerialization = new KryoSerialize(pool);
            kryoSerialization.serialize(byteArrayOutputStream, message);
            byte[] body = byteArrayOutputStream.toByteArray();
            int dataLength = body.length;
            out.writeInt(dataLength);
            out.writeBytes(body);
        } finally {
            closer.close();
        }
    }

    public Object decode(byte[] body) throws IOException {
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
            closer.register(byteArrayInputStream);
            KryoSerialize kryoSerialization = new KryoSerialize(pool);
            Object obj = kryoSerialization.deserialize(byteArrayInputStream);
            return obj;
        } finally {
            closer.close();
        }
    }
}

  最後是,Kryo自己的編碼器、解碼器,其實只要調用Kryo編解碼工具類(KryoCodecUtil)裡面的encode、decode方法就可以了。現在貼出具體的代碼:

/**
 * @filename:KryoDecoder.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Kryo解碼器
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.kryo;

import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.MessageDecoder;

public class KryoDecoder extends MessageDecoder {

    public KryoDecoder(MessageCodecUtil util) {
        super(util);
    }
}
/**
 * @filename:KryoEncoder.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Kryo編碼器
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.kryo;

import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.MessageEncoder;

public class KryoEncoder extends MessageEncoder {

    public KryoEncoder(MessageCodecUtil util) {
        super(util);
    }
}

  最後,我們再來實現一下,利用Hessian實現RPC消息的編碼、解碼器代碼模塊。首先還是Hessian序列化/反序列化實現(HessianSerialize),它同樣實現了RPC消息序列化/反序列化介面(RpcSerialize),對應的代碼如下:

/**
 * @filename:HessianSerialize.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Hessian序列化/反序列化實現
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.hessian;

import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import newlandframework.netty.rpc.serialize.support.RpcSerialize;

public class HessianSerialize implements RpcSerialize {

    public void serialize(OutputStream output, Object object) {
        Hessian2Output ho = new Hessian2Output(output);
        try {
            ho.startMessage();
            ho.writeObject(object);
            ho.completeMessage();
            ho.close();
            output.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public Object deserialize(InputStream input) {
        Object result = null;
        try {
            Hessian2Input hi = new Hessian2Input(input);
            hi.startMessage();
            result = hi.readObject();
            hi.completeMessage();
            hi.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }
}

  現在利用對象池(Object Pooling)技術,對Hessian序列化/反序列化類(HessianSerialize)進行池化處理,對應的代碼如下:

/**
 * @filename:HessianSerializeFactory.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Hessian序列化/反序列化對象工廠池
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.hessian;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;

public class HessianSerializeFactory extends BasePooledObjectFactory<HessianSerialize> {

    public HessianSerialize create() throws Exception {
        return createHessian();
    }

    public PooledObject<HessianSerialize> wrap(HessianSerialize hessian) {
        return new DefaultPooledObject<HessianSerialize>(hessian);
    }

    private HessianSerialize createHessian() {
        return new HessianSerialize();
    }
}
/**
 * @filename:HessianSerializePool.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Hessian序列化/反序列化池
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.hessian;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

public class HessianSerializePool {

    //Netty採用Hessian序列化/反序列化的時候,為了避免重覆產生對象,提高JVM記憶體利用率,故引入對象池技術,經過測試
    //遇到高併發序列化/反序列化的場景的時候,序列化效率明顯提升不少。
    private GenericObjectPool<HessianSerialize> hessianPool;
    private static HessianSerializePool poolFactory = null;

    private HessianSerializePool() {
        hessianPool = new GenericObjectPool<HessianSerialize>(new HessianSerializeFactory());
    }

    public static HessianSerializePool getHessianPoolInstance() {
        if (poolFactory == null) {
            synchronized (HessianSerializePool.class) {
                if (poolFactory == null) {
                    poolFactory = new HessianSerializePool();
                }
            }
        }
        return poolFactory;
    }

    //預留介面,後續可以通過Spring Property Placeholder依賴註入
    public HessianSerializePool(final int maxTotal, final int minIdle, final long maxWaitMillis, final long minEvictableIdleTimeMillis) {
        hessianPool = new GenericObjectPool<HessianSerialize>(new HessianSerializeFactory());
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        //最大池對象總數
        config.setMaxTotal(maxTotal);
        //最小空閑數
        config.setMinIdle(minIdle);
        //最大等待時間, 預設的值為-1,表示無限等待
        config.setMaxWaitMillis(maxWaitMillis);
        //退出連接的最小空閑時間 預設1800000毫秒
        config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
        hessianPool.setConfig(config);
    }

    public HessianSerialize borrow() {
        try {
            return getHessianPool().borrowObject();
        } catch (final Exception ex) {
            ex.printStackTrace();
            return null;
        }
    }

    public void restore(final HessianSerialize object) {
        getHessianPool().returnObject(object);
    }

    public GenericObjectPool<HessianSerialize> getHessianPool() {
        return hessianPool;
    }
}

  Hessian序列化對象經過池化處理之後,我們通過Hessian編解碼工具類,來“借用”Hessian序列化對象(HessianSerialize),當然了,你借出來之後,一定要還回去嘛。Hessian編解碼工具類的實現方式如下:

/**
 * @filename:HessianCodecUtil.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Hessian編解碼工具類
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.hessian;

import com.google.common.io.Closer;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;

public class HessianCodecUtil implements MessageCodecUtil {

    HessianSerializePool pool = HessianSerializePool.getHessianPoolInstance();
    private static Closer closer = Closer.create();

    public HessianCodecUtil() {

    }

    public void encode(final ByteBuf out, final Object message) throws IOException {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            closer.register(byteArrayOutputStream);
            HessianSerialize hessianSerialization = pool.borrow();
            hessianSerialization.serialize(byteArrayOutputStream, message);
            byte[] body = byteArrayOutputStream.toByteArray();
            int dataLength = body.length;
            out.writeInt(dataLength);
            out.writeBytes(body);
            pool.restore(hessianSerialization);
        } finally {
            closer.close();
        }
    }

    public Object decode(byte[] body) throws IOException {
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
            closer.register(byteArrayInputStream);
            HessianSerialize hessianSerialization = pool.borrow();
            Object object = hessianSerialization.deserialize(byteArrayInputStream);
            pool.restore(hessianSerialization);
            return object;
        } finally {
            closer.close();
        }
    }
}

  最後Hessian對RPC消息的編碼器、解碼器參考實現代碼如下所示:

/**
 * @filename:HessianDecoder.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Hessian解碼器
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.hessian;

import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.MessageDecoder;

public class HessianDecoder extends MessageDecoder {

    public HessianDecoder(MessageCodecUtil util) {
        super(util);
    }
}
/**
 * @filename:HessianEncoder.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Hessian編碼器
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.serialize.support.hessian;

import newlandframework.netty.rpc.serialize.support.MessageCodecUtil;
import newlandframework.netty.rpc.serialize.support.MessageEncoder;

public class HessianEncoder extends MessageEncoder {

    public HessianEncoder(MessageCodecUtil util) {
        super(util);
    }
}

  到目前為止,NettyRPC所針對的Kryo、Hessian序列化協議模塊,已經設計實現完畢,現在我們就要把這個協議,嵌入NettyRPC的核心模塊包(newlandframework.netty.rpc.core),下麵只給出優化調整之後的代碼,其它代碼模塊的內容,可以參考我上一篇的文章:談談如何使用Netty開發實現高性能的RPC伺服器。好了,我們先來看下,NettyRPC核心模塊包(newlandframework.netty.rpc.core)的層次結構:

     

  先來看下,NettyRPC服務端的實現部分。首先是,Rpc服務端管道初始化(MessageRecvChannelInitializer),跟上一版本對比,主要引入了序列化消息對象(RpcSerializeProtocol),具體實現代碼如下:

/**
 * @filename:MessageRecvChannelInitializer.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc服務端管道初始化
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
	   

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Java中ScheduleThreadPoolExecutor主要用於執行延遲任務或者按照一定的頻率執行任務。其中scheduleAtFixedRate函數是按照一定頻率執行任務,scheduleWithFixedDelay可以根據延遲一定時間再執行任務。本文將參考ScheduleThreadPoo ...
  • 首先可以去http://www.oracle.com/technetwork/java/javase/downloads下載jdk安裝包。目前jdk已經更新到了8u91/8u92的版本,不過此處我用的jdk版本仍然是jdk8u51,下麵介紹其安裝步驟與環境變數的配置 雙擊應用程式jdk-8u51-w ...
  • 在最近的一次大數據技術討論會上,有一家公司的技術高管談到松耦合和緊耦合的性能表現的話題。正好Laxcus大數據管理系統的設計,從0.x、1.x到2.x版本,也經歷了從緊耦合到松耦合的發展過程。做為親歷者,對這兩種架構的設計和運行效果,我們有清楚的瞭解和認識。下麵就說一說這件事。寫此博文,也希望給做系... ...
  • ...
  • Reactor 模式簡單實現 在網上有部分文章在描述Netty時,會提到Reactor。這個Reactor到底是什麼呢?為了搞清楚Reactor到底是什麼鬼,我寫了一個簡單的Demo,來幫助大家理解他。 網上是這麼描述Reactor的: The Reactor design pattern hand ...
  • 一、概述 觀察者模式定義了對象之間的一對多依賴,這樣一來,當一個對象改變狀態時,它的所有依賴者都會收到通知並自動更新。觀察者模式有時成為發佈/訂閱模式,就是讓多個對象在一個對象的狀態改變時被通知到。 二、解決問題 當一個系統有多個類協同工作,如果在一個類中需要知道另外一個類的實現細節才能讓系統運轉, ...
  • 優秀程式設計的Kiss原則(keep it simple,stupid) 優秀程式設計的Kiss原則(keep it simple,stupid) 良好的編程原則與良好的設計工程原則密切相關。本文總結的這些設計原則,幫助開發者更有效率的編寫代碼,並幫助成為一名優秀的程式員。 1.避免重覆原則(DRY ...
  • 面向對象 問題 面向對象的好處 學一門技術是否需要趣味性、通俗性的指導 面向對象 什麼是對象 世間任何事物都可以定義為對象。 什麼是類 類就是把對象的公共屬性和方法抽離出來形成集合的抽象,也就是說,類是一種抽象,用來描述對象特征的抽象。 什麼是實例 實例就是對象,就好比我們,就是人這種抽象的一個實例 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...