wmproxy wmproxy已用Rust實現http/https代理, socks5代理, 反向代理, 靜態文件伺服器,四層TCP/UDP轉發,內網穿透,後續將實現websocket代理等,會將實現過程分享出來,感興趣的可以一起造個輪子 項目地址 國內: https://gitee.com/tic ...
wmproxy
wmproxy
已用Rust
實現http/https
代理, socks5
代理, 反向代理, 靜態文件伺服器,四層TCP/UDP轉發,內網穿透,後續將實現websocket
代理等,會將實現過程分享出來,感興趣的可以一起造個輪子
項目地址
國內: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
有請主角上場
Socket
是集萬千寵愛為一身的王子,在操作系統的王國里,他負責對外的所有通訊,所以要想溝通鄰國的公主必須經過他,所以大家對他都是萬般友好。
這天一個Rust
城市裡的大臣tokio
對他發起了邀請,邀請他來參觀嚴謹的邏輯莊園。
tokio
莊園
莊園中的各成員對即將到來的王子議論紛紛。
大管家mio
說:“大家都想想等下怎麼向socket
王子介紹自己,好讓他配合大家的工作。”
大管家mio
是tokio
的基石,一切和王國打交道的都交由他去打理,他是保證莊園高效運轉的關鍵,此刻他準備好了歡迎宴會。
在宴會上,socket
聽說tokio
莊園是這座城市非同步運行
的重要基石,就很好奇的讓大伙介紹介紹下怎麼工作的。
莊園主tokio
就說:“我是依靠著大管家mio
幫我負責處理底層的事,Waker
來提醒我有新的事情,PollEvented
來幫我管理事件的。下麵先讓mio
來介紹下。”
管家mio
說:“我負責收集莊園中的所有信息,他們告訴我他們要關心的什麼比較,比如您的到來(可讀),或者您有什麼話想說(可寫),我會負責和王國的底層進行溝通,我在這個國家用的是epoll
,據說在遙遠的另一個國家用的是iocp
,如果有相應的需求,我將會通知Waker
,由他去提醒莊主來及時的處理,這場宴會也是提前得到通知而進行準備的。”
通知Waker
說:“我所做的事情就是微不足道,我的對接對象是PollEvented
,當他關心讀事件,我會向mio
去發起poll_read
請求,如果此時mio
那邊已經知道有新的消息了,那我就直接把他們讀出來交給民眾Poll::Ready
,如果此時還沒有新消息,那我會告訴管家,有新消息的時候通知我Poll::Pending
,此時我就在這裡等待,直到有新的消息到達我就通知給民眾。當他關心寫事件,我會向mio
請求poll_write
請求,後續的和收消息的一致。現在給你們展示下包裝了一層我的Context和我能換醒的虛表。”
/// 這個在代碼里就是經常看到,它就是我的一層淺封裝啦。
pub struct Context<'a> {
waker: &'a Waker,
_marker: PhantomData<fn(&'a ()) -> &'a ()>,
}
/// 我通過他來控制回調,保證喚醒的時候能正確的通知
pub struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ()),
}
事件PollEvented
說:“莊主要處理的事情太多了,而有些事情又需要等待一層層的反饋,他沒法把精力放在一件事情上一直等待,所以就有了我出馬,莊主告訴我他關心什麼事,我就把它記下來,這樣子莊主就可以去處理其它的事,等事件到來的時候我就告訴莊主,這樣子莊主就可以高效的處理所有的事件。”
王子覺得他們說了一堆有點啰嗦
“帶我看看你們實際的工坊,我要實地考查下。”王子說。
莊主就帶著王子來到了,受理工坊,受理工坊正在處理建立受理點:
TcpListener::bind(addr).await
受理點的內容就是PollEvent
:
pub struct TcpListener {
io: PollEvented<mio::net::TcpListener>,
}
當他接受新的受理者的時候:
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
let (mio, addr) = self
.io
.registration()
.async_io(Interest::READABLE, || self.io.accept())
.await?;
let stream = TcpStream::new(mio)?;
Ok((stream, addr))
}
他向PollEvent
註冊了可讀事情有的時候通知他,此時PollEvent
就建立了一個Waker
對象,當有符合條件的時候就來告訴他:
/// 建立一個可讀的Future對象
fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
Readiness {
scheduled_io: self,
state: State::Init,
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
is_ready: false,
interest,
_p: PhantomPinned,
}),
}
}
底層有mio和系統交互,一旦有數據就通知Waker
,他建在runtime/io/driver.rs
上面
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
let events = &mut self.events;
// 高效的監聽埠
match self.poll.poll(events, max_wait) {
Ok(_) => {}
}
for event in events.iter() {
let token = event.token();
if token == TOKEN_WAKEUP {
} else if token == TOKEN_SIGNAL {
} else {
let ready = Ready::from_mio(event);
let ptr: *const ScheduledIo = token.0 as *const _;
let io: &ScheduledIo = unsafe { &*ptr };
io.set_readiness(Tick::Set(self.tick), |curr| curr | ready);
// 有相應的事件,進行喚醒然後通知上層處理相應的事件
io.wake(ready);
}
}
}
然後看到Waker
工坊上處理:
pub(crate) fn wake_all(&mut self) {
assert!(self.curr <= NUM_WAKERS);
while self.curr > 0 {
self.curr -= 1;
let waker = unsafe { ptr::read(self.inner[self.curr].as_mut_ptr()) };
waker.wake();
}
}
pub fn wake(self) {
// 存在的回調函數及對應的數據,好進行調用
let wake = self.waker.vtable.wake;
let data = self.waker.data;
crate::mem::forget(self);
// 用回調函數的方式處理剛等待執行的線程
unsafe { (wake)(data) };
}
最後又回到了受理工坊,我們知道了一個新的來源TcpStream
的到來,期間在等待的時候,我們可以去處理其它的事情,不至於空有許多人力物力,卻在等我的寶的事情到來沒法快速處理事情。
王子說:“在只有一個受理的時候你們這麼高效,如果有同時多個受理,又需要在處理完訪問相同的數據,你們又能處理嗎?”
莊主說:“那麼接下來就讓我帶你參觀下Poll
工坊,他用同步的方式可以同時處理多個鏈接。”
參觀完非同步工坊,莊主又帶著王子來到了
只見Poll工坊大屏幕上就出現了一個例子:
#[tokio::main]
async fn main() -> std::io::Result<()> {
use std::{future::poll_fn, task::Poll, pin::Pin};
use tokio::net::TcpListener;
let mut listeners = vec![];
// 同時監聽若幹個埠
for i in 1024..1099 {
listeners.push(TcpListener::bind(format!("127.0.0.1:{}", i)).await?);
}
loop {
let mut pin_listener = Pin::new(&mut listeners);
// 同時監聽若幹個埠,只要有任一個返回則返回數據
let fun = poll_fn(|cx| {
for l in &*pin_listener.as_mut() {
match l.poll_accept(cx) {
v @ Poll::Ready(_) => return v,
Poll::Pending => {},
}
}
Poll::Pending
});
let (conn, addr) = fun.await?;
println!("receiver conn:{:?} addr:{:?}", conn, addr);
}
}
他可快速的在函數中同時等待多個埠數據,這種同步的邏輯可以在複雜結構時很方便的書寫代碼的邏輯。
王子看完後:“現在你的處理能力已經高效又靈活,真正的可甜可咸了,把我的能力發揮完全又簡化了操作。”
點擊 [關註],[在看],[點贊] 是對作者最大的支持