9.2 運用API實現線程同步

来源:https://www.cnblogs.com/LyShark/archive/2023/10/02/17739720.html
-Advertisement-
Play Games

Windows 線程同步是指多個線程一同訪問共用資源時,為了避免資源的併發訪問導致數據的不一致或程式崩潰等問題,需要對線程的訪問進行協同和控制,以保證程式的正確性和穩定性。Windows提供了多種線程同步機制,以適應不同的併發編程場景。以上同步機制各有優缺點和適用場景,開發者應根據具體應用場景進行選... ...


Windows 線程同步是指多個線程一同訪問共用資源時,為了避免資源的併發訪問導致數據的不一致或程式崩潰等問題,需要對線程的訪問進行協同和控制,以保證程式的正確性和穩定性。Windows提供了多種線程同步機制,以適應不同的併發編程場景。主要包括以下幾種:

  • 事件(Event):用於不同線程間的信號通知。包括單次通知事件和重覆通知事件兩種類型。
  • 互斥量(Mutex):用於控制對共用資源的訪問,具有獨占性,可避免線程之間對共用資源的非法訪問。
  • 臨界區(CriticalSection):和互斥量類似,也用於控制對共用資源的訪問,但是是進程內部的,因此比較適用於同一進程中的線程同步控制。
  • 信號量(Semaphore):用於基於計數器機制,控制併發資源的訪問數量。
  • 互鎖變數(Interlocked Variable):用於對變數的併發修改操作控制,可提供一定程度的原子性操作保證。

以上同步機制各有優缺點和適用場景,開發者應根據具體應用場景進行選擇和使用。線上程同步的實現過程中,需要註意競爭條件和死鎖的處理,以確保程式中的線程能協同工作,共用資源能夠正確訪問和修改。線程同步是併發編程中的重要基礎,對於開發高效、穩定的併發應用至關重要。

9.2.1 CreateEvent

CreateEvent 是Windows API提供的用於創建事件對象的函數之一,該函數用於創建一個事件對象,並返回一個表示該事件對象的句柄。可以通過SetEvent函數將該事件對象設置為有信號狀態,通過ResetEevent函數將該事件對象設置為無信號狀態。當使用WaitForSingleObject或者WaitForMultipleObjects函數等待事件對象時,會阻塞線程直到事件狀態被置位。對於手動重置事件,需要調用ResetEvent函數手動將事件狀態置位。

CreateEvent 函數常用於線程同步和進程間通信,在不同線程或者進程之間通知事件狀態的改變。例如,某個線程完成了一項任務,需要通知其它等待該任務完成的線程;或者某個進程需要和另一個進程進行協調,需要通知其它進程某個事件的發生等等。

CreateEvent 函數的函數原型如下:

HANDLE CreateEvent(
  LPSECURITY_ATTRIBUTES lpEventAttributes,
  BOOL                  bManualReset,
  BOOL                  bInitialState,
  LPCTSTR               lpName
);

參數說明:

  • lpEventAttributes:指向SECURITY_ATTRIBUTES結構體的指針,指定事件對象的安全描述符和訪問許可權。通常設為NULL,表示使用預設值。
  • bManualReset:指定事件對象的類型,TRUE表示創建的是手動重置事件,FALSE表示創建的是自動重置事件。
  • bInitialState:指定事件對象的初始狀態,TRUE表示將事件對象設為有信號狀態,FALSE表示將事件對象設為無信號狀態。
  • lpName:指定事件對象的名稱,可以為NULL。

CreateEvent 是實現線程同步和進程通信的重要手段之一,應用廣泛且易用。在第一章中我們創建的多線程環境可能會出現線程同步的問題,此時使用Event事件機制即可很好的解決,首先在初始化時通過CreateEvent將事件設置為False狀態,進入ThreadFunction線程時再次通過SetEvent釋放,以此即可實現線程同步順序執行的目的。

#include <stdio.h>
#include <process.h>
#include <windows.h>

// 全局資源
long g_nNum = 0;

// 子線程個數
const int THREAD_NUM = 10;

CRITICAL_SECTION  g_csThreadCode;
HANDLE g_hThreadEvent;

unsigned int __stdcall ThreadFunction(void *ptr)
{
  int nThreadNum = *(int *)ptr;

  // 線程函數中觸發事件
  SetEvent(g_hThreadEvent);

  // 進入線程鎖
  EnterCriticalSection(&g_csThreadCode);
  g_nNum++;
  printf("線程編號: %d --> 全局資源值: %d --> 子線程ID: %d \n", nThreadNum, g_nNum, GetCurrentThreadId());

  // 離開線程鎖
  LeaveCriticalSection(&g_csThreadCode);
  return 0;
}

int main(int argc,char * argv[])
{
  unsigned int ThreadCount = 0;
  HANDLE  handle[THREAD_NUM];

  // 初始化自動將事件設置為False
  g_hThreadEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  InitializeCriticalSection(&g_csThreadCode);

  for (int each = 0; each < THREAD_NUM; each++)
  {
    handle[each] = (HANDLE)_beginthreadex(NULL, 0, ThreadFunction, &each, 0, &ThreadCount);

    // 等待線程事件被觸發
    WaitForSingleObject(g_hThreadEvent, INFINITE);
  }

  WaitForMultipleObjects(THREAD_NUM, handle, TRUE, INFINITE);

  // 銷毀事件
  CloseHandle(g_hThreadEvent);
  DeleteCriticalSection(&g_csThreadCode);

  system("pause");
  return 0;
}

當然了事件對象同樣可以實現更為複雜的同步機制,在如下我們在創建對象時,可以設置non-signaled狀態運行的auto-reset模式,當我們設置好我們需要的參數時,可以直接使用SetEvent(hEvent)設置事件狀態,則會自動執行線程函數。

要創建一個manual-reset模式並且初始狀態為not-signaled的事件對象,需要按照以下步驟:

首先定義一個SECURITY_ATTRIBUTES結構體變數,設置其中的參數為NULL表示使用預設安全描述符,例如。

SECURITY_ATTRIBUTES sa = {0};
sa.nLength = sizeof(sa);
sa.lpSecurityDescriptor = NULL;
sa.bInheritHandle = FALSE;

接著調用CreateEvent函數創建事件對象,將bManualResetbInitialState參數設置為FALSE,表示創建manual-reset模式的事件對象並初始狀態為not-signaled。例如:

HANDLE hEvent = CreateEvent(
                      &sa,           // 安全屬性
                      TRUE,          // Manual-reset模式
                      FALSE,         // Not-signaled 初始狀態
                      NULL           // 事件對象名稱
                      );

這樣,我們就創建了一個名為hEventmanual-reset模式的事件對象,初始狀態為not-signaled。可以通過SetEvent函數將事件對象設置為signaled狀態,通過ResetEvent函數將事件對象設置為non-signaled狀態,也可以通過WaitForSingleObject或者WaitForMultipleObjects函數等待事件對象的狀態變化。

#include <windows.h>  
#include <stdio.h>  
#include <process.h>  
#define STR_LEN 100  

// 存儲全局字元串
static char str[STR_LEN];

// 設置事件句柄
static HANDLE hEvent;

// 統計字元串中是否存在A
unsigned WINAPI NumberOfA(void *arg)
{
  int cnt = 0;
  // 等待線程對象事件
  WaitForSingleObject(hEvent, INFINITE);
  for (int i = 0; str[i] != 0; i++)
  {
    if (str[i] == 'A')
      cnt++;
  }
  printf("Num of A: %d \n", cnt);
  return 0;
}

// 統計字元串總長度
unsigned WINAPI NumberOfOthers(void *arg)
{
  int cnt = 0;
  // 等待線程對象事件
  WaitForSingleObject(hEvent, INFINITE);
  for (int i = 0; str[i] != 0; i++)
  {
    if (str[i] != 'A')
      cnt++;
  }
  printf("Num of others: %d \n", cnt - 1);
  return 0;
}

int main(int argc, char *argv[])
{
  HANDLE hThread1, hThread2;

  // 以non-signaled創建manual-reset模式的事件對象
  // 該對象創建後不會被立即執行,只有我們設置狀態為Signaled時才會繼續
  hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);

  hThread1 = (HANDLE)_beginthreadex(NULL, 0, NumberOfA, NULL, 0, NULL);
  hThread2 = (HANDLE)_beginthreadex(NULL, 0, NumberOfOthers, NULL, 0, NULL);

  fputs("Input string: ", stdout);
  fgets(str, STR_LEN, stdin);

  // 字元串讀入完畢後,將事件句柄改為signaled狀態  
  SetEvent(hEvent);

  WaitForSingleObject(hThread1, INFINITE);
  WaitForSingleObject(hThread2, INFINITE);

  // non-signaled 如果不更改,對象繼續停留在signaled
  ResetEvent(hEvent);

  CloseHandle(hEvent);

  system("pause");
  return 0;
}

9.2.2 CreateSemaphore

CreateSemaphore 是Windows API提供的用於創建信號量的函數之一,用於控制多個線程之間對共用資源的訪問數量。該函數常用於創建一個計數信號量對象,並返回一個表示該信號量對象的句柄。可以通過ReleaseSemaphore函數將該信號量對象的計數加1,通過WaitForSingleObject或者WaitForMultipleObjects函數等待信號量對象的計數變成正數以後再將其減1,以實現對共用資源訪問數量的控制。

CreateSemaphore 函數常用於實現生產者消費者模型、線程池、任務隊列等併發編程場景,用於限制訪問共用資源的線程數量。信號量機制更多時候被用於限制資源的數量而不是限制線程的數量,但也可以用來實現一些線程同步場景。

該函數的函數原型如下:

HANDLE CreateSemaphore(
  LPSECURITY_ATTRIBUTES lpSemaphoreAttributes,
  LONG                  lInitialCount,
  LONG                  lMaximumCount,
  LPCTSTR               lpName
);

參數說明:

  • lpSemaphoreAttributes:指向SECURITY_ATTRIBUTES結構體的指針,指定信號量對象的安全描述符和訪問許可權。通常設為NULL,表示使用預設值。
  • lInitialCount:指定信號量對象的初始計數,表示可以同時訪問共用資源的線程數量。
  • lMaximumCount:指定信號量對象的最大計數,表示信號量對象的計數上限。
  • lpName:指定信號量對象的名稱,可以為NULL。

總的來說,CreateSemaphore 是實現線程同步和進程通信,控制對共用資源的訪問數量的重要手段之一,如下一段演示代碼片段則通過此方法解決了線程通過問題,首先調用CreateSemaphore初始化時將信號量設置一個最大值,每次進入線程函數內部時,則ReleaseSemaphore信號自動加1,如果大於指定的數值則WaitForSingleObject等待釋放信號.

#include <stdio.h>
#include <process.h>
#include <windows.h>

// 全局資源
long g_nNum = 0;

// 子線程個數
const int THREAD_NUM = 10;

CRITICAL_SECTION  g_csThreadCode;
HANDLE g_hThreadParameter;

unsigned int __stdcall ThreadFunction(void *ptr)
{
  int nThreadNum = *(int *)ptr;

  // 信號量++
  ReleaseSemaphore(g_hThreadParameter, 1, NULL);

  // 進入線程鎖
  EnterCriticalSection(&g_csThreadCode);
  g_nNum++;
  printf("線程編號: %d --> 全局資源值: %d --> 子線程ID: %d \n", nThreadNum, g_nNum, GetCurrentThreadId());

  // 離開線程鎖
  LeaveCriticalSection(&g_csThreadCode);
  return 0;
}

int main(int argc,char * argv[])
{
  unsigned int ThreadCount = 0;
  HANDLE  handle[THREAD_NUM];

  // 初始化信號量當前0個資源,最大允許1個同時訪問
  g_hThreadParameter = CreateSemaphore(NULL, 0, 1, NULL);
  InitializeCriticalSection(&g_csThreadCode);

  for (int each = 0; each < THREAD_NUM; each++)
  {
    handle[each] = (HANDLE)_beginthreadex(NULL, 0, ThreadFunction, &each, 0, &ThreadCount);

    // 等待信號量>0
    WaitForSingleObject(g_hThreadParameter, INFINITE);
  }

  // 關閉信號
  CloseHandle(g_hThreadParameter);

  // 等待所有進程結束
  WaitForMultipleObjects(THREAD_NUM, handle, TRUE, INFINITE);
  DeleteCriticalSection(&g_csThreadCode);

  system("pause");
  return 0;
}

如下所示代碼片段,是一個應用了兩個線程的案例,初始化信號為0,利用信號量值為0時進入non-signaled狀態,大於0時進入signaled狀態的特性即可實現線程同步。

執行WaitForSingleObject(semTwo, INFINITE);會讓線程函數進入類似掛起的狀態,當接到ReleaseSemaphore(semOne, 1, NULL);才會恢復執行。

#include <windows.h>  
#include <stdio.h>  

static HANDLE semOne,semTwo;
static int num;

// 線程函數A用於接收參書
DWORD WINAPI ReadNumber(LPVOID lpParamter)
{
  int i;
  for (i = 0; i < 5; i++)
  {
    fputs("Input Number: ", stdout);

    // 臨界區的開始 signaled狀態  
    WaitForSingleObject(semTwo, INFINITE);
    
    scanf("%d", &num);

    // 臨界區的結束 non-signaled狀態  
    ReleaseSemaphore(semOne, 1, NULL);
  }
  return 0;
}

// 線程函數B: 用戶接受參數後完成計算
DWORD WINAPI Check(LPVOID lpParamter)
{
  int sum = 0, i;
  for (i = 0; i < 5; i++)
  {
    // 臨界區的開始 non-signaled狀態  
    WaitForSingleObject(semOne, INFINITE);
    sum += num;

    // 臨界區的結束 signaled狀態  
    ReleaseSemaphore(semTwo, 1, NULL);
  }
  printf("The Number IS: %d \n", sum);
  return 0;
}

int main(int argc, char *argv[])
{
  HANDLE hThread1, hThread2;

  // 創建信號量對象,設置為0進入non-signaled狀態  
  semOne = CreateSemaphore(NULL, 0, 1, NULL);

  // 創建信號量對象,設置為1進入signaled狀態  
  semTwo = CreateSemaphore(NULL, 1, 1, NULL);

  hThread1 = CreateThread(NULL, 0, ReadNumber, NULL, 0, NULL);
  hThread2 = CreateThread(NULL, 0, Check, NULL, 0, NULL);

  // 關閉臨界區
  WaitForSingleObject(hThread1, INFINITE);
  WaitForSingleObject(hThread2, INFINITE);

  CloseHandle(semOne);
  CloseHandle(semTwo);

  system("pause");
  return 0;
}

9.2.3 CreateMutex

CreateMutex 是Windows API提供的用於創建互斥體對象的函數之一,該函數用於創建一個互斥體對象,並返回一個表示該互斥體對象的句柄。可以通過WaitForSingleObject或者WaitForMultipleObjects函數等待互斥體對象,以確保只有一個線程能夠訪問共用資源,其他線程需要等待該線程釋放互斥體對象後才能繼續訪問。當需要釋放互斥體對象時,可以調用ReleaseMutex函數將其釋放。

CreateMutex 函數常用於對共用資源的訪問控制,避免多個線程同時訪問導致數據不一致的問題。有時候,互斥體也被用於跨進程同步訪問共用資源。

該函數的函數原型如下:

HANDLE CreateMutex(
  LPSECURITY_ATTRIBUTES lpMutexAttributes,
  BOOL                  bInitialOwner,
  LPCTSTR               lpName
);

參數說明:

  • lpMutexAttributes:指向SECURITY_ATTRIBUTES結構體的指針,指定互斥體對象的安全描述符和訪問許可權。通常設為NULL,表示使用預設值。
  • bInitialOwner:指定互斥體的初始狀態,TRUE表示將互斥體設置為有所有權的狀態,FALSE表示將互斥體設置為沒有所有權的狀態。
  • lpName:指定互斥體的名稱,可以為NULL。

該函數是實現線程同步和進程通信,控制對共用資源的訪問的重要手段之一,應用廣泛且易用。

如下案例所示,使用互斥鎖可以實現單位時間內,只允許一個線程擁有對共用資源的獨占許可權,從而實現了互不衝突的線程同步。

#include <windows.h>
#include <iostream>

using namespace std;

// 創建互斥鎖
HANDLE hMutex = NULL;

// 線程函數
DWORD WINAPI Func(LPVOID lpParamter)
{
  for (int x = 0; x < 10; x++)
  {
    // 請求獲得一個互斥鎖
    WaitForSingleObject(hMutex, INFINITE);

    cout << "thread func" << endl;

    // 釋放互斥鎖
    ReleaseMutex(hMutex);
  }
  return 0;
}

int main(int argc,char * argv[])
{
  HANDLE hThread = CreateThread(NULL, 0, Func, NULL, 0, NULL);

  hMutex = CreateMutex(NULL, FALSE, "lyshark");
  CloseHandle(hThread);

  for (int x = 0; x < 10; x++)
  {
    // 請求獲得一個互斥鎖
    WaitForSingleObject(hMutex, INFINITE);
    cout << "main thread" << endl;
    
    // 釋放互斥鎖
    ReleaseMutex(hMutex);
  }
  system("pause");
  return 0;
}

當然通過互斥鎖我們也可以實現贊單位時間內同時同步執行兩個線程函數,如下代碼所示;

#include <windows.h>
#include <iostream>

using namespace std;

// 創建互斥鎖
HANDLE hMutex = NULL;
#define NUM_THREAD 50

// 線程函數1
DWORD WINAPI FuncA(LPVOID lpParamter)
{
  for (int x = 0; x < 10; x++)
  {
    // 請求獲得一個互斥鎖
    WaitForSingleObject(hMutex, INFINITE);

    cout << "this is thread func A" << endl;

    // 釋放互斥鎖
    ReleaseMutex(hMutex);
  }
  return 0;
}

// 線程函數2
DWORD WINAPI FuncB(LPVOID lpParamter)
{
  for (int x = 0; x < 10; x++)
  {
    // 請求獲得一個互斥鎖
    WaitForSingleObject(hMutex, INFINITE);

    cout << "this is thread func B" << endl;

    // 釋放互斥鎖
    ReleaseMutex(hMutex);
  }
  return 0;
}

int main(int argc, char * argv[])
{

  // 用來存儲線程函數的句柄
  HANDLE tHandle[NUM_THREAD];

  // 創建互斥量,此時為signaled狀態
  hMutex = CreateMutex(NULL, FALSE, "lyshark");

  for (int x = 0; x < NUM_THREAD; x++)
  {
    if (x % 2)
    {
      tHandle[x] = CreateThread(NULL, 0, FuncA, NULL, 0, NULL);
    }
    else
    {
      tHandle[x] = CreateThread(NULL, 0, FuncB, NULL, 0, NULL);
    }
  }

  // 等待所有線程函數執行完畢
  WaitForMultipleObjects(NUM_THREAD, tHandle, TRUE, INFINITE);
  
  // 銷毀互斥對象
  CloseHandle(hMutex);

  system("pause");
  return 0;
}

9.2.4 ThreadParameters

線上程環境中,有時候啟動新線程時我們需要對不同的線程傳入不同的參數,通常實現線程傳參的方法有許多,一般可分為使用全局變數,使用結構體,使用類的成員函數等,本節將使用結構體傳參,通過創建一個結構體,將需要傳遞的參數存儲在結構體中,並將結構體的指針傳遞給線程函數。子線程在執行時,可以通過該指針訪問結構體中的參數。

對於簡單的參數傳遞而言,線程函數中定義LPVOID允許傳遞一個參數,此時我們只需要在函數中接收並強轉(int)(LPVOID)port即可獲取到一個整數類型的參數,如下是一個簡單的埠掃描軟體代碼片段。

#include <stdio.h>
#include <Windows.h>

// 線程函數接收一個參數
DWORD WINAPI ScanThread(LPVOID port)
{
  // 將參數強制轉化為需要的類型
  int Port = (int)(LPVOID)port;
  printf("[+] 埠: %5d \n", port);
  return 1;
}

int main(int argc, char* argv[])
{
  HANDLE handle;

  for (int port = 0; port < 100; port++)
  {
    handle = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ScanThread, (LPVOID)port, 0, 0);
  }
  WaitForSingleObject(handle, INFINITE);

  system("pause");
  return 0;
}

當然只傳遞一個參數在多數情況下時不夠使用的,而想線上程函數中傳遞多個參數,則需要傳遞一個結構指針,通過線程函數內部強轉為結構類型後,即可實現取值,如下代碼中我們首先定義了一個THREAD_PARAM結構體,該結構內有兩個成員分別指定掃描主機地址以及埠號,當參數被傳遞到ScanThread線程函數內部時只需要將指針內的數據拷貝到自身線程函數內,即可正確的引用特定的參數。

#include <stdio.h>
#include <windows.h>

typedef struct _THREAD_PARAM
{
  char *HostAddr;             // 掃描主機
  DWORD dwStartPort;          // 埠號
}THREAD_PARAM;

// 這個掃描線程函數
DWORD WINAPI ScanThread(LPVOID lpParam)
{
  // 拷貝傳遞來的掃描參數
  THREAD_PARAM ScanParam = { 0 };
  MoveMemory(&ScanParam, lpParam, sizeof(THREAD_PARAM));
  printf("地址: %-16s --> 埠: %-5d \n", ScanParam.HostAddr, ScanParam.dwStartPort);
  return 0;
}

int main(int argc, char *argv[])
{
  THREAD_PARAM ThreadParam = { 0 };

  for (int ip = 0; ip < 100; ip++)
  {
    char src_addr[50] = "192.168.1.";
    char sub_addr[10] = {0};
    // int number = atoi(sub_addr);
    
    // 將整數轉為字元串
    sprintf(sub_addr, "%d", ip);
    strcat(src_addr, sub_addr);

    // 將拼接好的字元串放到HostAddr
    ThreadParam.HostAddr = src_addr;

    for (DWORD port = 1; port < 10; port++)
    {
      // 指定埠號
      ThreadParam.dwStartPort = port;
      HANDLE hThread = CreateThread(NULL, 0, ScanThread, (LPVOID)&ThreadParam, 0, NULL);
      WaitForSingleObject(hThread, INFINITE);
    }
  }

  system("pause");
  return 0;
}

9.2.5 ThreadPool

Windows 線程池是一種非同步執行任務的機制,可以將任務提交到線程池中,由線程池自動分配線程執行任務。線程池可以有效地利用系統資源,提高程式的併發能力和性能。Windows 線程池是Windows操作系統提供的一種原生的線程池機制,可以使用Windows API函數進行操作。

CreateThreadpoolWork 是Windows API提供的用於創建一個工作從池線程中執行的工作對象的函數之一,該函數用於創建一個工作項,並返回一個表示該工作項的指針。可以通過SubmitThreadpoolWork函數將該工作項提交到線程池中進行執行。當該工作項完成後,線程池還可以使用回調函數清理函數TP_FREE_CLEANUP_GROUP回收資源。

該函數的函數原型如下:

PTP_WORK CreateThreadpoolWork(
  PTP_WORK_CALLBACK  pfnwk,     // 工作項回調函數指針
  PVOID              pv,        // 回調函數的參數指針
  PTP_CALLBACK_ENVIRON pcbe      // 回調函數運行環境
);

參數說明:

  • pfnwk:指向工作項回調函數的指針,該函數將在工作線程池中執行。例如:
VOID CALLBACK MyWorkCallback(
    PTP_CALLBACK_INSTANCE Instance,
    PVOID Context, PTP_WORK Work)
{
    // 實現工作項的具體操作
}
  • pv:指向回調函數的參數指針,由回調函數進行處理。

  • pcbe:指向TP_CALLBACK_ENVIRON結構體的指針,提供了回調函數需要的一些運行環境信息,例如可選的回調函數執行器TP_CALLBACK_INSTANCE和回調函數完成後的清理函數TP_CLEANUP_GROUP等。如果為NULL,則使用系統提供的預設值。

CreateThreadpoolWork 函數常用於實現線程池中的任務隊列,可以將一個具體的任務封裝為一個工作項,並提交到線程池中等待執行。該機制可以有效地提高任務的處理速度和效率,減少系統資源開銷。但是,需要註意線程池的資源占用問題,合理調優,避免線程泄漏等問題。

CallbackMayRunLong 是Windows API提供的調用標記函數之一,該函數用於標記回調函數是否可能耗時較長。如果回調函數不會耗時較長,則無需調用該函數。如果回調函數可能耗時較長,則建議在執行回調函數之前調用該函數對回調函數進行標記,以便線程池進行資源分配和調度等策略。

CallbackMayRunLong函數的函數原型如下:

VOID CallbackMayRunLong(
  PTP_CALLBACK_INSTANCE pci
);

參數說明:

  • pci:指向TP_CALLBACK_INSTANCE結構體的指針,表示回調函數的執行器,用於提供回調函數的運行環境信息。

SubmitThreadpoolWork 是Windows API提供的將工作項提交到線程池中執行的函數之一,該函數用於將工作項提交到線程池中等待被工作者線程執行。線程池中的工作者線程通過GetQueuedCompletionStatus函數從工作隊列中獲取工作項並執行。通過SubmitThreadpoolWorkGetQueuedCompletionStatus結合使用,可以實現線程池中的任務隊列,提高任務處理效率和系統性能。

SubmitThreadpoolWork 函數的函數原型如下:

VOID SubmitThreadpoolWork(
  PTP_WORK pwk
);

參數說明:

  • pwk:指向TP_WORK結構體的指針,表示要提交到線程池中執行的工作項。

讀者需要註意,SubmitThreadpoolWork 函數提交的是工作項而不是回調函數,回調函數是通過事先創建工作項指定的。在使用SubmitThreadpoolWork提交工作項時,需要根據具體的業務需求進行合理的設計和實現,避免線程池資源浪費、性能下降、記憶體泄漏等問題。

WaitForThreadpoolWorkCallbacks 是Windows API提供的等待線程池中工作項完成的函數之一,該函數用於等待線程池中提交的所有工作項被處理完畢。需要註意的是,該函數會阻塞當前線程直到所有工作項處理完畢,因此需要謹慎使用,避免阻塞其它線程的執行。

WaitForThreadpoolWorkCallbacks 函數的函數原型如下:

VOID WaitForThreadpoolWorkCallbacks(
  PTP_WORK pwk,
  BOOL     fCancelPendingCallbacks
);

參數說明:

  • pwk:指向TP_WORK結構體的指針,表示要等待完成的工作項。
  • fCancelPendingCallbacks:用於指定是否取消所有待處理的工作項。如果為TRUE,則取消所有待處理的工作項;如果為FALSE,則等待所有待處理的工作項被處理完畢。

要使用CreateThreadpoolWork()創建一個線程池很容易實現,讀者只需要指定TaskHandler線程函數即可,當需要啟動線程池時通過調用SubmitThreadpoolWork函數提交一組請求即可,如下是一個簡單的線程池創建功能實現。

#include <Windows.h>
#include <iostream>
#include <stdlib.h>

unsigned long g_count = 0;

// 線程執行函數
void NTAPI TaskHandler(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
  if (CallbackMayRunLong(Instance))
  {
    printf("剩餘資源: %d --> 線程ID: %d \n", InterlockedIncrement(&g_count), GetCurrentThreadId());
  }

  Sleep(5000);
  printf("運行子線程 \n");

  for (int x = 0; x < 100; x++)
  {
    printf("線程ID: %d ---> 子線程: %d \n", GetCurrentThreadId(), x);
  }
}

int main(int argc,char *argv)
{
  PTP_WORK workItem = CreateThreadpoolWork(TaskHandler, NULL, NULL);

  for (int x = 0; x < 100; x++)
  {
    // 調用SubmitThreadpoolWork向線程池提交一個請求
    SubmitThreadpoolWork(workItem);
  }

  // 等待線程池調用結束
  WaitForThreadpoolWorkCallbacks(workItem, false);
  CloseThreadpoolWork(workItem);

  system("pause");
  return 0;
}

線程池函數同樣支持限制線程數,限制線程可以通過調用SetThreadpoolThreadMinimum()實現,該函數可以在創建線程池後設置線程池的最小線程數。當線程池中的任務隊列中存在待執行的任務,並且當前工作線程的數量小於最小線程數時,線程池將自動創建新的工作線程,以確保待執行任務能夠及時得到處理。

以下是函數的原型定義:

VOID WINAPI SetThreadpoolThreadMinimum(
  PTP_POOL ptpp,
  DWORD     cthrdMic
);

參數說明:

  • ptpp:指向線程池對象的指針。
  • cthrdMic:線程池中的最小線程數。

線程池也支持分組操作,可通過綁定TP_CALLBACK_ENVIRON線程池環境變數實現分組,TP_CALLBACK_ENVIRON是Windows線程池API的一部分,它是一個環境變數結構體,用於確定要調用的線程池回調函數的環境。

以下是TP_CALLBACK_ENVIRON結構體的定義:

typedef struct _TP_CALLBACK_ENVIRON {
  TP_VERSION                  Version;
  PTP_POOL                    Pool;
  PTP_CLEANUP_GROUP           CleanupGroup;
  PFN_TP_SIMPLE_CALLBACK      CleanupGroupCancelCallback;
  PVOID                       RaceDll;
  struct _ACTIVATION_CONTEXT *ActivationContext;
  PFN_IO_CALLBACK             FinalizationCallback;
  union {
    DWORD Flags;
    struct {
      DWORD LongFunction : 1;
      DWORD Persistent   : 1;
      DWORD Private      : 30;
    } DUMMYSTRUCTNAME;
  } DUMMYUNIONNAME;
} TP_CALLBACK_ENVIRON, *PTP_CALLBACK_ENVIRON;

主要成員說明:

  • Version:回調環境的版本,必須為 TP_VERSION。
  • Pool:回調環境所屬的線程池對象。
  • CleanupGroup:回調環境所屬的清理組對象,用於控制回調的取消和資源管理。
  • CleanupGroupCancelCallback:當清理組取消回調時,所調用的回調函數。
  • RaceDll:保留欄位,用於標記已經看過這個環境變數的 DLL。
  • ActivationContext:回調環境的激活上下文,用來保證回調中需要的外部資源正確載入。
  • FinalizationCallback:當回調函數執行完成後調用的函數。
  • Flags:回調環境的標誌,用於設置回調函數的屬性。

使用TP_CALLBACK_ENVIRON結構體,可以在創建線程池回調函數時,配置回調函數的環境和參數,以控制回調函數的執行方式和行為。

例如,可以使用TP_CALLBACK_ENVIRON中的CleanupGroupCleanupGroupCancelCallback成員,將回調函數添加到清理組中,併在需要時取消回調。又或者在FinalizationCallback中執行某些特殊的清理任務,以確保在回調函數執行完畢後釋放資源。

#include <Windows.h>
#include <iostream>
#include <stdlib.h>

// 線程執行函數
void NTAPI TaskHandler(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
  for (int x = 0; x < 100; x++)
  {
    printf("線程ID: %d ---> 子線程: %d \n", GetCurrentThreadId(), x);
  }
}

// 單次線程任務
void NTAPI poolThreadFunc(PTP_CALLBACK_INSTANCE Instance, PVOID Context)
{
  printf("執行單次線程任務: %d \n", GetCurrentThreadId());
}

int main(int argc,char *argv)
{
  // 創建線程池
  PTP_POOL pool = CreateThreadpool(NULL);

  // 設置線程池 最小與最大 資源數
  SetThreadpoolThreadMinimum(pool, 1);
  SetThreadpoolThreadMaximum(pool, 100);

  // 初始化線程池環境變數
  TP_CALLBACK_ENVIRON cbe;

  InitializeThreadpoolEnvironment(&cbe);

  // 設置線程池回調的線程池
  SetThreadpoolCallbackPool(&cbe, pool);

  // 創建清理組
  PTP_CLEANUP_GROUP cleanupGroup = CreateThreadpoolCleanupGroup();

  // 為線程池設定清理組
  SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);

  // 創建線程池
  PTP_WORK pwork = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandler, NULL, &cbe);

  // 迴圈提交線程工作任務
  for (int x = 0; x < 100; x++)
  {
    SubmitThreadpoolWork(pwork);
  }

  // 提交單次線程任務
  TrySubmitThreadpoolCallback(poolThreadFunc, NULL, &cbe);
  TrySubmitThreadpoolCallback(poolThreadFunc, NULL, &cbe);

  // 等待線程池結束,關閉線程組
  WaitForThreadpoolWorkCallbacks(pwork, false);
  CloseThreadpoolWork(pwork);

  // 關閉清理組
  CloseThreadpoolCleanupGroupMembers(cleanupGroup, false, NULL);
  
  // 銷毀線程池環境變數
  DestroyThreadpoolEnvironment(&cbe);

  CloseThreadpool(pool);
  system("pause");
  return 0;
}

當讀者使用線程池時,同樣會遇到線程的同步問題,線程池內的線程函數同樣支持互斥鎖、信號量、內核事件控制、臨界區控制等同步和互斥機制,用於保護共用資源的訪問和修改。

這些同步和互斥機制可以用來解決線程間競爭和數據不一致的問題。例如,線上程池中如果有多個工作線程同時訪問共用資源,就需要使用互斥鎖或臨界區控制來確保每個線程對共用資源的使用不會相互干擾,避免出現數據競爭和不一致的情況。

使用這些同步和互斥機制時,應該根據實際場景進行選擇和設計。例如,互斥鎖適合用於保護少量的共用資源、需要經常訪問和更新的場景,而信號量適合用於控制併發訪問數量、資源池、生產者消費者模式等場景。同時,需要註意遵循線程安全和同步的原則,以避免死鎖、饑餓等問題。

#include <Windows.h>
#include <iostream>
#include <stdlib.h>

unsigned long g_count = 0;

// --------------------------------------------------------------
// 線程池同步-互斥量同步
// --------------------------------------------------------------
void NTAPI TaskHandlerMutex(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
  // 鎖定資源
  WaitForSingleObject(*(HANDLE *)Context, INFINITE);

  for (int x = 0; x < 100; x++)
  {
    printf("線程ID: %d ---> 子線程: %d \n", GetCurrentThreadId(), x);
    g_count = g_count + 1;
  }

  // 解鎖資源
  ReleaseMutexWhenCallbackReturns(Instance, *(HANDLE*)Context);
}

void TestMutex()
{
  // 創建互斥量
  HANDLE hMutex = CreateMutex(NULL, FALSE, NULL);

  PTP_WORK pool = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerMutex, &hMutex, NULL);

  for (int i = 0; i < 1000; i++)
  {
    SubmitThreadpoolWork(pool);
  }

  WaitForThreadpoolWorkCallbacks(pool, FALSE);
  CloseThreadpoolWork(pool);
  CloseHandle(hMutex);

  printf("相加後 ---> %d \n", g_count);
}

// --------------------------------------------------------------
// 線程池同步-事件內核對象
// --------------------------------------------------------------
void NTAPI TaskHandlerKern(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
  // 鎖定資源
  WaitForSingleObject(*(HANDLE *)Context, INFINITE);

  for (int x = 0; x < 100; x++)
  {
    printf("線程ID: %d ---> 子線程: %d \n", GetCurrentThreadId(), x);
    g_count = g_count + 1;
  }

  // 解鎖資源
  SetEventWhenCallbackReturns(Instance, *(HANDLE*)Context);
}

void TestKern()
{
  HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  SetEvent(hEvent);

  PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerKern, &hEvent, NULL);

  for (int i = 0; i < 1000; i++)
  {
    SubmitThreadpoolWork(pwk);
  }

  WaitForThreadpoolWorkCallbacks(pwk, FALSE);
  CloseThreadpoolWork(pwk);

  printf("相加後 ---> %d \n", g_count);
}

// --------------------------------------------------------------
// 線程池同步-信號量同步
// --------------------------------------------------------------
void NTAPI TaskHandlerSemaphore(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
  // 鎖定資源
  WaitForSingleObject(*(HANDLE *)Context, INFINITE);

  for (int x = 0; x < 100; x++)
  {
    printf("線程ID: %d ---> 子線程: %d \n", GetCurrentThreadId(), x);
    g_count = g_count + 1;
  }

  // 解鎖資源
  ReleaseSemaphoreWhenCallbackReturns(Instance, *(HANDLE*)Context, 1);
}

void TestSemaphore()
{
  // 創建信號量為100
  HANDLE hSemaphore = CreateSemaphore(NULL, 0, 100, NULL);

  ReleaseSemaphore(hSemaphore, 10, NULL);

  PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerSemaphore, &hSemaphore, NULL);

  for (int i = 0; i < 1000; i++)
  {
    SubmitThreadpoolWork(pwk);
  }

  WaitForThreadpoolWorkCallbacks(pwk, FALSE);
  CloseThreadpoolWork(pwk);
  CloseHandle(hSemaphore);

  printf("相加後 ---> %d \n", g_count);
}

// --------------------------------------------------------------
// 線程池同步-臨界區
// --------------------------------------------------------------
void NTAPI TaskHandlerLeave(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
  // 鎖定資源
  EnterCriticalSection((CRITICAL_SECTION*)Context);

  for (int x = 0; x < 100; x++)
  {
    printf("線程ID: %d ---> 子線程: %d \n", GetCurrentThreadId(), x);
    g_count = g_count + 1;
  }

  // 解鎖資源
  LeaveCriticalSectionWhenCallbackReturns(Instance, (CRITICAL_SECTION*)Context);
}

void TestLeave()
{
  CRITICAL_SECTION cs;
  InitializeCriticalSection(&cs);

  PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerLeave, &cs, NULL);

  for (int i = 0; i < 1000; i++)
  {
    SubmitThreadpoolWork(pwk);
  }

  WaitForThreadpoolWorkCallbacks(pwk, FALSE);
  DeleteCriticalSection(&cs);
  CloseThreadpoolWork(pwk);

  printf("相加後 ---> %d \n", g_count);
}

int main(int argc,char *argv)
{
  // TestMutex();
  // TestKern();
  // TestSemaphore();
  TestLeave();

  system("pause");
  return 0;
}

本文作者: 王瑞
本文鏈接: https://www.lyshark.com/post/505839cb.html
版權聲明: 本博客所有文章除特別聲明外,均採用 BY-NC-SA 許可協議。轉載請註明出處!

文章作者:lyshark (王瑞)
文章出處:https://www.cnblogs.com/LyShark/p/17739720.html
本博客所有文章除特別聲明外,均採用 BY-NC-SA 許可協議。轉載請註明出處!
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Docker 是一款流行的開源容器化平臺,使用 Docker 可以有效地隔離應用程式和系統環境,使得應用程式在不同的環境中具有相同的行為 Docker Compose 是一個用於定義和管理多個容器工具 ...
  • Java實現簡單計算器 參考鏈接🔗:https://www.bilibili.com/video/BV1d54y1s7uC?p=1&vd_source=cf21268954e139179e71f046bac01e56 設計思路 創建容器框架 創建組件和組件佈局方式 組件的測試 數字按鈕和功能按鈕的 ...
  • 以下內容來源網上 經過整合而成 一、一般介紹 STL(Standard Template Library),即標準模板庫,是一個具有工業強度的,高效的C++程式庫。它被容納於C++標準程式庫(C++ Standard Library)中,是ANSI/ISO C++標準中最新的也是極具革命性的一部分。 ...
  • 本篇為[用go設計開發一個自己的輕量級登錄庫/框架吧 - 秋玻 - 博客園 (cnblogs.com)]的二級認證業務篇,會講講二級認證業務的實現,給庫/框架增加新的功能。 源碼:https://github.com/weloe/token-go ...
  • Stream 簡介 Spring Cloud Stream 是用於構建消息驅動的微服務應用程式的框架,提供了多種中間件的合理配置 Spring Cloud Stream 包含以下核心概念: Destination Binders:目標綁定器,目標指的是 Kafka 或者 RabbitMQ,綁定器就是 ...
  • 我們前面完成了Dart語言基礎特性的學習,包括基礎語法概覽、迭代集合、非同步編程、Mixin高級特性和變數等。今天我們來學習Dart的庫相關知識,包括如何導入庫、指定庫首碼、導入部分或者排除部分庫、延遲導入庫等,最後看下Dart中67個關鍵字作為標識符的一些約束…… ...
  • 需求:輸入錯誤的手機號,會有提示語,正確的手機號碼會有正確的圖標 效果: 思路: (1)排版(不細講),使用input 、button、span等標簽,排版裡面一個主要的小點是,需要寫出兩個span ,通過v-show先進行隱藏,等後面判斷手機號碼的正確錯誤再進行顯示與隱藏 (2)接著,就需要在in ...
  • Java 21中除了推出JEP 445:Unnamed Classes and Instance Main Methods之外,還有另外一個預覽功能:未命名模式和變數(Unnamed Patterns and Variables)。該新特性的目的是提高代碼的可讀性和可維護性。 下麵通過一個例子來理解 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...