用Rust手把手編寫一個wmproxy(代理,內網穿透等), HTTP內網穿透支持修改頭信息項目 涉及HTTP1.1 chunked, http2, keep-alive ...
用Rust手把手編寫一個wmproxy(代理,內網穿透等), HTTP內網穿透支持修改頭信息
項目 ++wmproxy++
gite: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
修改header參數
但凡代理之類,基本上都有修改頭參數的需求,就比如要獲取客戶端的真實IP,需要寫入
x-forward-for
表示客戶端的真實IP,要不然經過轉發後的HTTP無法獲取真實的客戶端地址。
所以需要在轉發的同時能進行處理頭部信息的相關參數。故內網端不能僅做流量轉發。而且客戶端可能直接以純HTTP2
的協議請求內網的數據,所以同時需要支持HTTP/1.1及HTTP2
,由於以上需求,我們把之前的簡單的轉發邏輯改成以服務端接收客戶端請求的模式對數據進行重加工。
新流程如下
graph TD A[外網客戶端] B[代理服務端-外網] C[請求端] D[新的請求端] E[代理客戶端--內網] F[內網伺服器] A-->|請求http埠| B B -->|解析成Request| C C -->|修改Request中的Header|D D -->|發送HTTP請求數據給CenterClient|E E -->|請求內網伺服器轉發數據|F以下是數據從外網進入到內網伺服器的加工流程
graph TD A[外網客戶端] B[代理服務端-外網] C[新的返回端] D[返回端] E[代理客戶端--內網] F[內網伺服器] F -->|返回Response|E E -->|發送HTTP數據給CenterServer|D D -->|修改頭信息加工|C C -->|將數據轉發給|B B -->|返回數據|A以下是內網伺服器返回數據給外網客戶端的流程
轉發中的註意事項
我們可以獲取完整的Request再進行請求嗎?
如果我們這麼操作,當數據包非常的大的時候例如1G,我們此時在記憶體中將有完整的1G記憶體,那麼此時只需有數個同一類的請求,將會耗盡我們的記憶體,所以我們必須不能這麼處理。
超大文件下載的轉發
超大文件必須將得到的數據及時的轉發給客戶端,此時在記憶體中的值才不至於太大,又能及時的傳輸給客戶端,要不然可能大文件下載到中轉伺服器的時間內客戶端得不到任何數據就會空耗掉這時間。
http/1.1中的chunked的處理
因為http/1.1的chunked協議,由RFC 2616
定義,
分塊編碼(Transfer-Encoding: chunked)是超文本傳輸協議(HTTP)中的一種數據傳輸機制,允許HTTP由網頁伺服器發送給客戶端的數據可以分成多個部分。分塊傳輸編碼只在HTTP協議1.1版本(HTTP/1.1)中提供,如果頭部中有該選項,則代表數據包是chunked
格式。
數據分解成一系列數據塊,並以一個或多個塊發送,這樣伺服器可以發送數據而不需要預先知道發送內容的總大小。
比如我們常看到的
for data in res.chunk() {
}
就是表示的是數據分段接收,對於大數據這個尤為重要。
此種報文的示例
這時,報文中的實體需要改為用一系列分塊來傳輸。
每個分塊包含十六進位的長度值和數據,長度值獨占一行,長度不包括它結尾的 CRLF(\r\n),也不包括分塊數據結尾的 CRLF。
最後一個分塊長度值必須為 0,對應的分塊數據沒有內容,表示實體結束。
例:
HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked
a\r\n
01234567890\r\n
1e\r\n
wmproxy is very good nat tool\r\n
0\r\n
\r\n
此種報文中我們必須進行解析,因為客戶端可能是
keep-alive
選項,可以連續進行多發。所以收到的Request和Response都是連續的。必須知道何處結束才能繼續解析下一個Request/Response。http2不需要,因為http2自帶的data分包機制就有這些數據的處理
header數據的定義
- header的修改分為兩部分,一部分是對請求
Request
的重寫,另一部分是對返回Response
的重寫。所以我們必須同時支持這兩種,且將其區分出來。每條header信息我們將定定義一個可變長的數組,如第一個字元為proxy
則表示對Request
修改。 - 關於修改的動作有
- 添加,如
x-forward-for
需要末尾添加,我們用操作符+
,比如[proxy, +, x-forward-for, $client_ip]
- 刪除,我們用操作符
+
,如[-, hidden]
- 設置,設置我們預設不做任何參數,直接以header_name開頭,如
[custom-key, custom-value]
- 預設值,有些值有了參數我們就不將其重寫,如果沒有我們則設為預設值,我們用操作符
?
,如[?, server, wmproxy]
- 添加,如
所以我們client.yaml的配置新增至如下:
# 連接服務端地址
server: 127.0.0.1:8091
# 連接服務端是否加密
ts: true
# 內網映射配置的數組
mappings:
#將localhost的功能變數名稱轉發到本地的127.0.0.1:8080
- name: web
mode: http
local_addr: 127.0.0.1:8080
domain: localhost
headers:
- [proxy, +, x-forward-for, $client_ip]
- [-, hidden]
- [custom-key, custom-value]
- [?, server, wmproxy]
mappings的結構修改
pub struct MappingConfig {
pub name: String,
pub mode: String,
pub local_addr: Option<SocketAddr>,
#[serde(default = "default_domain")]
pub domain: String,
#[serde(default = "default_header")]
pub headers: Vec<Vec<String>>,
}
我們把headers定義成一個動態的數組。根據不同的類型做不同的數據,因為長度有變化所以做不定長參數。
以下是代碼解析
pub fn parse<T: Buf>(header: ProtFrameHeader, mut buf: T) -> ProxyResult<ProtMapping> {
must_have!(buf, 2)?;
let len = buf.get_u16() as usize;
let mut mappings = vec![];
for _ in 0..len {
let name = read_short_string(&mut buf)?;
let mode = read_short_string(&mut buf)?;
let domain = read_short_string(&mut buf)?;
let mut headers = vec![];
must_have!(buf, 2)?;
let len = buf.get_u16();
for _ in 0 .. len {
let mut header = vec![];
must_have!(buf, 1)?;
let sub_len = buf.get_u8();
for _ in 0..sub_len {
header.push(read_short_string(&mut buf)?);
}
headers.push(header);
}
mappings.push(MappingConfig::new(name, mode, domain, headers));
}
Ok(ProtMapping {
sock_map: header.sock_map(),
mappings,
})
}
如此解析成一個完整的對應功能變數名稱的結構,因為服務端用不到local_addr所以不做傳輸。
核心代碼的實現
核心處理代碼在
trans/http.rs
下,外部傳入一個可讀可寫的stream,可能是TcpStream
也可能是TlsStream<TcpStream>
或者其它,同時把接收的SocketAddr
傳入,以方便後續獲取$client_ip
的頭文件信息。
預處理
pub async fn process<T>(self, inbound: T, addr: SocketAddr) -> Result<(), ProxyError<T>>
where
T: AsyncRead + AsyncWrite + Unpin + Debug,
{
println!("new process {:?}", inbound);
let build = Client::builder();
let (virtual_sender, virtual_receiver) = channel::<ProtFrame>(10);
let stream = VirtualStream::new(self.sock_map, self.sender.clone(), virtual_receiver);
let mut client = Client::new(build.value().ok().unwrap(), stream);
let (receiver, sender) = client.split().unwrap();
let oper = HttpOper {
receiver,
sender,
sender_work: self.sender_work.clone(),
virtual_sender: Some(virtual_sender),
sock_map: self.sock_map,
mappings: self.mappings.clone(),
http_map: None,
};
let mut server = Server::new(inbound, Some(addr), oper);
tokio::spawn( async move {
let _ = client.wait_operate().await;
});
let _ret = server.incoming(Self::operate).await;
if _ret.is_err() {
println!("ret = {:?}", _ret);
}
Ok(())
}
此時我們創建一個虛擬的Stream來做雙邊互傳,但是此時我們還沒有收到任何的Request請求,我們並不知道當前的Host
,此時我們還未發送ProtCreate
,等真正處理請求的時候做處理,HttpOper
是處理每個操作時均會帶的參數,我們可以根據自己需要帶上該參數。
後續處理,其中我們讀和寫都用RecvStream,做到讀多少數據轉發多少數據,以保證數據處理的及時性
async fn inner_operate(
mut req: Request<RecvStream>,
data: Arc<Mutex<HttpOper>>,
) -> ProtResult<Option<Response<RecvStream>>> {
println!("receiver req = {:?}", req.url());
let mut value = data.lock().await;
let sender = value.virtual_sender.take();
// 傳在該參數則為第一次, 第一次的時候發送Create創建綁定連接
if sender.is_some() {
let host_name = req.get_host().unwrap_or(String::new());
// 取得相關的host數據,對內網的映射端做匹配,如果未匹配到返回錯誤,表示不支持
{
let mut config = None;
let mut is_find = false;
{
let read = value.mappings.read().await;
for v in &*read {
if v.domain == host_name {
is_find = true;
config = Some(v.clone());
}
}
}
if !is_find {
return Ok(Some(Response::builder().status(404).body("not found").ok().unwrap().into_type()));
}
value.http_map = config;
}
println!("do create prot {}, host = {:?}", value.sock_map, req.get_host());
let create = ProtCreate::new(value.sock_map, Some(req.get_host().unwrap_or(String::new())));
let _ = value.sender_work.send((create, sender.unwrap())).await;
}
if let Some(config) = &value.http_map {
// 覆寫Request的頭文件信息
HeaderHelper::rewrite_request(&mut req, &config.headers);
}
// 將請求發送出去
value.sender.send(req).await?;
// 等待返回數據的到來
let mut res = value.receiver.recv().await;
if res.is_some() {
if let Some(config) = &value.http_map {
// 覆寫Response的頭文件信息
HeaderHelper::rewrite_response(res.as_mut().unwrap(), &config.headers);
}
return Ok(res);
} else {
return Ok(Some(Response::builder().status(503).body("cant trans").ok().unwrap().into_type()));
}
}
以下是直接HTTP/1.1的請求示例
以下是直接HTTP/1.1升級成HTTP2的請求示例
以下是直接HTTP2的請求示例
請求的返回結果均帶上了添加的頭部信息,測試正常,至此HTTP的內網穿透數據打通。