Tokio 無疑是 Rust 世界中最優秀的非同步Runtime實現。非阻塞的特性帶來了優異的性能,但是在實際的開發中我們往往需要在某些情況下阻塞任務來實現某些功能。 ...
作者:京東科技 賈世聞
Tokio 無疑是 Rust 世界中最優秀的非同步Runtime實現。非阻塞的特性帶來了優異的性能,但是在實際的開發中我們往往需要在某些情況下阻塞任務來實現某些功能。
我們看看下麵的例子
fn main(){
let max_task = 1;
let rt = runtime::Builder::new_multi_thread()
.worker_threads(max_task)
.build()
.unwrap();
rt.block_on(async {
println!("tokio_multi_thread ");
for i in 0..100 {
println!("run {}", i);
tokio::spawn(async move {
println!("spawn {}", i);
thread::sleep(Duration::from_secs(2));
});
}
});
}
我們期待的運行結構是通過非同步任務列印出99個 “spawn i",但實際輸出的結果大概這樣
tokio_multi_thread
run 0
run 1
run 2
.......
run 16
spawn 0
run 17
......
run 99
spawn 1
spawn 2
......
spawn 29
......
spawn 58
spawn 59
59執行完後面就沒有輸出了,如果把max_task設置為2,情況會好一點,但是也沒有執行完所有的非同步操作,也就是說在資源不足的情況下,Tokio會拋棄某些任務,這不符合我們的預期。那麼能不能再達到了某一閥值的情況下阻塞一下,不再給Tokio新的任務呢。這有點類似線程池,當達達最大線程數的時候阻塞後面的任務待有釋放的線程後再繼續。
我們看看下麵的代碼。
fn main(){
let max_task = 2;
let rt = runtime::Builder::new_multi_thread()
.worker_threads(max_task)
.enable_time()
.build()
.unwrap();
let mut set = JoinSet::new();
rt.block_on(async {
for i in 0..100 {
println!("run {}", i);
while set.len() >= max_task {
set.join_next().await;
}
set.spawn(async move {
sleep().await;
println!("spawn {}", i);
});
}
while set.len() > 0 {
set.join_next().await;
}
});
}
我們使用JoinSet來管理派生出來的任務。set.join_next().await; 保證至少一個任務被執行完成。結合set的len,我們可以在任務達到上限時阻塞任務派生。當迴圈結束,可能還有未完成的任務,所以只要set.len()大於0就等待任務結束。
輸出大概長這樣
running 1 test
tokio_multi_thread
run 0
run 1
spawn 0
run 2
spawn 1
......
run 31
spawn 30
run 32
spawn 31
run 33
......
run 96
spawn 95
run 97
spawn 96
run 98
spawn 97
run 99
spawn 98
spawn 99
符合預期,代碼不多,有興趣的同學可以動手嘗試一下。