wmproxy wmproxy是由Rust編寫,已實現http/https代理,socks5代理, 反向代理,靜態文件伺服器,內網穿透,配置熱更新等, 後續將實現websocket代理等,同時會將實現過程分享出來, 感興趣的可以一起造個輪子法 項目地址 gite: https://gitee.com ...
wmproxy
wmproxy
是由Rust
編寫,已實現http/https
代理,socks5
代理, 反向代理,靜態文件伺服器,內網穿透,配置熱更新等, 後續將實現websocket
代理等,同時會將實現過程分享出來, 感興趣的可以一起造個輪子法
項目地址
gite: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
四層代理
四層代理,也稱為網路層代理,是基於IP地址和埠號的代理方式。它只關心數據包的源IP地址、目的IP地址、源埠號和目的埠號,不關心數據包的具體內容。四層代理主要通過報文中的目標地址和埠,再加上負載均衡設備設置的伺服器選擇方式,決定最終選擇的內部伺服器。
因為四層代理不用處理任何相關的包信息,只需將包數據傳遞給正確的伺服器即可,所以實現相對比較簡單。
以下是OSI七層模型的示意圖,來源於網上
實現方式
雙端建立連接,也就是收到客戶端的連接的時候,同時建立一條通往服務端的連接,然後做雙向綁定即可完成服務。
四層代理還有udp的轉發需求,需要同步將udp的數據進行轉發,udp的處理方式處理會相對複雜一些,因為當前地址只有綁定一份,但是可能來自各種不同的地址,不同的客戶端的(remote_ip
, remote_port
)我們需要當成一個全新的客戶端。
而且有時候無法主動感知是否已經被斷開了,所以也必須有超時機制,好在超時的時候能及時釋放掉連接,好讓系統及時的socket資源。
TCP實現
tcp找到相應的地址,連接,並雙向綁定即可
pub async fn process<T>(
data: Arc<Mutex<StreamConfig>>,
local_addr: SocketAddr,
mut inbound: T,
_addr: SocketAddr,
) -> ProxyResult<()>
where
T: AsyncRead + AsyncWrite + Unpin + std::marker::Send + 'static,
{
let value = data.lock().await;
for (_, s) in value.server.iter().enumerate() {
if s.bind_addr.port() == local_addr.port() {
let addr = ReverseHelper::get_upstream_addr(&s.upstream, "")?;
let mut connect = HealthCheck::connect(&addr).await?;
copy_bidirectional(&mut inbound, &mut connect).await?;
break;
}
}
Ok(())
}
UDP實現
UDP相對比較複雜,下麵我們先列舉內部的流程圖
flowchart TD A[綁定反向udp埠] B[客戶端] H{是否第一次} I[創建非同步協程] D[非同步協程中] B <-->|根據地址連接發送數據到| A A --> H H -->|是|I I -->|將Receiver傳到以接收數據| D H -->|否,將數據Sender給|D D -->|非同步讀取數據併發送|A在stream綁定的時候,要區分出TCP還是UDP的,做分別的綁定
/// stream的綁定,按bind_mode區分出udp或者是tcp,返回相應的列表
pub async fn bind(&mut self) -> ProxyResult<(Vec<TcpListener>, Vec<StreamUdp>)> {
let mut listeners = vec![];
let mut udp_listeners = vec![];
let mut bind_port = HashSet::new();
for value in &self.server.clone() {
if bind_port.contains(&value.bind_addr.port()) {
continue;
}
bind_port.insert(value.bind_addr.port());
if value.bind_mode == "udp" {
let listener = Helper::bind_upd(value.bind_addr).await?;
udp_listeners.push(StreamUdp::new(listener, value.clone()));
} else {
let listener = Helper::bind(value.bind_addr).await?;
listeners.push(listener);
}
}
Ok((listeners, udp_listeners))
}
我們會對連接做分別的監聽,下麵是udp的獲取是否有新數據:
async fn multi_udp_listen_work(
listens: &mut Vec<StreamUdp>,
) -> (io::Result<(Vec<u8>, SocketAddr)>, usize) {
if !listens.is_empty() {
let (data, index, _) =
select_all(listens.iter_mut().map(|listener| {
listener.next().boxed()
})).await;
if data.is_none() {
return (Err(io::Error::new(io::ErrorKind::InvalidInput, "read none data")), index)
}
(data.unwrap(), index)
} else {
let pend = std::future::pending();
let () = pend.await;
unreachable!()
}
}
此處我們用next,也就是我們實現了 futures_core::Stream
介面,用Poll的方式來註冊實現有事件的時候來通知。
在tokio中,在read或者write的時候返回
Poll::Pending
,將會將socket的可讀可寫註冊到底層,如果一旦系統可讀可寫就會通知該介面,將會重新執行一遍futures_core::Stream
我們將同時可以處理可讀可寫可發送事件,如果介面超時我們將關閉相應的介面。
impl Stream for StreamUdp {
type Item = io::Result<(Vec<u8>, SocketAddr)>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let _ = self.poll_write(cx)?;
let _ = self.poll_sender(cx)?;
self.poll_read(cx)
}
}
下麵是主要的StreamUdp
類
/// Udp轉發的處理結構,緩存一些數值以做中轉
pub struct StreamUdp {
/// 讀的緩衝類,避免每次都釋放
pub buf: BinaryMut,
/// 核心的udp綁定埠
pub socket: UdpSocket,
pub server: ServerConfig,
/// 如果接收該數據大小為0,那麼則代表通知數據關閉
pub receiver: Receiver<(Vec<u8>, SocketAddr)>,
/// 將發送器傳達給每個子協程
pub sender: Sender<(Vec<u8>, SocketAddr)>,
/// 接收的緩存數據,無法保證全部直接進行發送完畢
pub cache_data: LinkedList<(Vec<u8>, SocketAddr)>,
/// 發送的緩存數據,無法保證全部直接進行發送完畢
pub send_cache_data: LinkedList<(Vec<u8>, SocketAddr)>,
/// 每個地址綁定的對象,包含Sender,最後操作時間,超時時間
remote_sockets: HashMap<SocketAddr, InnerUdp>,
}
結果測試
我們自己開一個udp服務端,綁定了本地的8089
,我們將接收到的數據前面加上from server:
併進行返回,代理端我們綁定了84
的埠,並將udp數據轉發給8089
端:
use tokio::net::UdpSocket;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let sock = UdpSocket::bind("0.0.0.0:8089").await?;
let mut buf = [0; 1024];
loop {
let (len, addr) = sock.recv_from(&mut buf).await?;
let mut vec = "from server: ".as_bytes().to_vec();
vec.extend(&buf[..len]);
let _ = sock.send_to(&vec, addr).await?;
}
}
客戶端我們用nc
運行:
可以看出兩個客戶端互相獨立,彼此返回的數據均符合預期,正常的接收及返回。
TCP我們綁定了83埠並轉發到HTTP的本地埠8080,我們用curl
進行測試,符合預期,如圖:
結語
至此四層的反向代理TCP/UDP均已完成,也符合預期。
點擊 [關註],[在看],[點贊] 是對作者最大的支持