Windows 線程同步是指多個線程一同訪問共用資源時,為了避免資源的併發訪問導致數據的不一致或程式崩潰等問題,需要對線程的訪問進行協同和控制,以保證程式的正確性和穩定性。Windows提供了多種線程同步機制,以適應不同的併發編程場景。以上同步機制各有優缺點和適用場景,開發者應根據具體應用場景進行選... ...
Windows 線程同步是指多個線程一同訪問共用資源時,為了避免資源的併發訪問導致數據的不一致或程式崩潰等問題,需要對線程的訪問進行協同和控制,以保證程式的正確性和穩定性。Windows提供了多種線程同步機制,以適應不同的併發編程場景。主要包括以下幾種:
- 事件(Event):用於不同線程間的信號通知。包括單次通知事件和重覆通知事件兩種類型。
- 互斥量(Mutex):用於控制對共用資源的訪問,具有獨占性,可避免線程之間對共用資源的非法訪問。
- 臨界區(CriticalSection):和互斥量類似,也用於控制對共用資源的訪問,但是是進程內部的,因此比較適用於同一進程中的線程同步控制。
- 信號量(Semaphore):用於基於計數器機制,控制併發資源的訪問數量。
- 互鎖變數(Interlocked Variable):用於對變數的併發修改操作控制,可提供一定程度的原子性操作保證。
以上同步機制各有優缺點和適用場景,開發者應根據具體應用場景進行選擇和使用。線上程同步的實現過程中,需要註意競爭條件和死鎖的處理,以確保程式中的線程能協同工作,共用資源能夠正確訪問和修改。線程同步是併發編程中的重要基礎,對於開發高效、穩定的併發應用至關重要。
9.2.1 CreateEvent
CreateEvent 是Windows API
提供的用於創建事件對象的函數之一,該函數用於創建一個事件對象,並返回一個表示該事件對象的句柄。可以通過SetEvent
函數將該事件對象設置為有信號狀態,通過ResetEevent
函數將該事件對象設置為無信號狀態。當使用WaitForSingleObject
或者WaitForMultipleObjects
函數等待事件對象時,會阻塞線程直到事件狀態被置位。對於手動重置事件,需要調用ResetEvent
函數手動將事件狀態置位。
CreateEvent 函數常用於線程同步和進程間通信,在不同線程或者進程之間通知事件狀態的改變。例如,某個線程完成了一項任務,需要通知其它等待該任務完成的線程;或者某個進程需要和另一個進程進行協調,需要通知其它進程某個事件的發生等等。
CreateEvent 函數的函數原型如下:
HANDLE CreateEvent(
LPSECURITY_ATTRIBUTES lpEventAttributes,
BOOL bManualReset,
BOOL bInitialState,
LPCTSTR lpName
);
參數說明:
- lpEventAttributes:指向
SECURITY_ATTRIBUTES
結構體的指針,指定事件對象的安全描述符和訪問許可權。通常設為NULL,表示使用預設值。 - bManualReset:指定事件對象的類型,TRUE表示創建的是手動重置事件,FALSE表示創建的是自動重置事件。
- bInitialState:指定事件對象的初始狀態,TRUE表示將事件對象設為有信號狀態,FALSE表示將事件對象設為無信號狀態。
- lpName:指定事件對象的名稱,可以為NULL。
CreateEvent 是實現線程同步和進程通信的重要手段之一,應用廣泛且易用。在第一章中我們創建的多線程環境可能會出現線程同步的問題,此時使用Event
事件機制即可很好的解決,首先在初始化時通過CreateEvent
將事件設置為False
狀態,進入ThreadFunction
線程時再次通過SetEvent
釋放,以此即可實現線程同步順序執行的目的。
#include <stdio.h>
#include <process.h>
#include <windows.h>
// 全局資源
long g_nNum = 0;
// 子線程個數
const int THREAD_NUM = 10;
CRITICAL_SECTION g_csThreadCode;
HANDLE g_hThreadEvent;
unsigned int __stdcall ThreadFunction(void *ptr)
{
int nThreadNum = *(int *)ptr;
// 線程函數中觸發事件
SetEvent(g_hThreadEvent);
// 進入線程鎖
EnterCriticalSection(&g_csThreadCode);
g_nNum++;
printf("線程編號: %d --> 全局資源值: %d --> 子線程ID: %d \n", nThreadNum, g_nNum, GetCurrentThreadId());
// 離開線程鎖
LeaveCriticalSection(&g_csThreadCode);
return 0;
}
int main(int argc,char * argv[])
{
unsigned int ThreadCount = 0;
HANDLE handle[THREAD_NUM];
// 初始化自動將事件設置為False
g_hThreadEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
InitializeCriticalSection(&g_csThreadCode);
for (int each = 0; each < THREAD_NUM; each++)
{
handle[each] = (HANDLE)_beginthreadex(NULL, 0, ThreadFunction, &each, 0, &ThreadCount);
// 等待線程事件被觸發
WaitForSingleObject(g_hThreadEvent, INFINITE);
}
WaitForMultipleObjects(THREAD_NUM, handle, TRUE, INFINITE);
// 銷毀事件
CloseHandle(g_hThreadEvent);
DeleteCriticalSection(&g_csThreadCode);
system("pause");
return 0;
}
當然了事件對象同樣可以實現更為複雜的同步機制,在如下我們在創建對象時,可以設置non-signaled
狀態運行的auto-reset
模式,當我們設置好我們需要的參數時,可以直接使用SetEvent(hEvent)
設置事件狀態,則會自動執行線程函數。
要創建一個manual-reset
模式並且初始狀態為not-signaled
的事件對象,需要按照以下步驟:
首先定義一個SECURITY_ATTRIBUTES
結構體變數,設置其中的參數為NULL
表示使用預設安全描述符,例如。
SECURITY_ATTRIBUTES sa = {0};
sa.nLength = sizeof(sa);
sa.lpSecurityDescriptor = NULL;
sa.bInheritHandle = FALSE;
接著調用CreateEvent
函數創建事件對象,將bManualReset
和bInitialState
參數設置為FALSE,表示創建manual-reset
模式的事件對象並初始狀態為not-signaled
。例如:
HANDLE hEvent = CreateEvent(
&sa, // 安全屬性
TRUE, // Manual-reset模式
FALSE, // Not-signaled 初始狀態
NULL // 事件對象名稱
);
這樣,我們就創建了一個名為hEvent
的manual-reset
模式的事件對象,初始狀態為not-signaled
。可以通過SetEvent
函數將事件對象設置為signaled
狀態,通過ResetEvent
函數將事件對象設置為non-signaled
狀態,也可以通過WaitForSingleObject
或者WaitForMultipleObjects
函數等待事件對象的狀態變化。
#include <windows.h>
#include <stdio.h>
#include <process.h>
#define STR_LEN 100
// 存儲全局字元串
static char str[STR_LEN];
// 設置事件句柄
static HANDLE hEvent;
// 統計字元串中是否存在A
unsigned WINAPI NumberOfA(void *arg)
{
int cnt = 0;
// 等待線程對象事件
WaitForSingleObject(hEvent, INFINITE);
for (int i = 0; str[i] != 0; i++)
{
if (str[i] == 'A')
cnt++;
}
printf("Num of A: %d \n", cnt);
return 0;
}
// 統計字元串總長度
unsigned WINAPI NumberOfOthers(void *arg)
{
int cnt = 0;
// 等待線程對象事件
WaitForSingleObject(hEvent, INFINITE);
for (int i = 0; str[i] != 0; i++)
{
if (str[i] != 'A')
cnt++;
}
printf("Num of others: %d \n", cnt - 1);
return 0;
}
int main(int argc, char *argv[])
{
HANDLE hThread1, hThread2;
// 以non-signaled創建manual-reset模式的事件對象
// 該對象創建後不會被立即執行,只有我們設置狀態為Signaled時才會繼續
hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
hThread1 = (HANDLE)_beginthreadex(NULL, 0, NumberOfA, NULL, 0, NULL);
hThread2 = (HANDLE)_beginthreadex(NULL, 0, NumberOfOthers, NULL, 0, NULL);
fputs("Input string: ", stdout);
fgets(str, STR_LEN, stdin);
// 字元串讀入完畢後,將事件句柄改為signaled狀態
SetEvent(hEvent);
WaitForSingleObject(hThread1, INFINITE);
WaitForSingleObject(hThread2, INFINITE);
// non-signaled 如果不更改,對象繼續停留在signaled
ResetEvent(hEvent);
CloseHandle(hEvent);
system("pause");
return 0;
}
9.2.2 CreateSemaphore
CreateSemaphore 是Windows API
提供的用於創建信號量的函數之一,用於控制多個線程之間對共用資源的訪問數量。該函數常用於創建一個計數信號量對象,並返回一個表示該信號量對象的句柄。可以通過ReleaseSemaphore
函數將該信號量對象的計數加1,通過WaitForSingleObject
或者WaitForMultipleObjects
函數等待信號量對象的計數變成正數以後再將其減1,以實現對共用資源訪問數量的控制。
CreateSemaphore 函數常用於實現生產者消費者模型、線程池、任務隊列等併發編程場景,用於限制訪問共用資源的線程數量。信號量機制更多時候被用於限制資源的數量而不是限制線程的數量,但也可以用來實現一些線程同步場景。
該函數的函數原型如下:
HANDLE CreateSemaphore(
LPSECURITY_ATTRIBUTES lpSemaphoreAttributes,
LONG lInitialCount,
LONG lMaximumCount,
LPCTSTR lpName
);
參數說明:
- lpSemaphoreAttributes:指向
SECURITY_ATTRIBUTES
結構體的指針,指定信號量對象的安全描述符和訪問許可權。通常設為NULL,表示使用預設值。 - lInitialCount:指定信號量對象的初始計數,表示可以同時訪問共用資源的線程數量。
- lMaximumCount:指定信號量對象的最大計數,表示信號量對象的計數上限。
- lpName:指定信號量對象的名稱,可以為NULL。
總的來說,CreateSemaphore 是實現線程同步和進程通信,控制對共用資源的訪問數量的重要手段之一,如下一段演示代碼片段則通過此方法解決了線程通過問題,首先調用CreateSemaphore
初始化時將信號量設置一個最大值,每次進入線程函數內部時,則ReleaseSemaphore
信號自動加1,如果大於指定的數值則WaitForSingleObject
等待釋放信號.
#include <stdio.h>
#include <process.h>
#include <windows.h>
// 全局資源
long g_nNum = 0;
// 子線程個數
const int THREAD_NUM = 10;
CRITICAL_SECTION g_csThreadCode;
HANDLE g_hThreadParameter;
unsigned int __stdcall ThreadFunction(void *ptr)
{
int nThreadNum = *(int *)ptr;
// 信號量++
ReleaseSemaphore(g_hThreadParameter, 1, NULL);
// 進入線程鎖
EnterCriticalSection(&g_csThreadCode);
g_nNum++;
printf("線程編號: %d --> 全局資源值: %d --> 子線程ID: %d \n", nThreadNum, g_nNum, GetCurrentThreadId());
// 離開線程鎖
LeaveCriticalSection(&g_csThreadCode);
return 0;
}
int main(int argc,char * argv[])
{
unsigned int ThreadCount = 0;
HANDLE handle[THREAD_NUM];
// 初始化信號量當前0個資源,最大允許1個同時訪問
g_hThreadParameter = CreateSemaphore(NULL, 0, 1, NULL);
InitializeCriticalSection(&g_csThreadCode);
for (int each = 0; each < THREAD_NUM; each++)
{
handle[each] = (HANDLE)_beginthreadex(NULL, 0, ThreadFunction, &each, 0, &ThreadCount);
// 等待信號量>0
WaitForSingleObject(g_hThreadParameter, INFINITE);
}
// 關閉信號
CloseHandle(g_hThreadParameter);
// 等待所有進程結束
WaitForMultipleObjects(THREAD_NUM, handle, TRUE, INFINITE);
DeleteCriticalSection(&g_csThreadCode);
system("pause");
return 0;
}
如下所示代碼片段,是一個應用了兩個線程的案例,初始化信號為0,利用信號量值為0時進入non-signaled
狀態,大於0時進入signaled
狀態的特性即可實現線程同步。
執行WaitForSingleObject(semTwo, INFINITE);
會讓線程函數進入類似掛起的狀態,當接到ReleaseSemaphore(semOne, 1, NULL);
才會恢復執行。
#include <windows.h>
#include <stdio.h>
static HANDLE semOne,semTwo;
static int num;
// 線程函數A用於接收參書
DWORD WINAPI ReadNumber(LPVOID lpParamter)
{
int i;
for (i = 0; i < 5; i++)
{
fputs("Input Number: ", stdout);
// 臨界區的開始 signaled狀態
WaitForSingleObject(semTwo, INFINITE);
scanf("%d", &num);
// 臨界區的結束 non-signaled狀態
ReleaseSemaphore(semOne, 1, NULL);
}
return 0;
}
// 線程函數B: 用戶接受參數後完成計算
DWORD WINAPI Check(LPVOID lpParamter)
{
int sum = 0, i;
for (i = 0; i < 5; i++)
{
// 臨界區的開始 non-signaled狀態
WaitForSingleObject(semOne, INFINITE);
sum += num;
// 臨界區的結束 signaled狀態
ReleaseSemaphore(semTwo, 1, NULL);
}
printf("The Number IS: %d \n", sum);
return 0;
}
int main(int argc, char *argv[])
{
HANDLE hThread1, hThread2;
// 創建信號量對象,設置為0進入non-signaled狀態
semOne = CreateSemaphore(NULL, 0, 1, NULL);
// 創建信號量對象,設置為1進入signaled狀態
semTwo = CreateSemaphore(NULL, 1, 1, NULL);
hThread1 = CreateThread(NULL, 0, ReadNumber, NULL, 0, NULL);
hThread2 = CreateThread(NULL, 0, Check, NULL, 0, NULL);
// 關閉臨界區
WaitForSingleObject(hThread1, INFINITE);
WaitForSingleObject(hThread2, INFINITE);
CloseHandle(semOne);
CloseHandle(semTwo);
system("pause");
return 0;
}
9.2.3 CreateMutex
CreateMutex 是Windows API提供的用於創建互斥體對象的函數之一,該函數用於創建一個互斥體對象,並返回一個表示該互斥體對象的句柄。可以通過WaitForSingleObject
或者WaitForMultipleObjects
函數等待互斥體對象,以確保只有一個線程能夠訪問共用資源,其他線程需要等待該線程釋放互斥體對象後才能繼續訪問。當需要釋放互斥體對象時,可以調用ReleaseMutex
函數將其釋放。
CreateMutex 函數常用於對共用資源的訪問控制,避免多個線程同時訪問導致數據不一致的問題。有時候,互斥體也被用於跨進程同步訪問共用資源。
該函數的函數原型如下:
HANDLE CreateMutex(
LPSECURITY_ATTRIBUTES lpMutexAttributes,
BOOL bInitialOwner,
LPCTSTR lpName
);
參數說明:
- lpMutexAttributes:指向
SECURITY_ATTRIBUTES
結構體的指針,指定互斥體對象的安全描述符和訪問許可權。通常設為NULL,表示使用預設值。 - bInitialOwner:指定互斥體的初始狀態,TRUE表示將互斥體設置為有所有權的狀態,FALSE表示將互斥體設置為沒有所有權的狀態。
- lpName:指定互斥體的名稱,可以為NULL。
該函數是實現線程同步和進程通信,控制對共用資源的訪問的重要手段之一,應用廣泛且易用。
如下案例所示,使用互斥鎖可以實現單位時間內,只允許一個線程擁有對共用資源的獨占許可權,從而實現了互不衝突的線程同步。
#include <windows.h>
#include <iostream>
using namespace std;
// 創建互斥鎖
HANDLE hMutex = NULL;
// 線程函數
DWORD WINAPI Func(LPVOID lpParamter)
{
for (int x = 0; x < 10; x++)
{
// 請求獲得一個互斥鎖
WaitForSingleObject(hMutex, INFINITE);
cout << "thread func" << endl;
// 釋放互斥鎖
ReleaseMutex(hMutex);
}
return 0;
}
int main(int argc,char * argv[])
{
HANDLE hThread = CreateThread(NULL, 0, Func, NULL, 0, NULL);
hMutex = CreateMutex(NULL, FALSE, "lyshark");
CloseHandle(hThread);
for (int x = 0; x < 10; x++)
{
// 請求獲得一個互斥鎖
WaitForSingleObject(hMutex, INFINITE);
cout << "main thread" << endl;
// 釋放互斥鎖
ReleaseMutex(hMutex);
}
system("pause");
return 0;
}
當然通過互斥鎖我們也可以實現贊單位時間內同時同步執行兩個線程函數,如下代碼所示;
#include <windows.h>
#include <iostream>
using namespace std;
// 創建互斥鎖
HANDLE hMutex = NULL;
#define NUM_THREAD 50
// 線程函數1
DWORD WINAPI FuncA(LPVOID lpParamter)
{
for (int x = 0; x < 10; x++)
{
// 請求獲得一個互斥鎖
WaitForSingleObject(hMutex, INFINITE);
cout << "this is thread func A" << endl;
// 釋放互斥鎖
ReleaseMutex(hMutex);
}
return 0;
}
// 線程函數2
DWORD WINAPI FuncB(LPVOID lpParamter)
{
for (int x = 0; x < 10; x++)
{
// 請求獲得一個互斥鎖
WaitForSingleObject(hMutex, INFINITE);
cout << "this is thread func B" << endl;
// 釋放互斥鎖
ReleaseMutex(hMutex);
}
return 0;
}
int main(int argc, char * argv[])
{
// 用來存儲線程函數的句柄
HANDLE tHandle[NUM_THREAD];
// 創建互斥量,此時為signaled狀態
hMutex = CreateMutex(NULL, FALSE, "lyshark");
for (int x = 0; x < NUM_THREAD; x++)
{
if (x % 2)
{
tHandle[x] = CreateThread(NULL, 0, FuncA, NULL, 0, NULL);
}
else
{
tHandle[x] = CreateThread(NULL, 0, FuncB, NULL, 0, NULL);
}
}
// 等待所有線程函數執行完畢
WaitForMultipleObjects(NUM_THREAD, tHandle, TRUE, INFINITE);
// 銷毀互斥對象
CloseHandle(hMutex);
system("pause");
return 0;
}
9.2.4 ThreadParameters
線上程環境中,有時候啟動新線程時我們需要對不同的線程傳入不同的參數,通常實現線程傳參的方法有許多,一般可分為使用全局變數,使用結構體,使用類的成員函數等,本節將使用結構體傳參,通過創建一個結構體,將需要傳遞的參數存儲在結構體中,並將結構體的指針傳遞給線程函數。子線程在執行時,可以通過該指針訪問結構體中的參數。
對於簡單的參數傳遞而言,線程函數中定義LPVOID
允許傳遞一個參數,此時我們只需要在函數中接收並強轉(int)(LPVOID)port
即可獲取到一個整數類型的參數,如下是一個簡單的埠掃描軟體代碼片段。
#include <stdio.h>
#include <Windows.h>
// 線程函數接收一個參數
DWORD WINAPI ScanThread(LPVOID port)
{
// 將參數強制轉化為需要的類型
int Port = (int)(LPVOID)port;
printf("[+] 埠: %5d \n", port);
return 1;
}
int main(int argc, char* argv[])
{
HANDLE handle;
for (int port = 0; port < 100; port++)
{
handle = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ScanThread, (LPVOID)port, 0, 0);
}
WaitForSingleObject(handle, INFINITE);
system("pause");
return 0;
}
當然只傳遞一個參數在多數情況下時不夠使用的,而想線上程函數中傳遞多個參數,則需要傳遞一個結構指針,通過線程函數內部強轉為結構類型後,即可實現取值,如下代碼中我們首先定義了一個THREAD_PARAM
結構體,該結構內有兩個成員分別指定掃描主機地址以及埠號,當參數被傳遞到ScanThread
線程函數內部時只需要將指針內的數據拷貝到自身線程函數內,即可正確的引用特定的參數。
#include <stdio.h>
#include <windows.h>
typedef struct _THREAD_PARAM
{
char *HostAddr; // 掃描主機
DWORD dwStartPort; // 埠號
}THREAD_PARAM;
// 這個掃描線程函數
DWORD WINAPI ScanThread(LPVOID lpParam)
{
// 拷貝傳遞來的掃描參數
THREAD_PARAM ScanParam = { 0 };
MoveMemory(&ScanParam, lpParam, sizeof(THREAD_PARAM));
printf("地址: %-16s --> 埠: %-5d \n", ScanParam.HostAddr, ScanParam.dwStartPort);
return 0;
}
int main(int argc, char *argv[])
{
THREAD_PARAM ThreadParam = { 0 };
for (int ip = 0; ip < 100; ip++)
{
char src_addr[50] = "192.168.1.";
char sub_addr[10] = {0};
// int number = atoi(sub_addr);
// 將整數轉為字元串
sprintf(sub_addr, "%d", ip);
strcat(src_addr, sub_addr);
// 將拼接好的字元串放到HostAddr
ThreadParam.HostAddr = src_addr;
for (DWORD port = 1; port < 10; port++)
{
// 指定埠號
ThreadParam.dwStartPort = port;
HANDLE hThread = CreateThread(NULL, 0, ScanThread, (LPVOID)&ThreadParam, 0, NULL);
WaitForSingleObject(hThread, INFINITE);
}
}
system("pause");
return 0;
}
9.2.5 ThreadPool
Windows 線程池是一種非同步執行任務的機制,可以將任務提交到線程池中,由線程池自動分配線程執行任務。線程池可以有效地利用系統資源,提高程式的併發能力和性能。Windows
線程池是Windows
操作系統提供的一種原生的線程池機制,可以使用Windows API
函數進行操作。
CreateThreadpoolWork 是Windows API
提供的用於創建一個工作從池線程中執行的工作對象的函數之一,該函數用於創建一個工作項,並返回一個表示該工作項的指針。可以通過SubmitThreadpoolWork
函數將該工作項提交到線程池中進行執行。當該工作項完成後,線程池還可以使用回調函數清理函數TP_FREE_CLEANUP_GROUP
回收資源。
該函數的函數原型如下:
PTP_WORK CreateThreadpoolWork(
PTP_WORK_CALLBACK pfnwk, // 工作項回調函數指針
PVOID pv, // 回調函數的參數指針
PTP_CALLBACK_ENVIRON pcbe // 回調函數運行環境
);
參數說明:
- pfnwk:指向工作項回調函數的指針,該函數將在工作線程池中執行。例如:
VOID CALLBACK MyWorkCallback(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context, PTP_WORK Work)
{
// 實現工作項的具體操作
}
-
pv:指向回調函數的參數指針,由回調函數進行處理。
-
pcbe:指向
TP_CALLBACK_ENVIRON
結構體的指針,提供了回調函數需要的一些運行環境信息,例如可選的回調函數執行器TP_CALLBACK_INSTANCE
和回調函數完成後的清理函數TP_CLEANUP_GROUP
等。如果為NULL,則使用系統提供的預設值。
CreateThreadpoolWork 函數常用於實現線程池中的任務隊列,可以將一個具體的任務封裝為一個工作項,並提交到線程池中等待執行。該機制可以有效地提高任務的處理速度和效率,減少系統資源開銷。但是,需要註意線程池的資源占用問題,合理調優,避免線程泄漏等問題。
CallbackMayRunLong 是Windows API
提供的調用標記函數之一,該函數用於標記回調函數是否可能耗時較長。如果回調函數不會耗時較長,則無需調用該函數。如果回調函數可能耗時較長,則建議在執行回調函數之前調用該函數對回調函數進行標記,以便線程池進行資源分配和調度等策略。
CallbackMayRunLong函數的函數原型如下:
VOID CallbackMayRunLong(
PTP_CALLBACK_INSTANCE pci
);
參數說明:
- pci:指向
TP_CALLBACK_INSTANCE
結構體的指針,表示回調函數的執行器,用於提供回調函數的運行環境信息。
SubmitThreadpoolWork 是Windows API
提供的將工作項提交到線程池中執行的函數之一,該函數用於將工作項提交到線程池中等待被工作者線程執行。線程池中的工作者線程通過GetQueuedCompletionStatus
函數從工作隊列中獲取工作項並執行。通過SubmitThreadpoolWork
和GetQueuedCompletionStatus
結合使用,可以實現線程池中的任務隊列,提高任務處理效率和系統性能。
SubmitThreadpoolWork 函數的函數原型如下:
VOID SubmitThreadpoolWork(
PTP_WORK pwk
);
參數說明:
- pwk:指向
TP_WORK
結構體的指針,表示要提交到線程池中執行的工作項。
讀者需要註意,SubmitThreadpoolWork 函數提交的是工作項而不是回調函數,回調函數是通過事先創建工作項指定的。在使用SubmitThreadpoolWork
提交工作項時,需要根據具體的業務需求進行合理的設計和實現,避免線程池資源浪費、性能下降、記憶體泄漏等問題。
WaitForThreadpoolWorkCallbacks 是Windows API提供的等待線程池中工作項完成的函數之一,該函數用於等待線程池中提交的所有工作項被處理完畢。需要註意的是,該函數會阻塞當前線程直到所有工作項處理完畢,因此需要謹慎使用,避免阻塞其它線程的執行。
WaitForThreadpoolWorkCallbacks 函數的函數原型如下:
VOID WaitForThreadpoolWorkCallbacks(
PTP_WORK pwk,
BOOL fCancelPendingCallbacks
);
參數說明:
- pwk:指向TP_WORK結構體的指針,表示要等待完成的工作項。
- fCancelPendingCallbacks:用於指定是否取消所有待處理的工作項。如果為TRUE,則取消所有待處理的工作項;如果為FALSE,則等待所有待處理的工作項被處理完畢。
要使用CreateThreadpoolWork()
創建一個線程池很容易實現,讀者只需要指定TaskHandler
線程函數即可,當需要啟動線程池時通過調用SubmitThreadpoolWork
函數提交一組請求即可,如下是一個簡單的線程池創建功能實現。
#include <Windows.h>
#include <iostream>
#include <stdlib.h>
unsigned long g_count = 0;
// 線程執行函數
void NTAPI TaskHandler(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
if (CallbackMayRunLong(Instance))
{
printf("剩餘資源: %d --> 線程ID: %d \n", InterlockedIncrement(&g_count), GetCurrentThreadId());
}
Sleep(5000);
printf("運行子線程 \n");
for (int x = 0; x < 100; x++)
{
printf("線程ID: %d ---> 子線程: %d \n", GetCurrentThreadId(), x);
}
}
int main(int argc,char *argv)
{
PTP_WORK workItem = CreateThreadpoolWork(TaskHandler, NULL, NULL);
for (int x = 0; x < 100; x++)
{
// 調用SubmitThreadpoolWork向線程池提交一個請求
SubmitThreadpoolWork(workItem);
}
// 等待線程池調用結束
WaitForThreadpoolWorkCallbacks(workItem, false);
CloseThreadpoolWork(workItem);
system("pause");
return 0;
}
線程池函數同樣支持限制線程數,限制線程可以通過調用SetThreadpoolThreadMinimum()
實現,該函數可以在創建線程池後設置線程池的最小線程數。當線程池中的任務隊列中存在待執行的任務,並且當前工作線程的數量小於最小線程數時,線程池將自動創建新的工作線程,以確保待執行任務能夠及時得到處理。
以下是函數的原型定義:
VOID WINAPI SetThreadpoolThreadMinimum(
PTP_POOL ptpp,
DWORD cthrdMic
);
參數說明:
- ptpp:指向線程池對象的指針。
- cthrdMic:線程池中的最小線程數。
線程池也支持分組操作,可通過綁定TP_CALLBACK_ENVIRON
線程池環境變數實現分組,TP_CALLBACK_ENVIRON是Windows
線程池API
的一部分,它是一個環境變數結構體,用於確定要調用的線程池回調函數的環境。
以下是TP_CALLBACK_ENVIRON結構體的定義:
typedef struct _TP_CALLBACK_ENVIRON {
TP_VERSION Version;
PTP_POOL Pool;
PTP_CLEANUP_GROUP CleanupGroup;
PFN_TP_SIMPLE_CALLBACK CleanupGroupCancelCallback;
PVOID RaceDll;
struct _ACTIVATION_CONTEXT *ActivationContext;
PFN_IO_CALLBACK FinalizationCallback;
union {
DWORD Flags;
struct {
DWORD LongFunction : 1;
DWORD Persistent : 1;
DWORD Private : 30;
} DUMMYSTRUCTNAME;
} DUMMYUNIONNAME;
} TP_CALLBACK_ENVIRON, *PTP_CALLBACK_ENVIRON;
主要成員說明:
- Version:回調環境的版本,必須為 TP_VERSION。
- Pool:回調環境所屬的線程池對象。
- CleanupGroup:回調環境所屬的清理組對象,用於控制回調的取消和資源管理。
- CleanupGroupCancelCallback:當清理組取消回調時,所調用的回調函數。
- RaceDll:保留欄位,用於標記已經看過這個環境變數的 DLL。
- ActivationContext:回調環境的激活上下文,用來保證回調中需要的外部資源正確載入。
- FinalizationCallback:當回調函數執行完成後調用的函數。
- Flags:回調環境的標誌,用於設置回調函數的屬性。
使用TP_CALLBACK_ENVIRON
結構體,可以在創建線程池回調函數時,配置回調函數的環境和參數,以控制回調函數的執行方式和行為。
例如,可以使用TP_CALLBACK_ENVIRON
中的CleanupGroup
和CleanupGroupCancelCallback
成員,將回調函數添加到清理組中,併在需要時取消回調。又或者在FinalizationCallback
中執行某些特殊的清理任務,以確保在回調函數執行完畢後釋放資源。
#include <Windows.h>
#include <iostream>
#include <stdlib.h>
// 線程執行函數
void NTAPI TaskHandler(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
for (int x = 0; x < 100; x++)
{
printf("線程ID: %d ---> 子線程: %d \n", GetCurrentThreadId(), x);
}
}
// 單次線程任務
void NTAPI poolThreadFunc(PTP_CALLBACK_INSTANCE Instance, PVOID Context)
{
printf("執行單次線程任務: %d \n", GetCurrentThreadId());
}
int main(int argc,char *argv)
{
// 創建線程池
PTP_POOL pool = CreateThreadpool(NULL);
// 設置線程池 最小與最大 資源數
SetThreadpoolThreadMinimum(pool, 1);
SetThreadpoolThreadMaximum(pool, 100);
// 初始化線程池環境變數
TP_CALLBACK_ENVIRON cbe;
InitializeThreadpoolEnvironment(&cbe);
// 設置線程池回調的線程池
SetThreadpoolCallbackPool(&cbe, pool);
// 創建清理組
PTP_CLEANUP_GROUP cleanupGroup = CreateThreadpoolCleanupGroup();
// 為線程池設定清理組
SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);
// 創建線程池
PTP_WORK pwork = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandler, NULL, &cbe);
// 迴圈提交線程工作任務
for (int x = 0; x < 100; x++)
{
SubmitThreadpoolWork(pwork);
}
// 提交單次線程任務
TrySubmitThreadpoolCallback(poolThreadFunc, NULL, &cbe);
TrySubmitThreadpoolCallback(poolThreadFunc, NULL, &cbe);
// 等待線程池結束,關閉線程組
WaitForThreadpoolWorkCallbacks(pwork, false);
CloseThreadpoolWork(pwork);
// 關閉清理組
CloseThreadpoolCleanupGroupMembers(cleanupGroup, false, NULL);
// 銷毀線程池環境變數
DestroyThreadpoolEnvironment(&cbe);
CloseThreadpool(pool);
system("pause");
return 0;
}
當讀者使用線程池時,同樣會遇到線程的同步問題,線程池內的線程函數同樣支持互斥鎖、信號量、內核事件控制、臨界區控制等同步和互斥機制,用於保護共用資源的訪問和修改。
這些同步和互斥機制可以用來解決線程間競爭和數據不一致的問題。例如,線上程池中如果有多個工作線程同時訪問共用資源,就需要使用互斥鎖或臨界區控制來確保每個線程對共用資源的使用不會相互干擾,避免出現數據競爭和不一致的情況。
使用這些同步和互斥機制時,應該根據實際場景進行選擇和設計。例如,互斥鎖適合用於保護少量的共用資源、需要經常訪問和更新的場景,而信號量適合用於控制併發訪問數量、資源池、生產者消費者模式等場景。同時,需要註意遵循線程安全和同步的原則,以避免死鎖、饑餓等問題。
#include <Windows.h>
#include <iostream>
#include <stdlib.h>
unsigned long g_count = 0;
// --------------------------------------------------------------
// 線程池同步-互斥量同步
// --------------------------------------------------------------
void NTAPI TaskHandlerMutex(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
// 鎖定資源
WaitForSingleObject(*(HANDLE *)Context, INFINITE);
for (int x = 0; x < 100; x++)
{
printf("線程ID: %d ---> 子線程: %d \n", GetCurrentThreadId(), x);
g_count = g_count + 1;
}
// 解鎖資源
ReleaseMutexWhenCallbackReturns(Instance, *(HANDLE*)Context);
}
void TestMutex()
{
// 創建互斥量
HANDLE hMutex = CreateMutex(NULL, FALSE, NULL);
PTP_WORK pool = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerMutex, &hMutex, NULL);
for (int i = 0; i < 1000; i++)
{
SubmitThreadpoolWork(pool);
}
WaitForThreadpoolWorkCallbacks(pool, FALSE);
CloseThreadpoolWork(pool);
CloseHandle(hMutex);
printf("相加後 ---> %d \n", g_count);
}
// --------------------------------------------------------------
// 線程池同步-事件內核對象
// --------------------------------------------------------------
void NTAPI TaskHandlerKern(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
// 鎖定資源
WaitForSingleObject(*(HANDLE *)Context, INFINITE);
for (int x = 0; x < 100; x++)
{
printf("線程ID: %d ---> 子線程: %d \n", GetCurrentThreadId(), x);
g_count = g_count + 1;
}
// 解鎖資源
SetEventWhenCallbackReturns(Instance, *(HANDLE*)Context);
}
void TestKern()
{
HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
SetEvent(hEvent);
PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerKern, &hEvent, NULL);
for (int i = 0; i < 1000; i++)
{
SubmitThreadpoolWork(pwk);
}
WaitForThreadpoolWorkCallbacks(pwk, FALSE);
CloseThreadpoolWork(pwk);
printf("相加後 ---> %d \n", g_count);
}
// --------------------------------------------------------------
// 線程池同步-信號量同步
// --------------------------------------------------------------
void NTAPI TaskHandlerSemaphore(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
// 鎖定資源
WaitForSingleObject(*(HANDLE *)Context, INFINITE);
for (int x = 0; x < 100; x++)
{
printf("線程ID: %d ---> 子線程: %d \n", GetCurrentThreadId(), x);
g_count = g_count + 1;
}
// 解鎖資源
ReleaseSemaphoreWhenCallbackReturns(Instance, *(HANDLE*)Context, 1);
}
void TestSemaphore()
{
// 創建信號量為100
HANDLE hSemaphore = CreateSemaphore(NULL, 0, 100, NULL);
ReleaseSemaphore(hSemaphore, 10, NULL);
PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerSemaphore, &hSemaphore, NULL);
for (int i = 0; i < 1000; i++)
{
SubmitThreadpoolWork(pwk);
}
WaitForThreadpoolWorkCallbacks(pwk, FALSE);
CloseThreadpoolWork(pwk);
CloseHandle(hSemaphore);
printf("相加後 ---> %d \n", g_count);
}
// --------------------------------------------------------------
// 線程池同步-臨界區
// --------------------------------------------------------------
void NTAPI TaskHandlerLeave(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
// 鎖定資源
EnterCriticalSection((CRITICAL_SECTION*)Context);
for (int x = 0; x < 100; x++)
{
printf("線程ID: %d ---> 子線程: %d \n", GetCurrentThreadId(), x);
g_count = g_count + 1;
}
// 解鎖資源
LeaveCriticalSectionWhenCallbackReturns(Instance, (CRITICAL_SECTION*)Context);
}
void TestLeave()
{
CRITICAL_SECTION cs;
InitializeCriticalSection(&cs);
PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerLeave, &cs, NULL);
for (int i = 0; i < 1000; i++)
{
SubmitThreadpoolWork(pwk);
}
WaitForThreadpoolWorkCallbacks(pwk, FALSE);
DeleteCriticalSection(&cs);
CloseThreadpoolWork(pwk);
printf("相加後 ---> %d \n", g_count);
}
int main(int argc,char *argv)
{
// TestMutex();
// TestKern();
// TestSemaphore();
TestLeave();
system("pause");
return 0;
}
本文作者: 王瑞
本文鏈接: https://www.lyshark.com/post/505839cb.html
版權聲明: 本博客所有文章除特別聲明外,均採用 BY-NC-SA 許可協議。轉載請註明出處!
文章出處:https://www.cnblogs.com/LyShark/p/17739720.html
本博客所有文章除特別聲明外,均採用 BY-NC-SA 許可協議。轉載請註明出處!