wmproxy將用Rust實現http/https代理, socks5代理, 反向代理, 靜態文件服務,講述的是主動式健康檢查可帶來的好處 ...
wmproxy
wmproxy
將用Rust
實現http/https
代理, socks5
代理, 反向代理, 靜態文件伺服器,後續將實現websocket
代理, 內外網穿透等, 會將實現過程分享出來, 感興趣的可以一起造個輪子法
項目地址
gite: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
為什麼我們需要主動
主動可以讓我們掌握好系統的穩定性,假設我們有一條連接不可達,連接超時的判定是5秒,需要檢測失敗3次才認定為失敗,那麼此時從我們開始檢測,到判定失敗需要耗時15秒。
如果此時我們是個高併發的系統,每秒的QPS是1000,我們有三個地址判定,那麼此時我們有1/3的失敗概率。那麼在15秒內,我們會收到15000個請求,會造成5000個請求失敗,如果是重要的數據,我們會丟失很多重要數據。
如果此時客戶端擁有重試機制,那麼客戶端在失敗的時候會發起重試,而且系統可能會反覆的分配到那台不可達的系統,將會造成短時間內請求數激增,可能引發系統的雪崩。
所以此時我們主動知道目標端的系統穩定性極其重要。
網路訪問示意圖
sequenceDiagram participant 客戶端 participant 代理伺服器 客戶端->>代理伺服器: 請求數據(0.5s) 代理伺服器->>後端1: 連接並請求數據(5s)失敗 Note right of 後端1: 機器宕機不可達 代理伺服器-->>客戶端: 返回失敗0.5s(總耗時6s) 客戶端->>代理伺服器: 重新請求數據(0.5s) 代理伺服器->>後端2: 請求數據成功(0.2s) 後端2-->>代理伺服器: 返回數據成功(0.2s) 代理伺服器-->> 客戶端: 返回數據成功0.5s(總耗時1.4s)以下是沒有主動健康檢查
如果出錯的時候,一個請求的平均時長可能會達到(1.4s + 5s) / 2 = (3.2s)
,比正常訪問多了(3.2 - 1.4) = 1.8s
,節點的宕機會對系統的穩定性產生較大的影響
sequenceDiagram 客戶端->>代理伺服器: 請求數據(0.5s) loop 健康檢查 代理伺服器->>伺服器組(只訪問1): 定時請求,保證存活,1檢查成功,2檢查失敗 end Note right of 伺服器組(只訪問1): 處理客戶端數據 代理伺服器 -->> 伺服器組(只訪問1): 請求數據(0.2s) 伺服器組(只訪問1) -->> 代理伺服器: 返回數據成功(0.2s) 代理伺服器-->>客戶端: 返回數據成功(0.5s)(總耗時1.4s)以下是主動健康檢查,它保證了訪問後端伺服器組均是正常的狀態
伺服器2
出錯的時候,主動檢查已經檢查出伺服器2
不可用,負載均衡的時候選擇已經把伺服器2
摘除,所以系統的平均耗時1.4s,系統依然保持穩定
健康檢查的種類
在目前的系統中有以下兩分類:
- HTTP 請求特定的方法及路徑,判斷返回是否得到預期的status或者body
- TCP 僅只能測試連通性,如果能連接表示正常,會出現能連接但無服務的情況
健康檢查的準備
我們需要從配置中讀出所有的需要健康檢查的類型,即需要去重,把同一個指向的地址過濾掉
配置有可能被重新載入,所以我們需要預留髮送配置的方式(或者後續類似nginx用新開進程的方式則不需要),此處做一個預留。
-
如何去重
像這種簡單級別的去重通常用HashSet
複雜度為O(1)
或者用簡單的Vec
複雜度為O(n)
,以SocketAddr
的為鍵值,判斷是否有重覆的數據。 -
如何保證不影響主線程
把健康請求的方法移到非同步函數,用tokio::spawn
中處理,在健康檢查的情況下保證不影響其它數據處理 -
如果同時處理多個地址的健康檢查
每一次健康檢查都會在一個非同步函數中執行,在我們調用完請求後,我們會對當前該非同步進行tokio::time::sleep
以讓出當前CPU。 -
如何按指定間隔時間請求
因為每一次健康請求都是在非同步函數中,我們不確認之前的非同步是否完成,所以我們在每次請求前都記錄last_request
,我們在請求前調用HealthCheck::check_can_request
判斷當前是否可以發送請求來保證間隔時間內不多次請求造成伺服器的壓力。 -
超時連接判定處理
利用tokio::time::timeout
和future
做組合,等超時的時候直接按錯誤處理
部分實現源碼
主要源碼定義在check/active.rs
中,主要的定義兩個類
/// 單項健康檢查
#[derive(Debug, Clone)]
pub struct OneHealth {
/// 主動檢查地址
pub addr: SocketAddr,
/// 主動檢查方法, 有http/https/tcp等
pub method: String,
/// 每次檢查間隔
pub interval: Duration,
/// 最後一次記錄時間
pub last_record: Instant,
}
/// 主動式健康檢查
pub struct ActiveHealth {
/// 所有的健康列表
pub healths: Vec<OneHealth>,
/// 接收健康列表,當配置變更時重新載入
pub receiver: Receiver<Vec<OneHealth>>,
}
我們在配置的時候獲取所有需要主動檢查的數據
/// 獲取所有待健康檢查的列表
pub fn get_health_check(&self) -> Vec<OneHealth> {
let mut result = vec![];
let mut already: HashSet<SocketAddr> = HashSet::new();
if let Some(proxy) = &self.proxy {
// ...
}
if let Some(http) = &self.http {
// ...
}
result
}
主要的檢查源碼,所有的最終信息都落在HealthCheck
中的靜態變數里:
pub async fn do_check(&self) -> ProxyResult<()> {
// 防止短時間內健康檢查的連接過多, 做一定的超時處理, 或者等上一條消息處理完畢
if !HealthCheck::check_can_request(&self.addr, self.interval) {
return Ok(())
}
if self.method.eq_ignore_ascii_case("http") {
match tokio::time::timeout(self.interval + Duration::from_secs(1), self.connect_http()).await {
Ok(r) => match r {
Ok(r) => {
if r.status().is_server_error() {
log::trace!("主動健康檢查:HTTP:{}, 返回失敗:{}", self.addr, r.status());
HealthCheck::add_fall_down(self.addr);
} else {
HealthCheck::add_rise_up(self.addr);
}
}
Err(e) => {
log::trace!("主動健康檢查:HTTP:{}, 發生錯誤:{:?}", self.addr, e);
HealthCheck::add_fall_down(self.addr);
}
},
Err(e) => {
log::trace!("主動健康檢查:HTTP:{}, 發生超時:{:?}", self.addr, e);
HealthCheck::add_fall_down(self.addr);
},
}
} else {
match tokio::time::timeout(Duration::from_secs(3), self.connect_http()).await {
Ok(r) => {
match r {
Ok(_) => {
HealthCheck::add_rise_up(self.addr);
}
Err(e) => {
log::trace!("主動健康檢查:TCP:{}, 發生錯誤:{:?}", self.addr, e);
HealthCheck::add_fall_down(self.addr);
}
}
}
Err(e) => {
log::trace!("主動健康檢查:TCP:{}, 發生超時:{:?}", self.addr, e);
HealthCheck::add_fall_down(self.addr);
}
}
}
Ok(())
}
結語
主動檢查可以及時的更早的發現系統中不穩定的因素,是系統穩定性的基石,也可以通過更早的發現因素來通知運維介入,我們的目的是使系統更穩定,更健壯,處理延時更少。
點擊 [關註],[在看],[點贊] 是對作者最大的支持