V1.0 2024年6月11日 發佈於博客園 目錄 目錄目錄線程池原理線程池是什麼線程池解決的問題動態創建子線程的缺點線程池相關介面線程池相關結構體struct task 任務節點線程池介面init_pool() 線程池初始化線程池初始化流程圖add_task() 向線程池添加任務add_threa ...
V1.0 2024年6月11日 發佈於博客園
目錄
目錄線程池原理
線程池是什麼
線程池(Thread Pool)是一種基於池化思想管理線程的工具,經常出現在多線程伺服器中,如MySQL。
線程過多會帶來額外的開銷,其中包括創建銷毀線程的開銷、調度線程的開銷等等,同時也降低了電腦的整體性能。線程池維護多個線程,等待監督管理者分配可併發執行的任務。這種做法,一方面避免了處理任務時創建銷毀線程開銷的代價,另一方面避免了線程數量膨脹導致的過分調度問題,保證了對內核的充分利用。
線程池模型(同進程池):
多個子線程處理同一個客戶連接上的不同任務
使用線程池可以帶來一系列好處:
- 降低資源消耗(系統資源):通過池化技術重覆利用已創建的線程,降低線程創建和銷毀造成的損耗。
- 提高線程的可管理性(系統資源):線程是稀缺資源,如果無限制創建,不僅會消耗系統資源,還會因為線程的不合理分佈導致資源調度失衡,降低系統的穩定性。使用線程池可以進行統一的分配、調優和監控。
- 提高響應速度(任務響應):任務到達時,無需等待線程創建即可立即執行。
- 提供更多更強大的功能(功能擴展):線程池具備可拓展性,允許開發人員向其中增加更多的功能。比如延時定時線程池ScheduledThreadPoolExecutor,就允許任務延期執行或定期執行。
線程池解決的問題
線程池解決的核心問題就是資源管理問題。在併發環境下,系統不能夠確定在任意時刻中,有多少任務需要執行,有多少資源需要投入。這種不確定性將帶來以下若幹問題:
- 頻繁申請/銷毀資源和調度資源,將帶來額外的消耗,可能會非常巨大。
- 對資源無限申請缺少抑制手段,易引發系統資源耗盡的風險。
- 系統無法合理管理內部的資源分佈,會降低系統的穩定性。
動態創建子線程的缺點
通過動態創建子進程(或子線程)來實現併發伺服器,這樣做有如下缺點:
- 動態創建進程(或線程)是比較耗費時間的,這將導致較慢的客戶響應。
- 動態創建的子進程(或子線程)通常只用來為一個客戶服務(除非我們做特殊的處理),這將導致系統上產生大量的細微進程(或線程)。進程(或線程)間的切換將消耗大量CPU時間。
- 動態創建的子進程是當前進程的完整映像。當前進程必須謹慎地管理其分配的文件描述符和堆記憶體等系統資源,否則子進程可能複製這些資源,從而使系統的可用資源急劇下降,進而影響伺服器的性能。
線程池相關介面
線程池相關結構體
struct task 任務節點
// 任務結點 單向鏈表的節點,類型
struct task
{
void *(*do_task)(void *arg); // 任務函數指針 指向線程要執行的任務 格式是固定的
void *arg; // 需要傳遞給任務的參數,如果不需要,則NULL
struct task *next; // 指向下一個任務結點的指針
};
線程池介面
init_pool() 線程池初始化
// 初始化線程池 pool線程池指針 threads_number 初始化線程的個數
bool init_pool(thread_pool *pool, unsigned int threads_number)
{
// 初始化互斥鎖
pthread_mutex_init(&pool->lock, NULL);
// 初始化條件量
pthread_cond_init(&pool->cond, NULL);
// 銷毀標誌 設置線程池為未關閉狀態
pool->shutdown = false; // 不銷毀
// 給任務鏈表的節點申請堆記憶體
pool->task_list = malloc(sizeof(struct task));
// 申請堆記憶體,用於存儲創建出來的線程的ID
pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);
// 錯誤處理,對malloc進行錯誤處理
if (pool->task_list == NULL || pool->tids == NULL)
{
perror("分配記憶體錯誤");
return false;
}
// 對任務鏈表中的節點的指針域進行初始化
pool->task_list->next = NULL;
// 設置線程池中處於等待狀態的任務數量最大值
pool->max_waiting_tasks = MAX_WAITING_TASKS;
// 設置等待線程處理的任務的數量為0,說明現在沒有任務
pool->waiting_tasks = 0;
// 設置線程池中活躍的線程的數量
pool->active_threads = threads_number;
int i;
// 迴圈創建活躍線程
for (i = 0; i < pool->active_threads; i++)
{
// 創建線程 把線程的ID存儲在申請的堆記憶體
if (pthread_create(&((pool->tids)[i]), NULL,
routine, (void *)pool) != 0)
{
perror("創建線程錯誤");
return false;
}
}
return true;
}
線程池初始化流程圖
mermaid
graph TD A[初始化線程池] --> B[初始化互斥鎖] B --> C[初始化條件變數] C --> D[分配任務鏈表記憶體] D --> E[分配線程ID數組記憶體] E --> F{記憶體分配是否成功?} F -- 否 --> G[列印錯誤信息] F -- 是 --> H[設置初始值] H --> I[創建指定數量線程] I --> J[線程池初始化完成]add_task() 向線程池添加任務
// 向線程池的任務鏈表中添加任務
bool add_task(thread_pool *pool,
void *(*do_task)(void *arg), void *arg)
{
// 給任務鏈表節點申請記憶體
struct task *new_task = malloc(sizeof(struct task));
if (new_task == NULL) // 檢查記憶體分配是否成功
{
perror("申請記憶體錯誤");
return false;
}
new_task->do_task = do_task; // 設置任務函數指針
new_task->arg = arg; // 設置任務參數
new_task->next = NULL; // 指針域設置為NULL 初始化任務的下一個指針
//============ LOCK =============//
pthread_mutex_lock(&pool->lock); // 加鎖,保護共用資源
//===============================//
// 說明要處理的任務的數量大於能處理的任務數量
if (pool->waiting_tasks >= MAX_WAITING_TASKS) // 檢查等待任務是否超過最大值
{
pthread_mutex_unlock(&pool->lock); // 解鎖
fprintf(stderr, "任務太多.\n"); // 列印錯誤信息
free(new_task); // 釋放新任務記憶體
return false;
}
struct task *tmp = pool->task_list; // 獲取任務鏈表頭
// 遍歷鏈表,找到單向鏈表的尾節點
while (tmp->next != NULL)
tmp = tmp->next;
// 把新的要處理的任務插入到鏈表的尾部 尾插
tmp->next = new_task;
// 要處理的任務的數量+1 (等待任務數量+1)
pool->waiting_tasks++;
//=========== UNLOCK ============//
pthread_mutex_unlock(&pool->lock); // 解鎖
//===============================//
// 喚醒第一個處於阻塞隊列中的線程
pthread_cond_signal(&pool->cond);
return true;
}
add_thread() 增加活躍線程
// 向線程池加入新線程
int add_thread(thread_pool *pool, unsigned additional_threads)
{
// 判斷需要添加的新線程的數量是否為0 如果沒有要添加的線程,直接返回
if (additional_threads == 0)
return 0;
// 計算線程池中匯流排程的數量
unsigned total_threads =
pool->active_threads + additional_threads;
int i, actual_increment = 0; // 初始化計數器
// 迴圈創建新線程
for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++)
{
// 創建新線程
if (pthread_create(&((pool->tids)[i]),
NULL, routine, (void *)pool) != 0)
{
perror("增加活躍線程錯誤"); // 列印錯誤信息
// 如果沒有成功創建任何線程,返回錯誤
if (actual_increment == 0)
return -1;
break; // 退出迴圈
}
actual_increment++; // 增加計數器
}
// 記錄此時線程池中活躍線程的總數
pool->active_threads += actual_increment; // 更新活躍線程數
return actual_increment; // 返回實際增加的線程數
}
remove_thread()刪除活躍線程
// 從線程池中刪除線程
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
if (removing_threads == 0)
return pool->active_threads; // 如果沒有要刪除的線程,直接返回
int remaining_threads = pool->active_threads - removing_threads; // 計算剩餘線程數
remaining_threads = remaining_threads > 0 ? remaining_threads : 1; // 確保至少有一個線程
int i;
for (i = pool->active_threads - 1; i > remaining_threads - 1; i--) // 迴圈取消線程
{
errno = pthread_cancel(pool->tids[i]); // 取消線程
if (errno != 0) // 檢查取消是否成功
break;
}
if (i == pool->active_threads - 1) // 如果沒有成功取消任何線程,返回錯誤
return -1;
else
{
pool->active_threads = i + 1; // 更新活躍線程數
return i + 1; // 返回剩餘線程數
}
}
destroy_pool()銷毀線程池
// 銷毀線程池
bool destroy_pool(thread_pool *pool)
{
// 1,激活所有線程 設置關閉標誌
pool->shutdown = true;
pthread_cond_broadcast(&pool->cond); // 喚醒所有等待中的線程
// 2, 等待線程們執行完畢
int i;
for (i = 0; i < pool->active_threads; i++) // 迴圈等待所有線程退出
{
/**
* pthread_join(pool->tids[i], NULL) 的作用是等待線程池中第 i 個線程終止,並清理其相關資源。通過這種方式,可以確保在銷毀線程池時,所有線程都已經安全地終止。
* pthread_join 是 POSIX 線程庫中的一個函數,用於等待一個線程的終止。它的功能類似於進程中的 wait 系統調用。
*/
errno = pthread_join(pool->tids[i], NULL); // 等待線程退出
if (errno != 0) // 檢查等待是否成功
{
printf("join tids[%d] error: %s\n",
i, strerror(errno)); // 列印錯誤信息
}
else
printf("[%u] is joined\n", (unsigned)pool->tids[i]); // 列印線程退出信息
}
// 3, 銷毀線程池
free(pool->task_list); // 釋放任務鏈表記憶體
free(pool->tids); // 釋放線程ID數組記憶體
free(pool); // 釋放線程池結構體記憶體
return true;
}
線程池實例
main.c
#include "thread_pool.h" // 包含線程池頭文件
// 任務函數, 列印一次線程任務信息,並等待n秒,模擬真正的線程任務
void *mytask(void *arg)
{
int n = (int)arg; // 要執行的秒數 將參數轉換為整數, 強制轉換才能使用
/**
* %u:無符號整數(unsigned int)
* pthread_self():這是一個 POSIX 線程庫函數,返回調用它的線程的線程 ID。
* __FUNCTION__:這是一個預定義的巨集,擴展為當前函數的名稱。它在調試和日誌記錄時非常有用,可以顯示當前正在執行的函數名。
*/
printf("[%u][%s] ==>工作將會在這裡被執行 %d 秒...\n",
(unsigned)pthread_self(), __FUNCTION__, n); // 列印任務開始信息
sleep(n);
printf("[%u][%s] ==> 工作完畢!\n",
(unsigned)pthread_self(), __FUNCTION__); // 列印任務完成信息
return NULL;
}
// 計時函數
void *count_time(void *arg)
{
int i = 0; // 初始化計數器
while (1)
{
sleep(1);
printf("sec: %d\n", ++i); // 列印經過的秒數
}
}
int main(void)
{
pthread_t a; // 定義一個線程ID
pthread_create(&a, NULL, count_time, NULL); // 創建計時線程
// 1, 初始化線程池
thread_pool *pool = malloc(sizeof(thread_pool)); // 分配記憶體給 線程池管理結構體
init_pool(pool, 2); // 初始化線程池,創建2個線程
// 2, 添加任務
printf("向線程池中投送3個任務...\n");
/**
* rand() 是 C 標準庫函數,定義在 <stdlib.h> 頭文件中。它返回一個偽隨機數,
* 範圍在 0 到 RAND_MAX 之間,RAND_MAX 是一個巨集,通常定義為 32767。
*
* rand() % 10 的結果是 rand() 產生的隨機數對 10 取模的結果,也就是說,它會返回一個 0 到 9 之間的整數(包括 0 和 9)
*
* 線程函數和任務函數通常需要一個 void * 類型的參數,以便能夠傳遞任意類型的數據。在這種情況下,任務函數 mytask 需要一個 void * 類型的參數。
*/
add_task(pool, mytask, (void *)(rand() % 10));
add_task(pool, mytask, (void *)(rand() % 10));
add_task(pool, mytask, (void *)(rand() % 10));
// 3, 檢查活躍線程數量
printf("當前活躍的線程數量: %d\n",
remove_thread(pool, 0)); // 列印當前活躍線程數
sleep(9); // 等待9秒
// 4, 添加更多任務
printf("向線程池中投送2個任務...\n"); // 列印信息
add_task(pool, mytask, (void *)(rand() % 10));
add_task(pool, mytask, (void *)(rand() % 10));
// 5, 添加線程
add_thread(pool, 2); // 添加2個線程
sleep(5); // 等待5秒
// 6, 刪除線程
printf("從線程池中刪除3個活躍線程, "
"當前線程數量: %d\n",
remove_thread(pool, 3));
// 7, 銷毀線程池
destroy_pool(pool); // 銷毀線程池
return 0; // 程式正常結束
}
實際使用時, 只需要將上述代碼中的 mytask 函數修改為我們需要實現的功能函數即可
主函數流程圖
graph TD A[主函數開始] --> B[定義線程ID] B --> C[創建計時線程] C --> D[初始化線程池] D --> E[分配記憶體給線程池] E --> F[初始化線程池,創建2個線程] F --> G[添加任務] G --> H[列印信息: throwing 3 tasks...] H --> I[添加任務1] I --> J[添加任務2] J --> K[添加任務3] K --> L[檢查活躍線程數量] L --> M[列印當前活躍線程數] M --> N[等待9秒] N --> O[添加更多任務] O --> P[列印信息: throwing another 2 tasks...] P --> Q[添加任務4] Q --> R[添加任務5] R --> S[添加線程] S --> T[添加2個線程] T --> U[等待5秒] U --> V[刪除線程] V --> W[列印信息: remove 3 threads...] W --> X[刪除3個線程] X --> Y[銷毀線程池] Y --> Z[銷毀線程池並釋放資源] Z --> AA[主函數結束]thread_pool.h
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include <stdio.h> // 標準輸入輸出庫
#include <stdbool.h> // 布爾類型庫
#include <unistd.h> // UNIX 標準庫,包含 sleep 函數
#include <stdlib.h> // 標準庫,包含 malloc 和 free 函數
#include <string.h> // 字元串處理庫
#include <strings.h> // 字元串處理庫
#include <errno.h> // 錯誤號庫
#include <pthread.h> // POSIX 線程庫
#define MAX_WAITING_TASKS 1000 // 處於等待狀態的任務數量最大為1000
#define MAX_ACTIVE_THREADS 20 // 活躍線程的最大數量, 但該數量最佳應該==CPU一次性可執行的線程數量, 例如6核12線程, 則為12
/*************第一步: 構建任務結構體******************/
// 任務結點 單向鏈表的節點,類型
struct task
{
void *(*do_task)(void *arg); // 任務函數指針 指向線程要執行的任務 格式是固定的
void *arg; // 需要傳遞給任務的參數,如果不需要,則NULL
struct task *next; // 指向下一個任務結點的指針
};
// 線程池的管理結構體
typedef struct thread_pool
{
pthread_mutex_t lock; // 互斥鎖, 用於保護任務隊列
pthread_cond_t cond; // 條件量, 代表任務隊列中任務個數的變化---如果主線程向隊列投放任務, 則可以通過條件變數來喚醒哪些睡著了的線程
bool shutdown; // 是否需要銷毀線程池, 控制線程退出, 進而銷毀整個線程池
struct task *task_list; // 用於存儲任務的鏈表, 任務隊列剛開始沒有任何任務, 是一個具有頭節點的空鏈隊列
pthread_t *tids; // 用於記錄線程池中線程的ID
unsigned max_waiting_tasks; // 線程池中處於等待狀態的任務數量最大值
unsigned waiting_tasks; // 處於等待狀態的線程數量
unsigned active_threads; // 正在活躍的線程數量
} thread_pool;
// 初始化線程池
bool init_pool(thread_pool *pool, unsigned int threads_number);
// 向線程池中添加任務
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);
// 先線程池中添加線程
int add_thread(thread_pool *pool, unsigned int additional_threads_number);
// 從線程池中刪除線程
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);
// 銷毀線程池
bool destroy_pool(thread_pool *pool);
// 任務函數 線程常式
void *routine(void *arg);
#endif
thread_pool.c
#include "thread_pool.h" // 包含線程池頭文件
// 線程取消處理函數,確保線程取消時解鎖互斥鎖
void handler(void *arg)
{
printf("[%u] 結束了.\n",
(unsigned)pthread_self()); // 列印線程結束信息
pthread_mutex_unlock((pthread_mutex_t *)arg); // 解鎖互斥鎖
}
// 線程執行的任務函數
void *routine(void *arg)
{
// 調試
#ifdef DEBUG
printf("[%u] is started.\n",
(unsigned)pthread_self()); // 列印線程開始信息
#endif
// 把需要傳遞給線程任務的參數進行備份
thread_pool *pool = (thread_pool *)arg; // 將傳入的參數轉換為線程池指針
struct task *p; // 定義一個任務指針
while (1) // 無限迴圈,持續處理任務
{
/*
** push a cleanup functon handler(), make sure that
** the calling thread will release the mutex properly
** even if it is cancelled during holding the mutex.
**
** NOTE:
** pthread_cleanup_push() is a macro which includes a
** loop in it, so if the specified field of codes that
** paired within pthread_cleanup_push() and pthread_
** cleanup_pop() use 'break' may NOT break out of the
** truely loop but break out of these two macros.
** see line 61 below.
*/
/*
* 註意:
* 推送一個清理函數handler(),確保調用線程將正確釋放互斥量,即使它在持有互斥量期間被取消。
*
* pthread_cleanup_push()是一個巨集,其中包含一個迴圈,
* 所以如果在pthread_cleanup_push()和pthread_ cleanup_pop()中配對的代碼的指定欄位使用` break `可能不會跳出真正的迴圈,
* 而是跳出這兩個巨集。參見下麵的第61行。
*/
//================================================//
pthread_cleanup_push(handler, (void *)&pool->lock); // 註冊取消處理函數
pthread_mutex_lock(&pool->lock); // 加鎖,保護共用資源
//================================================//
// 1,如果沒有任務且線程池未關閉,則等待
while (pool->waiting_tasks == 0 && !pool->shutdown)
{
pthread_cond_wait(&pool->cond, &pool->lock); // 等待條件變數
}
// 2, 如果沒有任務且線程池已關閉,則退出
if (pool->waiting_tasks == 0 && pool->shutdown == true)
{
pthread_mutex_unlock(&pool->lock); // 解鎖
pthread_exit(NULL); // CANNOT use 'break'; 退出線程
}
// 3, 有任務則取出任務
p = pool->task_list->next; // 獲取第一個任務
pool->task_list->next = p->next; // 將任務從鏈表中移除
pool->waiting_tasks--; // 減少等待任務計數
//================================================//
pthread_mutex_unlock(&pool->lock); // 解鎖
pthread_cleanup_pop(0); // 取消註冊的取消處理函數
//================================================//
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); // 禁止線程取消
(p->do_task)(p->arg); // 執行任務
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); // 允許線程取消
free(p); // 釋放任務記憶體
}
pthread_exit(NULL); // 退出線程
}
// 初始化線程池 pool線程池指針 threads_number 初始化線程的個數
bool init_pool(thread_pool *pool, unsigned int threads_number)
{
// 初始化互斥鎖
pthread_mutex_init(&pool->lock, NULL);
// 初始化條件量
pthread_cond_init(&pool->cond, NULL);
// 銷毀標誌 設置線程池為未關閉狀態
pool->shutdown = false; // 不銷毀
// 給任務鏈表的節點申請堆記憶體
pool->task_list = malloc(sizeof(struct task));
// 申請堆記憶體,用於存儲創建出來的線程的ID
pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);
// 錯誤處理,對malloc進行錯誤處理
if (pool->task_list == NULL || pool->tids == NULL)
{
perror("分配記憶體錯誤");
return false;
}
// 對任務鏈表中的節點的指針域進行初始化
pool->task_list->next = NULL;
// 設置線程池中處於等待狀態的任務數量最大值
pool->max_waiting_tasks = MAX_WAITING_TASKS;
// 設置等待線程處理的任務的數量為0,說明現在沒有任務
pool->waiting_tasks = 0;
// 設置線程池中活躍的線程的數量
pool->active_threads = threads_number;
int i;
// 迴圈創建活躍線程
for (i = 0; i < pool->active_threads; i++)
{
// 創建線程 把線程的ID存儲在申請的堆記憶體
if (pthread_create(&((pool->tids)[i]), NULL,
routine, (void *)pool) != 0)
{
perror("創建線程錯誤");
return false;
}
// 用於調試
#ifdef DEBUG
printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
(unsigned)pthread_self(), __FUNCTION__,
i, (unsigned)pool->tids[i]); // 列印線程創建信息
#endif
}
return true;
}
// 向線程池的任務鏈表中添加任務
bool add_task(thread_pool *pool,
void *(*do_task)(void *arg), void *arg)
{
// 給任務鏈表節點申請記憶體
struct task *new_task = malloc(sizeof(struct task));
if (new_task == NULL) // 檢查記憶體分配是否成功
{
perror("申請記憶體錯誤");
return false;
}
new_task->do_task = do_task; // 設置任務函數指針
new_task->arg = arg; // 設置任務參數
new_task->next = NULL; // 指針域設置為NULL 初始化任務的下一個指針
//============ LOCK =============//
pthread_mutex_lock(&pool->lock); // 加鎖,保護共用資源
//===============================//
// 說明要處理的任務的數量大於能處理的任務數量
if (pool->waiting_tasks >= MAX_WAITING_TASKS) // 檢查等待任務是否超過最大值
{
pthread_mutex_unlock(&pool->lock); // 解鎖
fprintf(stderr, "任務太多.\n"); // 列印錯誤信息
free(new_task); // 釋放新任務記憶體
return false;
}
struct task *tmp = pool->task_list; // 獲取任務鏈表頭
// 遍歷鏈表,找到單向鏈表的尾節點
while (tmp->next != NULL)
tmp = tmp->next;
// 把新的要處理的任務插入到鏈表的尾部 尾插
tmp->next = new_task;
// 要處理的任務的數量+1 (等待任務數量+1)
pool->waiting_tasks++;
//=========== UNLOCK ============//
pthread_mutex_unlock(&pool->lock); // 解鎖
//===============================//
// 調試
#ifdef DEBUG
printf("[%u][%s] ==> a new task has been added.\n",
(unsigned)pthread_self(), __FUNCTION__); // 列印任務添加信息
#endif
// 喚醒第一個處於阻塞隊列中的線程
pthread_cond_signal(&pool->cond);
return true;
}
// 向線程池加入新線程
int add_thread(thread_pool *pool, unsigned additional_threads)
{
// 判斷需要添加的新線程的數量是否為0 如果沒有要添加的線程,直接返回
if (additional_threads == 0)
return 0;
// 計算線程池中匯流排程的數量
unsigned total_threads =
pool->active_threads + additional_threads;
int i, actual_increment = 0; // 初始化計數器
// 迴圈創建新線程
for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++)
{
// 創建新線程
if (pthread_create(&((pool->tids)[i]),
NULL, routine, (void *)pool) != 0)
{
perror("增加活躍線程錯誤"); // 列印錯誤信息
// 如果沒有成功創建任何線程,返回錯誤
if (actual_increment == 0)
return -1;
break; // 退出迴圈
}
actual_increment++; // 增加計數器
#ifdef DEBUG
printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
(unsigned)pthread_self(), __FUNCTION__,
i, (unsigned)pool->tids[i]); // 列印線程創建信息
#endif
}
// 記錄此時線程池中活躍線程的總數
pool->active_threads += actual_increment; // 更新活躍線程數
return actual_increment; // 返回實際增加的線程數
}
// 從線程池中刪除線程
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
if (removing_threads == 0)
return pool->active_threads; // 如果沒有要刪除的線程,直接返回
int remaining_threads = pool->active_threads - removing_threads; // 計算剩餘線程數
remaining_threads = remaining_threads > 0 ? remaining_threads : 1; // 確保至少有一個線程
int i;
for (i = pool->active_threads - 1; i > remaining_threads - 1; i--) // 迴圈取消線程
{
errno = pthread_cancel(pool->tids[i]); // 取消線程
if (errno != 0) // 檢查取消是否成功
break;
#ifdef DEBUG
printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",
(unsigned)pthread_self(), __FUNCTION__,
i, (unsigned)pool->tids[i]); // 列印線程取消信息
#endif
}
if (i == pool->active_threads - 1) // 如果沒有成功取消任何線程,返回錯誤
return -1;
else
{
pool->active_threads = i + 1; // 更新活躍線程數
return i + 1; // 返回剩餘線程數
}
}
// 銷毀線程池
bool destroy_pool(thread_pool *pool)
{
// 1,激活所有線程 設置關閉標誌
pool->shutdown = true;
pthread_cond_broadcast(&pool->cond); // 喚醒所有等待中的線程
// 2, 等待線程們執行完畢
int i;
for (i = 0; i < pool->active_threads; i++) // 迴圈等待所有線程退出
{
/**
* pthread_join(pool->tids[i], NULL) 的作用是等待線程池中第 i 個線程終止,並清理其相關資源。通過這種方式,可以確保在銷毀線程池時,所有線程都已經安全地終止。
* pthread_join 是 POSIX 線程庫中的一個函數,用於等待一個線程的終止。它的功能類似於進程中的 wait 系統調用。
*/
errno = pthread_join(pool->tids[i], NULL); // 等待線程退出
if (errno != 0) // 檢查等待是否成功
{
printf("join tids[%d] error: %s\n",
i, strerror(errno)); // 列印錯誤信息
}
else
printf("[%u] is joined\n", (unsigned)pool->tids[i]); // 列印線程退出信息
}
// 3, 銷毀線程池
free(pool->task_list); // 釋放任務鏈表記憶體
free(pool->tids); // 釋放線程ID數組記憶體
free(pool); // 釋放線程池結構體記憶體
return true;
}
線程執行的任務函數流程圖
void *routine(void *arg)
mermaid
graph TD A[線程執行的任務函數開始] --> B[註冊取消處理函數] B --> C[加鎖] C --> D{是否有任務 且 線程池未關閉?} D -- 否 --> E[等待條件變數] D -- 是 --> F{是否沒有任務 且 線程池已關閉?} F -- 是 --> G[解鎖並退出線程] F -- 否 --> H[取出任務] H --> I[從鏈表中移除任務] I --> J[減少等待任務計數] J --> K[解鎖] K --> L[取消註冊的取消處理函數] L --> M[禁止線程取消] M --> N[執行任務] N --> O[允許線程取消] O --> P[釋放任務記憶體] P --> A銷毀線程池流程圖
mermaid
graph TD A[銷毀線程池] --> B[設置關閉標誌] B --> C[喚醒所有等待線程] C --> D[等待所有線程終止] D --> E[釋放任務鏈表記憶體] E --> F[釋放線程ID數組記憶體] F --> G[釋放線程池結構體記憶體] G --> H[線程池銷毀完成]參考
- Hexo (smartyue076.github.io)
- 線程池原理與實現_嗶哩嗶哩_bilibili
- Linux環境編程圖文指南(配視頻教程) (豆瓣) (douban.com)
- Linux高性能伺服器編程 (豆瓣) (douban.com)
本文來自博客園,作者:舟清颺,轉載請註明原文鏈接:https://www.cnblogs.com/zqingyang/p/18241374