Netty是Trustin Lee在2004年開發的一款高性能的網路應用程式框架。相比於JDK自帶的NIO,Netty做了相當多的增強,且隔離了jdk nio的實現細節,API也比較友好,還支持流量整形等高級特性。在我們常見的一些開源項目中已經普遍的應用到了Netty,比如Dubbo、Elastic ...
Netty是Trustin Lee在2004年開發的一款高性能的網路應用程式框架。相比於JDK自帶的NIO,Netty做了相當多的增強,且隔離了jdk nio的實現細節,API也比較友好,還支持流量整形等高級特性。在我們常見的一些開源項目中已經普遍的應用到了Netty,比如Dubbo、Elasticsearch、Zookeeper等。
Netty的具體開發
提示:因代碼相對較多,這裡只展示其主要部分,至於項目中用到的編解碼器、工具類,請直接拉到最後下載源碼!也歡迎順手給個Star~
需要的依賴
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jmx</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.29.Final</version>
</dependency>
Client端代碼
package com.example.nettydemo.client;
import com.example.nettydemo.client.codec.*;
import com.example.nettydemo.client.codec.dispatcher.OperationResultFuture;
import com.example.nettydemo.client.codec.dispatcher.RequestPendingCenter;
import com.example.nettydemo.client.codec.dispatcher.ResponseDispatcherHandler;
import com.example.nettydemo.common.RequestMessage;
import com.example.nettydemo.common.string.StringOperation;
import com.example.nettydemo.util.IdUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import javax.net.ssl.SSLException;
import java.util.concurrent.ExecutionException;
public class Client {
public static void main(String[] args) throws InterruptedException, ExecutionException, SSLException {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
//客戶端連接伺服器最大允許時間,預設為30s
bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000); //10s
NioEventLoopGroup group = new NioEventLoopGroup();
try {
bootstrap.group(group);
RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new FrameDecoder());
pipeline.addLast(new FrameEncoder());
pipeline.addLast(new ProtocolEncoder());
pipeline.addLast(new ProtocolDecoder());
pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
pipeline.addLast(new OperationToRequestMessageEncoder());
// pipeline.addLast(loggingHandler);
}
});
//連接服務
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888);
//因為future是非同步執行,所以需要先連接上後,再進行下一步操作
channelFuture.sync();
long streamId = IdUtil.nextId();
/**
* 發送數據測試,按照定義的規則組裝數據
*/
// OrderOperation orderOperation = new OrderOperation(1001, "你好啊,hi");
RequestMessage requestMessage = new RequestMessage(streamId, new StringOperation(1234, "你好啊,hi"));
//將future放入center
OperationResultFuture operationResultFuture = new OperationResultFuture();
requestPendingCenter.add(streamId, operationResultFuture);
//發送消息
for (int i = 0; i < 10; i++) {
channelFuture.channel().writeAndFlush(requestMessage);
}
//阻塞等待結果,結果來了之後會調用ResponseDispatcherHandler去set結果
// OperationResult operationResult = operationResultFuture.get();
// //將結果列印
// System.out.println("返回:"+operationResult);
channelFuture.channel().closeFuture().get();
} finally {
group.shutdownGracefully();
}
}
}
Server端代碼
package com.example.nettydemo.server;
import com.example.nettydemo.server.codec.FrameDecoder;
import com.example.nettydemo.server.codec.FrameEncoder;
import com.example.nettydemo.server.codec.ProtocolDecoder;
import com.example.nettydemo.server.codec.ProtocolEncoder;
import com.example.nettydemo.server.handler.MetricsHandler;
import com.example.nettydemo.server.handler.ServerIdleCheckHandler;
import com.example.nettydemo.server.handler.ServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
import lombok.extern.slf4j.Slf4j;
import javax.net.ssl.SSLException;
import java.security.cert.CertificateException;
import java.util.concurrent.ExecutionException;
/**
* netty server 入口
*/
@Slf4j
public class Server {
public static void main(String... args) throws InterruptedException, ExecutionException, CertificateException, SSLException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
//設置channel模式,因為是server所以使用NioServerSocketChannel
serverBootstrap.channel(NioServerSocketChannel.class);
//最大的等待連接數量
serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
//設置是否啟用 Nagle 演算法:用將小的碎片數據連接成更大的報文 來提高發送效率。
//如果需要發送一些較小的報文,則需要禁用該演算法
serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
//設置netty自帶的log,並設置級別
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
//thread
//用戶指定線程名
NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
NioEventLoopGroup workGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
UnorderedThreadPoolEventExecutor businessGroup = new UnorderedThreadPoolEventExecutor(10, new DefaultThreadFactory("business"));
//只能使用一個線程,因GlobalTrafficShapingHandler比較輕量級
NioEventLoopGroup eventLoopGroupForTrafficShaping = new NioEventLoopGroup(0, new DefaultThreadFactory("TS"));
try {
//設置react方式
serverBootstrap.group(bossGroup, workGroup);
//metrics
MetricsHandler metricsHandler = new MetricsHandler();
//trafficShaping流量整形
//long writeLimit 寫入時控制, long readLimit 讀取時控制 具體設置看業務修改
GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(eventLoopGroupForTrafficShaping, 10 * 1024 * 1024, 10 * 1024 * 1024);
//log
LoggingHandler debugLogHandler = new LoggingHandler(LogLevel.DEBUG);
LoggingHandler infoLogHandler = new LoggingHandler(LogLevel.INFO);
//設置childHandler,按執行順序放
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("debugLog", debugLogHandler);
pipeline.addLast("tsHandler", globalTrafficShapingHandler);
pipeline.addLast("metricHandler", metricsHandler);
pipeline.addLast("idleHandler", new ServerIdleCheckHandler());
pipeline.addLast("frameDecoder", new FrameDecoder());
pipeline.addLast("frameEncoder", new FrameEncoder());
pipeline.addLast("protocolDecoder", new ProtocolDecoder());
pipeline.addLast("protocolEncoder", new ProtocolEncoder());
pipeline.addLast("infoLog", infoLogHandler);
//對flush增強,減少flush次數犧牲延遲增強吞吐量
pipeline.addLast("flushEnhance", new FlushConsolidationHandler(10, true));
//為業務處理指定單獨的線程池
pipeline.addLast(businessGroup, new ServerProcessHandler());//businessGroup,
}
});
//綁定埠並阻塞啟動
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
businessGroup.shutdownGracefully();
eventLoopGroupForTrafficShaping.shutdownGracefully();
}
}
}
最後
以上介紹了Netty的基本用法,在代碼中也做了一部分的關鍵註釋,但可能還會有許多不足,也不可能滿足所有人的要求,大家可根據自己的實際需求去改造此項目。附上源碼地址netty源碼
持續學習,記錄點滴。更多文章請訪問 文章首發