使用C++20協程和io_uring優雅地實現非同步IO

来源:https://www.cnblogs.com/icysky/p/18098451
-Advertisement-
Play Games

距離2020年已經過去很久了,各大編譯器對於C++20各項標準的支持也日趨完善,無棧協程也是其中之一,所以我就嘗試著拿協程與`io_uring`實現了一下proactor模式,這篇文章用來記錄一下我的設計和想法。除此之外,我們能在網路上找到許多優秀的C++20協程的教程以及許多優秀的協程應用(庫),... ...


距離2020年已經過去很久了,各大編譯器對於C++20各項標準的支持也日趨完善,無棧協程也是其中之一,所以我就嘗試著拿協程與io_uring實現了一下proactor模式,這篇文章用來記錄一下我的設計和想法。除此之外,我們能在網路上找到許多優秀的C++20協程的教程以及許多優秀的協程應用(庫),但從協程入門到架構出成熟的應用(庫)之間還存在著不小的鴻溝,而直接去啃大型工程的源代碼絕對不算是一種高效率的學習方式。所以,如果這篇文章能夠在這方面提供一定的幫助的話那就再好不過了。

正如上所述,這篇文章是介紹基於C++20協程實現非同步IO的,而不是介紹C++20協程的,因此有一定的閱讀門檻。在閱讀之前,你應當至少熟悉一下C++20協程。

為什麼要使用協程

因為協程能夠讓我們像寫同步IO那樣來實現非同步IO,如下所示:

auto foo(tcp_connection connection) -> task<void> {
    char buffer[1024];
    int result = co_await connection.recv(buffer, sizeof(buffer));
    // do something...
    result = co_await connection.send(buffer, result);
    // do something...
    co_return;
}

如果我們合理地實現了協程的掛起、恢復等操作,那麼當我們執行co_await connection.recv時,我們實際上希望代碼執行的操作如下:

  1. 告訴操作系統,監聽recv操作,等待對方發送數據;
  2. 掛起當前協程;
  3. 去處理別的事情。

當操作系統接收到recv的數據時,執行以下操作:

  1. 處理recv,把數據讀進來;
  2. 恢復之前掛起的協程,從掛起的地方恢復執行。

這就是我們需要協程做的事情。如果你熟悉reactor模式的話,這應該並不陌生——我們只是把回調函數換成了協程而已。那麼回到這一部分的標題——我們為什麼要使用協程而不是回調函數呢?——因為使用協程寫出來的代碼更好看,也更好維護,僅此而已。

關於為什麼要使用非同步IO:非同步IO能夠提高程式的吞吐量。試想一下一臺基於同步IO的HTTP伺服器,一種不難想到的實現方式是每accept一個連接,就創建一個新的線程來處理這個連接的IO,最後當這個連接斷開時銷毀這個線程。這麼實現當然可以,但創建和銷毀線程的開銷是很大的,而且這要求線程調度器能夠很好地分配線程之間的時間。使用IO多路復用的方式能夠利用有限(甚至單線程)處理許多連接的IO,而不至於浪費過多的資源。

關於協程和回調函數的性能:我想二者應當是差不多的,或者協程可能還會更差一點,因為掛起協程和恢復協程需要執行一些額外操作。不過既然性能還沒有緊張到需要去摳dpdk,那麼和這一點點的性能優勢相比較的話,代碼的可維護性和可讀性絕對也是不容忽視的問題。

關於非同步IO的性能:我們通常講非同步IO性能更好指的是吞吐量,而不是低延時。不論是reactor模式還是proactor模式,其設計主旨都是要讓CPU在等待IO的時候去處理別的事情,不要讓CPU閑下來。如果低延時很重要的話,應當考慮使用同步IO與輪詢的方式。

設計思路

根據第一部分,設計的基調就能夠定下來了。我們重新考慮一下需要做的事情:

  1. 當我們執行到co_await read(...)等非同步IO時,掛起當前協程,去處理其他事情;
  2. 當非同步IO執行完畢時,恢復協程的執行。

仔細思考一下上述兩點,我們就能夠得到所有要做的事情:

  1. 我們需要適時掛起協程,所以首先我們要實現協程task
  2. 協程可能會調用協程,所以需要維護一下協程的調用棧(我是在promise里維護的);
  3. 協程是用來處理非同步IO的,所以我們需要有一些組件來處理io_uring的IO(我是在io_context_worker中處理的);
  4. 當非同步IO執行完畢時,需要有什麼東西恢復協程的執行(這也是在io_context_worker中處理的);
  5. 當整個協程執行完畢時,需要銷毀協程(這也是在io_context_worker中處理的)。

在繼續閱讀之前,我先貼一下代碼。對照著代碼看的話會舒服一些:GitHub

taskpromise

taskpromise均在coco/task.hpp中定義。我對task的定位正如協程最基本的功能——能夠掛起和恢復的函數。task類本身只是對std::coroutine_handle的簡易封裝。在這裡我只介紹一下taskoperator co_await

taskoperator co_await只是返回task_awaitable,所以co_await處理的重點實際上是在task_awaitable中實現的。考慮一下,當我們co_await一個task時,我們究竟是在乾什麼:

  1. 掛起當前協程
  2. 維護協程的調用棧
  3. 啟動被co_await的協程

task_awaitable::await_suspend()中很容易看出這三點:

template <class T>
template <class Promise>
auto task_awaitable<T>::await_suspend(
    std::coroutine_handle<Promise> caller) noexcept -> coroutine_handle {
    // Set caller for this coroutine.
    promise_base &base = static_cast<promise_base &>(m_coroutine.promise());
    promise_base &caller_base = static_cast<promise_base &>(caller.promise());
    base.m_caller_or_top      = &caller_base;

    // Maintain stack bottom and top.
    promise_base *stack_bottom = caller_base.m_stack_bottom;
    assert(stack_bottom == stack_bottom->m_stack_bottom);

    base.m_stack_bottom           = stack_bottom;
    stack_bottom->m_caller_or_top = &base;

    return m_coroutine;
}

這裡著重講一下維護協程的調用棧。對於協程而言,至少存在一個協程,它是在協程外創建的(比如main函數)。因為它不在協程中,所以它也無法被co_await,這個協程我們稱之為協程的棧底。在這個協程的執行過程中,它可能會創建和執行新的協程。當新的協程執行完畢時,它們會被清理,並且將執行權交還給調用者,這與普通函數的調用棧是一樣的,只不過這個功能需要我們自己來實現。

因為promise在記憶體中的位置是不可移動的(我禁止了promise的拷貝與移動),所以我直接採用了類似鏈表的方式將協程的調用棧串了起來。在promise_base中,有兩個成員變數用來維護這個調用棧:

promise_base *m_caller_or_top;
promise_base *m_stack_bottom;

因為第一個變數被覆用了(具備不同的含義),所以可能有點亂。對於棧底協程而言,m_caller_or_top指向當前調用棧的棧頂協程,對於其他協程而言,m_caller_or_top指向自己的調用者(父協程)。這麼設計是因為棧底協程不存在調用者,所以就乾脆用這個變數存一下棧頂了。m_stack_bottom顧名思義,就是指向棧底的協程。對於棧底協程而言,這個變數指向的就是它自己了。

有了m_caller_or_top,當一個協程執行完畢時,就能方便地找到它的父協程並交換執行權。有了m_stack_bottomm_caller_or_top,我們就能很方便地找到協程的棧底和棧頂。當需要恢復task時,就能夠保證總是恢復棧頂的協程。

當協程執行完畢時,需要將控制權交還給父協程。我們考慮一下交還控制權需要做的事情:

  1. 維護調用棧,變更棧頂
  2. 如果不是棧底,則恢復父協程的執行

協程執行完畢時會去嘗試執行promisefinal_suspend(),因此這部分代碼在promisefinal_suspend()中實現。final_suspend()返回的類型叫promise_awaitable,其對應的代碼如下:

template <class Promise>
auto promise_awaitable::await_suspend(
    std::coroutine_handle<Promise> coroutine) noexcept
    -> std::coroutine_handle<> {
    promise_base &base  = static_cast<promise_base &>(coroutine.promise());
    promise_base *stack = base.m_stack_bottom;

    // Stack bottom completed. Nothing to resume.
    if (stack == &base)
        return std::noop_coroutine();

    // Set caller coroutine as the top of the stack.
    promise_base *caller   = base.m_caller_or_top;
    stack->m_caller_or_top = caller;

    // Resume caller.
    return caller->m_coroutine;
}

這段代碼應該非常易懂,不過我們很容易聯想到一個問題:既然交還了控制權,那麼它是在何時銷毀的?

其實這也不難想到,協程是由父協程銷毀的。協程的返回值存放在promise中,當父協程co_await sometask時,父協程還需要讀取子協程的promise以獲取返回值。當子協程task<T>析構時,子協程才真正被銷毀。

io_context的設計

既然協程需要非同步地處理IO,那麼必然需要個處理IO的地方,就是io_contextio_context維護了一個線程池,線程池中每一個線程均執行一個worker,每個worker均維護一個io_uring來處理本線程的IO事件和協程。當需要提交新的task給線程池的時候,由io_context分配給某一個worker執行。

這聽起來和reactor模式好像沒啥區別,用epoll寫reactor模式的時候基本上也是這麼乾的,這是因為我本來就是從reactor模式那邊搬過來的。不過相比於reactor模式,這麼做還是有不少細節要處理的。在使用io_uring時,每次我們啟動非同步IO時,都需要獲取到io_uring對象——要使用io_uring_prep_<io_operation>系列函數,我們必須從io_uring對象中獲取一個sqe。而如上所述,io_uring對象在worker中,這就造成了一個麻煩:我們無法在io_contexttask分配worker的時候將io_uring對象的引用(指針)傳遞給task。雖然使用全局變數不失為一種選擇,但我不想這麼做,因為也許使用者想要在一個進程中創建幾個不同的io_context用呢。

雖然無法直接將io_uring的引用傳遞給task,但還有一種方法可以進行交互。在所有awaitableawait_suspend中,我們可以拿到當前協程的coroutine_handle,而在io_context中,我們也能拿到taskcoroutine_handle,因此可以通過promise來傳遞io_uring的引用。

具體在實現時,我沒有傳遞io_uring的引用,而是傳遞了worker的指針。這麼做是因為當初我想同時支持IOCP,傳遞worker可以省掉一些麻煩,雖然後來放棄了。worker的指針只被放在協程棧的棧底,這麼做是因為當協程在不同worker之間轉移時,能夠很方便地修改協程所屬的worker(只需要修改棧底就可以了),儘管後來也沒有實現work-stealing隊列。在promise中,處理worker的方法如下所示:

/// \brief
///   Set I/O context for current coroutine.
/// \param[in] io_ctx
///   I/O context to be set for current coroutine.
auto set_worker(io_context_worker *io_ctx) noexcept -> void {
    m_stack_bottom->m_worker.store(io_ctx, std::memory_order_release);
}

/// \brief
///   Get I/O context for current coroutine.
/// \return
///   I/O context for current coroutine.
[[nodiscard]] auto worker() const noexcept -> io_context_worker * {
    return m_stack_bottom->m_worker.load(std::memory_order_acquire);
}

不過這麼做也有一個缺點,就是io_context侵入了promise的設計,使得task必須在io_context中才能發揮作用。

awaitable的設計

awaitablecoco/io.hpp中定義。各種awaitable的設計就比較簡單了。以read_awaitable為例,它在await_suspend()中獲取當前協程所屬的worker,然後啟用非同步IO,如下所示:

template <class Promise>
auto await_suspend(std::coroutine_handle<Promise> coro) noexcept -> bool {
    m_userdata.coroutine = coro.address();
    return this->suspend(coro.promise().worker());
}

獲取了當前協程的worker後,就轉入this->suspend()函數中去執行了。suspend()方法主要的工作是啟動非同步IO操作,並掛起當前協程:

auto coco::read_awaitable::suspend(io_context_worker *worker) noexcept -> bool {
    assert(worker != nullptr);
    m_userdata.cqe_res   = 0;
    m_userdata.cqe_flags = 0;

    io_uring     *ring = worker->io_ring();
    io_uring_sqe *sqe  = io_uring_get_sqe(ring);
    while (sqe == nullptr) [[unlikely]] {
        io_uring_submit(ring);
        sqe = io_uring_get_sqe(ring);
    }

    io_uring_prep_read(sqe, m_file, m_buffer, m_size, m_offset);
    sqe->user_data = reinterpret_cast<uint64_t>(&m_userdata);

    int result = io_uring_submit(ring);
    if (result < 0) [[unlikely]] { // Result is -errno.
        m_userdata.cqe_res = result;
        return false;
    }

    return true;
}

我沒有把啟用非同步IO部分放到模板函數中,這是因為對於各種不同的IO操作,這部分的代碼實際上大同小異。但考慮到這部分代碼的長度,放到模板中可能會導致比較嚴重的二進位膨脹,所以就單獨拿出來放到.cpp文件中了。

如果去翻我之前的commit記錄的話,會發現起初我並沒有把各種awaitable暴露出來,而是讓各種非同步操作(比如connection.receive())返回task。後來將awaitable暴露出來是考慮到諸如readwrite等操作可能會被頻繁地調用,而每次創建一個協程都需要申請一次堆記憶體,在迴圈中執行的話可能對運行效率有比較嚴重的影響。

關於代碼

再放一遍代碼地址:GitHub

這份代碼不長,總共兩三千行,而且其中一多半都是註釋,結合本文的話應該不會很難讀。這本身只是一份實驗性質的代碼,同時我希望它適合拿來學習,所以我並沒有打算塞入太多的功能。除此之外,我不是很建議你拿來放到工程中使用,因為我可能會一時興起做出一些breaking change。如果你真的有這個需要的話,我建議你fork一份代碼自己維護。

一些可能會被問到的問題

  1. 為什麼沒有實現UDP相關的IO?

因為io_uring似乎還沒有支持recvfrom,至少我實現的時候還沒有。

  1. 為什麼不使用mmap和內核共用記憶體/為什麼不向io_uring註冊文件描述符等性能相關的問題

因為我不是io_uring專家。我寫這個庫的目的是學慣用C++20的協程架構一個非同步IO庫,做這些性能優化會加大架構難度,並且花掉我大量的頭髮和時間,使我本不茂密的頭髮雪上加霜。除此之外,你會發現我也沒有實現work-stealing隊列,原因同理。Round Robin的性能雖然不至於最優,但也不會太差。

  1. 考不考慮加上HTTP支持?

考慮過,太懶所以放棄了。一方面是手寫HTTP parser還是挺麻煩的,就算能用bison自動生成也還得去啃RFC。另一方面是,TCP作為一種基於流的協議,我沒有想好如何處理連接的緩存能夠兼顧性能和使用的便捷性。如果你有這方面的需求的話,不妨先用著其他的HTTP parser,比如llhttp

  1. 為什麼沒有實現yield

我覺得非同步IO一般用不到這東西,所以就沒寫。如果需要的話就自己實現吧。

  1. 會支持Windows(IOCP)嗎?

我有考慮過支持IOCP,但IOCP不支持定時器,我又不想刪除Linux這邊的timer,所以暫且沒有這個想法。

  1. 作者你README寫得好水啊

確實,我也覺得好水啊,有沒有好心人幫忙修一修啊

  1. 為什麼不用中文寫註釋和README?

不用中文寫註釋是因為clang-format沒法處理中文的斷行。不用中文寫README是因為懶,不想寫兩份README。說不定哪天心情好了就寫一份中文版。

碎碎念

要不要塞張插圖呢?


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...