在前兩個版本中,每次發起請求一次就新建一個netty的channel連接,如果在高併發情況下就會造成資源的浪費,這時實現 非同步請求 就十分重要,當有多個請求線程時,需要設計一個 線程池 來進行管理。除此之外,當前方法過於依賴註冊中心,在高併發情況下對註冊中心造成了壓力;另外如果註冊中心出現宕機等情況 ...
在前兩個版本中,每次發起請求一次就新建一個netty的channel連接,如果在高併發情況下就會造成資源的浪費,這時實現非同步請求就十分重要,當有多個請求線程時,需要設計一個線程池來進行管理。除此之外,當前方法過於依賴註冊中心,在高併發情況下對註冊中心造成了壓力;另外如果註冊中心出現宕機等情況,那麼整合系統就崩潰了,為瞭解決這個問題,添加了一個適合高併發的服務緩存機制。以上為該版本的新增內容。
非同步請求和線程池
這裡就不具體介紹非同步請求的概念了。用一個通俗的例子解釋,如你在飯店點餐,當你點好餐後,會得到一個點餐號,但是飯菜並不會立即做好送過,需要你等待一段時間,在這個時間段中,你可以做其他的事情,當飯菜做好後,會根據點餐號進行廣播,通知你去拿飯菜。這就是一個典型的非同步處理。
在項目中涉及到非同步的主要有三個自定義類,即ChannelHolder
,LwRequestPool
和LwRequestManager
。
在ChannelHolder
中定義的變數:
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChannelHolder {
private Channel channel;
private EventLoopGroup eventLoopGroup;
}
在LwRequestManager
中的變數:
private static final ConcurrentHashMap<String, ChannelHolder> channelHolderMap = new ConcurrentHashMap<>();
private static ExecutorService requestExecutor = new ThreadPoolExecutor(30, 100, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(30),
new BasicThreadFactory.Builder().namingPattern("request-service-connector-%d").build());
private static LwRequestPool requestPool = SpringBeanFactory.getBean(LwRequestPool.class);
在LwRequestPool
中定義的變數:
private final ConcurrentHashMap<String, Promise<LwResponse>> requestPool = new ConcurrentHashMap<>();
剛開始在動態代理中會調用send()
方法,開始了有關非同步調用的內容。通過requestId來確定是哪個請求,利用線程池執行netty客戶端的運行,並利用CountDownLatch
來先暫停下麵代碼的運行,如果latch執行了countDown()方法,會再返回這裡執行下麵的步驟。
public static void send(LwRequest request, URL url) throws Exception{
String requestId = request.getRequestId();
CountDownLatch latch = new CountDownLatch(1);
requestExecutor.execute(new NettyClient(requestId, url, latch));
latch.await();
ChannelHolder channelHolder = channelHolderMap.get(requestId);
channelHolder.getChannel().writeAndFlush(request);
log.info("客戶端發送消息:{}", channelHolder);
}
之後運行Netty客戶端中的run()方法,如果與服務端連接成功,將該請求id和對應的channel註冊到channelHolderMap
變數中,並執行submitRequest
方法,將請求id和eventLoop註冊到變數requestPool
中。最後執行了countDown()
方法。
@Override
public void run() {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4));
pipeline.addLast(new LwRpcEncoder(LwRequest.class, new HessianSerializer()));
pipeline.addLast(new LwRpcDecoder(LwResponse.class, new HessianSerializer()));
pipeline.addLast(clientHandler);
}
});
try {
ChannelFuture future = bootstrap.connect(url.getHostname(), url.getPort()).sync();
//連接成功
if (future.isSuccess()) {
ChannelHolder channelHolder = ChannelHolder.builder()
.channel(future.channel())
.eventLoopGroup(group).build();
LwRequestManager.registerChannelHolder(requestId, channelHolder);
latch.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
requestPool.submitRequest(requestId, channelHolder.getChannel().eventLoop());
public void submitRequest(String requestId, EventExecutor executor) {
requestPool.put(requestId, new DefaultPromise<>(executor));
}
當執行了countDown()
方法,會跳轉到原來最初的地方,執行剩下的代碼部分,進行請求發送。等待服務端的響應。
ChannelHolder channelHolder = channelHolderMap.get(requestId);
channelHolder.getChannel().writeAndFlush(request);
當客戶端接收到服務端發回的結果信息時,會執行notifyRequest
方法。
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LwResponse response) throws Exception {
lwRequestPool.notifyRequest(response.getRequestId(), response);
}
在notifyRequest
方法中,會從變數requestPool
中獲取到返回的LwResponse
變數,並封裝在Promise
中,最後調用setsuccess()方法。
public void notifyRequest(String requestId, LwResponse response) {
Promise<LwResponse> promise = requestPool.get(requestId);
if (promise != null) {
promise.setSuccess(response);
}
}
setsuccess()
方法是netty的Promise中的方法。它會通知所有的監聽器。在官方解釋如下:
Marks this future as a success and notifies all
此時就可以通過fetchResponse
根據請求id獲取到了服務端發送過來的消息,此時已經執行完畢,需要從requestpool
中刪除該請求信息。
LwResponse response = lwRequestPool.fetchResponse(requestId);
public LwResponse fetchResponse(String requestId) throws Exception {
Promise<LwResponse> promise = requestPool.get(requestId);
if (promise == null)
return null;
LwResponse response = promise.get(10, TimeUnit.SECONDS);
requestPool.remove(requestId);
LwRequestManager.destroyChannelHolder(requestId);
return response;
}
高併發下的緩存機制
在原來的版本中,每次請求遠程服務時,都需要從註冊中心獲取服務地址,在高併發情況下,會對註冊中心造成一定的影響;或者如果註冊中心突然宕機,那麼就無法獲取待服務地址,整個系統就崩潰了。所以設計一個緩存機制,將請求到的服務地址持久化到本地,當下次請求時,就無須再需要註冊中心了,直接從持久化文件中獲取,減輕了註冊中心的壓力。
在進行本地緩存時,會先調用saveServices
方法,將URL數組信息保存到Properties
中,並獲取當前version
版本號,然後執行doSaveProperties
方法來保存到本地。這個步驟支持同步和非同步兩種方式。
public void saveServices(String serviceName, List<URL> urlList) {
if (file == null)
return;
try {
StringBuilder buf = new StringBuilder();
for(URL url : urlList) {
if (buf.length() > 0) {
buf.append(";");
}
buf.append(url.getAllInformation());
}
properties.setProperty(serviceName, buf.toString());
long version = lastCacheChanged.incrementAndGet();
if (syncSaveFile) {
doSaveProperties(version);
} else {
registerCacheExecutor.execute(new SaveProperties(version));
}
} catch (Throwable t) {
log.warn(t.getMessage(), t);
}
}
在doSaveProperties
方法中,如果傳入的版本號不是最新的版本號,說明其他線程已經修改了,內容發生了變化,直接退出。在寫入到文件時會添加鎖,進一步保證信息的準確性。如果添加失敗,會進行重試操作。
private void doSaveProperties(long version) {
if (version < lastCacheChanged.get())
return;
if (file == null)
return;
try {
File lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
try(RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
FileChannel channel = raf.getChannel();) {
FileLock lock = channel.tryLock();
if (lock == null) {
throw new IOException("不能鎖住註冊的緩存文件");
}
try {
if (!file.exists()) {
file.createNewFile();
}
try (FileOutputStream outputFile = new FileOutputStream(file)) {
properties.store(outputFile, "RPC Server Cache");
}
} finally {
lock.release();
}
}
}catch (Throwable e) {
savePropertiesRetryTimes.incrementAndGet();
if (savePropertiesRetryTimes.get() > SAVE_MAX_RETRY) {
log.warn("超過最大重試次數,緩存失敗!");
savePropertiesRetryTimes.set(0);
return;
}
if (version < lastCacheChanged.get()) {
savePropertiesRetryTimes.set(0);
return;
}
e.printStackTrace();
}
}
具體詳細代碼可以到我的項目中進行查看:輕量級RPC第三版