在本人寫的前一篇文章中,談及有關如何利用Netty開發實現,高性能RPC伺服器的一些設計思路、設計原理,以及具體的實現方案(具體參見:談談如何使用Netty開發實現高性能的RPC伺服器)。在文章的最後提及到,其實基於該方案設計的RPC伺服器的處理性能,還有優化的餘地。於是利用周末的時間,在原來Net ...
在本人寫的前一篇文章中,談及有關如何利用Netty開發實現,高性能RPC伺服器的一些設計思路、設計原理,以及具體的實現方案(具體參見:談談如何使用Netty開發實現高性能的RPC伺服器)。在文章的最後提及到,其實基於該方案設計的RPC伺服器的處理性能,還有優化的餘地。於是利用周末的時間,在原來NettyRPC框架的基礎上,加以優化重構,本次主要優化改造點如下:
1、NettyRPC中對RPC消息進行編碼、解碼採用的是Netty自帶的ObjectEncoder、ObjectDecoder(對象編碼、解碼器),該編碼、解碼器基於的是Java的原生序列化機制,從已有的文章以及測試數據來看,Java的原生序列化性能效率不高,而且產生的序列化二進位碼流太大,故本次在優化中,引入RPC消息序列化協議的概念。所謂消息序列化協議,就是針對RPC消息的序列化、反序列化過程進行特殊的定製,引入第三方編解碼框架。本次引入的第三方編解碼框架有Kryo、Hessian。這裡,不得不再次提及一下,對象序列化、反序列化的概念,在RPC的遠程服務調用過程中,需要把消息對象通過網路傳輸,這個就要用到序列化將對象轉變成位元組流,到達另外一端之後,再反序列化回來變成消息對象。
2、引入Google Guava併發編程框架對NettyRPC的NIO線程池、業務線程池進行重新梳理封裝。
3、利用第三方編解碼框架(Kryo、Hessian)的時候,考慮到高併發的場景下,頻繁的創建、銷毀序列化對象,會非常消耗JVM的記憶體資源,影響整個RPC伺服器的處理性能,因此引入對象池化(Object Pooling)技術。眾所周知,創建新對象並初始化,可能會消耗很多的時間。當需要產生大量對象的時候,可能會對性能造成一定的影響。為瞭解決這個問題,除了提升硬體條件之外,對象池化技術就是這方面的銀彈,而Apache Commons Pool框架就是對象池化技術的一個很好的實現(開源項目路徑:http://commons.apache.org/proper/commons-pool/download_pool.cgi)。本文中的Hessian池化工作,主要是基於Apache Commons Pool框架,進行封裝處理。
本文將著重,從上面的三個方面,對重構優化之後的NettyRPC伺服器的實現思路、實現方式進行重點講解。首先請大家簡單看下,本次優化之後的NettyRPC伺服器支持的序列化協議,如下圖所示:
可以很清楚的看到,優化之後的NettyRPC可以支持Kryo、Hessian、Java本地序列化三種消息序列化方式。其中Java本地序列化方式,相信大家應該很熟悉了,再次不在重覆講述。現在我們重點講述一下,另外兩種序列化方式:
1、Kryo序列化。它是針對Java,而定製實現的高效對象序列化框架,相比Java本地原生序列化方式,Kryo在處理性能上、碼流大小上等等方面有很大的優化改進。目前已知的很多著名開源項目,都引入採用了該序列化方式。比如alibaba開源的dubbo RPC等等。本文中採用的Kryo的預設版本是基於:kryo-3.0.3。它的下載鏈接是:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-3.0.3。為什麼採用這個版本?主要原因我上面也說明瞭,出於應對高併發場景下,頻繁地創建、銷毀序列化對象,會非常消耗JVM的記憶體資源、以及時間。Kryo的這個發行版本中,集成引入了序列化對象池功能模塊(KryoFactory、KryoPool),這樣我們就不必再利用Apache Commons Pool對其進行二次封裝。
2、Hessian序列化。Hessian本身是一種序列化協議,它比Java原生的序列化、反序列化速度更快、序列化出來的數據也更小。它是採用二進位格式進行數據傳輸,而且,目前支持多種語言格式。本文中採用的是:hessian-4.0.37 版本,它的下載鏈接是:http://hessian.caucho.com/#Java。
接下來,先來看下優化之後的NettyRPC的消息協議編解碼包(newlandframework.netty.rpc.serialize.support、newlandframework.netty.rpc.serialize.support.kryo、newlandframework.netty.rpc.serialize.support.hessian)的結構,如下圖所示:
其中RPC請求消息結構代碼如下:
/** * @filename:MessageRequest.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc服務請求結構 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.model; import java.io.Serializable; import org.apache.commons.lang.builder.ReflectionToStringBuilder; public class MessageRequest implements Serializable { private String messageId; private String className; private String methodName; private Class<?>[] typeParameters; private Object[] parametersVal; public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<?>[] getTypeParameters() { return typeParameters; } public void setTypeParameters(Class<?>[] typeParameters) { this.typeParameters = typeParameters; } public Object[] getParameters() { return parametersVal; } public void setParameters(Object[] parametersVal) { this.parametersVal = parametersVal; } public String toString() { return ReflectionToStringBuilder.toStringExclude(this, new String[]{"typeParameters", "parametersVal"}); } }
RPC應答消息結構,如下所示:
/** * @filename:MessageResponse.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc服務應答結構 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.model; import java.io.Serializable; import org.apache.commons.lang.builder.ReflectionToStringBuilder; public class MessageResponse implements Serializable { private String messageId; private String error; private Object resultDesc; public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public String getError() { return error; } public void setError(String error) { this.error = error; } public Object getResult() { return resultDesc; } public void setResult(Object resultDesc) { this.resultDesc = resultDesc; } public String toString() { return ReflectionToStringBuilder.toString(this); } }
現在,我們就來對上述的RPC請求消息、應答消息進行編解碼框架的設計。由於NettyRPC中的協議類型,目前已經支持Kryo序列化、Hessian序列化、Java原生本地序列化方式。考慮到可擴展性,故要抽象出RPC消息序列化,協議類型對象(RpcSerializeProtocol),它的代碼實現如下所示:
/** * @filename:RpcSerializeProtocol.java * * Newland Co. Ltd. All rights reserved. * * @Description:RPC消息序序列化協議類型 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support; import org.apache.commons.lang.builder.ReflectionToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; public enum RpcSerializeProtocol { //目前由於沒有引入跨語言RPC通信機制,暫時採用支持同構語言Java序列化/反序列化機制的第三方插件 //NettyRPC目前已知的序列化插件有:Java原生序列化、Kryo、Hessian JDKSERIALIZE("jdknative"), KRYOSERIALIZE("kryo"), HESSIANSERIALIZE("hessian"); private String serializeProtocol; private RpcSerializeProtocol(String serializeProtocol) { this.serializeProtocol = serializeProtocol; } public String toString() { ReflectionToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE); return ReflectionToStringBuilder.toString(this); } public String getProtocol() { return serializeProtocol; } }
針對不同編解碼序列化的框架(這裡主要是指Kryo、Hessian),再抽象、萃取出一個RPC消息序列化/反序列化介面(RpcSerialize)、RPC消息編解碼介面(MessageCodecUtil)。
/** * @filename:RpcSerialize.java * * Newland Co. Ltd. All rights reserved. * * @Description:RPC消息序列化/反序列化介面定義 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public interface RpcSerialize { void serialize(OutputStream output, Object object) throws IOException; Object deserialize(InputStream input) throws IOException; }
/** * @filename:MessageCodecUtil.java * * Newland Co. Ltd. All rights reserved. * * @Description:RPC消息編解碼介面 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support; import io.netty.buffer.ByteBuf; import java.io.IOException; public interface MessageCodecUtil { //RPC消息報文頭長度4個位元組 final public static int MESSAGE_LENGTH = 4; public void encode(final ByteBuf out, final Object message) throws IOException; public Object decode(byte[] body) throws IOException; }
最後我們的NettyRPC框架要能自由地支配、定製Netty的RPC服務端、客戶端,採用何種序列化來進行RPC消息對象的網路傳輸。因此,要再抽象一個RPC消息序列化協議選擇器介面(RpcSerializeFrame),對應的實現如下:
/** * @filename:RpcSerializeFrame.java * * Newland Co. Ltd. All rights reserved. * * @Description:RPC消息序序列化協議選擇器介面 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support; import io.netty.channel.ChannelPipeline; public interface RpcSerializeFrame { public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline); }
現在有了上面定義的一系列的介面,現在就可以定製實現,基於Kryo、Hessian方式的RPC消息序列化、反序列化模塊了。先來看下整體的類圖結構:
首先是RPC消息的編碼器MessageEncoder,它繼承自Netty的MessageToByteEncoder編碼器。主要是把RPC消息對象編碼成二進位流的格式,對應實現如下:
/** * @filename:MessageEncoder.java * * Newland Co. Ltd. All rights reserved. * * @Description:RPC消息編碼介面 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class MessageEncoder extends MessageToByteEncoder<Object> { private MessageCodecUtil util = null; public MessageEncoder(final MessageCodecUtil util) { this.util = util; } protected void encode(final ChannelHandlerContext ctx, final Object msg, final ByteBuf out) throws Exception { util.encode(out, msg); } }
接下來是RPC消息的解碼器MessageDecoder,它繼承自Netty的ByteToMessageDecoder。主要針對二進位流反序列化成消息對象。當然了,在之前的一篇文章中我曾經提到,NettyRPC是基於TCP協議的,TCP在傳輸數據的過程中會出現所謂的“粘包”現象,所以我們的MessageDecoder要對RPC消息體的長度進行校驗,如果不滿足RPC消息報文頭中指定的消息體長度,我們直接重置一下ByteBuf讀索引的位置,具體可以參考如下的代碼方式,進行RPC消息協議的解析:
/** * @filename:MessageDecoder.java * * Newland Co. Ltd. All rights reserved. * * @Description:RPC消息解碼介面 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.io.IOException; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; public class MessageDecoder extends ByteToMessageDecoder { final public static int MESSAGE_LENGTH = MessageCodecUtil.MESSAGE_LENGTH; private MessageCodecUtil util = null; public MessageDecoder(final MessageCodecUtil util) { this.util = util; } protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { //出現粘包導致消息頭長度不對,直接返回 if (in.readableBytes() < MessageDecoder.MESSAGE_LENGTH) { return; } in.markReaderIndex(); //讀取消息的內容長度 int messageLength = in.readInt(); if (messageLength < 0) { ctx.close(); } //讀到的消息長度和報文頭的已知長度不匹配。那就重置一下ByteBuf讀索引的位置 if (in.readableBytes() < messageLength) { in.resetReaderIndex(); return; } else { byte[] messageBody = new byte[messageLength]; in.readBytes(messageBody); try { Object obj = util.decode(messageBody); out.add(obj); } catch (IOException ex) { Logger.getLogger(MessageDecoder.class.getName()).log(Level.SEVERE, null, ex); } } } }
現在,我們進一步實現,利用Kryo序列化方式,對RPC消息進行編解碼的模塊。首先是要實現NettyRPC消息序列化介面(RpcSerialize)的方法。
/** * @filename:KryoSerialize.java * * Newland Co. Ltd. All rights reserved. * * @Description:Kryo序列化/反序列化實現 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support.kryo; import newlandframework.netty.rpc.serialize.support.RpcSerialize; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.pool.KryoPool; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class KryoSerialize implements RpcSerialize { private KryoPool pool = null; public KryoSerialize(final KryoPool pool) { this.pool = pool; } public void serialize(OutputStream output, Object object) throws IOException { Kryo kryo = pool.borrow(); Output out = new Output(output); kryo.writeClassAndObject(out, object); out.close(); pool.release(kryo); } public Object deserialize(InputStream input) throws IOException { Kryo kryo = pool.borrow(); Input in = new Input(input); Object result = kryo.readClassAndObject(in); in.close(); pool.release(kryo); return result; } }
接著利用Kryo庫裡面的對象池,對RPC消息對象進行編解碼。首先是Kryo對象池工廠(KryoPoolFactory),這個也是我為什麼選擇kryo-3.0.3版本的原因了。代碼如下:
/** * @filename:KryoPoolFactory.java * * Newland Co. Ltd. All rights reserved. * * @Description:Kryo對象池工廠 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support.kryo; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.pool.KryoFactory; import com.esotericsoftware.kryo.pool.KryoPool; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse; import org.objenesis.strategy.StdInstantiatorStrategy; public class KryoPoolFactory { private static KryoPoolFactory poolFactory = null; private KryoFactory factory = new KryoFactory() { public Kryo create() { Kryo kryo = new Kryo(); kryo.setReferences(false); //把已知的結構註冊到Kryo註冊器裡面,提高序列化/反序列化效率 kryo.register(MessageRequest.class); kryo.register(MessageResponse.class); kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); return kryo; } }; private KryoPool pool = new KryoPool.Builder(factory).build(); private KryoPoolFactory() { } public static KryoPool getKryoPoolInstance() { if (poolFactory == null) { synchronized (KryoPoolFactory.class) { if (poolFactory == null) { poolFactory = new KryoPoolFactory(); } } } return poolFactory.getPool(); } public KryoPool getPool() { return pool; } }
Kryo對RPC消息進行編碼、解碼的工具類KryoCodecUtil,實現了RPC消息編解碼介面(MessageCodecUtil),具體實現方式如下:
/** * @filename:KryoCodecUtil.java * * Newland Co. Ltd. All rights reserved. * * @Description:Kryo編解碼工具類 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support.kryo; import com.esotericsoftware.kryo.pool.KryoPool; import io.netty.buffer.ByteBuf; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; import com.google.common.io.Closer; public class KryoCodecUtil implements MessageCodecUtil { private KryoPool pool; private static Closer closer = Closer.create(); public KryoCodecUtil(KryoPool pool) { this.pool = pool; } public void encode(final ByteBuf out, final Object message) throws IOException { try { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); closer.register(byteArrayOutputStream); KryoSerialize kryoSerialization = new KryoSerialize(pool); kryoSerialization.serialize(byteArrayOutputStream, message); byte[] body = byteArrayOutputStream.toByteArray(); int dataLength = body.length; out.writeInt(dataLength); out.writeBytes(body); } finally { closer.close(); } } public Object decode(byte[] body) throws IOException { try { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body); closer.register(byteArrayInputStream); KryoSerialize kryoSerialization = new KryoSerialize(pool); Object obj = kryoSerialization.deserialize(byteArrayInputStream); return obj; } finally { closer.close(); } } }
最後是,Kryo自己的編碼器、解碼器,其實只要調用Kryo編解碼工具類(KryoCodecUtil)裡面的encode、decode方法就可以了。現在貼出具體的代碼:
/** * @filename:KryoDecoder.java * * Newland Co. Ltd. All rights reserved. * * @Description:Kryo解碼器 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support.kryo; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; import newlandframework.netty.rpc.serialize.support.MessageDecoder; public class KryoDecoder extends MessageDecoder { public KryoDecoder(MessageCodecUtil util) { super(util); } }
/** * @filename:KryoEncoder.java * * Newland Co. Ltd. All rights reserved. * * @Description:Kryo編碼器 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support.kryo; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; import newlandframework.netty.rpc.serialize.support.MessageEncoder; public class KryoEncoder extends MessageEncoder { public KryoEncoder(MessageCodecUtil util) { super(util); } }
最後,我們再來實現一下,利用Hessian實現RPC消息的編碼、解碼器代碼模塊。首先還是Hessian序列化/反序列化實現(HessianSerialize),它同樣實現了RPC消息序列化/反序列化介面(RpcSerialize),對應的代碼如下:
/** * @filename:HessianSerialize.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hessian序列化/反序列化實現 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support.hessian; import com.caucho.hessian.io.Hessian2Input; import com.caucho.hessian.io.Hessian2Output; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import newlandframework.netty.rpc.serialize.support.RpcSerialize; public class HessianSerialize implements RpcSerialize { public void serialize(OutputStream output, Object object) { Hessian2Output ho = new Hessian2Output(output); try { ho.startMessage(); ho.writeObject(object); ho.completeMessage(); ho.close(); output.close(); } catch (IOException e) { e.printStackTrace(); } } public Object deserialize(InputStream input) { Object result = null; try { Hessian2Input hi = new Hessian2Input(input); hi.startMessage(); result = hi.readObject(); hi.completeMessage(); hi.close(); } catch (IOException e) { e.printStackTrace(); } return result; } }
現在利用對象池(Object Pooling)技術,對Hessian序列化/反序列化類(HessianSerialize)進行池化處理,對應的代碼如下:
/** * @filename:HessianSerializeFactory.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hessian序列化/反序列化對象工廠池 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support.hessian; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; public class HessianSerializeFactory extends BasePooledObjectFactory<HessianSerialize> { public HessianSerialize create() throws Exception { return createHessian(); } public PooledObject<HessianSerialize> wrap(HessianSerialize hessian) { return new DefaultPooledObject<HessianSerialize>(hessian); } private HessianSerialize createHessian() { return new HessianSerialize(); } }
/** * @filename:HessianSerializePool.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hessian序列化/反序列化池 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support.hessian; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; public class HessianSerializePool { //Netty採用Hessian序列化/反序列化的時候,為了避免重覆產生對象,提高JVM記憶體利用率,故引入對象池技術,經過測試 //遇到高併發序列化/反序列化的場景的時候,序列化效率明顯提升不少。 private GenericObjectPool<HessianSerialize> hessianPool; private static HessianSerializePool poolFactory = null; private HessianSerializePool() { hessianPool = new GenericObjectPool<HessianSerialize>(new HessianSerializeFactory()); } public static HessianSerializePool getHessianPoolInstance() { if (poolFactory == null) { synchronized (HessianSerializePool.class) { if (poolFactory == null) { poolFactory = new HessianSerializePool(); } } } return poolFactory; } //預留介面,後續可以通過Spring Property Placeholder依賴註入 public HessianSerializePool(final int maxTotal, final int minIdle, final long maxWaitMillis, final long minEvictableIdleTimeMillis) { hessianPool = new GenericObjectPool<HessianSerialize>(new HessianSerializeFactory()); GenericObjectPoolConfig config = new GenericObjectPoolConfig(); //最大池對象總數 config.setMaxTotal(maxTotal); //最小空閑數 config.setMinIdle(minIdle); //最大等待時間, 預設的值為-1,表示無限等待 config.setMaxWaitMillis(maxWaitMillis); //退出連接的最小空閑時間 預設1800000毫秒 config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); hessianPool.setConfig(config); } public HessianSerialize borrow() { try { return getHessianPool().borrowObject(); } catch (final Exception ex) { ex.printStackTrace(); return null; } } public void restore(final HessianSerialize object) { getHessianPool().returnObject(object); } public GenericObjectPool<HessianSerialize> getHessianPool() { return hessianPool; } }
Hessian序列化對象經過池化處理之後,我們通過Hessian編解碼工具類,來“借用”Hessian序列化對象(HessianSerialize),當然了,你借出來之後,一定要還回去嘛。Hessian編解碼工具類的實現方式如下:
/** * @filename:HessianCodecUtil.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hessian編解碼工具類 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support.hessian; import com.google.common.io.Closer; import io.netty.buffer.ByteBuf; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; public class HessianCodecUtil implements MessageCodecUtil { HessianSerializePool pool = HessianSerializePool.getHessianPoolInstance(); private static Closer closer = Closer.create(); public HessianCodecUtil() { } public void encode(final ByteBuf out, final Object message) throws IOException { try { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); closer.register(byteArrayOutputStream); HessianSerialize hessianSerialization = pool.borrow(); hessianSerialization.serialize(byteArrayOutputStream, message); byte[] body = byteArrayOutputStream.toByteArray(); int dataLength = body.length; out.writeInt(dataLength); out.writeBytes(body); pool.restore(hessianSerialization); } finally { closer.close(); } } public Object decode(byte[] body) throws IOException { try { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body); closer.register(byteArrayInputStream); HessianSerialize hessianSerialization = pool.borrow(); Object object = hessianSerialization.deserialize(byteArrayInputStream); pool.restore(hessianSerialization); return object; } finally { closer.close(); } } }
最後Hessian對RPC消息的編碼器、解碼器參考實現代碼如下所示:
/** * @filename:HessianDecoder.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hessian解碼器 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support.hessian; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; import newlandframework.netty.rpc.serialize.support.MessageDecoder; public class HessianDecoder extends MessageDecoder { public HessianDecoder(MessageCodecUtil util) { super(util); } }
/** * @filename:HessianEncoder.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hessian編碼器 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support.hessian; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; import newlandframework.netty.rpc.serialize.support.MessageEncoder; public class HessianEncoder extends MessageEncoder { public HessianEncoder(MessageCodecUtil util) { super(util); } }
到目前為止,NettyRPC所針對的Kryo、Hessian序列化協議模塊,已經設計實現完畢,現在我們就要把這個協議,嵌入NettyRPC的核心模塊包(newlandframework.netty.rpc.core),下麵只給出優化調整之後的代碼,其它代碼模塊的內容,可以參考我上一篇的文章:談談如何使用Netty開發實現高性能的RPC伺服器。好了,我們先來看下,NettyRPC核心模塊包(newlandframework.netty.rpc.core)的層次結構:
先來看下,NettyRPC服務端的實現部分。首先是,Rpc服務端管道初始化(MessageRecvChannelInitializer),跟上一版本對比,主要引入了序列化消息對象(RpcSerializeProtocol),具體實現代碼如下:
/** * @filename:MessageRecvChannelInitializer.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc服務端管道初始化 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel;