Netty 線程模型 Netty的線程模型主要是基於React,因為考慮到應用場景的不同所以演化出多種版本。 單線程模式 即接收服務請求以及執行IO操作都由一個線程來完成,由於採用的是IO多路復用這類無阻塞IO操作,所以在請求量不大的情況下單線程模式也是可以解決一部分場景問題的。 單接收多工作線程模 ...
Netty 線程模型
Netty的線程模型主要是基於React,因為考慮到應用場景的不同所以演化出多種版本。
單線程模式
即接收服務請求以及執行IO操作都由一個線程來完成,由於採用的是IO多路復用這類無阻塞IO操作,所以在請求量不大的情況下單線程模式也是可以解決一部分場景問題的。
單接收多工作線程模式
當請求量增大後,原有的一個線程處理所有IO操作變得越來越無法支撐相應的性能指標,所以提到了一個工作線程池的概念,此時接收服務請求還是一個線程,接收請求的線程收到請求後會委托給後面的工作線程池,從線程池中取得一個線程去執行用戶請求。
多接收多工作線程模式
當請求量進一步增大後,單一的接收服務請求的線程無法處理所有客戶端的連接,所以將接收服務請求的也擴展成線程池,由多個線程同時負責接收客戶端的連接。
RPC 業務線程
上面提到的都是Netty自身的線程模型,伴隨著請求量的增長而不斷發展出來的優化策略。而RPC請求對應用系統來講最主要還是業務邏輯的處理,而這類業務有可能是計算密集型的也有可以是IO密集型,像大多數應用都伴隨著資料庫操作,redis或者是連接其它的網路服務等。如果業務請求中有這類耗時的IO操作,推薦將處理業務請求的任務分配給獨立的線程池,否則可能會阻塞netty自身的線程。
接收請求線程與工作線程分工
- 接收請求線程主要負責創建鏈路,然後將請求委派給工作線程
- 工作線程負責編碼解碼讀取IO等操作
方案實現
目前我實現的RPC是採用多接收多工作線程模式,在服務端是這樣綁定埠的:
public void bind(ServiceConfig serviceConfig) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(this.rpcServerInitializer) .childOption(ChannelOption.SO_KEEPALIVE,true) ; try { ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync(); //... channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { throw new RpcException(e); } } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
boosGroup就是一組用來接收服務請求的
workerGroup就是一組具體負責IO操作的
增加業務線程只需要將handle的操作進一步委派給線程池即可,這裡為了擴展所以需要定義介面:
定義線程池介面
public interface RpcThreadPool { Executor getExecutor(int threadSize,int queues); }
實現固定大小線程池
參考了dubbo線程池
@Qualifier("fixedRpcThreadPool") @Component public class FixedRpcThreadPool implements RpcThreadPool { private Executor executor; @Override public Executor getExecutor(int threadSize,int queues) { if(null==executor) { synchronized (this) { if(null==executor) { executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //... } }); } } } return executor; } }
小插曲:
記的有一次一朋友突然問java 線程池中的那個coreSize是什麼意思?我頓時短路了,因平時也不怎麼寫多線程,想到平時用的比較多的資料庫線程池,裡面的參數倒是印象比較深,但就是想不起來有個coreSize。後來才又仔細看了下線程池的一些參數。現在借這個機會又可以多多再看看,以免再次短路。
線程池工廠
當有多個線程池實現時,通過線程池名稱來動態選擇線程池。
@Component public class RpcThreadPoolFactory { @Autowired private Map<String,RpcThreadPool> rpcThreadPoolMap; public RpcThreadPool getThreadPool(String threadPoolName){ return this.rpcThreadPoolMap.get(threadPoolName); } }
修改ChannelHandle的channelRead0方法
將方法體包裝成Task交給線程池去執行。
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) { this.executor.execute(new Runnable() { @Override public void run() { RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this); RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest)); channelHandlerContext.writeAndFlush(response); } }); }
問題
目前缺乏壓測,所以暫時沒有明確的數據對比。
源碼地址
https://github.com/jiangmin168168/jim-framework