前言 本文配套代碼:https://github.com/TTGuoying/ThreadPool 先看看幾個概念: 我們為什麼要使用線程池呢? 簡單來說就是線程本身存在開銷,我們利用多線程來進行任務處理,單線程也不能濫用,無止禁的開新線程會給系統產生大量消耗,而線程本來就是可重用的資源,不需要每次 ...
前言
本文配套代碼:https://github.com/TTGuoying/ThreadPool
先看看幾個概念:
- 線程:進程中負責執行的執行單元。一個進程中至少有一個線程。
- 多線程:一個進程中有多個線程同時運行,根據cpu切換輪流工作,在多核cpu上可以幾個線程同時在不同的核心上同時運行。
- 線程池:基本思想還是一種對象池思想,開闢一塊記憶體空間,裡面存放一些休眠(掛起Suspend)的線程。當有任務要執行時,從池中取一個空閑的線程執行任務,執行完成後線程休眠放回池中。這樣可以避免反覆創建線程對象所帶來的性能開銷,節省了系統的資源。
我們為什麼要使用線程池呢?
簡單來說就是線程本身存在開銷,我們利用多線程來進行任務處理,單線程也不能濫用,無止禁的開新線程會給系統產生大量消耗,而線程本來就是可重用的資源,不需要每次使用時都進行初始化,因此可以採用有限的線程個數處理無限的任務。
代碼實現
本文的線程池是在Windows上實現的。主要思路是維護一個空閑線程隊列、一個忙碌線程隊列和一個任務隊列,一開始建立一定數量的空閑線程放進空閑線程隊列,當有任務進入任務隊列時,從空閑線程隊列中去一個線程執行任務,線程變為忙碌線程移入忙碌線程隊列,任務執行完成後,線程到任務隊列中取任務繼續執行,如果任務隊列中沒有任務線程休眠後從忙碌線程隊列回到空閑線程隊列。下麵是線程池的工作原理圖:
本線程池類實現了自動調節池中線程數。
廢話少說,直接上代碼:
1 /* 2 ========================================================================== 3 * 類ThreadPool是本代碼的核心類,類中自動維護線程池的創建和任務隊列的派送 4 5 * 其中的TaskFun是任務函數 6 * 其中的TaskCallbackFun是回調函數 7 8 *用法:定義一個ThreadPool變數,TaskFun函數和TaskCallbackFun回調函數,然後調用ThreadPool的QueueTaskItem()函數即可 9 10 Author: TTGuoying 11 12 Date: 2018/02/19 23:15 13 14 ========================================================================== 15 */ 16 #pragma once 17 #include <Windows.h> 18 #include <list> 19 #include <queue> 20 #include <memory> 21 22 using std::list; 23 using std::queue; 24 using std::shared_ptr; 25 26 #define THRESHOLE_OF_WAIT_TASK 20 27 28 typedef int(*TaskFun)(PVOID param); // 任務函數 29 typedef void(*TaskCallbackFun)(int result); // 回調函數 30 31 class ThreadPool 32 { 33 private: 34 // 線程類(內部類) 35 class Thread 36 { 37 public: 38 Thread(ThreadPool *threadPool); 39 ~Thread(); 40 41 BOOL isBusy(); // 是否有任務在執行 42 void ExecuteTask(TaskFun task, PVOID param, TaskCallbackFun taskCallback); // 執行任務 43 44 private: 45 ThreadPool *threadPool; // 所屬線程池 46 BOOL busy; // 是否有任務在執行 47 BOOL exit; // 是否退出 48 HANDLE thread; // 線程句柄 49 TaskFun task; // 要執行的任務 50 PVOID param; // 任務參數 51 TaskCallbackFun taskCb; // 回調的任務 52 static unsigned int __stdcall ThreadProc(PVOID pM); // 線程函數 53 }; 54 55 // IOCP的通知種類 56 enum WAIT_OPERATION_TYPE 57 { 58 GET_TASK, 59 EXIT 60 }; 61 62 // 待執行的任務類 63 class WaitTask 64 { 65 public: 66 WaitTask(TaskFun task, PVOID param, TaskCallbackFun taskCb, BOOL bLong) 67 { 68 this->task = task; 69 this->param = param; 70 this->taskCb = taskCb; 71 this->bLong = bLong; 72 } 73 ~WaitTask() { task = NULL; taskCb = NULL; bLong = FALSE; param = NULL; } 74 75 TaskFun task; // 要執行的任務 76 PVOID param; // 任務參數 77 TaskCallbackFun taskCb; // 回調的任務 78 BOOL bLong; // 是否時長任務 79 }; 80 81 // 從任務列表取任務的線程函數 82 static unsigned int __stdcall GetTaskThreadProc(PVOID pM) 83 { 84 ThreadPool *threadPool = (ThreadPool *)pM; 85 BOOL bRet = FALSE; 86 DWORD dwBytes = 0; 87 WAIT_OPERATION_TYPE opType; 88 OVERLAPPED *ol; 89 while (WAIT_OBJECT_0 != WaitForSingleObject(threadPool->stopEvent, 0)) 90 { 91 BOOL bRet = GetQueuedCompletionStatus(threadPool->completionPort, &dwBytes, (PULONG_PTR)&opType, &ol, INFINITE); 92 // 收到退出標誌 93 if (EXIT == (DWORD)opType) 94 { 95 break; 96 } 97 else if (GET_TASK == (DWORD)opType) 98 { 99 threadPool->GetTaskExcute(); 100 } 101 } 102 return 0; 103 } 104 105 //線程臨界區鎖 106 class CriticalSectionLock 107 { 108 private: 109 CRITICAL_SECTION cs;//臨界區 110 public: 111 CriticalSectionLock() { InitializeCriticalSection(&cs); } 112 ~CriticalSectionLock() { DeleteCriticalSection(&cs); } 113 void Lock() { EnterCriticalSection(&cs); } 114 void UnLock() { LeaveCriticalSection(&cs); } 115 }; 116 117 118 public: 119 ThreadPool(size_t minNumOfThread = 2, size_t maxNumOfThread = 10); 120 ~ThreadPool(); 121 122 BOOL QueueTaskItem(TaskFun task, PVOID param, TaskCallbackFun taskCb = NULL, BOOL longFun = FALSE); // 任務入隊 123 124 private: 125 size_t getCurNumOfThread() { return getIdleThreadNum() + getBusyThreadNum(); } // 獲取線程池中的當前線程數 126 size_t GetMaxNumOfThread() { return maxNumOfThread - numOfLongFun; } // 獲取線程池中的最大線程數 127 void SetMaxNumOfThread(size_t size) // 設置線程池中的最大線程數 128 { 129 if (size < numOfLongFun) 130 { 131 maxNumOfThread = size + numOfLongFun; 132 } 133 else 134 maxNumOfThread = size; 135 } 136 size_t GetMinNumOfThread() { return minNumOfThread; } // 獲取線程池中的最小線程數 137 void SetMinNumOfThread(size_t size) { minNumOfThread = size; } // 設置線程池中的最小線程數 138 139 size_t getIdleThreadNum() { return idleThreadList.size(); } // 獲取線程池中的線程數 140 size_t getBusyThreadNum() { return busyThreadList.size(); } // 獲取線程池中的線程數 141 void CreateIdleThread(size_t size); // 創建空閑線程 142 void DeleteIdleThread(size_t size); // 刪除空閑線程 143 Thread *GetIdleThread(); // 獲取空閑線程 144 void MoveBusyThreadToIdleList(Thread *busyThread); // 忙碌線程加入空閑列表 145 void MoveThreadToBusyList(Thread *thread); // 線程加入忙碌列表 146 void GetTaskExcute(); // 從任務隊列中取任務執行 147 WaitTask *GetTask(); // 從任務隊列中取任務 148 149 CriticalSectionLock idleThreadLock; // 空閑線程列表鎖 150 list<Thread *> idleThreadList; // 空閑線程列表 151 CriticalSectionLock busyThreadLock; // 忙碌線程列表鎖 152 list<Thread *> busyThreadList; // 忙碌線程列表 153 154 CriticalSectionLock waitTaskLock; 155 list<WaitTask *> waitTaskList; // 任務列表 156 157 HANDLE dispatchThrad; // 分發任務線程 158 HANDLE stopEvent; // 通知線程退出的時間 159 HANDLE completionPort; // 完成埠 160 size_t maxNumOfThread; // 線程池中最大的線程數 161 size_t minNumOfThread; // 線程池中最小的線程數 162 size_t numOfLongFun; // 線程池中最小的線程數 163 };
1 #include "stdafx.h" 2 #include "ThreadPool.h" 3 #include <process.h> 4 5 6 ThreadPool::ThreadPool(size_t minNumOfThread, size_t maxNumOfThread) 7 { 8 if (minNumOfThread < 2) 9 this->minNumOfThread = 2; 10 else 11 this->minNumOfThread = minNumOfThread; 12 if (maxNumOfThread < this->minNumOfThread * 2) 13 this->maxNumOfThread = this->minNumOfThread * 2; 14 else 15 this->maxNumOfThread = maxNumOfThread; 16 stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL); 17 completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1); 18 19 idleThreadList.clear(); 20 CreateIdleThread(this->minNumOfThread); 21 busyThreadList.clear(); 22 23 dispatchThrad = (HANDLE)_beginthreadex(0, 0, GetTaskThreadProc, this, 0, 0); 24 numOfLongFun = 0; 25 } 26 27 ThreadPool::~ThreadPool() 28 { 29 SetEvent(stopEvent); 30 PostQueuedCompletionStatus(completionPort, 0, (DWORD)EXIT, NULL); 31 32 CloseHandle(stopEvent); 33 } 34 35 BOOL ThreadPool::QueueTaskItem(TaskFun task, PVOID param, TaskCallbackFun taskCb, BOOL longFun) 36 { 37 waitTaskLock.Lock(); 38 WaitTask *waitTask = new WaitTask(task, param, taskCb, longFun); 39 waitTaskList.push_back(waitTask); 40 waitTaskLock.UnLock(); 41 PostQueuedCompletionStatus(completionPort, 0, (DWORD)GET_TASK, NULL); 42 return TRUE; 43 } 44 45 void ThreadPool::CreateIdleThread(size_t size) 46 { 47 idleThreadLock.Lock(); 48 for (size_t i = 0; i < size; i++) 49 { 50 idleThreadList.push_back(new Thread(this)); 51 } 52 idleThreadLock.UnLock(); 53 } 54 55 void ThreadPool::DeleteIdleThread(size_t size) 56 { 57 idleThreadLock.Lock(); 58 size_t t = idleThreadList.size(); 59 if (t >= size) 60 { 61 for (size_t i = 0; i < size; i++) 62 { 63 auto thread = idleThreadList.back(); 64 delete thread; 65 idleThreadList.pop_back(); 66 } 67 } 68 else 69 { 70 for (size_t i = 0; i < t; i++) 71 { 72 auto thread = idleThreadList.back(); 73 delete thread; 74 idleThreadList.pop_back(); 75 } 76 } 77 idleThreadLock.UnLock(); 78 } 79 80 ThreadPool::Thread *ThreadPool::GetIdleThread() 81 { 82 Thread *thread = NULL; 83 idleThreadLock.Lock(); 84 if (idleThreadList.size() > 0) 85 { 86 thread = idleThreadList.front(); 87 idleThreadList.pop_front(); 88 } 89 idleThreadLock.UnLock(); 90 91 if (thread == NULL && getCurNumOfThread() < maxNumOfThread) 92 { 93 thread = new Thread(this); 94 } 95 96 if (thread == NULL && waitTaskList.size() > THRESHOLE_OF_WAIT_TASK) 97 { 98 thread = new Thread(this); 99 InterlockedIncrement(&maxNumOfThread); 100 } 101 return thread; 102 } 103 104 void ThreadPool::MoveBusyThreadToIdleList(Thread * busyThread) 105 { 106 idleThreadLock.Lock(); 107 idleThreadList.push_back(busyThread); 108 idleThreadLock.UnLock(); 109 110 busyThreadLock.Lock(); 111 for (auto it = busyThreadList.begin(); it != busyThreadList.end(); it++) 112 { 113 if (*it == busyThread) 114 { 115 busyThreadList.erase(it); 116 break; 117 } 118 } 119 busyThreadLock.UnLock(); 120 121 if (maxNumOfThread != 0 && idleThreadList.size() > maxNumOfThread * 0.8) 122 { 123 DeleteIdleThread(idleThreadList.size() / 2); 124 } 125 126 PostQueuedCompletionStatus(completionPort, 0, (DWORD)GET_TASK, NULL); 127 } 128 129 void ThreadPool::MoveThreadToBusyList(Thread * thread) 130 { 131 busyThreadLock.Lock(); 132 busyThreadList.push_back(thread); 133 busyThreadLock.UnLock(); 134 } 135 136 void ThreadPool::GetTaskExcute() 137 { 138 Thread *thread = NULL; 139 WaitTask *waitTask = NULL; 140 141 waitTask = GetTask(); 142 if (waitTask == NULL) 143 { 144 return; 145 } 146 147 if (waitTask->bLong) 148 { 149 if (idleThreadList.size() > minNumOfThread) 150 { 151 thread = GetIdleThread(); 152 } 153 else 154 { 155 thread = new Thread(this); 156 InterlockedIncrement(&numOfLongFun); 157 InterlockedIncrement(&maxNumOfThread); 158 } 159 } 160 else 161 { 162 thread = GetIdleThread(); 163 } 164 165 if (thread != NULL) 166 { 167 thread->ExecuteTask(waitTask->task, waitTask->param, waitTask->taskCb); 168 delete waitTask; 169 MoveThreadToBusyList(thread); 170 } 171 else 172 { 173 waitTaskLock.Lock(); 174 waitTaskList.push_front(waitTask); 175 waitTaskLock.UnLock(); 176 } 177 178 } 179 180 ThreadPool::WaitTask *ThreadPool::GetTask() 181 { 182 WaitTask *waitTask = NULL; 183 waitTaskLock.Lock(); 184 if (waitTaskList.size() > 0) 185 { 186 waitTask = waitTaskList.front(); 187 waitTaskList.pop_front(); 188 } 189 waitTaskLock.UnLock(); 190 return waitTask; 191 } 192 193 194 ThreadPool::Thread::Thread(ThreadPool *threadPool) : 195 busy(FALSE), 196 thread(INVALID_HANDLE_VALUE), 197 task(NULL), 198 taskCb(NULL), 199 exit(FALSE), 200 threadPool(threadPool) 201 { 202 thread = (HANDLE)_beginthreadex(0, 0, ThreadProc, this, CREATE_SUSPENDED, 0); 203 } 204 205 ThreadPool::Thread::~Thread() 206 { 207 exit = TRUE; 208 task = NULL; 209 taskCb = NULL; 210 ResumeThread(thread); 211 WaitForSingleObject(thread, INFINITE); 212 CloseHandle(thread); 213 } 214 215 BOOL ThreadPool::Thread::isBusy() 216 { 217 return busy; 218 } 219 220 void ThreadPool::Thread::ExecuteTask(TaskFun task, PVOID param, TaskCallbackFun taskCallback) 221 { 222 busy = TRUE; 223 this->task = task; 224 this->param = param; 225 this->taskCb = taskCallback; 226 ResumeThread(thread); 227 } 228 229 unsigned int ThreadPool::Thread::ThreadProc(PVOID pM) 230 { 231 Thread *pThread = (Thread*)pM; 232 233 while (true) 234 { 235 if (pThread->exit) 236 break; //線程退出 237 238 if (pThread->task == NULL && pThread->taskCb == NULL) 239 { 240 pThread->busy = FALSE; 241 pThread->threadPool->MoveBusyThreadToIdleList(pThread); 242 SuspendThread(pThread->thread); 243 continue; 244 } 245 246 int resulst = pThread->task(pThread->param); 247 if(pThread->taskCb) 248 pThread->taskCb(resulst); 249 WaitTask *waitTask = pThread->threadPool->GetTask(); 250 if (waitTask != NULL) 251 { 252 pThread->task = waitTask->task; 253 pThread->taskCb = waitTask->taskCb; 254 delete waitTask; 255 continue; 256 } 257 else 258 { 259 pThread->task = NULL; 260 pThread->param = NULL; 261 pThread->taskCb = NULL; 262 pThread->busy = FALSE; 263 pThread->threadPool->MoveBusyThreadToIdleList(pThread); 264 SuspendThread(pThread->thread); 265 } 266 } 267 268 return 0; 269 }
1 // ThreadPool.cpp: 定義控制台應用程式的入口點。 2 // 3 4 #include "stdafx.h" 5 #include "ThreadPool.h" 6 #include <stdio.h> 7 8 class Task 9 { 10 public: 11 static int Task1(PVOID p) 12 { 13 int i = 10; 14 while (i >= 0) 15 { 16 printf("%d\n", i); 17 Sleep(100); 18 i--; 19 } 20 return i; 21 } 22 }; 23 24 class TaskCallback 25 { 26 public: 27 static void TaskCallback1(int result) 28 { 29 printf(" %d\n", result); 30 } 31 }; 32 33 int main() 34 { 35 ThreadPool threadPool(2, 10); 36 for (size_t i = 0; i < 30; i++) 37 { 38 threadPool.QueueTaskItem(Task::Task1, NULL, TaskCallback::TaskCallback1); 39 } 40 threadPool.QueueTaskItem(Task::Task1, NULL, TaskCallback::TaskCallback1, TRUE); 41 42 getchar(); 43 44 return 0; 45 }