`wmproxy`將用`Rust`實現`http/https`代理, `socks5`代理, 反向代理, 靜態文件伺服器,後續將實現`websocket`代理, 內外網穿透等, 會將實現過程分享出來, 感興趣的可以一起造個輪子法, 一些實現類nginx可能在rust會碰到的問題,一起探討下一些實現在... ...
wmproxy
wmproxy
將用Rust
實現http/https
代理, socks5
代理, 反向代理, 靜態文件伺服器,後續將實現websocket
代理, 內外網穿透等, 會將實現過程分享出來, 感興趣的可以一起造個輪子法
項目地址
gite: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
日誌功能
為了更容易理解程式中發生的情況,我們可能想要添加一些日誌語句。通常在編寫應用程式時這很容易。「在某種程度上,日誌記錄與使用 println! 相同,只是你可以指定消息的重要性」。
在rust中定義的日誌級別有5種分別為error
、warn
、info
、debug
和 trace
定義日誌的級別是表示只關係這級別的日誌及更高級別的日誌:
定義log,則包含所有的級別
定義warn,則只會顯示error
或者warn
的消息
要嚮應用程式添加日誌記錄,你需要兩樣東西:
- log crate,rust官方指定的日誌級別庫
- 一個實際將日誌輸出寫到有用位置的適配器
當下我們選用的是流行的根據環境變數指定的適配器env_logger
,它會根據環境變數中配置的值,日誌等級,或者只開啟指定的庫等功能,或者不同的庫分配不同的等級等。
在Linux
或者MacOs
上開啟功能
env RUST_LOG=debug cargo run
在Windows PowerShell
上開啟功能
$env:RUST_LOG="debug"
cargo run
在Windows CMD
上開啟功能
set RUST_LOG="debug"
cargo run
如果我們指定庫等級可以設置
RUST_LOG="info,wenmeng=warn,webparse=warn"
這樣就可以減少第三方庫打日誌給程式帶來的干擾
需要在Cargo.toml
中引用
[dependencies]
log = "0.4.20"
env_logger = "0.10.0"
以下是示意代碼
use log::{info, warn};
fn main() {
env_logger::init();
info!("歡迎使用軟體wmproxy");
warn!("現在已經成功啟動");
}
用println!
將會直接輸出到stdout
,當日誌數據多的時候,無法進行關閉,做為第三方庫,就不能幹擾引用庫的正常看日誌,所以這隻能調試的時候使用,或者少量的關鍵地方使用。
多個TcpListener的Accept
因為當前支持多個埠綁定,或者配置沒有配置,存在None的情況,我們需要同時在一個線程中await所有的TcpListener。
在這裡我們先用的是tokio::select!
對多個TcpListener同時進行await。
如果此時我們沒有綁定proxy的綁定地址,此時listener為None,但我們需要進行判斷才知道他是否為None,如果我們用以下寫法:
use tokio::net::TcpListener;
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut listener: Option<TcpListener> = None;
tokio::select! {
// 加了if條件判斷是否有值
Ok((conn, addr)) = listener.as_mut().unwrap().accept(), if listener.is_some() => {
println!("accept addr = {:?}", addr);
}
}
Ok(())
}
此時我們試運行,依然報以下錯誤:
thread 'main' panicked at 'called `Option::unwrap()` on a `None` value', examples/udp.rs:9:46
也就是即使加了if條件我們也正確的執行我們的操作,因為tokio::select的每個分支必須返回Fut
,此時如果為None,就不能返回Fut
違反了該函數的定義,那麼我們做以下封裝:
async fn tcp_listen_work(listen: &Option<TcpListener>) -> Option<(TcpStream, SocketAddr)> {
if listen.is_some() {
match listen.as_ref().unwrap().accept().await {
Ok((tcp, addr)) => Some((tcp, addr)),
Err(_e) => None,
}
} else {
// 如果為None的時候,就永遠返回Poll::Pending
let pend = std::future::pending();
let () = pend.await;
None
}
}
如果為None的話,將其返回Poll::Pending,則該分支await的時候永遠不會等到結果。
那麼最終的的代碼示意如下:
#[tokio::main]
async fn main() -> io::Result<()> {
let listener: Option<TcpListener> = TcpListener::bind("127.0.0.1:8090").await.ok();
tokio::select! {
Some((conn, addr)) = tcp_listen_work(&listener) => {
println!("accept addr = {:?}", addr);
}
}
Ok(())
}
另一種在反向代理的時候因為server的數量是不定的,所以監聽的TcpListener也是不定的,此時我們用Vec<TcpListener>
來做表示,那麼此時,我們如何通過tokio::select
來一次性await所有的accept呢?
此時我們藉助futures
庫中的select_all
來監聽,但是select_all
又不允許空的Vec,因為他要返回一個Fut,空的無法返回一個Fut,所以此時我們也要對其進行封裝:
async fn multi_tcp_listen_work(listens: &mut Vec<TcpListener>) -> (io::Result<(TcpStream, SocketAddr)>, usize) {
if !listens.is_empty() {
let (conn, index, _) = select_all(listens.iter_mut()
.map(|listener| listener.accept().boxed())).await;
(conn, index)
} else {
let pend = std::future::pending();
let () = pend.await;
unreachable!()
}
}
此時監聽從8091-8099,我們的最終代碼:
#[tokio::main]
async fn main() -> io::Result<()> {
let listener: Option<TcpListener> = TcpListener::bind("127.0.0.1:8090").await.ok();
let mut listeners = vec![];
for i in 8091..8099 {
listeners.push(TcpListener::bind(format!("127.0.0.1:{}", i)).await?);
}
tokio::select! {
Some((conn, addr)) = tcp_listen_work(&listener) => {
println!("accept addr = {:?}", addr);
}
(result, index) = multi_tcp_listen_work(&mut listeners) => {
println!("index receiver = {:?}", index)
}
}
Ok(())
}
如果此時我們用
telnet 127.0.0.1 8098
那麼我們就可以看到輸出:
index receiver = 7
表示代碼已正確的執行。
Rust中數據在多個線程中的共用
Rust中每個對象的所有權都僅只能有一個對象擁有,那麼我們數據在在多個地方共用的時候可以怎麼辦呢?
在單線程中,我們可以用use std::rc::Rc;
Rc的特點
- 單線程的引用計數
- 不可變引用
- 非線程安全,即僅能在單線程中使用
Rc引用計數中還有一個弱引用稱為Weak
,弱引用表示持有對象的一個指針,但是不添加引用計數,也不會影響數據刪除,不保證一定能取得到數據。
因為其不能修改數據,所以也常用RefCell
做配合,來做引用計數的修改。
以下是一個父類子類用弱引用計數實現的方案:
use std::rc::Rc;
use std::rc::Weak;
use std::cell::RefCell;
/// 父類擁有者
struct Owner {
name: String,
gadgets: RefCell<Vec<Weak<Gadget>>>,
}
/// 子類對象
struct Gadget {
id: i32,
owner: Rc<Owner>,
}
fn main() {
let gadget_owner: Rc<Owner> = Rc::new(
Owner {
name: "wmproxy".to_string(),
gadgets: RefCell::new(vec![]),
}
);
// 生成兩個小工具
let gadget1 = Rc::new(
Gadget {
id: 1,
owner: Rc::clone(&gadget_owner),
}
);
let gadget2 = Rc::new(
Gadget {
id: 2,
owner: Rc::clone(&gadget_owner),
}
);
{
let mut gadgets = gadget_owner.gadgets.borrow_mut();
gadgets.push(Rc::downgrade(&gadget1));
gadgets.push(Rc::downgrade(&gadget2));
}
for gadget_weak in gadget_owner.gadgets.borrow().iter() {
let gadget = gadget_weak.upgrade().unwrap();
println!("小工具 {} 的擁有者:{}", gadget.id, gadget.owner.name);
}
}
因為其並未實現Send函數,所以無法在多線程種傳遞。在多線程中,我們需要用Arc
,但是在Arc獲取可變對象的時候有限制,必須他是唯一引用的時候才能修改。
use std::sync::Arc;
fn main() {
let mut x = Arc::new(3);
*Arc::get_mut(&mut x).unwrap() = 4;
assert_eq!(*x, 4);
let _y = Arc::clone(&x);
assert!(Arc::get_mut(&mut x).is_none());
}
所以我們在多線程中的引用需要修改的時候,通常會用Atomic或者Mutex來做數據的寫入的唯一性。
#![allow(unused)]
fn main() {
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc::channel;
const N: usize = 10;
let data = Arc::new(Mutex::new(0));
let (tx, rx) = channel();
for _ in 0..N {
let (data, tx) = (Arc::clone(&data), tx.clone());
thread::spawn(move || {
// 共用數據data,保證線上程中只會同時有一個對象擁有修改許可權,也相當於擁有所有權,10個線程,每個線程+1,最終結果必須等於10
let mut data = data.lock().unwrap();
*data += 1;
if *data == N {
tx.send(()).unwrap();
}
});
}
rx.recv().unwrap();
assert!(*data.lock().unwrap() == 10);
}
結語
以上是三種編寫Rust中常碰見的情況,也是在此項目中應用解決過的方案,在瞭解原理的情況下,解決問題可以有不同的思路。理解了原理,你就知道他設計的初衷,更好的幫助你學習相關的Rust知識。