一個Windows下線程池的實現(C++)

来源:https://www.cnblogs.com/tanguoying/archive/2018/02/19/8454637.html
-Advertisement-
Play Games

前言 本文配套代碼:https://github.com/TTGuoying/ThreadPool 先看看幾個概念: 我們為什麼要使用線程池呢? 簡單來說就是線程本身存在開銷,我們利用多線程來進行任務處理,單線程也不能濫用,無止禁的開新線程會給系統產生大量消耗,而線程本來就是可重用的資源,不需要每次 ...


前言

  本文配套代碼:https://github.com/TTGuoying/ThreadPool

  先看看幾個概念:

  1.  線程:進程中負責執行的執行單元。一個進程中至少有一個線程。
  2.  多線程:一個進程中有多個線程同時運行,根據cpu切換輪流工作,在多核cpu上可以幾個線程同時在不同的核心上同時運行。
  3.  線程池:基本思想還是一種對象池思想,開闢一塊記憶體空間,裡面存放一些休眠(掛起Suspend)的線程。當有任務要執行時,從池中取一個空閑的線程執行任務,執行完成後線程休眠放回池中。這樣可以避免反覆創建線程對象所帶來的性能開銷,節省了系統的資源。

  我們為什麼要使用線程池呢?

 

  簡單來說就是線程本身存在開銷,我們利用多線程來進行任務處理,單線程也不能濫用,無止禁的開新線程會給系統產生大量消耗,而線程本來就是可重用的資源,不需要每次使用時都進行初始化,因此可以採用有限的線程個數處理無限的任務。

代碼實現

 

  本文的線程池是在Windows上實現的。主要思路是維護一個空閑線程隊列、一個忙碌線程隊列和一個任務隊列,一開始建立一定數量的空閑線程放進空閑線程隊列,當有任務進入任務隊列時,從空閑線程隊列中去一個線程執行任務,線程變為忙碌線程移入忙碌線程隊列,任務執行完成後,線程到任務隊列中取任務繼續執行,如果任務隊列中沒有任務線程休眠後從忙碌線程隊列回到空閑線程隊列。下麵是線程池的工作原理圖:

  本線程池類實現了自動調節池中線程數。

  廢話少說,直接上代碼:

 

  1 /*
  2 ==========================================================================
  3 * 類ThreadPool是本代碼的核心類,類中自動維護線程池的創建和任務隊列的派送
  4 
  5 * 其中的TaskFun是任務函數
  6 * 其中的TaskCallbackFun是回調函數
  7 
  8 *用法:定義一個ThreadPool變數,TaskFun函數和TaskCallbackFun回調函數,然後調用ThreadPool的QueueTaskItem()函數即可
  9 
 10 Author: TTGuoying
 11 
 12 Date: 2018/02/19 23:15
 13 
 14 ==========================================================================
 15 */
 16 #pragma once
 17 #include <Windows.h>
 18 #include <list>
 19 #include <queue>
 20 #include <memory>
 21 
 22 using std::list;
 23 using std::queue;
 24 using std::shared_ptr;
 25 
 26 #define THRESHOLE_OF_WAIT_TASK  20
 27 
 28 typedef int(*TaskFun)(PVOID param);                // 任務函數
 29 typedef void(*TaskCallbackFun)(int result);        // 回調函數
 30 
 31 class ThreadPool
 32 {
 33 private:
 34     // 線程類(內部類)
 35     class Thread
 36     {
 37     public:
 38         Thread(ThreadPool *threadPool);
 39         ~Thread();
 40 
 41         BOOL isBusy();                                                    // 是否有任務在執行
 42         void ExecuteTask(TaskFun task, PVOID param, TaskCallbackFun taskCallback);    // 執行任務
 43 
 44     private:
 45         ThreadPool *threadPool;                                            // 所屬線程池
 46         BOOL    busy;                                                    // 是否有任務在執行
 47         BOOL    exit;                                                    // 是否退出
 48         HANDLE  thread;                                                    // 線程句柄
 49         TaskFun    task;                                                    // 要執行的任務
 50         PVOID    param;                                                    // 任務參數
 51         TaskCallbackFun taskCb;                                            // 回調的任務
 52         static unsigned int __stdcall ThreadProc(PVOID pM);                // 線程函數
 53     };
 54 
 55     // IOCP的通知種類
 56     enum WAIT_OPERATION_TYPE
 57     {
 58         GET_TASK,
 59         EXIT
 60     };
 61 
 62     // 待執行的任務類
 63     class WaitTask
 64     {
 65     public:
 66         WaitTask(TaskFun task, PVOID param, TaskCallbackFun taskCb, BOOL bLong)
 67         {
 68             this->task = task;
 69             this->param = param;
 70             this->taskCb = taskCb;
 71             this->bLong = bLong;
 72         }
 73         ~WaitTask() { task = NULL; taskCb = NULL; bLong = FALSE; param = NULL; }
 74 
 75         TaskFun    task;                    // 要執行的任務
 76         PVOID param;                    // 任務參數
 77         TaskCallbackFun taskCb;            // 回調的任務
 78         BOOL bLong;                        // 是否時長任務
 79     };
 80 
 81     // 從任務列表取任務的線程函數
 82     static unsigned int __stdcall GetTaskThreadProc(PVOID pM)
 83     {
 84         ThreadPool *threadPool = (ThreadPool *)pM;
 85         BOOL bRet = FALSE;
 86         DWORD dwBytes = 0;
 87         WAIT_OPERATION_TYPE opType;
 88         OVERLAPPED *ol;
 89         while (WAIT_OBJECT_0 != WaitForSingleObject(threadPool->stopEvent, 0))
 90         {
 91             BOOL bRet = GetQueuedCompletionStatus(threadPool->completionPort, &dwBytes, (PULONG_PTR)&opType, &ol, INFINITE);
 92             // 收到退出標誌
 93             if (EXIT == (DWORD)opType)
 94             {
 95                 break;
 96             }
 97             else if (GET_TASK == (DWORD)opType)
 98             {
 99                 threadPool->GetTaskExcute();
100             }
101         }
102         return 0;
103     }
104 
105     //線程臨界區鎖
106     class CriticalSectionLock
107     {
108     private:
109         CRITICAL_SECTION cs;//臨界區
110     public:
111         CriticalSectionLock() { InitializeCriticalSection(&cs); }
112         ~CriticalSectionLock() { DeleteCriticalSection(&cs); }
113         void Lock() { EnterCriticalSection(&cs); }
114         void UnLock() { LeaveCriticalSection(&cs); }
115     };
116 
117 
118 public:
119     ThreadPool(size_t minNumOfThread = 2, size_t maxNumOfThread = 10);
120     ~ThreadPool();
121 
122     BOOL QueueTaskItem(TaskFun task, PVOID param, TaskCallbackFun taskCb = NULL, BOOL longFun = FALSE);       // 任務入隊
123 
124 private:
125     size_t getCurNumOfThread() { return getIdleThreadNum() + getBusyThreadNum(); }    // 獲取線程池中的當前線程數
126     size_t GetMaxNumOfThread() { return maxNumOfThread - numOfLongFun; }            // 獲取線程池中的最大線程數
127     void SetMaxNumOfThread(size_t size)            // 設置線程池中的最大線程數
128     { 
129         if (size < numOfLongFun)
130         {
131             maxNumOfThread = size + numOfLongFun;
132         }
133         else
134             maxNumOfThread = size; 
135     }                    
136     size_t GetMinNumOfThread() { return minNumOfThread; }                            // 獲取線程池中的最小線程數
137     void SetMinNumOfThread(size_t size) { minNumOfThread = size; }                    // 設置線程池中的最小線程數
138 
139     size_t getIdleThreadNum() { return idleThreadList.size(); }    // 獲取線程池中的線程數
140     size_t getBusyThreadNum() { return busyThreadList.size(); }    // 獲取線程池中的線程數
141     void CreateIdleThread(size_t size);                            // 創建空閑線程
142     void DeleteIdleThread(size_t size);                            // 刪除空閑線程
143     Thread *GetIdleThread();                                    // 獲取空閑線程
144     void MoveBusyThreadToIdleList(Thread *busyThread);            // 忙碌線程加入空閑列表
145     void MoveThreadToBusyList(Thread *thread);                    // 線程加入忙碌列表
146     void GetTaskExcute();                                        // 從任務隊列中取任務執行
147     WaitTask *GetTask();                                        // 從任務隊列中取任務
148 
149     CriticalSectionLock idleThreadLock;                            // 空閑線程列表鎖
150     list<Thread *> idleThreadList;                                // 空閑線程列表
151     CriticalSectionLock busyThreadLock;                            // 忙碌線程列表鎖
152     list<Thread *> busyThreadList;                                // 忙碌線程列表
153 
154     CriticalSectionLock waitTaskLock;
155     list<WaitTask *> waitTaskList;                                // 任務列表
156 
157     HANDLE                    dispatchThrad;                        // 分發任務線程
158     HANDLE                    stopEvent;                            // 通知線程退出的時間
159     HANDLE                    completionPort;                        // 完成埠
160     size_t                    maxNumOfThread;                        // 線程池中最大的線程數
161     size_t                    minNumOfThread;                        // 線程池中最小的線程數
162     size_t                    numOfLongFun;                        // 線程池中最小的線程數
163 };

 

  1 #include "stdafx.h"
  2 #include "ThreadPool.h"
  3 #include <process.h>
  4 
  5 
  6 ThreadPool::ThreadPool(size_t minNumOfThread, size_t maxNumOfThread)
  7 {
  8     if (minNumOfThread < 2)
  9         this->minNumOfThread = 2;
 10     else
 11         this->minNumOfThread = minNumOfThread;
 12     if (maxNumOfThread < this->minNumOfThread * 2)
 13         this->maxNumOfThread = this->minNumOfThread * 2;
 14     else
 15         this->maxNumOfThread = maxNumOfThread;
 16     stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
 17     completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);
 18 
 19     idleThreadList.clear();
 20     CreateIdleThread(this->minNumOfThread);
 21     busyThreadList.clear();
 22 
 23     dispatchThrad = (HANDLE)_beginthreadex(0, 0, GetTaskThreadProc, this, 0, 0);
 24     numOfLongFun = 0;
 25 }
 26 
 27 ThreadPool::~ThreadPool()
 28 {
 29     SetEvent(stopEvent);
 30     PostQueuedCompletionStatus(completionPort, 0, (DWORD)EXIT, NULL);
 31 
 32     CloseHandle(stopEvent);
 33 }
 34 
 35 BOOL ThreadPool::QueueTaskItem(TaskFun task, PVOID param, TaskCallbackFun taskCb, BOOL longFun)
 36 {
 37     waitTaskLock.Lock();
 38     WaitTask *waitTask = new WaitTask(task, param, taskCb, longFun);
 39     waitTaskList.push_back(waitTask);
 40     waitTaskLock.UnLock();
 41     PostQueuedCompletionStatus(completionPort, 0, (DWORD)GET_TASK, NULL);
 42     return TRUE;
 43 }
 44 
 45 void ThreadPool::CreateIdleThread(size_t size)
 46 {
 47     idleThreadLock.Lock();
 48     for (size_t i = 0; i < size; i++)
 49     {
 50         idleThreadList.push_back(new Thread(this));
 51     }
 52     idleThreadLock.UnLock();
 53 }
 54 
 55 void ThreadPool::DeleteIdleThread(size_t size)
 56 {
 57     idleThreadLock.Lock();
 58     size_t t = idleThreadList.size();
 59     if (t >= size)
 60     {
 61         for (size_t i = 0; i < size; i++)
 62         {
 63             auto thread = idleThreadList.back();
 64             delete thread;
 65             idleThreadList.pop_back();
 66         }
 67     }
 68     else
 69     {
 70         for (size_t i = 0; i < t; i++)
 71         {
 72             auto thread = idleThreadList.back();
 73             delete thread;
 74             idleThreadList.pop_back();
 75         }
 76     }
 77     idleThreadLock.UnLock();
 78 }
 79 
 80 ThreadPool::Thread *ThreadPool::GetIdleThread()
 81 {
 82     Thread *thread = NULL;
 83     idleThreadLock.Lock();
 84     if (idleThreadList.size() > 0)
 85     {
 86         thread = idleThreadList.front();
 87         idleThreadList.pop_front();
 88     }
 89     idleThreadLock.UnLock();
 90 
 91     if (thread == NULL && getCurNumOfThread() < maxNumOfThread)
 92     {
 93         thread = new Thread(this);
 94     }
 95 
 96     if (thread == NULL && waitTaskList.size() > THRESHOLE_OF_WAIT_TASK)
 97     {
 98         thread = new Thread(this);
 99         InterlockedIncrement(&maxNumOfThread);
100     }
101     return thread;
102 }
103 
104 void ThreadPool::MoveBusyThreadToIdleList(Thread * busyThread)
105 {
106     idleThreadLock.Lock();
107     idleThreadList.push_back(busyThread);
108     idleThreadLock.UnLock();
109 
110     busyThreadLock.Lock();
111     for (auto it = busyThreadList.begin(); it != busyThreadList.end(); it++)
112     {
113         if (*it == busyThread)
114         {
115             busyThreadList.erase(it);
116             break;
117         }
118     }
119     busyThreadLock.UnLock();
120 
121     if (maxNumOfThread != 0 && idleThreadList.size() > maxNumOfThread * 0.8)
122     {
123         DeleteIdleThread(idleThreadList.size() / 2);
124     }
125 
126     PostQueuedCompletionStatus(completionPort, 0, (DWORD)GET_TASK, NULL);
127 }
128 
129 void ThreadPool::MoveThreadToBusyList(Thread * thread)
130 {
131     busyThreadLock.Lock();
132     busyThreadList.push_back(thread);
133     busyThreadLock.UnLock();
134 }
135 
136 void ThreadPool::GetTaskExcute()
137 {
138     Thread *thread = NULL;
139     WaitTask *waitTask = NULL;
140 
141     waitTask = GetTask();
142     if (waitTask == NULL)
143     {
144         return;
145     }
146 
147     if (waitTask->bLong)
148     {
149         if (idleThreadList.size() > minNumOfThread)
150         {
151             thread = GetIdleThread();
152         }
153         else
154         {
155             thread = new Thread(this);
156             InterlockedIncrement(&numOfLongFun);
157             InterlockedIncrement(&maxNumOfThread);
158         }
159     }
160     else
161     {
162         thread = GetIdleThread();
163     }
164 
165     if (thread != NULL)
166     {
167         thread->ExecuteTask(waitTask->task, waitTask->param, waitTask->taskCb);
168         delete waitTask;
169         MoveThreadToBusyList(thread);
170     }
171     else
172     {
173         waitTaskLock.Lock();
174         waitTaskList.push_front(waitTask);
175         waitTaskLock.UnLock();
176     }
177     
178 }
179 
180 ThreadPool::WaitTask *ThreadPool::GetTask()
181 {
182     WaitTask *waitTask = NULL;
183     waitTaskLock.Lock();
184     if (waitTaskList.size() > 0)
185     {
186         waitTask = waitTaskList.front();
187         waitTaskList.pop_front();
188     }
189     waitTaskLock.UnLock();
190     return waitTask;
191 }
192 
193 
194 ThreadPool::Thread::Thread(ThreadPool *threadPool) :
195     busy(FALSE),
196     thread(INVALID_HANDLE_VALUE),
197     task(NULL),
198     taskCb(NULL),
199     exit(FALSE),
200     threadPool(threadPool)
201 {
202     thread = (HANDLE)_beginthreadex(0, 0, ThreadProc, this, CREATE_SUSPENDED, 0);
203 }
204 
205 ThreadPool::Thread::~Thread()
206 {
207     exit = TRUE;
208     task = NULL;
209     taskCb = NULL;
210     ResumeThread(thread);
211     WaitForSingleObject(thread, INFINITE);
212     CloseHandle(thread);
213 }
214 
215 BOOL ThreadPool::Thread::isBusy()
216 {
217     return busy;
218 }
219 
220 void ThreadPool::Thread::ExecuteTask(TaskFun task, PVOID param, TaskCallbackFun taskCallback)
221 {
222     busy = TRUE;
223     this->task = task;
224     this->param = param;
225     this->taskCb = taskCallback;
226     ResumeThread(thread);
227 }
228 
229 unsigned int ThreadPool::Thread::ThreadProc(PVOID pM)
230 {
231     Thread *pThread = (Thread*)pM;
232 
233     while (true)
234     {
235         if (pThread->exit)
236             break; //線程退出
237 
238         if (pThread->task == NULL && pThread->taskCb == NULL)
239         {
240             pThread->busy = FALSE;
241             pThread->threadPool->MoveBusyThreadToIdleList(pThread);
242             SuspendThread(pThread->thread);
243             continue;
244         }
245 
246         int resulst = pThread->task(pThread->param);
247         if(pThread->taskCb)
248             pThread->taskCb(resulst);
249         WaitTask *waitTask = pThread->threadPool->GetTask();
250         if (waitTask != NULL)
251         {
252             pThread->task = waitTask->task;
253             pThread->taskCb = waitTask->taskCb;
254             delete waitTask;
255             continue;
256         }
257         else
258         {
259             pThread->task = NULL;
260             pThread->param = NULL;
261             pThread->taskCb = NULL;
262             pThread->busy = FALSE;
263             pThread->threadPool->MoveBusyThreadToIdleList(pThread);
264             SuspendThread(pThread->thread);
265         }
266     }
267 
268     return 0;
269 }
 1 // ThreadPool.cpp: 定義控制台應用程式的入口點。
 2 //
 3 
 4 #include "stdafx.h"
 5 #include "ThreadPool.h"
 6 #include <stdio.h>
 7 
 8 class Task
 9 {
10 public:
11     static int Task1(PVOID p) 
12     {
13         int i = 10;
14         while (i >= 0)
15         {
16             printf("%d\n", i);
17             Sleep(100);
18             i--;
19         }
20         return i;
21     }
22 };
23 
24 class TaskCallback
25 {
26 public:
27     static void TaskCallback1(int result)
28     {
29         printf("   %d\n", result);
30     }
31 };
32 
33 int main()
34 {
35     ThreadPool threadPool(2, 10);
36     for (size_t i = 0; i < 30; i++)
37     {
38         threadPool.QueueTaskItem(Task::Task1, NULL, TaskCallback::TaskCallback1);
39     }
40     threadPool.QueueTaskItem(Task::Task1, NULL, TaskCallback::TaskCallback1, TRUE);
41     
42     getchar();
43 
44     return 0;
45 }

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 先介紹一下《MySQL資料庫開發的三十六條軍規》,這裡只介紹核心的,具體內容大家可以自行百度,這是從底層開發人員到管理者必須知道規範。出自58趕集。 介紹兩個例子。這個適合用STAR法則。STAR法則是情境(situation)、任務(task)、行動(action)、結論(result)四項的縮寫 ...
  • 首先,原標題是對那些只知道玩的成人說的。 Die With Me ========== Die With Me是一個超級無聊的比列時程式員開發的IOS的APP,有關這個APP大家可以自行 "百度" 。 不少人(包括我)都是通過"躺倒鴨"知道的DWM,我是一初二學生,買不起IPhone,用國產Andr ...
  • js
    1、js:JavaScript一種直譯式腳本語言(解釋型腳本語言,執行前不需要編譯;這一點和Java類似,Java也是解釋型語言,源碼變為位元組碼(jvm可執行的代碼)的過程不是編譯過程),是一種動態類型、弱類型、基於原型的語言,內置支持類型。它的解釋器被稱為JavaScript引擎,為瀏覽器的一部分 ...
  • timeChunk函數讓創建節點的工作分批進行,比如一秒鐘創建1000個節點,改為每個200ms創建10個節點。具體timeChunk函數封裝如下 應用實例見https://92node.com/article/js-fen-shi.html ...
  • 上一篇聊了聊構建分散式系統所面臨的困難,這篇將著重討論構建容錯分散式系統的演算法與協議。構建容錯系統的最佳方法是使用通用抽象,允許應用程式忽略分散式系統中的一些問題。本篇我們先聊一聊線性一致性,以及與線性一致性有關的技術,後續需要瞭解的分散式協調服務,如:ZooKeeper等,都是基於分散式系統的線性 ...
  • 本文簡要地示範瞭如何使用java CardLayout對程式進行佈局。 ...
  • Problem Link: http://codeforces.com/problemset/problem/888/F Problem Statement: There are n points marked on the plane. The points are situated in suc ...
  • 針對本文,博主最近在寫《成神之路系列文章》 ,分章分節介紹所有知識點。歡迎關註。 一、基礎篇 1.1 JVM 1.1.1. Java記憶體模型,Java記憶體管理,Java堆和棧,垃圾回收 http://www.jcp.org/en/jsr/detail?id=133 http://ifeve.com/ ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...