1.線程池本質 多個線程組成的一個集合,目的為了併發執行任務,定義時是一個結構體,成員有互斥鎖,條件變數,任務鏈隊列指針,任務鏈隊列中等待的任務個數,當前活躍的線程數量,線程ID,線程銷毀標記等 2.線程池的關鍵技術 (1)萬能函數指針(通用函數指針): *void *(*p)(void ) ( ...
1.線程池本質
多個線程組成的一個集合,目的為了併發執行任務,定義時是一個結構體,成員有互斥鎖,條件變數,任務鏈隊列指針,任務鏈隊列中等待的任務個數,當前活躍的線程數量,線程ID,線程銷毀標記等
2.線程池的關鍵技術
(1)萬能函數指針(通用函數指針): *void *(*p)(void )
(使用技巧:函數的參數個數超過1個時,參數可以打包成結構體,多個參數就變成一個參數(全部包含在結構體裡面了))
原理:該函數需要用到互斥鎖在完成一個任務後減少任務數量,解鎖後繼續下一個任務,還要用到條件變數在任務全部完成時通過判斷任務數量和結束標誌位退出。
(2)封裝線程池有關的介面函數
三大基本函數需要我們去封裝
第一個:初始化線程池
原理:通過對線程池結構體中的成員初始化讓線程池進入工作模式,隨後使用迴圈創建對應數量的線程
第二個:添加任務
原理:通過動態分配記憶體準備新的記憶體空間分配給新的任務,在添加任務時利用互斥鎖上鎖防止任務函數完成任務時減少任務數量帶來的沖中突,將任務尾插到任務鏈表中,並目對線程池結構體中各個成員變數進行更新。
第三個:線程池的銷毀(回收線程,想辦法讓線程的任務函數退出)
原理:通過改變線程池結構體成員變數中的結束標誌位,令所有線程能夠退出,隨後在任務函數中開始退出線程,併在這個線程池銷毀函數中回收所有線程。
線程池實例源碼列舉如下:
頭文件(thread_pool.h)
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <pthread.h>
#define MAX_WAITING_TASKS 1000 // 處於等待狀態的線程數量最大為1000
#define MAX_ACTIVE_THREADS 20 // 活躍的線程數量
// 任務結點 單向鏈表的節點,類型
struct task
{
void *(*do_task)(void *arg); // 任務函數指針 指向線程要執行的任務 格式是固定的
void *arg; // 需要傳遞給任務的參數,如果不需要,則NULL
struct task *next; // 指向下一個任務結點的指針
};
// 線程池的管理結構體
typedef struct thread_pool
{
pthread_mutex_t lock; // 互斥鎖
pthread_cond_t cond; // 條件量
bool shutdown;
/*是否需要銷毀線程池,用於指示線程池是否處於銷毀狀態。它的作用是線上程池需要被銷毀時,向線程池中的工作線程發出信號,告知它們停止接受新的任務,並逐漸退出。具體來說,當 shutdown 標記被設置為 true 時,線程池將不再接受新的任務提交,但會繼續執行已經提交的任務,直到所有任務都執行完畢。一旦線程池中的任務執行完畢,工作線程就會逐個退出,釋放相關資源,最終銷毀整個線程池。*/
struct task *task_list; // 用於存儲任務的鏈表
pthread_t *tids; // 用於記錄線程池中線程的ID
unsigned max_waiting_tasks; // 線程池中線程的數量最大值
unsigned waiting_tasks; // 處於等待狀態的線程數量
unsigned active_threads; // 正在活躍的線程數量
} thread_pool;
// 初始化線程池
bool init_pool(thread_pool *pool, unsigned int threads_number);
// 向線程池中添加任務
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);
// 先線程池中添加線程
int add_thread(thread_pool *pool, unsigned int additional_threads_number);
// 從線程池中刪除線程
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);
// 銷毀線程池
bool destroy_pool(thread_pool *pool);
// 任務函數
void *routine(void *arg);
介面函數的實現源碼(thread_pool.c):
#include "thread_pool.h"
/*
* @name : handler
* @brief : 接到取消請求之後進行釋放互斥鎖
* @params :
* @arg : 傳入每個添加的任務隨機的10秒內的秒數
* @retval : NULL
* @version:
* @note :
*/
void handler(void *arg)
{
printf("[%u] is ended.\n",
(unsigned)pthread_self()); // 響應取消請求之後自動處理的常式:釋放互斥鎖,以確保不會因為線程被取消而導致資源泄漏或死鎖等問題。
pthread_mutex_unlock((pthread_mutex_t *)arg);
}
/*
* @name : routine
* @brief : 接到取消請求之後進行釋放互斥鎖
* @params :
* @arg : 傳遞給線程任務的參數
* @retval : NULL
* @version:
* @note :
*/
void *routine(void *arg)
{
// 調試
#ifdef DEBUG
printf("[%u] is started.\n",
(unsigned)pthread_self());
#endif
// 把需要傳遞給線程任務的參數進行備份
thread_pool *pool = (thread_pool *)arg;
struct task *p;
while (1)
{
/*
pthread_cleanup_push()是一個巨集,用於向線程的取消處理器棧中註冊(也可以簡單理解為綁定)一個處理函數。作用是線上程退出時自動執行註冊的處理函數,即handler
*/
pthread_cleanup_push(handler, (void *)&pool->lock);
pthread_mutex_lock(&pool->lock); // 解鎖,申請資源
// 1,沒有任務,且線程池沒有被銷毀,就掛起等待
while (pool->waiting_tasks == 0 && !pool->shutdown)
{
pthread_cond_wait(&pool->cond, &pool->lock);
}
// 2, 沒有任務,且線程池被標記銷毀,就釋放互斥鎖並結束進程
if (pool->waiting_tasks == 0 && pool->shutdown == true)
{
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL); // CANNOT use 'break';
}
// 3, 從任務列表中取出一個任務進行消費,並更新線程池的任務列表和等待任務數量???如果線程數不夠怎麼辦
p = pool->task_list->next;
pool->task_list->next = p->next;
pool->waiting_tasks--;
// 4, 釋放互斥鎖,並用pthread_cleanup_pop取消在pthread_cleanup_push中註冊的清理處理器
pthread_mutex_unlock(&pool->lock);
pthread_cleanup_pop(0);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); // 設置取消狀態為禁用,防止在執行任務時被取消
(p->do_task)(p->arg); // 執行任務函數,傳入任務參數
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); // 恢復取消狀態為啟用
free(p); // 任務完成,釋放任務所占用的記憶體空間
}
}
/*
* @name : init_pool
* @brief : 初始化線程池
* @params :
* @*pool : 線程池的管理結構體指針
* @threads_number : 初始確定的線程數
* @retval : 成功返回true,失敗false
* @version:
* @note :
*/
bool init_pool(thread_pool *pool, unsigned int threads_number)
{
pthread_mutex_init(&pool->lock, NULL); // 初始化互斥鎖
pthread_cond_init(&pool->cond, NULL); // 初始化條件量
pool->shutdown = false; // 設置銷毀標誌參數,初始化為不銷毀
pool->task_list = malloc(sizeof(struct task)); // 給鏈表的節點申請堆記憶體
pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS); // 申請堆記憶體,用於存儲創建出來的TID
// 錯誤處理,對malloc進行錯誤處理
if (pool->task_list == NULL || pool->tids == NULL)
{
perror("allocate memory error");
return false;
}
pool->task_list->next = NULL; // 對任務鏈表中的節點的指針域進行初始化
pool->max_waiting_tasks = MAX_WAITING_TASKS; // 設置線程池中線程數量的最大值
pool->waiting_tasks = 0; // 設置等待線程處理的任務的數量為0,說明現在沒有任務
pool->active_threads = threads_number; // 設置線程池中活躍的線程的數量
for (int i = 0; i < pool->active_threads; i++) // 迴圈創建活躍線程
{
// 創建線程 把線程的ID存儲在申請的堆記憶體
if (pthread_create(&((pool->tids)[i]), NULL,
routine, (void *)pool) != 0)
{
perror("create threads error");
return false;
}
// 用於調試
#ifdef DEBUG
printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
(unsigned)pthread_self(), __FUNCTION__,
i, (unsigned)pool->tids[i]);
#endif
}
return true;
}
/*
* @name : add_task
* @brief : 向線程池的任務鏈表中添加任務,並喚醒
* @params :
* @*pool : 線程池的管理結構體指針
* @threads_number : 初始確定的線程數
* @retval : 成功返回true,失敗false
* @version:
* @note :
*/
// 先線程池的任務鏈表中添加任務
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg)
{
struct task *new_task = malloc(sizeof(struct task)); // 給任務鏈表節點申請記憶體
if (new_task == NULL)
{
perror("allocate memory error");
return false;
}
new_task->do_task = do_task;
new_task->arg = arg;
new_task->next = NULL; // 指針域設置為NULL
pthread_mutex_lock(&pool->lock); // 進行線程池任務添加
// 說明要處理的任務的數量大於能處理的任務數量,直接解鎖釋放資源並返回
if (pool->waiting_tasks >= MAX_WAITING_TASKS)
{
pthread_mutex_unlock(&pool->lock);
fprintf(stderr, "too many tasks.\n");
free(new_task);
return false;
}
struct task *tmp = pool->task_list; // 獲取線程池單鏈表的頭節點地址
while (tmp->next != NULL) // 遍歷鏈表,找到單向鏈表的尾節點
tmp = tmp->next;
tmp->next = new_task; // 把新的要處理的任務插入到鏈表的尾部 尾插
pool->waiting_tasks++; // 要處理的任務的數量+1
pthread_mutex_unlock(&pool->lock); // 解鎖
// 調試
#ifdef DEBUG
printf("[%u][%s] ==> a new task has been added.\n",
(unsigned)pthread_self(), __FUNCTION__);
#endif
pthread_cond_signal(&pool->cond); // 喚醒第一個處於條件變數阻塞隊列中的線程
return true;
}
/*
* @name : add_thread
* @brief : 向線程池加入新線程
* @params :
* @*pool : 線程池的管理結構體指針
* @additional_threads : 需要添加的線程數
* @retval : 實際增加的線程數
* @version:
* @note :
*/
int add_thread(thread_pool *pool, unsigned additional_threads)
{
if (additional_threads == 0) // 判斷需要添加的新線程的數量是否為0,是的話直接返回
return 0;
unsigned total_threads = pool->active_threads + additional_threads; // 計算線程池中匯流排程的數量
int i, actual_increment = 0; // actual_increment 為實際增加的進程數
for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++) // 對實際增加的進程數進行限制,使得總和不能超過最大總數
{
// 創建新線程
if (pthread_create(&((pool->tids)[i]),
NULL, routine, (void *)pool) != 0) // 增加進程任務處理
{
perror("add threads error");
// no threads has been created, return fail
if (actual_increment == 0)
return -1;
break;
}
actual_increment++;
#ifdef DEBUG
printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
(unsigned)pthread_self(), __FUNCTION__,
i, (unsigned)pool->tids[i]);
#endif
}
// 記錄此時線程池中活躍線程的總數,並作為返回值
pool->active_threads += actual_increment;
return actual_increment;
}
/*
* @name : remove_thread
* @brief : 移除多餘線程
* @params :
* @*pool : 線程池的管理結構體指針
* @*removing_threads : 需要移除的線程數量
* @retval : 移除後剩下的活躍線程數
* @version:
* @note :
*/
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
if (removing_threads == 0) // 判斷需要添加的新線程的數量是否為0,是的話直接返回
return pool->active_threads;
int remaining_threads = pool->active_threads - removing_threads;
remaining_threads = remaining_threads > 0 ? remaining_threads : 1; // 初步計算經過移除後,剩下的線程數,並判斷;最終剩餘的線程數不得小於1
int i;
for (i = pool->active_threads - 1; i > remaining_threads - 1; i--) // 依次移除線程,併進行錯誤判斷
{
errno = pthread_cancel(pool->tids[i]);
if (errno != 0)
break;
#ifdef DEBUG
printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",
(unsigned)pthread_self(), __FUNCTION__,
i, (unsigned)pool->tids[i]);
#endif
}
if (i == pool->active_threads - 1) // 若沒有移除成功,則返回-1
return -1;
else
{
pool->active_threads = i + 1;// 否則返回活躍的線程數
return i + 1;
}
}
/*
* @name : destroy_pool
* @brief : 銷毀線程池
* @params :
* @*pool : 線程池的管理結構體指針
* @retval : 成功返回true,失敗false
* @version:
* @note :
*/
bool destroy_pool(thread_pool *pool)
{
pool->shutdown = true; // 1, 修改線程池結構體的成員,並通知所有線程
pthread_cond_broadcast(&pool->cond);
for (int i = 0; i < pool->active_threads; i++) // 2, 等待所有線程退出,並回收資源
{
errno = pthread_join(pool->tids[i], NULL);
if (errno != 0)
{
printf("join tids[%d] error: %s\n",
i, strerror(errno));
}
else
printf("[%u] is joined\n", (unsigned)pool->tids[i]);
}
free(pool->task_list); // 釋放申請了的堆記憶體
free(pool->tids);
free(pool);
return true;
}
應用舉例(main.c):
#include "thread_pool.h"
/**
* @file name: --
* @brief
* @author [email protected]
* @date 2024/04/25
* @version 1.0 :版本
* @property :
* @note
* CopyRight (c) 2023-2024 [email protected] All Right Reseverd
*/
/*
* @name : mytask
* @brief : 給每個線程安排的具體任務
* @params :
* @arg : 傳入每個添加的任務隨機的10秒內的秒數
* @retval : NULL
* @version:
* @note :
* 1.__FUNCTION__ 是 C 和 C++ 語言中的預定義巨集,用於獲取當前所在函數的名稱(在 C++ 中,也包括成員函數)。它會在編譯時被替換為當前函數的字元串字面值。
* 2.這部分可以換成自己需要安排的進程任務
*/
void *mytask(void *arg)
{
int n = (int)arg; // 定義整型變數接收參數
printf("[%u][%s] ==> job will be done in %d sec...\n",
(unsigned)pthread_self(), __FUNCTION__, n);
sleep(n);
printf("[%u][%s] ==> job done!\n",
(unsigned)pthread_self(), __FUNCTION__);
return NULL;
}
/*
* @name : count_time
* @ : 計時器,每隔一秒輸出當前秒數
* @params :
* @*arg : NULL
* @retval : NULL
* @version:
* @note :
*/
void *count_time(void *arg)
{
int i = 0;
while (1)
{
sleep(1);
printf("sec: %d\n", ++i);
}
}
int main()
{
// 創建線程進行實時輸出時間
pthread_t a;
pthread_create(&a, NULL, count_time, NULL);
// 1, initialize the pool 初始化帶有2條線程的線程池
thread_pool *pool = malloc(sizeof(thread_pool));
init_pool(pool, 2);
// 2, throw tasks 投入3個任務
printf("throwing 3 tasks...\n");
add_task(pool, mytask, (void *)(rand() % 10));
add_task(pool, mytask, (void *)(rand() % 10));
add_task(pool, mytask, (void *)(rand() % 10));
// 3, check active threads number 顯示當前的線程數量
printf("current thread number: %d\n",
remove_thread(pool, 0));
sleep(9);
// 4, throw tasks 投入2個任務
printf("throwing another 2 tasks...\n");
add_task(pool, mytask, (void *)(rand() % 10));
add_task(pool, mytask, (void *)(rand() % 10));
// 5, add threads 增加2條線程
add_thread(pool, 2);
sleep(5);
// 6, remove threads 移除3條線程
printf("remove 3 threads from the pool, "
"current thread number: %d\n",
remove_thread(pool, 3));
// 7, destroy the pool 銷毀線程池
destroy_pool(pool);
return 0;
}