ClickHouse源碼筆記1:聚合函數的實現

来源:https://www.cnblogs.com/happenlee/archive/2020/06/02/13029547.html
-Advertisement-
Play Games

由於工作的需求,後續筆者工作需要和開源的OLAP資料庫ClickHouse打交道。ClickHouse是Yandex在2016年6月15日開源了一個分析型資料庫,以強悍的單機處理能力被稱道。 筆者在實際測試ClickHouse和閱讀ClickHouse的源碼過程之中,對"戰鬥民族"開發的資料庫十分欣 ...


由於工作的需求,後續筆者工作需要和開源的OLAP資料庫ClickHouse打交道。ClickHouse是Yandex在2016年6月15日開源了一個分析型資料庫,以強悍的單機處理能力被稱道
筆者在實際測試ClickHouse和閱讀ClickHouse的源碼過程之中,對"戰鬥民族"開發的資料庫十分欣賞。ClickHouse不僅是一個很好的資料庫學習材料,而且同時應用了大量的CPP17的新特性進行開發,也是一個大型的Modern CPP的教導資料。
筆者接下來會陸續將閱讀ClickHouse的部分心得體會與通過源碼閱讀筆記的方式和大家分享,坦白說,這種源碼閱讀筆記很難寫啊。(多一分繁瑣,少一分就模糊了~~)
第一篇文章,我們就從聚合函數的實現開始聊起~~ 上車!

1.基礎知識的梳理

什麼是聚合函數?

聚合函數: 顧名思義就是對一組數據執行聚合計算並返回結果的函數。
這類函數在資料庫之中很常見,如:count, max, min, sum等等。

ClickHouse的實現介面
  • IAggregateFunction介面
    在ClickHouse之中,定義了一個統一的聚合函數介面:IAggregateFunction.(在ClickHouse之中,所有的介面類都是以大寫的I開頭的。) 上文筆者提到的聚合函數,則都是作為抽象類IAggregateFunction的子類實現的。其中該介面最為核心的方法是下麵這5個方法:
    • add函數:最為核心的調用介面,將對應AggregateDataPtr指針之中數據取出,與列columns中的第row_num的數據進行對應的聚合計算。(這裡可以看到ClickHouse是一個純粹的列式存儲資料庫,所有的操作都是基於列的數據結構。)
    • merge函數:將兩個聚合結果進行合併的函數,通常用在併發執行聚合函數的過程之中,需要將對應的聚合結果進行合併。
    • serialize函數與deserialize函數:序列化與反序列化的函數,通常用於spill to disk或分散式場景需要保存或傳輸中間結果的。
    • addBatch函數:這是函數也是非常重要的,雖然它僅僅實現了一個for迴圈調用add函數。它通過這樣的方式來減少虛函數的調用次數,並且增加了編譯器內聯的概率。(虛函數的調用需要一次訪存指令,一次查表,最終才能定位到需要調用的函數上,這在傳統的火山模型的實現上會帶來極大的CPU開銷。
  /** Adds a value into aggregation data on which place points to.
     *  columns points to columns containing arguments of aggregation function.
     *  row_num is number of row which should be added.
     *  Additional parameter arena should be used instead of standard memory allocator if the addition requires memory allocation.
     */
    virtual void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const = 0;

    /// Merges state (on which place points to) with other state of current aggregation function.
    virtual void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;

    /// Serializes state (to transmit it over the network, for example).
    virtual void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const = 0;

    /// Deserializes state. This function is called only for empty (just created) states.
    virtual void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const = 0;
    // /** Contains a loop with calls to "add" function. You can collect arguments into array "places"
      *  and do a single call to "addBatch" for devirtualization and inlining.
      */
    virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0;

  • 抽象類IColumn
    上面的介面IAggregateFunction的函數使用到了ClickHouse的核心介面IColumn類,這裡也進行簡要的介紹。 IColumn 介面表達了所有數據在ClickHouse之中的用記憶體表達的數據結構,其他帶有具體數據類型的如ColumnUInt8、ColumnArray 等, 都實現了對應的列介面,並且在子類之中具象實現了不同的記憶體佈局。
    IColumn的子類實現細節很瑣碎,筆者這裡就暫時不展開講了,筆者這裡就簡單講講涉及到聚合函數調用部分的IColumn介面的對應方法:
    這裡columns是一個二維數組,通過columns[0]可以取到第一列。(這裡只有涉及到一列,為什麼columns是二維數組呢?因為處理array等列的時候,也是通過對應的介面,而array就需要應用二維數組了. )
    註意這裡有一個強制的類型轉換,column已經轉換為ColVecType類型了,這是模板派生出IColumn的子類。
    然後通過IColumn子類實現的getData方法獲取對應row_num行的數據進行add函數調用就完成了一次聚合函數的計算了。
    void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
    {
        const auto & column = static_cast<const ColVecType &>(*columns[0]);
        this->data(place).add(column.getData()[row_num]);
    }
  • IAggregateFunctionHelper介面
    這個介面是上面提到 IAggregateFunction的輔助子類介面,它很巧妙的通過模板的類型派生,將虛函數的調用轉換為函數指針的調用,這個在實際聚合函數的實現過程之中能夠大大提高計算的效率。
    函數addFree就實現了我上述所說的過程,但是它是一個private的函數,所以通常我們都是通過getAddressOfAddFunction獲取對應的函數地址。這在聚合查詢的過程之中能夠提高20%左右的執行效率。
template <typename Derived>
class IAggregateFunctionHelper : public IAggregateFunction
{
private:
    static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)
    {
        static_cast<const Derived &>(*that).add(place, columns, row_num, arena);
    }

public:
    IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_)
        : IAggregateFunction(argument_types_, parameters_) {}

    AddFunc getAddressOfAddFunction() const override { return &addFree; }
  • AggregateFunctionFactory類
    顧名思義,這個是一個生成聚合函數的工廠類。它的邏輯很簡單,所有ClickHouse之中所相關的聚合函數都是通過這個工廠類註冊並且獲取,然後進行調用的。
class AggregateFunctionFactory final : private boost::noncopyable, public IFactoryWithAliases<AggregateFunctionCreator>
{
public:

    static AggregateFunctionFactory & instance();

    /// Register a function by its name.
    /// No locking, you must register all functions before usage of get.
    void registerFunction(
        const String & name,
        Creator creator,
        CaseSensitiveness case_sensitiveness = CaseSensitive);

    /// Throws an exception if not found.
    AggregateFunctionPtr get(
        const String & name,
        const DataTypes & argument_types,
        const Array & parameters = {},
        int recursion_level = 0) const;

2.聚合函數的註冊流程

有了上述的背景知識,我們接下來舉個慄子。來看看一個聚合函數的實現細節,以及它是如何被使用的。

AggregateFunctionSum

筆者這裡選取了一個很簡單的聚合運算元Sum,我們來看看它實現的代碼細節。
這裡我們可以看到AggregateFunctionSum是個final類,無法被繼承了。而它繼承了上面提到的IAggregateFunctionHelp類的子類IAggregateFunctionDataHelper類。

這裡我們就重點看,這個類override了getName方法,返回了對應的名字sum。並且實現了我們上文提到的四個核心的方法。

  • add
  • merge
  • seriable
  • deserialize
template <typename T, typename TResult, typename Data>
class AggregateFunctionSum final : public IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>
{
public:
    using ResultDataType = std::conditional_t<IsDecimalNumber<T>, DataTypeDecimal<TResult>, DataTypeNumber<TResult>>;
    using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
    using ColVecResult = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<TResult>, ColumnVector<TResult>>;

    String getName() const override { return "sum"; }

    AggregateFunctionSum(const DataTypes & argument_types_)
        : IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>(argument_types_, {})
        , scale(0)
    {}

    AggregateFunctionSum(const IDataType & data_type, const DataTypes & argument_types_)
        : IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>(argument_types_, {})
        , scale(getDecimalScale(data_type))
    {}

    DataTypePtr getReturnType() const override
    {
        if constexpr (IsDecimalNumber<T>)
            return std::make_shared<ResultDataType>(ResultDataType::maxPrecision(), scale);
        else
            return std::make_shared<ResultDataType>();
    }

    void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
    {
        const auto & column = static_cast<const ColVecType &>(*columns[0]);
        this->data(place).add(column.getData()[row_num]);
    }

    void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
    {
        this->data(place).merge(this->data(rhs));
    }

    void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
    {
        this->data(place).write(buf);
    }

    void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
    {
        this->data(place).read(buf);
    }

    void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
    {
        auto & column = static_cast<ColVecResult &>(to);
        column.getData().push_back(this->data(place).get());
    }

private:
    UInt32 scale;
};

接下來,ClickHouse實現了兩種聚合計算:AggregateFunctionSumDataAggregateFunctionSumKahanData。後者是用Kahan演算法避免float類型精度損失的,我們可以暫時不細看。直接看SumData的實現。這是個模板類,之前我們講到AggregateFunction的函數就是通過AggregateDataPtr指針來獲取AggregateFunctionSumData的地址,來調用add實現聚合運算元的。我們可以看到AggregateFunctionSumData實現了前文提到的add, merge, write,read四大方法,正好和介面一一對應上了。

template <typename T>
struct AggregateFunctionSumData
{
    T sum{};

    void add(T value)
    {
        sum += value;
    }

    void merge(const AggregateFunctionSumData & rhs)
    {
        sum += rhs.sum;
    }

    void write(WriteBuffer & buf) const
    {
        writeBinary(sum, buf);
    }

    void read(ReadBuffer & buf)
    {
        readBinary(sum, buf);
    }

    T get() const
    {
        return sum;
    }
};

ClickHouse在Server啟動時。main函數之中會調用registerAggregateFunction的初始化函數註冊所有的聚合函數。
然後調用到下麵的函數:

void registerAggregateFunctionSum(AggregateFunctionFactory & factory)
{
    factory.registerFunction("sum", createAggregateFunctionSum<AggregateFunctionSumSimple>, AggregateFunctionFactory::CaseInsensitive);
    factory.registerFunction("sumWithOverflow", createAggregateFunctionSum<AggregateFunctionSumWithOverflow>);
    factory.registerFunction("sumKahan", createAggregateFunctionSum<AggregateFunctionSumKahan>);
}

這裡又調用了 factory.registerFunction("sum", createAggregateFunctionSum<AggregateFunctionSumSimple>, AggregateFunctionFactory::CaseInsensitive);來進行上述我們看到的聚合函數的註冊。這裡有一點很噁心的模板代碼,筆者這裡簡化了一下,把註冊的部分函數拉出來:

createAggregateFunctionSum(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
    AggregateFunctionPtr res;
    DataTypePtr data_type = argument_types[0];
    if (isDecimal(data_type))
        res.reset(createWithDecimalType<Function>(*data_type, *data_type, argument_types));
    else
        res.reset(createWithNumericType<Function>(*data_type, argument_types));
    return res;

這裡的Function模板就是上面的AggregateFunctionSumSimple, 而它又是下麵的模板類型:

template <typename T> using AggregateFunctionSumSimple = typename SumSimple<T>::Function;

template <typename T>
struct SumSimple
{
    /// @note It uses slow Decimal128 (cause we need such a variant). sumWithOverflow is faster for Decimal32/64
    using ResultType = std::conditional_t<IsDecimalNumber<T>, Decimal128, NearestFieldType<T>>;
    using AggregateDataType = AggregateFunctionSumData<ResultType>;
    using Function = AggregateFunctionSum<T, ResultType, AggregateDataType>;
};

不知道讀者被繞暈了沒,最終繞回來還是new出來這個AggregateFunctionSum<T, ResultType, AggregateDataType>
也就是完成了這個求和運算元的註冊,後續我們get出來就可以愉快的調用啦。(這裡這部分的模板變化比較複雜,如果看不明白可以回到源碼梳理一下~~~)

3. 小結

好了,關於聚合函數的基礎信息,和它是如何實現並且通過工廠方法註冊獲取的流程算是搞明白了。
關於其他的聚合運算元,也是大同小異的方式。筆者就不再贅述了,感興趣的可以回到源碼之中繼續一探究竟。講完了聚合函數的實現,下一篇筆者就要繼續給探究聚合函數究竟在ClickHouse之中是如何和列存結合使用,並實現向量化的~~。
筆者是一個ClickHouse的初學者,對ClickHouse有興趣的同學,也歡迎和筆者多多指教,交流。

4. 參考資料

官方文檔
ClickHouse源代碼


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 回到目錄 好了,在介紹完前面各種電路的re模型分析方法後,本節我們再看一下另一種建模思路的混合等效模型。這種模型早期曾被廣泛使用,雖然現在我們一般都用更好用的re模型,但這種混合等效模型還是有必要瞭解一下的。 而且,各大半導體器件廠商在數據規格書給出的也都是的混合等效模型參數。廠家這麼做的原因有二: ...
  • 1.基本操作 # 查看防火牆狀態 service iptables status # 停止防火牆 service iptables stop # 啟動防火牆 service iptables start # 重啟防火牆 service iptables restart # 永久關閉防火牆 chkco ...
  • 與大多數其他半導體存儲技術不同,數據存儲為磁性狀態而不是電荷,並通過測量電阻來感測而不幹擾磁性狀態。使用磁性狀態進行存儲有兩個主要好處。首先磁極化不會像電荷一樣隨時間泄漏,因此即使關閉電源,信息也會被存儲。其次在兩種狀態之間切換磁極化不涉及電子或原子的實際運動,因此不存在已知的磨損機制。 Evers ...
  • 一 手動部署-官網版1.1 獲取資源[root@master01 ~]# mkdir ingress[root@master01 ~]# cd ingress/[root@master01 ingress]# git clone https://github.com/nginxinc/kuberne ...
  • 調度器和伺服器組都必須在物理上有一個網卡通過不分段的區域網相連,即通過交換機或者高速的HUB相連,中間沒有隔有路由器。VIP地址為調度器和伺服器組共用,調度器配置的VIP地址是對外可見的,用於接收虛擬服務的請求報文;所有的伺服器把VIP地址配置在各自的Non-ARP網路設備上,它對外面是不可見的,只... ...
  • 原文鏈接:https://blog.csdn.net/wulex/article/details/103578789 微服務架構 微服務架構圖 微服務架構 k8s微服務系統架構設計 k8s+微服務 微服務 此思維導圖為微服務架構一書的內容梳理,主要圍繞微服務、架構師,架構方法論以及架構的特定等知識點 ...
  • 手機、汽車、甚至宇宙飛船,在今天的科技世界中,你幾乎到處都能看到 Linux 的身影。前兩天 SpaceX 成功將宇航員送入太空的獵鷹9號火箭與龍飛船用的也是 Linux的操作系統。身處與 Linux 相關行業的同學還是非常幸運的,行業前景光明、人才需求量大、薪資待遇也水漲船高。當然我們做 Linu ...
  • 一、將一個或多個值插入到列表頭部 命令:lpush 格式:lpush key value1 [value2] ... 127.0.0.1:6379> lpush list_key 1 (integer) 1 127.0.0.1:6379> lpush list_key 2 3 4 (integer) ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...