本文詳細介紹了stream連接池及其原理,讓我們更好的理解GaussDB(DWS)集群通信中數據交互的具體邏輯,對於GaussDB通信運維也具備一定的參考意義。 ...
本文分享自華為雲社區《GaussDB(DWS) 集群通信系列二:stream線程池設計》,作者:半島里有個小鐵盒。
1.前言
適用版本:【8.1.0(及以上)】
GaussDB(DWS)分散式架構的Stream運算元作為SQL join操作時頻繁發生的執行運算元,共存在三種模式:Gather、Redistribute、Broadcast,分別負責CN節點GATHER數據,DN節點REDISTRIBUTE和BROACAST數據。大集群高併發場景下,Stream運算元過多可能會導致通信的性能瓶頸,引起性能劣化(2000個stream同時啟動,進程初始化耗時從ms級劣化到s級),因此需要儘可能減少Stream運算元。但是在某些現場環境下,存在數據傾斜、join查詢不包含必要分佈鍵等客觀情況,Stream運算元無法有效減少,為多表join場景下的查詢時延保障帶來挑戰。因此GaussDB(DWS)對於線程初始化->線程任務執行->線程退出執行的流程方面做了stream線程池優化,減少了線程初始化與線程退出所帶來的開銷。
2.實現原理
stream線程是臨時線程,隨query啟動和退出,負責stream運算元的執行,stream線程初始化和退出都會爭搶鎖等進程級資源,在stream線程個數無法進一步優化的場景下,需要設計有效方案以減少stream線程初始化和退出的時間代價,將進程初始化耗時穩定在ms級,保障資料庫的確定性時延查詢。Stream線程池的核心思想是等stream線程執行完計劃任務,保留必要且可復用的線程信息,將線程放入線程池中。
線程池中的線程執行過程如上圖所示,其具體步驟為:
- 步驟一:線程信息初始化
- 步驟二:線程待喚醒後輕量級初始化(query級初始化)
- 步驟三:線程任務執行
- 步驟四:線程清理
- 返回步驟二:繼續等待下條query執行
在返回步驟二時,當線程等待超時、超出線程池容量(最大stream線程個數)、異常時線程已不可用,需要銷毀。
其中步驟一中線上程初始化時,需要執行的操作有:線程創建、創建相關記憶體上下文、信號處理函數註冊、記憶體追蹤信息初始化、初始化GUC選項等操作;
步驟二中線上程輕量級/查詢級初始化時,需要執行的操作有恢復GUC參數、初始化BackendParams、重置GUC參數等操作。
stream線程池為了高效管理線程的出/入池操作,採用無鎖隊列實現。定義結構體ThreadSlot保存線程池中每一個線程的信息,包含:線程狀態、線程號、線程對應的database oid、線程執行所需的信息StreamProducer、線程喚醒所需的鎖和條件變數。
當線程還未被創建時,初始化一定數量的ThreadSlot數量以預留stream線程,這些ThreadSlot被保存在數組threadSlots中。當stream線程執行完畢,需要將stream線程放置到表徵可復用線程的無鎖隊列,稱之為idleRing;當線程因為超時、異常等原因不再復用,需要退出時,將stream線程對應的ThreadSlot放置到表徵未創建線程的無鎖隊列,稱之為emptyRing。
idleRing的作用是為了快速獲取並復用線程池中的線程,emptyRing的作用是快速獲取一個未被使用的ThreadSlot結構,以創建一個新的stream線程。由於stream線程的初始化信息和database是強相關的,如果不保留database相關的信息,那麼線程初始化的時間代價仍然較高,所以線程池中的線程復用時,需要滿足database信息匹配。對於設計線程池而言,每一個database都應該對應一個idleRing。
綜上所述,基於無鎖隊列的stream線程池設計如下所示:
從上圖可以看出,一個線程池包含預留stream線程結構的threadSlots、一個表徵未創建線程的無鎖隊列emptyRing和表徵可復用線程的無鎖隊列idleRing,由於每個database對應一個idleRing,因此多個idleRing被組織為鏈表結構。
3.具體實現機制
3.1 數據結構設計
定義結構體ThreadSlot保存線程池中每一個線程的信息,包含:線程狀態、線程號、線程對應的database oid、線程執行所需的信息StreamProducer,StreamProducer是父線程向子線程傳遞的唯一結構、線程喚醒所需的鎖和條件變數。
typedef struct { int status; uint32 idx; ThreadId tid; Oid dbOid; StreamProducer* streamObj; pthread_mutex_t m_mutex; pthread_cond_t m_cond; } ThreadSlot;
定義結構體StreamThreadPool表徵線程池,其中size表示線程池中擬預留的ThreadSlot個數,ThreadSlot被保存在threadSlots數組中;無鎖隊列emptyRing用來保存未創建線程的ThreadSlot,對應地,idleRing用來保存空閑的已創建stream線程的ThreadSlot。結構如下所示:
class StreamThreadPool: public BaseObject { public: StreamThreadPool(); void Init(int num); // streamThreadPool init int Call(StreamProducer* obj); // 獲取idle線程 或 create 新線程 bool Wait(); // idle線程等待喚醒或者超時退出 ThreadSlot* GetLocalSlot(); // get streamThreadSlot void SetLocalSlot(int slotIdx); // set streamThreadSlot StreamPool* GetLocalPool(); // 獲取streamDBPool 或 新建一個 ThreadSlot* PopSlot(); // 從idleRing/emptyRing獲取一slot void PushToEmpty(ThreadSlot* slot); // 將slot直接放入emptyRing void PushToIdle(StreamPool* pool, ThreadSlot* slot); // 將slot直接放入idleRing void LocalPushToIdle(); // 根據狀態,將slot放入idleRing void LocalPushToEmpty(); // 根據狀態,將slot放入emptyRing int CleanStreamPool(const char *dbName, cleanOption cleanMode); // 根據db信息清線程 void CleanInAllStreamPool(int desNum); // 調整線程池中stream線程個數 int GetStreamNum(); // 獲取線程池中stream線程個數 bool Release(); // 判斷超時線程是否需要清理 bool TimeoutClean(); // 清理超時idle線程 private: int size; ThreadSlot* threadSlots; ArrayLockFreeQueue emptyRing; StreamPool* PoolListHead; }
定義結構體StreamPool,由於stream線程的初始化信息和database是強相關的,如果不保留database相關的信息,那麼線程初始化的時間代價仍然較高,所以線程池中的線程復用時,需要滿足database信息匹配,所以一個emptyRing和一個database相匹配,保存在鏈表PoolListHead中。
typedef struct StreamPool { Oid dbOid; ArrayLockFreeQueue idleRing; struct StreamPool* next; } StreamPool;
綜上,我們可以得到各結構間組織的直觀圖,如下所示:
上圖中threadSlots可以放在idleRing(藍色)、emptyRing(綠色)和運行空間(黃色)中。
3.2 stream線程狀態轉移DFA設計
每一個記錄線程信息的結構ThreadSlot中都保存了線程當前的狀態status,記錄線程狀態的目的是為了保障線程執行過程的有序控制,也可以通過狀態的互斥避免threadSlot不會被兩個線程同時使用。
stream線程狀態轉移用確定性有限狀態機(DFA,definite automata)表徵,共包含4個狀態:
STREAM_SLOT_EXIT、STREAM_SLOT_IDLE、STREAM_SLOT_HOLD和STREAM_SLOT_RUN狀態。其物理含義如下:
- STREAM_SLOT_EXIT:線程退出狀態,表示線程未被創建或線程已退出;
- STREAM_SLOT_IDLE:線程可復用狀態,表示線程在idleRing中,可以被覆用;
- STREAM_SLOT_HOLD:線程臨時獨占狀態,表示線程在做進入下一個狀態的準備工作;
- STREAM_SLOT_RUN:線程運行狀態,表示線程正在執行任務。
狀態間轉移條件如下所示,圖中粗箭頭表示狀態機主迴圈部分:
與狀態對應的,是slot所處的位置,slot所處的位置有三處,分別是idleRing、emptyRing和運行空間,slot從無鎖隊列中拿出,運行時所處的位置,我們稱之為運行空間。各狀態所處的位置情況如下所示:
- STREAM_SLOT_EXIT:idleRing(idle線程超時)、emptyRing(初始化或者FATAL);
- STREAM_SLOT_IDLE:idleRing
- STREAM_SLOT_HOLD:運行空間(從無鎖隊列中取出)、idleRing(idle線程超時或中斷);
- STREAM_SLOT_RUN:運行空間。
Slot的位置變化和狀態轉移的關係如下,圖中粗箭頭表示狀態機主迴圈部分:
根據各狀態所處的位置情況,從idleRing中取出的slot可能有三種狀態:EXIT、IDLE、HOLD。當取出IDLE狀態的slot,說明線程可復用;當取出EXIT狀態的slot,說明線程已退出,此時需要將slot轉存到emptyRing;當取出HOLD狀態,說明線程正在被使用,此時需要放回idleRing。
EmptyRing中slot的狀態只能是EXIT,運行空間中slot的狀態要麼是HOLD(剛取出還未運行),要麼是RUN(正在運行),不再贅述
3.3 單個stream線程執行流程
Stream線程池中stream線程整體執行流程如下圖所示:
stream線程初始化僅初始化一次,執行完query之後,便將連接歸還到連接池裡,迴圈執行上圖中黃色部分的語句,如果有異常則線程退出,連接銷毀,slot歸還至emptyRing;如果正常執行結束,將連接中內容清理,避免下個連接誤用,並將slot歸還至idleRing等待下個連接復用。
那麼stream線程復用時如何保持參數的一致性呢,對應上圖中的set GUC params階段。父線程保存自己的guc_variables在syncGucVariables中,syncGucVariables是需要傳遞給stream的結構用以保證父子線程guc參數的一致。然後父線程在初始化streamProducer時將syncGucVariables保存在該結構中傳遞。Stream線程根據streamProducer初始化自己的syncGucVariables變數,首先reset所有的guc變數,然後根據syncGucVariables修正自己的variables。
4.外部介面
4.1 GUC參數
max_stream_pool:設置stream線程池能夠容納stream線程的最大個數。該參數8.1.2及以上版本支持。預設值為65535。設置為-1表示不開啟stream線程池。該參數支持reload更新,更新規則:設置max_stream_pool小於當前可用線程個數,支持線程個數實時減少;當設置max_stream_pool大於當前idle線程個數,將由業務驅動線程個數的增加4.2 視圖
pg_thread_wait_status:展示了集群所有CN/DN進程內的所有線程的實時 等待狀態,是定位集群通信問題最重要的視圖
其中對於wait_status列狀態說明如下:
-
wait stream task:空閑的stream線程;
-
wait node:等待其他DN的數據,需要關註對端狀態;
-
flush data:發送數據給其他DN時因為對端buffer滿而阻塞;
-
wait cmd:DN上空閑的postgres線程,等待CN的下一個query;
-
none:未定義狀態,極有可能是阻塞原因;
-
synchronize quit:同步退出狀態,自身任務已完成,在等待同一個query的其他線程一起退出;
5.通過表象看stream線程池邏輯
【場景一】集群基礎行為場景——建立多資料庫場景
Create database ***;(建立多庫)
分別執行帶stream運算元的查詢;例:create table test_01(c1 int, c2 int)with(orientation=column) distribute by hash(c1);
insert into test_01 select generate_series(1,100), generate_series(1,100);analyze test_01;
select * from test_01 a, test_01 b, test_01 c, test_01 d, test_01 e, test_01 f where a.c2 =b.c2 and c.c2 = d.c2 and e.c2=f.c2 limit 100;
【場景二】集群基礎行為場景——建立多用戶場景
Create user ***;(建立多用戶)
分別執行帶stream運算元的查詢;(參考場景一示例)
查詢結束,查pgxc_thread_wait_status看DN節點:預期stream線程狀態為wait thread cond。且多user之間stream線程可以復用。例:用戶一執行完查詢,視圖中顯示共有四個stream線程線上程池,用戶二執行同樣查詢返回正確結果,視圖中的stream線程個數不變,且線程號也是一致的,則說明復用。
【場景三】集群基礎行為場景——線程清理場景
調整guc參數max_stream_pool的值,觀測是否生效;預期:當設置max_stream_pool小於當前idle線程個數,支持線程個數實時減少;當設置max_stream_pool大於當前idle線程個數,將由業務驅動線程個數的增加,但是不會超過max_stream_pool。
執行clean connection(ALL force),查看stream線程是否被清理;預期:該database的stream線程被完全清理。
執行drop database命令,查看stream線程是否被清理;預期:該database的stream線程被完全清理。6.總結
本文詳細介紹了stream連接池及其原理,讓我們更好的理解GaussDB(DWS)集群通信中數據交互的具體邏輯,對於GaussDB通信運維也具備一定的參考意義。