使用C++20協程和io_uring優雅地實現非同步IO

来源:https://www.cnblogs.com/icysky/p/18098451
-Advertisement-
Play Games

距離2020年已經過去很久了,各大編譯器對於C++20各項標準的支持也日趨完善,無棧協程也是其中之一,所以我就嘗試著拿協程與`io_uring`實現了一下proactor模式,這篇文章用來記錄一下我的設計和想法。除此之外,我們能在網路上找到許多優秀的C++20協程的教程以及許多優秀的協程應用(庫),... ...


距離2020年已經過去很久了,各大編譯器對於C++20各項標準的支持也日趨完善,無棧協程也是其中之一,所以我就嘗試著拿協程與io_uring實現了一下proactor模式,這篇文章用來記錄一下我的設計和想法。除此之外,我們能在網路上找到許多優秀的C++20協程的教程以及許多優秀的協程應用(庫),但從協程入門到架構出成熟的應用(庫)之間還存在著不小的鴻溝,而直接去啃大型工程的源代碼絕對不算是一種高效率的學習方式。所以,如果這篇文章能夠在這方面提供一定的幫助的話那就再好不過了。

正如上所述,這篇文章是介紹基於C++20協程實現非同步IO的,而不是介紹C++20協程的,因此有一定的閱讀門檻。在閱讀之前,你應當至少熟悉一下C++20協程。

為什麼要使用協程

因為協程能夠讓我們像寫同步IO那樣來實現非同步IO,如下所示:

auto foo(tcp_connection connection) -> task<void> {
    char buffer[1024];
    int result = co_await connection.recv(buffer, sizeof(buffer));
    // do something...
    result = co_await connection.send(buffer, result);
    // do something...
    co_return;
}

如果我們合理地實現了協程的掛起、恢復等操作,那麼當我們執行co_await connection.recv時,我們實際上希望代碼執行的操作如下:

  1. 告訴操作系統,監聽recv操作,等待對方發送數據;
  2. 掛起當前協程;
  3. 去處理別的事情。

當操作系統接收到recv的數據時,執行以下操作:

  1. 處理recv,把數據讀進來;
  2. 恢復之前掛起的協程,從掛起的地方恢復執行。

這就是我們需要協程做的事情。如果你熟悉reactor模式的話,這應該並不陌生——我們只是把回調函數換成了協程而已。那麼回到這一部分的標題——我們為什麼要使用協程而不是回調函數呢?——因為使用協程寫出來的代碼更好看,也更好維護,僅此而已。

關於為什麼要使用非同步IO:非同步IO能夠提高程式的吞吐量。試想一下一臺基於同步IO的HTTP伺服器,一種不難想到的實現方式是每accept一個連接,就創建一個新的線程來處理這個連接的IO,最後當這個連接斷開時銷毀這個線程。這麼實現當然可以,但創建和銷毀線程的開銷是很大的,而且這要求線程調度器能夠很好地分配線程之間的時間。使用IO多路復用的方式能夠利用有限(甚至單線程)處理許多連接的IO,而不至於浪費過多的資源。

關於協程和回調函數的性能:我想二者應當是差不多的,或者協程可能還會更差一點,因為掛起協程和恢復協程需要執行一些額外操作。不過既然性能還沒有緊張到需要去摳dpdk,那麼和這一點點的性能優勢相比較的話,代碼的可維護性和可讀性絕對也是不容忽視的問題。

關於非同步IO的性能:我們通常講非同步IO性能更好指的是吞吐量,而不是低延時。不論是reactor模式還是proactor模式,其設計主旨都是要讓CPU在等待IO的時候去處理別的事情,不要讓CPU閑下來。如果低延時很重要的話,應當考慮使用同步IO與輪詢的方式。

設計思路

根據第一部分,設計的基調就能夠定下來了。我們重新考慮一下需要做的事情:

  1. 當我們執行到co_await read(...)等非同步IO時,掛起當前協程,去處理其他事情;
  2. 當非同步IO執行完畢時,恢復協程的執行。

仔細思考一下上述兩點,我們就能夠得到所有要做的事情:

  1. 我們需要適時掛起協程,所以首先我們要實現協程task
  2. 協程可能會調用協程,所以需要維護一下協程的調用棧(我是在promise里維護的);
  3. 協程是用來處理非同步IO的,所以我們需要有一些組件來處理io_uring的IO(我是在io_context_worker中處理的);
  4. 當非同步IO執行完畢時,需要有什麼東西恢復協程的執行(這也是在io_context_worker中處理的);
  5. 當整個協程執行完畢時,需要銷毀協程(這也是在io_context_worker中處理的)。

在繼續閱讀之前,我先貼一下代碼。對照著代碼看的話會舒服一些:GitHub

taskpromise

taskpromise均在coco/task.hpp中定義。我對task的定位正如協程最基本的功能——能夠掛起和恢復的函數。task類本身只是對std::coroutine_handle的簡易封裝。在這裡我只介紹一下taskoperator co_await

taskoperator co_await只是返回task_awaitable,所以co_await處理的重點實際上是在task_awaitable中實現的。考慮一下,當我們co_await一個task時,我們究竟是在乾什麼:

  1. 掛起當前協程
  2. 維護協程的調用棧
  3. 啟動被co_await的協程

task_awaitable::await_suspend()中很容易看出這三點:

template <class T>
template <class Promise>
auto task_awaitable<T>::await_suspend(
    std::coroutine_handle<Promise> caller) noexcept -> coroutine_handle {
    // Set caller for this coroutine.
    promise_base &base = static_cast<promise_base &>(m_coroutine.promise());
    promise_base &caller_base = static_cast<promise_base &>(caller.promise());
    base.m_caller_or_top      = &caller_base;

    // Maintain stack bottom and top.
    promise_base *stack_bottom = caller_base.m_stack_bottom;
    assert(stack_bottom == stack_bottom->m_stack_bottom);

    base.m_stack_bottom           = stack_bottom;
    stack_bottom->m_caller_or_top = &base;

    return m_coroutine;
}

這裡著重講一下維護協程的調用棧。對於協程而言,至少存在一個協程,它是在協程外創建的(比如main函數)。因為它不在協程中,所以它也無法被co_await,這個協程我們稱之為協程的棧底。在這個協程的執行過程中,它可能會創建和執行新的協程。當新的協程執行完畢時,它們會被清理,並且將執行權交還給調用者,這與普通函數的調用棧是一樣的,只不過這個功能需要我們自己來實現。

因為promise在記憶體中的位置是不可移動的(我禁止了promise的拷貝與移動),所以我直接採用了類似鏈表的方式將協程的調用棧串了起來。在promise_base中,有兩個成員變數用來維護這個調用棧:

promise_base *m_caller_or_top;
promise_base *m_stack_bottom;

因為第一個變數被覆用了(具備不同的含義),所以可能有點亂。對於棧底協程而言,m_caller_or_top指向當前調用棧的棧頂協程,對於其他協程而言,m_caller_or_top指向自己的調用者(父協程)。這麼設計是因為棧底協程不存在調用者,所以就乾脆用這個變數存一下棧頂了。m_stack_bottom顧名思義,就是指向棧底的協程。對於棧底協程而言,這個變數指向的就是它自己了。

有了m_caller_or_top,當一個協程執行完畢時,就能方便地找到它的父協程並交換執行權。有了m_stack_bottomm_caller_or_top,我們就能很方便地找到協程的棧底和棧頂。當需要恢復task時,就能夠保證總是恢復棧頂的協程。

當協程執行完畢時,需要將控制權交還給父協程。我們考慮一下交還控制權需要做的事情:

  1. 維護調用棧,變更棧頂
  2. 如果不是棧底,則恢復父協程的執行

協程執行完畢時會去嘗試執行promisefinal_suspend(),因此這部分代碼在promisefinal_suspend()中實現。final_suspend()返回的類型叫promise_awaitable,其對應的代碼如下:

template <class Promise>
auto promise_awaitable::await_suspend(
    std::coroutine_handle<Promise> coroutine) noexcept
    -> std::coroutine_handle<> {
    promise_base &base  = static_cast<promise_base &>(coroutine.promise());
    promise_base *stack = base.m_stack_bottom;

    // Stack bottom completed. Nothing to resume.
    if (stack == &base)
        return std::noop_coroutine();

    // Set caller coroutine as the top of the stack.
    promise_base *caller   = base.m_caller_or_top;
    stack->m_caller_or_top = caller;

    // Resume caller.
    return caller->m_coroutine;
}

這段代碼應該非常易懂,不過我們很容易聯想到一個問題:既然交還了控制權,那麼它是在何時銷毀的?

其實這也不難想到,協程是由父協程銷毀的。協程的返回值存放在promise中,當父協程co_await sometask時,父協程還需要讀取子協程的promise以獲取返回值。當子協程task<T>析構時,子協程才真正被銷毀。

io_context的設計

既然協程需要非同步地處理IO,那麼必然需要個處理IO的地方,就是io_contextio_context維護了一個線程池,線程池中每一個線程均執行一個worker,每個worker均維護一個io_uring來處理本線程的IO事件和協程。當需要提交新的task給線程池的時候,由io_context分配給某一個worker執行。

這聽起來和reactor模式好像沒啥區別,用epoll寫reactor模式的時候基本上也是這麼乾的,這是因為我本來就是從reactor模式那邊搬過來的。不過相比於reactor模式,這麼做還是有不少細節要處理的。在使用io_uring時,每次我們啟動非同步IO時,都需要獲取到io_uring對象——要使用io_uring_prep_<io_operation>系列函數,我們必須從io_uring對象中獲取一個sqe。而如上所述,io_uring對象在worker中,這就造成了一個麻煩:我們無法在io_contexttask分配worker的時候將io_uring對象的引用(指針)傳遞給task。雖然使用全局變數不失為一種選擇,但我不想這麼做,因為也許使用者想要在一個進程中創建幾個不同的io_context用呢。

雖然無法直接將io_uring的引用傳遞給task,但還有一種方法可以進行交互。在所有awaitableawait_suspend中,我們可以拿到當前協程的coroutine_handle,而在io_context中,我們也能拿到taskcoroutine_handle,因此可以通過promise來傳遞io_uring的引用。

具體在實現時,我沒有傳遞io_uring的引用,而是傳遞了worker的指針。這麼做是因為當初我想同時支持IOCP,傳遞worker可以省掉一些麻煩,雖然後來放棄了。worker的指針只被放在協程棧的棧底,這麼做是因為當協程在不同worker之間轉移時,能夠很方便地修改協程所屬的worker(只需要修改棧底就可以了),儘管後來也沒有實現work-stealing隊列。在promise中,處理worker的方法如下所示:

/// \brief
///   Set I/O context for current coroutine.
/// \param[in] io_ctx
///   I/O context to be set for current coroutine.
auto set_worker(io_context_worker *io_ctx) noexcept -> void {
    m_stack_bottom->m_worker.store(io_ctx, std::memory_order_release);
}

/// \brief
///   Get I/O context for current coroutine.
/// \return
///   I/O context for current coroutine.
[[nodiscard]] auto worker() const noexcept -> io_context_worker * {
    return m_stack_bottom->m_worker.load(std::memory_order_acquire);
}

不過這麼做也有一個缺點,就是io_context侵入了promise的設計,使得task必須在io_context中才能發揮作用。

awaitable的設計

awaitablecoco/io.hpp中定義。各種awaitable的設計就比較簡單了。以read_awaitable為例,它在await_suspend()中獲取當前協程所屬的worker,然後啟用非同步IO,如下所示:

template <class Promise>
auto await_suspend(std::coroutine_handle<Promise> coro) noexcept -> bool {
    m_userdata.coroutine = coro.address();
    return this->suspend(coro.promise().worker());
}

獲取了當前協程的worker後,就轉入this->suspend()函數中去執行了。suspend()方法主要的工作是啟動非同步IO操作,並掛起當前協程:

auto coco::read_awaitable::suspend(io_context_worker *worker) noexcept -> bool {
    assert(worker != nullptr);
    m_userdata.cqe_res   = 0;
    m_userdata.cqe_flags = 0;

    io_uring     *ring = worker->io_ring();
    io_uring_sqe *sqe  = io_uring_get_sqe(ring);
    while (sqe == nullptr) [[unlikely]] {
        io_uring_submit(ring);
        sqe = io_uring_get_sqe(ring);
    }

    io_uring_prep_read(sqe, m_file, m_buffer, m_size, m_offset);
    sqe->user_data = reinterpret_cast<uint64_t>(&m_userdata);

    int result = io_uring_submit(ring);
    if (result < 0) [[unlikely]] { // Result is -errno.
        m_userdata.cqe_res = result;
        return false;
    }

    return true;
}

我沒有把啟用非同步IO部分放到模板函數中,這是因為對於各種不同的IO操作,這部分的代碼實際上大同小異。但考慮到這部分代碼的長度,放到模板中可能會導致比較嚴重的二進位膨脹,所以就單獨拿出來放到.cpp文件中了。

如果去翻我之前的commit記錄的話,會發現起初我並沒有把各種awaitable暴露出來,而是讓各種非同步操作(比如connection.receive())返回task。後來將awaitable暴露出來是考慮到諸如readwrite等操作可能會被頻繁地調用,而每次創建一個協程都需要申請一次堆記憶體,在迴圈中執行的話可能對運行效率有比較嚴重的影響。

關於代碼

再放一遍代碼地址:GitHub

這份代碼不長,總共兩三千行,而且其中一多半都是註釋,結合本文的話應該不會很難讀。這本身只是一份實驗性質的代碼,同時我希望它適合拿來學習,所以我並沒有打算塞入太多的功能。除此之外,我不是很建議你拿來放到工程中使用,因為我可能會一時興起做出一些breaking change。如果你真的有這個需要的話,我建議你fork一份代碼自己維護。

一些可能會被問到的問題

  1. 為什麼沒有實現UDP相關的IO?

因為io_uring似乎還沒有支持recvfrom,至少我實現的時候還沒有。

  1. 為什麼不使用mmap和內核共用記憶體/為什麼不向io_uring註冊文件描述符等性能相關的問題

因為我不是io_uring專家。我寫這個庫的目的是學慣用C++20的協程架構一個非同步IO庫,做這些性能優化會加大架構難度,並且花掉我大量的頭髮和時間,使我本不茂密的頭髮雪上加霜。除此之外,你會發現我也沒有實現work-stealing隊列,原因同理。Round Robin的性能雖然不至於最優,但也不會太差。

  1. 考不考慮加上HTTP支持?

考慮過,太懶所以放棄了。一方面是手寫HTTP parser還是挺麻煩的,就算能用bison自動生成也還得去啃RFC。另一方面是,TCP作為一種基於流的協議,我沒有想好如何處理連接的緩存能夠兼顧性能和使用的便捷性。如果你有這方面的需求的話,不妨先用著其他的HTTP parser,比如llhttp

  1. 為什麼沒有實現yield

我覺得非同步IO一般用不到這東西,所以就沒寫。如果需要的話就自己實現吧。

  1. 會支持Windows(IOCP)嗎?

我有考慮過支持IOCP,但IOCP不支持定時器,我又不想刪除Linux這邊的timer,所以暫且沒有這個想法。

  1. 作者你README寫得好水啊

確實,我也覺得好水啊,有沒有好心人幫忙修一修啊

  1. 為什麼不用中文寫註釋和README?

不用中文寫註釋是因為clang-format沒法處理中文的斷行。不用中文寫README是因為懶,不想寫兩份README。說不定哪天心情好了就寫一份中文版。

碎碎念

要不要塞張插圖呢?


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
一周排行
    -Advertisement-
    Play Games
  • 一個自定義WPF窗體的解決方案,借鑒了呂毅老師的WPF製作高性能的透明背景的異形視窗一文,併在此基礎上增加了滑鼠穿透的功能。可以使得透明窗體的滑鼠事件穿透到下層,在下層窗體中響應。 ...
  • 在C#中使用RabbitMQ做個簡單的發送郵件小項目 前言 好久沒有做項目了,這次做一個發送郵件的小項目。發郵件是一個比較耗時的操作,之前在我的個人博客裡面回覆評論和友鏈申請是會通過發送郵件來通知對方的,不過當時只是簡單的進行了非同步操作。 那麼這次來使用RabbitMQ去統一發送郵件,我的想法是通過 ...
  • 當你使用Edge等瀏覽器或系統軟體播放媒體時,Windows控制中心就會出現相應的媒體信息以及控制播放的功能,如圖。 SMTC (SystemMediaTransportControls) 是一個Windows App SDK (舊為UWP) 中提供的一個API,用於與系統媒體交互。接入SMTC的好 ...
  • 最近在微軟商店,官方上架了新款Win11風格的WPF版UI框架【WPF Gallery Preview 1.0.0.0】,這款應用引入了前沿的Fluent Design UI設計,為用戶帶來全新的視覺體驗。 ...
  • 1.簡單使用實例 1.1 添加log4net.dll的引用。 在NuGet程式包中搜索log4net並添加,此次我所用版本為2.0.17。如下圖: 1.2 添加配置文件 右鍵項目,添加新建項,搜索選擇應用程式配置文件,命名為log4net.config,步驟如下圖: 1.2.1 log4net.co ...
  • 之前也分享過 Swashbuckle.AspNetCore 的使用,不過版本比較老了,本次演示用的示例版本為 .net core 8.0,從安裝使用開始,到根據命名空間分組顯示,十分的有用 ...
  • 在 Visual Studio 中,至少可以創建三種不同類型的類庫: 類庫(.NET Framework) 類庫(.NET 標準) 類庫 (.NET Core) 雖然第一種是我們多年來一直在使用的,但一直感到困惑的一個主要問題是何時使用 .NET Standard 和 .NET Core 類庫類型。 ...
  • WPF的按鈕提供了Template模板,可以通過修改Template模板中的內容對按鈕的樣式進行自定義。結合資源字典,可以將自定義資源在xaml視窗、自定義控制項或者整個App當中調用 ...
  • 實現了一個支持長短按得按鈕組件,單擊可以觸發Click事件,長按可以觸發LongPressed事件,長按鬆開時觸發LongClick事件。還可以和自定義外觀相結合,實現自定義的按鈕外形。 ...
  • 一、WTM是什麼 WalkingTec.Mvvm框架(簡稱WTM)最早開發與2013年,基於Asp.net MVC3 和 最早的Entity Framework, 當初主要是為瞭解決公司內部開發效率低,代碼風格不統一的問題。2017年9月,將代碼移植到了.Net Core上,併進行了深度優化和重構, ...