在網路傳輸中,怎麼確保通道連接的可用性是一個很重要的問題,簡單的說,在網路通信中有客戶端和服務端,一個負責發送請求,一個負責接收請求,在保證連接有效性的背景下,這兩個物體扮演了什麼角色,心跳機制能有效的保證連接的可用性,那它的機制是什麼,下文中將會詳細講解。 網路層的可用性 首先講一下TCP,在du ...
在網路傳輸中,怎麼確保通道連接的可用性是一個很重要的問題,簡單的說,在網路通信中有客戶端和服務端,一個負責發送請求,一個負責接收請求,在保證連接有效性的背景下,這兩個物體扮演了什麼角色,心跳機制能有效的保證連接的可用性,那它的機制是什麼,下文中將會詳細講解。
網路層的可用性
首先講一下TCP,在dubbo中的通信是基於TCP的,TCP本身並沒有長短連接的區別,在短連接中,每次通信時,都會創建Socket,當該次通信結束後,就會調用socket.close();而在長連接中,每次通信完畢後,不會關閉連接,這樣就可以做到連接的復用,長連接的好處是省去了創建連接時的耗時。那麼如何確保連接的有效性呢,在TCP中用到了KeepAlive機制,keepalive並不是TCP協議的一部分,但是大多數操作系統都實現了這個機制,在一定時間內,在鏈路上如果沒有數據傳送的情況下,TCP層將會發送相應的keepalive探針來確定連接可用性,探測失敗後重試10次(tcp_keepalive_probes
),每次間隔時間為75s(tcp_keepalive_intvl
),所有探測失敗後,才認為當前連接已經不可用了。
KeepAlive機制是在網路層保證了連接的可用性,但在應用層我們認為這還是不夠的。
- KeepAlive的報活機制只有在鏈路空閑的情況下才會起作用,假如此時有數據發送,且物理鏈路已經不通,操作系統這邊的鏈路狀態還是E STABLISHED,這時會發生TCP重傳機制,要知道預設的TCP超時重傳,指數退避演算法也是一個相當長的過程。
KeepAlive本身是面向網路的,並不是面向於應用的,可能是由於本身GC問題,系統load高等情況,但網路依然是通的,此時,應用已經失去了活性,所以連接自然認為是不可用的。
應用層的連接可用性:心跳機制
如何理解應用層的心跳?簡單的說,就是客戶端會開啟一個定時任務,定時對已經建立連接的對端應用發送請求,服務端則需要特殊處理該請求,返迴響應。如果心跳持續多次沒有收到響應,客戶端會認為連接不可用,主動斷開連接。
客戶端如何得知請求失敗了?
在失敗的場景下,服務端是不會返迴響應的,所以只能在客戶端自身上設計了。
當客戶端發起一個RPC請求時,會設置一個超時時間client_timeout,同時它也會開啟一個延遲的client_timeout的定時器。當接收到正常響應時,會移除該定時器;而當計時器倒計時完畢後,還沒有被移除,則會認為請求超時,構造一個失敗的響應傳遞給客戶端。
連接建立時創建定時器
HeaderExchangeClient類
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
// 創建信息交換通道
this.channel = new HeaderExchangeChannel(client);
// 獲得dubbo版本
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
//獲得心跳周期配置,如果沒有配置,並且dubbo是1.0版本的,則這隻為1分鐘,否則設置為0
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
// 獲得心跳超時配置,預設是心跳周期的三倍
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (needHeartbeat) {
// 開啟心跳
long tickDuration = calculateLeastDuration(heartbeat);
heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true) , tickDuration, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
startHeartbeatTimer();
}
}
創建了一個HashedWheelTimer
開啟心跳檢測,這是 Netty 所提供的一個經典的時間輪定時器實現。
HeaderExchangeServer
也同時開啟了定時器,代碼邏輯和上述差不多。
開啟兩個定時任務
private void startHeartbeatTimer() {
long heartbeatTick = calculateLeastDuration(heartbeat);
long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
HeartbeatTimerTask heartBeatTimerTask =new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}
在該方法中主要開啟了兩個定時器
- HeartbeatTimerTask 主要是定時發送心跳請求
- ReconnectTimerTask 主要是心跳失敗後處理重連,斷連的邏輯
舊版的心跳處理HeartBeatTask類
final class HeartBeatTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
/**
* 通道管理
*/
private ChannelProvider channelProvider;
/**
* 心跳間隔 單位:ms
*/
private int heartbeat;
/**
* 心跳超時時間 單位:ms
*/
private int heartbeatTimeout;
HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
this.channelProvider = provider;
this.heartbeat = heartbeat;
this.heartbeatTimeout = heartbeatTimeout;
}
@Override
public void run() {
try {
long now = System.currentTimeMillis();
// 遍歷所有通道
for (Channel channel : channelProvider.getChannels()) {
// 如果通道關閉了,則跳過
if (channel.isClosed()) {
continue;
}
try {
// 最後一次接收到消息的時間戳
Long lastRead = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP);
// 最後一次發送消息的時間戳
Long lastWrite = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
// 如果最後一次接收或者發送消息到時間到現在的時間間隔超過了心跳間隔時間
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
// 創建一個request
Request req = new Request();
// 設置版本號
req.setVersion(Version.getProtocolVersion());
// 設置需要得到響應
req.setTwoWay(true);
// 設置事件類型,為心跳事件
req.setEvent(Request.HEARTBEAT_EVENT);
// 發送心跳請求
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
}
}
// 如果最後一次接收消息的時間到現在已經超過了超時時間
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
// 如果該通道是客戶端,也就是請求的伺服器掛掉了,客戶端嘗試重連伺服器
if (channel instanceof Client) {
try {
// 重新連接伺服器
((Client) channel).reconnect();
} catch (Exception e) {
//do nothing
}
} else {
// 如果不是客戶端,也就是是服務端返迴響應給客戶端,但是客戶端掛掉了,則服務端關閉客戶端連接
channel.close();
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
} catch (Throwable t) {
logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
}
}
interface ChannelProvider {
// 獲得所有的通道集合,需要心跳的通道數組
Collection<Channel> getChannels();
}
}
它首先遍歷所有的Channel,在服務端對用的是所有客戶端連接,在客戶端對應的是服務端連接,判斷當前TCP連接是否空閑,如果空閑就發送心跳報文,判斷是否空閑,根據Channel是否有讀或寫來決定,比如一分鐘內沒有讀或寫就發送心跳報文,然後是處理超時的問題,處理客戶端超時重新建立TCP連接,目前的策略是檢查是否在3分鐘內都沒有成功接受或發送報文,如果在服務端檢測則就會主動關閉遠程客戶端連接。
新版本的心跳機制
定時任務一: 發送心跳請求
在新版本下,去除了HeartBeatTask類,添加了HeartbeatTimerTask和ReconnectTimerTask類
public class HeartbeatTimerTask extends AbstractTimerTask {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatTimerTask.class);
private final int heartbeat;
HeartbeatTimerTask(ChannelProvider channelProvider, Long heartbeatTick, int heartbeat) {
super(channelProvider, heartbeatTick);
this.heartbeat = heartbeat;
}
@Override
protected void doTask(Channel channel) {
try {
Long lastRead = lastRead(channel);
Long lastWrite = lastWrite(channel);
if ((lastRead != null && now() - lastRead > heartbeat)
|| (lastWrite != null && now() - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: "
+ heartbeat + "ms");
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
}
Dubbo採取的是雙向心跳設計,即服務端會向客戶端發送心跳,客戶端也會向服務端發送心跳,接收的一方更新lastread欄位,發送的一方更新lastWrite欄位,超過心跳間隙的時間,便發送心跳請求給對端。
定時任務二: 處理重連和斷連
public class ReconnectTimerTask extends AbstractTimerTask {
private static final Logger logger = LoggerFactory.getLogger(ReconnectTimerTask.class);
private final int idleTimeout;
public ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int idleTimeout) {
super(channelProvider, heartbeatTimeoutTick);
this.idleTimeout = idleTimeout;
}
@Override
protected void doTask(Channel channel) {
try {
Long lastRead = lastRead(channel);
Long now = now();
// Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection
if (!channel.isConnected()) {
try {
logger.info("Initial connection to " + channel);
((Client) channel).reconnect();
} catch (Exception e) {
logger.error("Fail to connect to " + channel, e);
}
// check pong at client
} else if (lastRead != null && now - lastRead > idleTimeout) {
logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: "
+ idleTimeout + "ms");
try {
((Client) channel).reconnect();
} catch (Exception e) {
logger.error(channel + "reconnect failed during idle time.", e);
}
}
} catch (Throwable t) {
logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t);
}
}
}
不同類型處理機制不同,當超過設置的心跳總時間後,客戶端選擇的是重新連接,服務端是選擇直接斷開連接。
心跳改進方案
Netty對空閑連接的檢測提供了天然的支持,使用IdleStateHandler可以很方便的實現空閑檢測邏輯。
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit){}
- readerIdleTime: 讀超時的時間
- writerIdleTime: 寫超時的時間
- allIdleTime: 所有類型的超時時間
客戶端和服務端配置
客戶端:
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("clientIdleHandler", new IdleStateHandler(60, 0, 0));
}
});
服務端:
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("serverIdleHandler",new IdleStateHandler(0, 0, 200));
}
}
從上面看出,客戶端配置了read超時為60s,服務端配置了write/read超時未200s,
空閑超時邏輯-客戶端
對於空閑超時的處理邏輯,客戶端和服務端是不同的,首先來看客戶端的:
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// send heartbeat
sendHeartBeat();
} else {
super.userEventTriggered(ctx, evt);
}
}
檢測到空閑超時後,採取的行為是向服務端發送心跳包,
public void sendHeartBeat() {
Invocation invocation = new Invocation();
invocation.setInvocationType(InvocationType.HEART_BEAT);
channel.writeAndFlush(invocation).addListener(new CallbackFuture() {
@Override
public void callback(Future future) {
RPCResult result = future.get();
//超時 或者 寫失敗
if (result.isError()) {
channel.addFailedHeartBeatTimes();
if (channel.getFailedHeartBeatTimes() >= channel.getMaxHeartBeatFailedTimes()) {
channel.reconnect();
}
} else {
channel.clearHeartBeatFailedTimes();
}
}
});
}
構造一個心跳包發送到服務端,接受響應結果
- 響應成功,清除請求失敗標記
響應失敗,心跳失敗標記+1,如果超過配置的失敗次數,則重新連接
空閑超時邏輯 - 服務端
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
channel.close();
} else {
super.userEventTriggered(ctx, evt);
}
}
服務端直接關閉連接。