線程池的實現源碼及應用舉例

来源:https://www.cnblogs.com/cino/p/18227519
-Advertisement-
Play Games

1.線程池本質 ​ 多個線程組成的一個集合,目的為了併發執行任務,定義時是一個結構體,成員有互斥鎖,條件變數,任務鏈隊列指針,任務鏈隊列中等待的任務個數,當前活躍的線程數量,線程ID,線程銷毀標記等 2.線程池的關鍵技術 (1)萬能函數指針(通用函數指針): *void *(*p)(void ) ( ...


1.線程池本質

​ 多個線程組成的一個集合,目的為了併發執行任務,定義時是一個結構體,成員有互斥鎖,條件變數,任務鏈隊列指針,任務鏈隊列中等待的任務個數,當前活躍的線程數量,線程ID,線程銷毀標記

2.線程池的關鍵技術
(1)萬能函數指針(通用函數指針): *void *(*p)(void )
(使用技巧:函數的參數個數超過1個時,參數可以打包成結構體,多個參數就變成一個參數(全部包含在結構體裡面了))

原理:該函數需要用到互斥鎖在完成一個任務後減少任務數量,解鎖後繼續下一個任務,還要用到條件變數在任務全部完成時通過判斷任務數量和結束標誌位退出。

(2)封裝線程池有關的介面函數
三大基本函數需要我們去封裝
第一個:初始化線程池
原理:通過對線程池結構體中的成員初始化讓線程池進入工作模式,隨後使用迴圈創建對應數量的線程

第二個:添加任務

原理:通過動態分配記憶體準備新的記憶體空間分配給新的任務,在添加任務時利用互斥鎖上鎖防止任務函數完成任務時減少任務數量帶來的沖中突,將任務尾插到任務鏈表中,並目對線程池結構體中各個成員變數進行更新。

第三個:線程池的銷毀(回收線程,想辦法讓線程的任務函數退出)
原理:通過改變線程池結構體成員變數中的結束標誌位,令所有線程能夠退出,隨後在任務函數中開始退出線程,併在這個線程池銷毀函數中回收所有線程。

線程池實例源碼列舉如下:


頭文件(thread_pool.h)

#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <pthread.h>

#define MAX_WAITING_TASKS 1000 // 處於等待狀態的線程數量最大為1000
#define MAX_ACTIVE_THREADS 20  // 活躍的線程數量

// 任務結點  單向鏈表的節點,類型
struct task
{
	void *(*do_task)(void *arg); // 任務函數指針  指向線程要執行的任務  格式是固定的
	void *arg;					 // 需要傳遞給任務的參數,如果不需要,則NULL

	struct task *next; // 指向下一個任務結點的指針
};

// 線程池的管理結構體
typedef struct thread_pool
{
	pthread_mutex_t lock; // 互斥鎖
	pthread_cond_t cond;  // 條件量

	bool shutdown;
	/*是否需要銷毀線程池,用於指示線程池是否處於銷毀狀態。它的作用是線上程池需要被銷毀時,向線程池中的工作線程發出信號,告知它們停止接受新的任務,並逐漸退出。具體來說,當 shutdown 標記被設置為 true 時,線程池將不再接受新的任務提交,但會繼續執行已經提交的任務,直到所有任務都執行完畢。一旦線程池中的任務執行完畢,工作線程就會逐個退出,釋放相關資源,最終銷毀整個線程池。*/

	struct task *task_list; // 用於存儲任務的鏈表

	pthread_t *tids; // 用於記錄線程池中線程的ID

	unsigned max_waiting_tasks; // 線程池中線程的數量最大值
	unsigned waiting_tasks;		// 處於等待狀態的線程數量
	unsigned active_threads;	// 正在活躍的線程數量
} thread_pool;

// 初始化線程池
bool init_pool(thread_pool *pool, unsigned int threads_number);

// 向線程池中添加任務
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);

// 先線程池中添加線程
int add_thread(thread_pool *pool, unsigned int additional_threads_number);

// 從線程池中刪除線程
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);

// 銷毀線程池
bool destroy_pool(thread_pool *pool);

// 任務函數
void *routine(void *arg);

介面函數的實現源碼(thread_pool.c):

#include "thread_pool.h"

/*
 *	@name   : handler
 *	@brief  : 接到取消請求之後進行釋放互斥鎖
 *	@params :
 *	          @arg : 傳入每個添加的任務隨機的10秒內的秒數
 *	@retval : NULL
 * 	@version:
 * 	@note   :
 */
void handler(void *arg)
{
	printf("[%u] is ended.\n",
		   (unsigned)pthread_self()); // 響應取消請求之後自動處理的常式:釋放互斥鎖,以確保不會因為線程被取消而導致資源泄漏或死鎖等問題。
	pthread_mutex_unlock((pthread_mutex_t *)arg);
}

/*
 *	@name   : routine
 *	@brief  : 接到取消請求之後進行釋放互斥鎖
 *	@params :
 *	          @arg : 傳遞給線程任務的參數
 *	@retval : NULL
 * 	@version:
 * 	@note   :
 */
void *routine(void *arg)
{
// 調試
#ifdef DEBUG
	printf("[%u] is started.\n",
		   (unsigned)pthread_self());
#endif
	// 把需要傳遞給線程任務的參數進行備份
	thread_pool *pool = (thread_pool *)arg;
	struct task *p;

	while (1)
	{
		/*
		pthread_cleanup_push()是一個巨集,用於向線程的取消處理器棧中註冊(也可以簡單理解為綁定)一個處理函數。作用是線上程退出時自動執行註冊的處理函數,即handler
		*/
		pthread_cleanup_push(handler, (void *)&pool->lock);
		pthread_mutex_lock(&pool->lock); // 解鎖,申請資源
		// 1,沒有任務,且線程池沒有被銷毀,就掛起等待
		while (pool->waiting_tasks == 0 && !pool->shutdown)
		{
			pthread_cond_wait(&pool->cond, &pool->lock);
		}
		// 2, 沒有任務,且線程池被標記銷毀,就釋放互斥鎖並結束進程
		if (pool->waiting_tasks == 0 && pool->shutdown == true)
		{
			pthread_mutex_unlock(&pool->lock);
			pthread_exit(NULL); // CANNOT use 'break';
		}

		// 3, 從任務列表中取出一個任務進行消費,並更新線程池的任務列表和等待任務數量???如果線程數不夠怎麼辦
		p = pool->task_list->next;
		pool->task_list->next = p->next;
		pool->waiting_tasks--;
		// 4, 釋放互斥鎖,並用pthread_cleanup_pop取消在pthread_cleanup_push中註冊的清理處理器
		pthread_mutex_unlock(&pool->lock);
		pthread_cleanup_pop(0);

		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); // 設置取消狀態為禁用,防止在執行任務時被取消
		(p->do_task)(p->arg);								  // 執行任務函數,傳入任務參數
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);  // 恢復取消狀態為啟用

		free(p); // 任務完成,釋放任務所占用的記憶體空間
	}
}

/*
 *	@name   : init_pool
 *	@brief  : 初始化線程池
 *	@params :
 *	          @*pool : 線程池的管理結構體指針
 *	          @threads_number : 初始確定的線程數
 *	@retval : 成功返回true,失敗false
 * 	@version:
 * 	@note   :
 */
bool init_pool(thread_pool *pool, unsigned int threads_number)
{

	pthread_mutex_init(&pool->lock, NULL); // 初始化互斥鎖
	pthread_cond_init(&pool->cond, NULL);  // 初始化條件量

	pool->shutdown = false;										 //  設置銷毀標誌參數,初始化為不銷毀
	pool->task_list = malloc(sizeof(struct task));				 // 給鏈表的節點申請堆記憶體
	pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS); // 申請堆記憶體,用於存儲創建出來的TID
	// 錯誤處理,對malloc進行錯誤處理
	if (pool->task_list == NULL || pool->tids == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	pool->task_list->next = NULL;				 // 對任務鏈表中的節點的指針域進行初始化
	pool->max_waiting_tasks = MAX_WAITING_TASKS; // 設置線程池中線程數量的最大值
	pool->waiting_tasks = 0;					 // 設置等待線程處理的任務的數量為0,說明現在沒有任務
	pool->active_threads = threads_number;		 // 設置線程池中活躍的線程的數量

	for (int i = 0; i < pool->active_threads; i++) // 迴圈創建活躍線程
	{
		// 創建線程  把線程的ID存儲在申請的堆記憶體
		if (pthread_create(&((pool->tids)[i]), NULL,
						   routine, (void *)pool) != 0)
		{
			perror("create threads error");
			return false;
		}
// 用於調試
#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			   (unsigned)pthread_self(), __FUNCTION__,
			   i, (unsigned)pool->tids[i]);
#endif
	}
	return true;
}

/*
 *	@name   : add_task
 *	@brief  : 向線程池的任務鏈表中添加任務,並喚醒
 *	@params :
 *	          @*pool : 線程池的管理結構體指針
 *	          @threads_number : 初始確定的線程數
 *	@retval : 成功返回true,失敗false
 * 	@version:
 * 	@note   :
 */
// 先線程池的任務鏈表中添加任務
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg)
{
	struct task *new_task = malloc(sizeof(struct task)); // 給任務鏈表節點申請記憶體
	if (new_task == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	new_task->do_task = do_task;
	new_task->arg = arg;
	new_task->next = NULL; // 指針域設置為NULL

	pthread_mutex_lock(&pool->lock); // 進行線程池任務添加

	// 說明要處理的任務的數量大於能處理的任務數量,直接解鎖釋放資源並返回
	if (pool->waiting_tasks >= MAX_WAITING_TASKS)
	{
		pthread_mutex_unlock(&pool->lock);
		fprintf(stderr, "too many tasks.\n");
		free(new_task);
		return false;
	}

	struct task *tmp = pool->task_list; // 獲取線程池單鏈表的頭節點地址
	while (tmp->next != NULL)			// 遍歷鏈表,找到單向鏈表的尾節點
		tmp = tmp->next;

	tmp->next = new_task;  // 把新的要處理的任務插入到鏈表的尾部  尾插
	pool->waiting_tasks++; // 要處理的任務的數量+1

	pthread_mutex_unlock(&pool->lock); // 解鎖
// 調試
#ifdef DEBUG
	printf("[%u][%s] ==> a new task has been added.\n",
		   (unsigned)pthread_self(), __FUNCTION__);
#endif

	pthread_cond_signal(&pool->cond); // 喚醒第一個處於條件變數阻塞隊列中的線程
	return true;
}

/*
 *	@name   : add_thread
 *	@brief  : 向線程池加入新線程
 *	@params :
 *	          @*pool : 線程池的管理結構體指針
 *	          @additional_threads : 需要添加的線程數
 *	@retval : 實際增加的線程數
 * 	@version:
 * 	@note   :
 */
int add_thread(thread_pool *pool, unsigned additional_threads)
{
	if (additional_threads == 0) // 判斷需要添加的新線程的數量是否為0,是的話直接返回
		return 0;

	unsigned total_threads = pool->active_threads + additional_threads; // 計算線程池中匯流排程的數量

	int i, actual_increment = 0;													 // actual_increment 為實際增加的進程數
	for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++) // 對實際增加的進程數進行限制,使得總和不能超過最大總數
	{
		// 創建新線程
		if (pthread_create(&((pool->tids)[i]),
						   NULL, routine, (void *)pool) != 0) // 增加進程任務處理
		{
			perror("add threads error");
			// no threads has been created, return fail
			if (actual_increment == 0)
				return -1;

			break;
		}
		actual_increment++;

#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			   (unsigned)pthread_self(), __FUNCTION__,
			   i, (unsigned)pool->tids[i]);
#endif
	}
	// 記錄此時線程池中活躍線程的總數,並作為返回值
	pool->active_threads += actual_increment;
	return actual_increment;
}
/*
 *	@name   : remove_thread
 *	@brief  : 移除多餘線程
 *	@params :
 *	          @*pool : 線程池的管理結構體指針
 *	          @*removing_threads : 需要移除的線程數量
 *	@retval : 移除後剩下的活躍線程數
 * 	@version:
 * 	@note   :
 */
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
	if (removing_threads == 0) // 判斷需要添加的新線程的數量是否為0,是的話直接返回
		return pool->active_threads;

	int remaining_threads = pool->active_threads - removing_threads;
	remaining_threads = remaining_threads > 0 ? remaining_threads : 1; // 初步計算經過移除後,剩下的線程數,並判斷;最終剩餘的線程數不得小於1

	int i;
	for (i = pool->active_threads - 1; i > remaining_threads - 1; i--) // 依次移除線程,併進行錯誤判斷
	{
		errno = pthread_cancel(pool->tids[i]);

		if (errno != 0)
			break;

#ifdef DEBUG
		printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",
			   (unsigned)pthread_self(), __FUNCTION__,
			   i, (unsigned)pool->tids[i]);
#endif
	}

	if (i == pool->active_threads - 1) // 若沒有移除成功,則返回-1
		return -1;
	else
	{
		pool->active_threads = i + 1;// 否則返回活躍的線程數
		return i + 1;
	}
}

/*
 *	@name   : destroy_pool
 *	@brief  : 銷毀線程池
 *	@params :
 *	          @*pool : 線程池的管理結構體指針
 *	@retval : 成功返回true,失敗false
 * 	@version:
 * 	@note   :
 */
bool destroy_pool(thread_pool *pool)
{
	pool->shutdown = true; // 1, 修改線程池結構體的成員,並通知所有線程
	pthread_cond_broadcast(&pool->cond);

	for (int i = 0; i < pool->active_threads; i++) // 2, 等待所有線程退出,並回收資源
	{
		errno = pthread_join(pool->tids[i], NULL);
		if (errno != 0)
		{
			printf("join tids[%d] error: %s\n",
				   i, strerror(errno));
		}
		else
			printf("[%u] is joined\n", (unsigned)pool->tids[i]);
	}

	free(pool->task_list); // 釋放申請了的堆記憶體
	free(pool->tids);
	free(pool);

	return true;
}

應用舉例(main.c):

#include "thread_pool.h"
/**
 * @file name:	--
 * @brief
 * @author [email protected]
 * @date 2024/04/25
 * @version 1.0 :版本
 * @property :
 * @note
 * CopyRight (c)  2023-2024   [email protected]   All Right Reseverd
 */

/*
 *	@name   : mytask
 *	@brief  : 給每個線程安排的具體任務
 *	@params :
 *	          @arg : 傳入每個添加的任務隨機的10秒內的秒數
 *	@retval : NULL
 * 	@version:
 * 	@note   :
 * 			  1.__FUNCTION__ 是 C 和 C++ 語言中的預定義巨集,用於獲取當前所在函數的名稱(在 C++ 中,也包括成員函數)。它會在編譯時被替換為當前函數的字元串字面值。
 * 			  2.這部分可以換成自己需要安排的進程任務
 */
void *mytask(void *arg)
{
	int n = (int)arg; // 定義整型變數接收參數

	printf("[%u][%s] ==> job will be done in %d sec...\n",
		   (unsigned)pthread_self(), __FUNCTION__, n);
	sleep(n);
	printf("[%u][%s] ==> job done!\n",
		   (unsigned)pthread_self(), __FUNCTION__);

	return NULL;
}

/*
 *	@name   : count_time
 *	@	    : 計時器,每隔一秒輸出當前秒數
 *	@params :
 *	          @*arg : NULL
 *	@retval : NULL
 * 	@version:
 * 	@note   :
 */
void *count_time(void *arg)
{
	int i = 0;
	while (1)
	{
		sleep(1);
		printf("sec: %d\n", ++i);
	}
}

int main()
{
	// 創建線程進行實時輸出時間
	pthread_t a;
	pthread_create(&a, NULL, count_time, NULL);

	// 1, initialize the pool 初始化帶有2條線程的線程池
	thread_pool *pool = malloc(sizeof(thread_pool));
	init_pool(pool, 2);

	// 2, throw tasks  投入3個任務
	printf("throwing 3 tasks...\n");
	add_task(pool, mytask, (void *)(rand() % 10));
	add_task(pool, mytask, (void *)(rand() % 10));
	add_task(pool, mytask, (void *)(rand() % 10));

	// 3, check active threads number 顯示當前的線程數量
	printf("current thread number: %d\n",
		   remove_thread(pool, 0));
	sleep(9);

	// 4, throw tasks 投入2個任務
	printf("throwing another 2 tasks...\n");
	add_task(pool, mytask, (void *)(rand() % 10));
	add_task(pool, mytask, (void *)(rand() % 10));
	// 5, add threads 增加2條線程
	add_thread(pool, 2);
	sleep(5);
	// 6, remove threads 移除3條線程
	printf("remove 3 threads from the pool, "
		   "current thread number: %d\n",
		   remove_thread(pool, 3));

	// 7, destroy the pool 銷毀線程池
	destroy_pool(pool);
	return 0;
}


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 對於“軟體架構”這個詞有很多定義和含義。而且,“軟體開發”、“軟體設計”和“軟體架構”這三個概念之間存在相當大的重疊,它們在許多方面相互交融。 從核心上看,可以將軟體架構視為在構建應用程式時,對不同選擇進行權衡的學科。 1 為什麼需要權衡以及我們為什麼在意? 我們在構建軟體時必須進行權衡的原因,與其 ...
  • 大家好,我是碼農先森。 數組與切片的區別 在 Go 語言中,數組和切片是兩種不同的數據結構,它們之間有以下主要區別。 參數長度: 數組(Array):數組的長度是固定的,在創建時就需要指定數組的長度,無法動態改變;只有長度信息,通過 len() 函數獲取。 切片(Slice):切片是對數組的一個引用 ...
  • 《最少必要面試題》第一版 相信大家都會有種及眼熟又陌生的感覺、看過可能在短暫的面試後又馬上忘記了。JavaPub 在這裡整理這些容易忘記的重點知識及 解答,建議收藏,經常溫習查閱。 點擊線上閱讀《最少必要面試題》 更多 作者:JavaPub2024 目錄緩存1. 什麼是緩存?2. 為什麼要用緩存?3 ...
  • 一、背景介紹 1.1 爬取目標 用python開發的爬蟲採集軟體,可自動按指定博主抓取該博主已發佈筆記。 為什麼有了源碼還開發界面軟體呢?方便不懂編程代碼的小白用戶使用,無需安裝python,無需改代碼,雙擊打開即用! 軟體界面截圖: 爬取結果截圖: 結果截圖1: 結果截圖2: 結果截圖3: 以上。 ...
  • Lambda表達式 Lambda表達式,也可以稱為閉包,是Java 8發佈的最重要新特性 Lambda允許把函數作為一個方法的參數(函數作為參數傳遞進方法中) 使用Lambda表達式可以使代碼變的更加簡潔緊湊 語法: (parameter) -> expression (parameter) -> ...
  • Node.js是一個基於 Chrome V8 引擎的 JavaScript 運行環境。Node.js 使用了一個事件驅動、非阻塞式 I/O 的模型,使其輕量又高效。Express是一個保持最小規模的靈活的 Node.js Web應用程式開發框架,為Web和移動應用程式提供一組強大的功能。使用Node ...
  • 進程是電腦分配資源的基本單位,線程是cpu調度的基本單位 線程基本概念: LWP:light weight process 輕量級的進程。創建線程的底層函數和進程一樣,都是clone,因此線程的本質仍是進程(在linux環境下) 與進程相比,線程有獨立的TCB結構體(類似於進程的PCB),但沒有獨 ...
  • this,構造器,static,final,單例模式 this關鍵字 在java中this是一個引用變數,即指向當前對象地址的引用(指針),→可以把this當作當前對象,便於更好的索引. this() 實際是調用了當前對象的構造器 1. 引用當前對象的屬性 當在方法中要訪問當前對象的屬性時,可以用t ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...