使用C++20協程和io_uring優雅地實現非同步IO

来源:https://www.cnblogs.com/icysky/p/18098451
-Advertisement-
Play Games

距離2020年已經過去很久了,各大編譯器對於C++20各項標準的支持也日趨完善,無棧協程也是其中之一,所以我就嘗試著拿協程與`io_uring`實現了一下proactor模式,這篇文章用來記錄一下我的設計和想法。除此之外,我們能在網路上找到許多優秀的C++20協程的教程以及許多優秀的協程應用(庫),... ...


距離2020年已經過去很久了,各大編譯器對於C++20各項標準的支持也日趨完善,無棧協程也是其中之一,所以我就嘗試著拿協程與io_uring實現了一下proactor模式,這篇文章用來記錄一下我的設計和想法。除此之外,我們能在網路上找到許多優秀的C++20協程的教程以及許多優秀的協程應用(庫),但從協程入門到架構出成熟的應用(庫)之間還存在著不小的鴻溝,而直接去啃大型工程的源代碼絕對不算是一種高效率的學習方式。所以,如果這篇文章能夠在這方面提供一定的幫助的話那就再好不過了。

正如上所述,這篇文章是介紹基於C++20協程實現非同步IO的,而不是介紹C++20協程的,因此有一定的閱讀門檻。在閱讀之前,你應當至少熟悉一下C++20協程。

為什麼要使用協程

因為協程能夠讓我們像寫同步IO那樣來實現非同步IO,如下所示:

auto foo(tcp_connection connection) -> task<void> {
    char buffer[1024];
    int result = co_await connection.recv(buffer, sizeof(buffer));
    // do something...
    result = co_await connection.send(buffer, result);
    // do something...
    co_return;
}

如果我們合理地實現了協程的掛起、恢復等操作,那麼當我們執行co_await connection.recv時,我們實際上希望代碼執行的操作如下:

  1. 告訴操作系統,監聽recv操作,等待對方發送數據;
  2. 掛起當前協程;
  3. 去處理別的事情。

當操作系統接收到recv的數據時,執行以下操作:

  1. 處理recv,把數據讀進來;
  2. 恢復之前掛起的協程,從掛起的地方恢復執行。

這就是我們需要協程做的事情。如果你熟悉reactor模式的話,這應該並不陌生——我們只是把回調函數換成了協程而已。那麼回到這一部分的標題——我們為什麼要使用協程而不是回調函數呢?——因為使用協程寫出來的代碼更好看,也更好維護,僅此而已。

關於為什麼要使用非同步IO:非同步IO能夠提高程式的吞吐量。試想一下一臺基於同步IO的HTTP伺服器,一種不難想到的實現方式是每accept一個連接,就創建一個新的線程來處理這個連接的IO,最後當這個連接斷開時銷毀這個線程。這麼實現當然可以,但創建和銷毀線程的開銷是很大的,而且這要求線程調度器能夠很好地分配線程之間的時間。使用IO多路復用的方式能夠利用有限(甚至單線程)處理許多連接的IO,而不至於浪費過多的資源。

關於協程和回調函數的性能:我想二者應當是差不多的,或者協程可能還會更差一點,因為掛起協程和恢復協程需要執行一些額外操作。不過既然性能還沒有緊張到需要去摳dpdk,那麼和這一點點的性能優勢相比較的話,代碼的可維護性和可讀性絕對也是不容忽視的問題。

關於非同步IO的性能:我們通常講非同步IO性能更好指的是吞吐量,而不是低延時。不論是reactor模式還是proactor模式,其設計主旨都是要讓CPU在等待IO的時候去處理別的事情,不要讓CPU閑下來。如果低延時很重要的話,應當考慮使用同步IO與輪詢的方式。

設計思路

根據第一部分,設計的基調就能夠定下來了。我們重新考慮一下需要做的事情:

  1. 當我們執行到co_await read(...)等非同步IO時,掛起當前協程,去處理其他事情;
  2. 當非同步IO執行完畢時,恢復協程的執行。

仔細思考一下上述兩點,我們就能夠得到所有要做的事情:

  1. 我們需要適時掛起協程,所以首先我們要實現協程task
  2. 協程可能會調用協程,所以需要維護一下協程的調用棧(我是在promise里維護的);
  3. 協程是用來處理非同步IO的,所以我們需要有一些組件來處理io_uring的IO(我是在io_context_worker中處理的);
  4. 當非同步IO執行完畢時,需要有什麼東西恢復協程的執行(這也是在io_context_worker中處理的);
  5. 當整個協程執行完畢時,需要銷毀協程(這也是在io_context_worker中處理的)。

在繼續閱讀之前,我先貼一下代碼。對照著代碼看的話會舒服一些:GitHub

taskpromise

taskpromise均在coco/task.hpp中定義。我對task的定位正如協程最基本的功能——能夠掛起和恢復的函數。task類本身只是對std::coroutine_handle的簡易封裝。在這裡我只介紹一下taskoperator co_await

taskoperator co_await只是返回task_awaitable,所以co_await處理的重點實際上是在task_awaitable中實現的。考慮一下,當我們co_await一個task時,我們究竟是在乾什麼:

  1. 掛起當前協程
  2. 維護協程的調用棧
  3. 啟動被co_await的協程

task_awaitable::await_suspend()中很容易看出這三點:

template <class T>
template <class Promise>
auto task_awaitable<T>::await_suspend(
    std::coroutine_handle<Promise> caller) noexcept -> coroutine_handle {
    // Set caller for this coroutine.
    promise_base &base = static_cast<promise_base &>(m_coroutine.promise());
    promise_base &caller_base = static_cast<promise_base &>(caller.promise());
    base.m_caller_or_top      = &caller_base;

    // Maintain stack bottom and top.
    promise_base *stack_bottom = caller_base.m_stack_bottom;
    assert(stack_bottom == stack_bottom->m_stack_bottom);

    base.m_stack_bottom           = stack_bottom;
    stack_bottom->m_caller_or_top = &base;

    return m_coroutine;
}

這裡著重講一下維護協程的調用棧。對於協程而言,至少存在一個協程,它是在協程外創建的(比如main函數)。因為它不在協程中,所以它也無法被co_await,這個協程我們稱之為協程的棧底。在這個協程的執行過程中,它可能會創建和執行新的協程。當新的協程執行完畢時,它們會被清理,並且將執行權交還給調用者,這與普通函數的調用棧是一樣的,只不過這個功能需要我們自己來實現。

因為promise在記憶體中的位置是不可移動的(我禁止了promise的拷貝與移動),所以我直接採用了類似鏈表的方式將協程的調用棧串了起來。在promise_base中,有兩個成員變數用來維護這個調用棧:

promise_base *m_caller_or_top;
promise_base *m_stack_bottom;

因為第一個變數被覆用了(具備不同的含義),所以可能有點亂。對於棧底協程而言,m_caller_or_top指向當前調用棧的棧頂協程,對於其他協程而言,m_caller_or_top指向自己的調用者(父協程)。這麼設計是因為棧底協程不存在調用者,所以就乾脆用這個變數存一下棧頂了。m_stack_bottom顧名思義,就是指向棧底的協程。對於棧底協程而言,這個變數指向的就是它自己了。

有了m_caller_or_top,當一個協程執行完畢時,就能方便地找到它的父協程並交換執行權。有了m_stack_bottomm_caller_or_top,我們就能很方便地找到協程的棧底和棧頂。當需要恢復task時,就能夠保證總是恢復棧頂的協程。

當協程執行完畢時,需要將控制權交還給父協程。我們考慮一下交還控制權需要做的事情:

  1. 維護調用棧,變更棧頂
  2. 如果不是棧底,則恢復父協程的執行

協程執行完畢時會去嘗試執行promisefinal_suspend(),因此這部分代碼在promisefinal_suspend()中實現。final_suspend()返回的類型叫promise_awaitable,其對應的代碼如下:

template <class Promise>
auto promise_awaitable::await_suspend(
    std::coroutine_handle<Promise> coroutine) noexcept
    -> std::coroutine_handle<> {
    promise_base &base  = static_cast<promise_base &>(coroutine.promise());
    promise_base *stack = base.m_stack_bottom;

    // Stack bottom completed. Nothing to resume.
    if (stack == &base)
        return std::noop_coroutine();

    // Set caller coroutine as the top of the stack.
    promise_base *caller   = base.m_caller_or_top;
    stack->m_caller_or_top = caller;

    // Resume caller.
    return caller->m_coroutine;
}

這段代碼應該非常易懂,不過我們很容易聯想到一個問題:既然交還了控制權,那麼它是在何時銷毀的?

其實這也不難想到,協程是由父協程銷毀的。協程的返回值存放在promise中,當父協程co_await sometask時,父協程還需要讀取子協程的promise以獲取返回值。當子協程task<T>析構時,子協程才真正被銷毀。

io_context的設計

既然協程需要非同步地處理IO,那麼必然需要個處理IO的地方,就是io_contextio_context維護了一個線程池,線程池中每一個線程均執行一個worker,每個worker均維護一個io_uring來處理本線程的IO事件和協程。當需要提交新的task給線程池的時候,由io_context分配給某一個worker執行。

這聽起來和reactor模式好像沒啥區別,用epoll寫reactor模式的時候基本上也是這麼乾的,這是因為我本來就是從reactor模式那邊搬過來的。不過相比於reactor模式,這麼做還是有不少細節要處理的。在使用io_uring時,每次我們啟動非同步IO時,都需要獲取到io_uring對象——要使用io_uring_prep_<io_operation>系列函數,我們必須從io_uring對象中獲取一個sqe。而如上所述,io_uring對象在worker中,這就造成了一個麻煩:我們無法在io_contexttask分配worker的時候將io_uring對象的引用(指針)傳遞給task。雖然使用全局變數不失為一種選擇,但我不想這麼做,因為也許使用者想要在一個進程中創建幾個不同的io_context用呢。

雖然無法直接將io_uring的引用傳遞給task,但還有一種方法可以進行交互。在所有awaitableawait_suspend中,我們可以拿到當前協程的coroutine_handle,而在io_context中,我們也能拿到taskcoroutine_handle,因此可以通過promise來傳遞io_uring的引用。

具體在實現時,我沒有傳遞io_uring的引用,而是傳遞了worker的指針。這麼做是因為當初我想同時支持IOCP,傳遞worker可以省掉一些麻煩,雖然後來放棄了。worker的指針只被放在協程棧的棧底,這麼做是因為當協程在不同worker之間轉移時,能夠很方便地修改協程所屬的worker(只需要修改棧底就可以了),儘管後來也沒有實現work-stealing隊列。在promise中,處理worker的方法如下所示:

/// \brief
///   Set I/O context for current coroutine.
/// \param[in] io_ctx
///   I/O context to be set for current coroutine.
auto set_worker(io_context_worker *io_ctx) noexcept -> void {
    m_stack_bottom->m_worker.store(io_ctx, std::memory_order_release);
}

/// \brief
///   Get I/O context for current coroutine.
/// \return
///   I/O context for current coroutine.
[[nodiscard]] auto worker() const noexcept -> io_context_worker * {
    return m_stack_bottom->m_worker.load(std::memory_order_acquire);
}

不過這麼做也有一個缺點,就是io_context侵入了promise的設計,使得task必須在io_context中才能發揮作用。

awaitable的設計

awaitablecoco/io.hpp中定義。各種awaitable的設計就比較簡單了。以read_awaitable為例,它在await_suspend()中獲取當前協程所屬的worker,然後啟用非同步IO,如下所示:

template <class Promise>
auto await_suspend(std::coroutine_handle<Promise> coro) noexcept -> bool {
    m_userdata.coroutine = coro.address();
    return this->suspend(coro.promise().worker());
}

獲取了當前協程的worker後,就轉入this->suspend()函數中去執行了。suspend()方法主要的工作是啟動非同步IO操作,並掛起當前協程:

auto coco::read_awaitable::suspend(io_context_worker *worker) noexcept -> bool {
    assert(worker != nullptr);
    m_userdata.cqe_res   = 0;
    m_userdata.cqe_flags = 0;

    io_uring     *ring = worker->io_ring();
    io_uring_sqe *sqe  = io_uring_get_sqe(ring);
    while (sqe == nullptr) [[unlikely]] {
        io_uring_submit(ring);
        sqe = io_uring_get_sqe(ring);
    }

    io_uring_prep_read(sqe, m_file, m_buffer, m_size, m_offset);
    sqe->user_data = reinterpret_cast<uint64_t>(&m_userdata);

    int result = io_uring_submit(ring);
    if (result < 0) [[unlikely]] { // Result is -errno.
        m_userdata.cqe_res = result;
        return false;
    }

    return true;
}

我沒有把啟用非同步IO部分放到模板函數中,這是因為對於各種不同的IO操作,這部分的代碼實際上大同小異。但考慮到這部分代碼的長度,放到模板中可能會導致比較嚴重的二進位膨脹,所以就單獨拿出來放到.cpp文件中了。

如果去翻我之前的commit記錄的話,會發現起初我並沒有把各種awaitable暴露出來,而是讓各種非同步操作(比如connection.receive())返回task。後來將awaitable暴露出來是考慮到諸如readwrite等操作可能會被頻繁地調用,而每次創建一個協程都需要申請一次堆記憶體,在迴圈中執行的話可能對運行效率有比較嚴重的影響。

關於代碼

再放一遍代碼地址:GitHub

這份代碼不長,總共兩三千行,而且其中一多半都是註釋,結合本文的話應該不會很難讀。這本身只是一份實驗性質的代碼,同時我希望它適合拿來學習,所以我並沒有打算塞入太多的功能。除此之外,我不是很建議你拿來放到工程中使用,因為我可能會一時興起做出一些breaking change。如果你真的有這個需要的話,我建議你fork一份代碼自己維護。

一些可能會被問到的問題

  1. 為什麼沒有實現UDP相關的IO?

因為io_uring似乎還沒有支持recvfrom,至少我實現的時候還沒有。

  1. 為什麼不使用mmap和內核共用記憶體/為什麼不向io_uring註冊文件描述符等性能相關的問題

因為我不是io_uring專家。我寫這個庫的目的是學慣用C++20的協程架構一個非同步IO庫,做這些性能優化會加大架構難度,並且花掉我大量的頭髮和時間,使我本不茂密的頭髮雪上加霜。除此之外,你會發現我也沒有實現work-stealing隊列,原因同理。Round Robin的性能雖然不至於最優,但也不會太差。

  1. 考不考慮加上HTTP支持?

考慮過,太懶所以放棄了。一方面是手寫HTTP parser還是挺麻煩的,就算能用bison自動生成也還得去啃RFC。另一方面是,TCP作為一種基於流的協議,我沒有想好如何處理連接的緩存能夠兼顧性能和使用的便捷性。如果你有這方面的需求的話,不妨先用著其他的HTTP parser,比如llhttp

  1. 為什麼沒有實現yield

我覺得非同步IO一般用不到這東西,所以就沒寫。如果需要的話就自己實現吧。

  1. 會支持Windows(IOCP)嗎?

我有考慮過支持IOCP,但IOCP不支持定時器,我又不想刪除Linux這邊的timer,所以暫且沒有這個想法。

  1. 作者你README寫得好水啊

確實,我也覺得好水啊,有沒有好心人幫忙修一修啊

  1. 為什麼不用中文寫註釋和README?

不用中文寫註釋是因為clang-format沒法處理中文的斷行。不用中文寫README是因為懶,不想寫兩份README。說不定哪天心情好了就寫一份中文版。

碎碎念

要不要塞張插圖呢?


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
一周排行
    -Advertisement-
    Play Games
  • 前言 在我們開發過程中基本上不可或缺的用到一些敏感機密數據,比如SQL伺服器的連接串或者是OAuth2的Secret等,這些敏感數據在代碼中是不太安全的,我們不應該在源代碼中存儲密碼和其他的敏感數據,一種推薦的方式是通過Asp.Net Core的機密管理器。 機密管理器 在 ASP.NET Core ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 順序棧的介面程式 目錄順序棧的介面程式頭文件創建順序棧入棧出棧利用棧將10進位轉16進位數驗證 頭文件 #include <stdio.h> #include <stdbool.h> #include <stdlib.h> 創建順序棧 // 指的是順序棧中的元素的數據類型,用戶可以根據需要進行修改 ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • C總結與剖析:關鍵字篇 -- <<C語言深度解剖>> 目錄C總結與剖析:關鍵字篇 -- <<C語言深度解剖>>程式的本質:二進位文件變數1.變數:記憶體上的某個位置開闢的空間2.變數的初始化3.為什麼要有變數4.局部變數與全局變數5.變數的大小由類型決定6.任何一個變數,記憶體賦值都是從低地址開始往高地 ...
  • 如果讓你來做一個有狀態流式應用的故障恢復,你會如何來做呢? 單機和多機會遇到什麼不同的問題? Flink Checkpoint 是做什麼用的?原理是什麼? ...
  • C++ 多級繼承 多級繼承是一種面向對象編程(OOP)特性,允許一個類從多個基類繼承屬性和方法。它使代碼更易於組織和維護,並促進代碼重用。 多級繼承的語法 在 C++ 中,使用 : 符號來指定繼承關係。多級繼承的語法如下: class DerivedClass : public BaseClass1 ...
  • 前言 什麼是SpringCloud? Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的開發便利性簡化了分散式系統的開發,比如服務註冊、服務發現、網關、路由、鏈路追蹤等。Spring Cloud 並不是重覆造輪子,而是將市面上開發得比較好的模塊集成進去,進行封裝,從 ...
  • class_template 類模板和函數模板的定義和使用類似,我們已經進行了介紹。有時,有兩個或多個類,其功能是相同的,僅僅是數據類型不同。類模板用於實現類所需數據的類型參數化 template<class NameType, class AgeType> class Person { publi ...
  • 目錄system v IPC簡介共用記憶體需要用到的函數介面shmget函數--獲取對象IDshmat函數--獲得映射空間shmctl函數--釋放資源共用記憶體實現思路註意 system v IPC簡介 消息隊列、共用記憶體和信號量統稱為system v IPC(進程間通信機制),V是羅馬數字5,是UNI ...