##2.使用I/O復用技術和線程池 網路中有很多用戶會嘗試去connect()這個WebServer上正在listen的這個port,而監聽到的這些連接會排隊等待被accept()。由於用戶連接請求是隨機到達的非同步事件,每當監聽socket(listenfd)listen到新的客戶連接並且放入監聽隊 ...
2.使用I/O復用技術和線程池
網路中有很多用戶會嘗試去connect()這個WebServer上正在listen的這個port,而監聽到的這些連接會排隊等待被accept()。由於用戶連接請求是隨機到達的非同步事件,每當監聽socket(listenfd)listen到新的客戶連接並且放入監聽隊列,我們都需要告訴Web伺服器有連接來了,accept這個連接,並分配一個邏輯單元來處理這個用戶請求。而且,我們在處理這個請求的同時,還需要繼續監聽其他客戶的請求並分配另一邏輯單元來處理新的用戶請求(即併發,同時處理多個事件,後面會使用線程池實現併發)。
這裡,伺服器通過epoll這種I/O復用技術來實現對監聽socket(listenfd)和連接socket(客戶請求)的同時監聽。I/O復用雖然可以同時監聽多個文件描述符,但是它本身是阻塞的,並且當有多個文件描述符同時就緒的時候,如果不採取額外措施,程式則只能按順序處理其中就緒的每一個文件描述符,所以為提高效率,我們將在這部分通過線程池來實現併發(多線程併發),為每個就緒的文件描述符分配一個邏輯單元(線程)來處理。
代碼塊
//對文件描述符設置非阻塞
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
/* 將fd上的EPOLLIN和EPOLLET事件註冊到epollfd指示的epoll內核事件中 */
void addfd(int epollfd, int fd, bool one_shot) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
/* 針對connfd,開啟EPOLLONESHOT,因為我們希望每個socket在任意時刻都只被一個線程處理 */
if(one_shot)
event.events |= EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
/* 創建一個額外的文件描述符來唯一標識內核中的epoll事件表 */
int epollfd = epoll_create(5);
/* 用於存儲epoll事件表中就緒事件的event數組 */
epoll_event events[MAX_EVENT_NUMBER];
/* 主線程往epoll內核事件表中註冊監聽socket事件,當listen到新的客戶連接時,listenfd變為就緒事件 */
addfd(epollfd, listenfd, false);
/* 主線程調用epoll_wait等待一組文件描述符上的事件,並將當前所有就緒的epoll_event複製到events數組中 */
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
/* 然後我們遍歷這一數組以處理這些已經就緒的事件 */
for(int i = 0; i < number; ++i) {
int sockfd = events[i].data.fd; // 事件表中就緒的socket文件描述符
if(sockfd == listenfd) { // 當listen到新的用戶連接,listenfd上則產生就緒事件
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
/* ET模式 */
while(1) {
/* accept()返回一個新的socket文件描述符用於send()和recv() */
int connfd = accept(listenfd, (struct sockaddr *) &client_address, &client_addrlength);
/* 並將connfd註冊到內核事件表中,users是 http_conn 類型的數組 */
users[connfd].init(connfd, client_address);
/*
...
*/
}
}
else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {
// 如有異常,則直接關閉客戶連接,並刪除該用戶的timer
/*
...
*/
}
else if(events[i].events & EPOLLIN) {
/* 當這一sockfd上有可讀事件時,epoll_wait通知主線程。*/
if(users[sockfd].read()) { /* 主線程從這一sockfd迴圈讀取數據, 直到沒有更多數據可讀 */
pool->append(users + sockfd); /* 然後將讀取到的數據封裝成一個請求對象並插入請求隊列 */
/*
...
*/
}
else
/*
...
*/
}
else if(events[i].events & EPOLLOUT) {
/* 當這一sockfd上有可寫事件時,epoll_wait通知主線程。主線程往socket上寫入伺服器處理客戶請求的結果 */
if(users[sockfd].write()) {
/*
...
*/
}
else
/*
...
*/
}
}
accept函數
伺服器通過 accept() 函數來接收客戶端請求。
函數原型如下:
int accept(int sock, struct sockaddr *addr, socklen_t *addrlen)
sock 為伺服器端套接字,addr 為 sockaddr_in 結構體變數,addrlen 為參數 addr 的長度,可由 sizeof() 求得。
accept() 返回一個新的套接字來和客戶端通信,addr 保存了客戶端的IP地址和埠號。後面和客戶端通信時,要使用這個新生成的套接字。
I/O復用
I/O復用使得程式能同時監聽多個文件描述符。通常,網路程式在以下情況需要使用I/O復用技術:
- 客戶端程式要同時處理多個socket。
- 客戶端程式要同時處理用戶輸入和網路連接。
- TCP伺服器要同時處理監聽socket和連接socket。這是I/O復用使用最多的場合。
- 伺服器要同時處理TCP請求和UDP請求。
- 伺服器要同時監聽多個埠,或處理多種服務。
註意:I/O復用本身是阻塞的。
Linux下實現I/O復用的系統調用主要有select、poll、epoll。
select
int select ( int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout )
1)nfds參數指定被監聽的文件描述符的總數。通常設置為被監聽的所有文件描述符中的最大值加1,因為文件描述符是從0開始計數的。
2)readfds、writefds、exceptfds參數分別指向可讀、可寫和異常等事件對應的文件描述符集合。
3)timeout參數用來設置select函數的超時時間。
返回值:成功時返回就緒文件描述符的總數,超時返回0,出錯返回-1並設置errno。
poll
int poll ( struct pollfd * fds, nfds_t nfds, int timeout )
1)fds參數是一個pollfd結構類型的數組,它指定所有我們感興趣的文件描述符上發生的可讀、可寫、異常等事件。
2)nfds參數指定被監聽事件集合fds的大小。
3)timeout參數指定poll超時值,單位是毫秒。當timeout值為-1時,poll調用將永遠阻塞,直到某個事件發生;當timeout值為0時,poll調用將立即返回。
poll返回值和select一樣。
epoll
epoll是Linux特有的I/O復用函數。epoll使用一組函數(共三個函數)完成任務,把用戶關心的文件描述符上的事件放在內核里的一個事件表中,無需像select和poll那樣每次調用都要重覆傳入文件描述符或事件集。但epoll需要一個額外的文件描述符,來唯一標識內核中的這個事件表。這個文件描述符使用epoll_create函數來創建:
int epoll_create ( int sieze )
size參數只是告訴內核這個epoll對象會處理的事件大致數目,而不是能夠處理事件的最大數目。即size參數沒有任何作用。
返回值:成功:epoll 專用的文件描述符;失敗:-1。
註意:使用完epoll後,必須調用close()關閉,否則可能導致fd被耗盡。
操作epoll的內核事件表的函數:
int epoll_ctl( int epfd, int op, int fd, struct epoll_event *event )
epfd參數即epoll句柄(使用epoll_create函數返回的文件描述符),op參數表示動作,用三個巨集來表示:
EPOLL_CTL_ADD:註冊新的fd到epfd中;
EPOLL_CTL_MOD:修改已經註冊的fd的監聽事件;
EPOLL_CTL_DEL:從epfd中刪除一個fd;
fd參數指需要監聽的fd,event參數告訴內核需要監聽什麼事,struct epoll_event結構如下:
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
event可以是以下幾個巨集的集合:
EPOLLIN :表示對應的文件描述符可以讀(包括對端SOCKET正常關閉);
EPOLLOUT:表示對應的文件描述符可以寫;
EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這裡應該表示有帶外數據到來);
EPOLLERR:表示對應的文件描述符發生錯誤;
EPOLLHUP:表示對應的文件描述符被掛斷;
EPOLLET: 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的。
EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列里
例如將event設置為 讀 和 ET模式 事件的集合:ev.events = EPOLLIN | EPOLLET;
返回值:epoll_ctl成功時返回0,失敗返回-1並設置errno。
註意:它不同於 select() 是在監聽事件時告訴內核要監聽什麼類型的事件,而是在這裡先註冊要監聽的事件類型。
epoll_wait函數。等侍註冊在epfd上的socket fd的事件的發生,其原型如下:
int epoll_wait( int epfd, struct epoll_event * events, int maxevents, int timeout )
1)epfd是 epoll的描述符。
2)events則是分配好的 epoll_event結構體數組,epoll將會把發生的事件複製到 events數組中(events不可以是空指針,內核只負責把數據複製到這個 events數組中,不會去幫助我們在用戶態中分配記憶體。內核這種做法效率很高)。
3)maxevents表示本次可以返回的最大事件數目,通常 maxevents參數與預分配的events數組的大小是相等的。
4)timeout表示在沒有檢測到事件發生時最多等待的時間(單位為毫秒),如果 timeout為0,則表示 epoll_wait在 rdllist鏈表中為空,立刻返回,不會等待。
返回值:該函數成功時返回就緒的文件描述符的個數,失敗時返回-1並設置errno。
註意:如果有事件的發生則會將發生的socket fd和事件類型放入到events數組中,並將註冊在epfd上的socket fd的事件類型給清空。如果下一個迴圈還要關註這個socket fd的話,則需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)來重新設置socket fd的事件類型。這時不用EPOLL_CTL_ADD,因為socket fd並未清空,只是事件類型清空。
epoll對文件描述符的操作有兩種模式:LT(Level Trigger,電平觸發)模式和 ET(Edge Trigger,邊沿觸發)模式。
LT模式是預設的工作模式,這種模式下epoll相當於一個效率較高的poll,當epoll_wait檢測到其上有事件發生並將此事件通知應用程式後,應用程式可以不立即處理該事件。這樣,當應用程式下一次調用epoll_wait時,epoll_wait還會再次嚮應用程式通知此事件,直到此事件被處理。
ET模式下,當epoll_wait檢測到其上有事件發生並將此事件通知應用程式後,應用程式必須立即處理該事件,因為後續的epoll_wait不再嚮應用程式通知這一事件。ET模式在很大程度上降低了同一個epoll事件被重覆觸發的次數,因此效率要比LT模式高。需要往epoll內核事件表中註冊一個文件描述符上的EPOLLET事件epoll才能變為ET模式。
註意:每個使用ET模式的文件描述符都應該是非阻塞的。如果是阻塞的,那麼讀或寫操作會因為沒有後續的事件而一直處於阻塞狀態。
即使我們使用ET模式,一個socket上的某個事件還是可能被觸發多次。這在併發的程式中會導致多個線程(或進程)同時操作一個socket的情況出現。例如一個線程在讀取完某個socket上的數據後開始處理這些數據,而在處理這些數據的過程中該socket又有新數據可讀(EPOLLIN 再次被觸發),此時程式會喚醒另一個線程來讀取這些新的數據。這並不是我們期望的,這會使程式的健壯性大大降低而編程的複雜度大大增加。我們期望的是一個socket連接在任意時刻都只被一個線程處理。這就可以使用epoll的 EPOLLONESHOT 事件實現。
對於註冊了 EPOLLONESHOT 事件的文件描述符,操作系統最多觸發其上註冊的一個可讀、可寫、或者異常事件,且只觸發一次。這樣,當一個線程在處理某個socket時,其他線程是不可能有機會操作該socket的。但反過來,註冊了 EPOLLONESHOT 事件的socket一旦被某個線程處理完畢,該線程就應該立即重置這個socket上的 EPOLLONESHOT 事件,以確保這個socket下一次可讀時,其 EPOLLIN 事件能被觸發,讓其他工作線程有機會繼續處理這個socket。
三組I/O復用函數的比較
系統調用 | 事件集合 | 應用程式索引就緒文件描述符的時間複雜度 | 最大支持文件描述符數 | 工作模式 | 內核實現和工作效率 |
---|---|---|---|---|---|
select | 用戶通過3個參數分別傳入感興趣的可讀、可寫及異常等事件,內核通過對這些參數的線上修改來反饋其中的就緒事件。這使得用戶每次調用select都要重置這3個參數 | O(n) | 一般有最大限制 | LT | 採用輪詢的方式來檢測就緒事件,演算法複雜度為O(n) |
poll | 統一處理所有事件類型,因此只需一個事件集參數,用戶通過pollfd.events傳入感興趣的事,內核通過修改pollfd.revents反饋其中就緒的事件 | O(n) | 65535 | LT | 採用輪詢的方式來檢測就緒事件,演算法複雜度為O(n) |
epoll | 內核通過一個事件表直接管理用戶感興趣的所有事件。因此每次調用epoll_wait時,無需反覆傳入用戶感興趣的事件。epoll_wait的參數events僅用來反饋就緒的事件 | O(1) | 65535 | LT 或 ET | 採用回調方式來檢測就緒事件,演算法複雜度為O(1) |
綜上,當監測的fd數量較小,且各個fd都很活躍的情況下,建議使用select和poll;當監聽的fd數量較多,且單位時間僅部分fd活躍的情況下,使用epoll會明顯提升性能。
多線程編程
創建線程和結束線程
線程相關常用的API如下(在Linux系統上都定義在pthread.h頭文件中):
- pthread_create
用於創建一個線程,定義如下:
int pthread_create (pthread_t* thread, const pthread_attr_t* attr, void* (start_routine)( void ), void* arg)
1)thread參數是新線程的標識符,其他線程相關函數通過它來引用新線程。其是一個整形類型,在Linux上幾乎所有的資源標識符都是一個整型數,比如socket。
2)attr參數用於設置新線程的屬性。給它傳 NULL 值時表示使用預設線程屬性。
3)start_routine和arg參數分別指定線程將運行的函數及其參數,如果參數不止一個,需要將參數寫到一個結構體中,再將該結構體的地址作為參數傳入。
返回值:成功時返回0,失敗時返回錯誤碼。
註意:
- 線程數量受資源限制是有限的,線程總數不能超過內核參數所定義的值。
- 傳入start_routine參數的函數要求為靜態函數。
要在靜態函數中使用類的動態成員有兩種方法:
- 通過類的靜態對象來調用
- 將類的對象作為參數傳遞給該靜態函數
- pthread_exit
線程函數在結束時最好調用此函數,以確保全全、乾凈地退出,因為預設屬性的線程執行結束後並不會立即釋放占用的資源,直到整個進程執行結束,所有線程的資源以及整個進程占用的資源才會被操作系統回收。其函數原型如下:
void pthread_exit ( void* retval )
此函數通過 retval 參數向線程的回收者傳遞其退出信息,如果線程不需要返回任何數據,將 retval 參數置為 NULL 即可。
它執行完後不會返回到調用者,而且用於不會失敗。
- pthread_join
一個進程中的所有線程都可以調用此函數來回收其他線程(前提是目標線程是可回收的),即等待其他線程結束。其定義如下:
int pthread_join( pthread_t thread, void retval );
thread參數是目標線程的標識符,retval則是目標線程返回的退出信息。該函數會一直阻塞**,直到被回收的線程結束為止。
返回值:成功時返回0,失敗則返回錯誤碼。
可能的錯誤碼如下:
(1) EDEADLK:可能引起死鎖,例如兩個線程互相join等待對方
(2) EINVAL:目標線程不可回收,或者有其他線程正在join等待本線程
(3) ESRCH:線程不存在
- pthread_cancel
可用此函數向另一個線程發送“終止執行”的信號(後續稱“Cancel”信號),從而令目標線程結束執行。函數原型如下:
int pthread_cancel(pthread_t pthread)
參數為目標線程的標識符。
返回值:成功返回0,失敗則返回錯誤碼。
註意: 函數的功能僅僅是向目標線程發送 Cancel 信號,至於目標線程是否處理該信號以及何時結束執行,由目標線程決定。
接收到取消信號的目標線程可以決定是否允許被取消以及如何取消,這分別由以下兩個函數完成(成功時都返回 0):
int pthread_setcancelstate(int state, int *oldstate)
int pthread_setcanceltype(int type, int *oldtype)
這兩個參數的第一個參數分別用於設置線程的取消狀態(是否允許取消)和取消類型(如何取消),第二個參數則分別記錄線程原來的取消狀態和取消類型。state參數有兩個可選值:
- PTHREAD_CANCEL_ENABLE:允許線程被取消。是線程創建時的預設狀態
- PTHREAD_CANCEL_DISABLE:禁止線程被取消。這種情況下的線程收到取消請求,則它會將請求掛機,直到該線程允許被取消。
type參數也有兩個可選值:
- PTHREAD_CANCEL_DEFERRED:線程隨時都可以被取消。它將使得收到取消請求的目標線程立即採取行動。
- PTHREAD_CANCEL_ASYNCHRONOUS:允許目標線程推遲行動,直到它調用了下麵幾個所謂的取消點函數中的一個,pthread_join、pthread_testcancel、pthread_cond_wait、pthread_cond_timedwait、sem_wait、sigwait、read、wait等。不過為了安全,最好在可能被取消的代碼中調用 pthread_testcancel 函數以設置取消點。
線程結束執行的方式共有 3 種,分別是:
- 線程將指定函數體中的代碼執行完後自行結束。
- 線程執行過程中,遇到 pthread_exit() 函數結束執行。
- 線程執行過程中,被同一進程中的其它線程(包括主線程)強制終止。
第一種很容易理解,第二種和第三種方式我們將分別舉例給大家演示用法。
pthread_exit() 函數的用法:
#include <stdio.h>
#include <pthread.h>
//線程要執行的函數,arg 用來接收線程傳遞過來的數據
void *ThreadFun(void *arg)
{
//終止線程的執行,將“https://www.cnblogs.com/zyzhi”返回
pthread_exit("https://www.cnblogs.com/zyzhi"); //返回的字元串存儲在常量區,並非當前線程的私有資源
printf("*****************");//此語句不會被線程執行
}
int main()
{
int res;
//創建一個空指針
void * thread_result;
//定義一個表示線程的變數
pthread_t myThread;
res = pthread_create(&myThread, NULL, ThreadFun, NULL);
if (res != 0) {
printf("線程創建失敗");
return 0;
}
//等待 myThread 線程執行完成,並用 thread_result 指針接收該線程的返回值
res = pthread_join(myThread, &thread_result);
if (res != 0) {
printf("等待線程失敗");
}
printf("%s", (char*)thread_result);
//輸出結果為 https://www.cnblogs.com/zyzhi
return 0;
}
第三種方法是指一個線程可以藉助 pthread_cancel() 函數向另一個線程發送“終止執行”的信號,從而令目標線程結束執行。對於接收 Cancel 信號後結束執行的目標線程,等同於該線程自己執行如下語句:
pthread_exit(PTHREAD_CANCELED);
PTHREAD_CANCELED是一種巨集(定義在<pthread.h>頭文件中)
pthread_cancel() 函數的用法:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h> // sleep() 函數
//線程執行的函數
void * thread_Fun(void * arg) {
printf("新建線程開始執行\n");
sleep(10);
}
int main()
{
pthread_t myThread;
void * mess;
int value;
int res;
//創建 myThread 線程
res = pthread_create(&myThread, NULL, thread_Fun, NULL);
if (res != 0) {
printf("線程創建失敗\n");
return 0;
}
sleep(1);
//向 myThread 線程發送 Cancel 信號
res = pthread_cancel(myThread);
if (res != 0) {
printf("終止 myThread 線程失敗\n");
return 0;
}
//獲取已終止線程的返回值
res = pthread_join(myThread, &mess);
if (res != 0) {
printf("等待線程失敗\n");
return 0;
}
//如果線程被強制終止,其返回值為 PTHREAD_CANCELED
if (mess == PTHREAD_CANCELED) {
printf("myThread 線程被強制終止\n");
}
else {
printf("error\n");
}
return 0;
}
/*
最後輸出:
新建線程開始執行
myThread 線程被強制終止
*/
線程分離
線程分為兩種狀態:可結合態分離態
- 可結合態(線程預設狀態)
在此狀態下的線程能夠被其他線程回收資源或殺死,在被其他線程回收前,其占有的存儲器資源不會釋放。
- 分離態
這種狀態下的線程不能被其他線程回收或殺死,它的存儲器資源在它終止時由系統自動釋放。
可以使用線程分離函數將線程變為分離態:
int pthread_detach( pthread_t thread)
返回值:成功時返回0,失敗返回-1
POSIX 信號量
多線程程式必須考慮同步問題。pthread_join 可以看作一種簡單的線程同步方式,但它無法高效地實現複雜的同步需求,比如控制對共用資源的獨占式訪問。所以我們需要學習 3 種專門用於線程同步的機制:POSIX信號量、互斥量、條件變數。
常用的 POSIX 信號量函數有以下 5 個,都定義在 semaphore.h 中:
- sem_init
int sem_init( sem_t *sem, int pshared, unsigned int value )
用於初始化一個未命名的信號量
參數:
1)sem:要初始化的信號量
2)pshared:指定信號量的類型,如果為 0,表示這個信號量是當前進程的局部信號量,否則該信號量就可以在多個進程間共用
3)value:指定信號量的初始值
註意:初始化一個已經被初始化的信號量將導致不可預期的結果
- sem_destroy
int sem_destroy( sem_t *sem )
用於銷毀一個信號量
註意:銷毀一個正在被其他線程等待的信號量將導致不可預期的結果
- sem_wait
int sem_wait( sem_t *sem )
以原子操作的方式將信號量的值 -1
如果信號量的值為 0,則 sem_wait 將被阻塞直到信號量有非 0 值
- sem_trywait
int sem_trywait( sem_t *sem )
以原子操作的方式將信號量的值 -1,它會立即返回(相當於 sem_wait 的非阻塞版本)
信號量為 0 時會返回 -1 並設置 errno 為 EAGAIN
- sem_post
int sem_post( sem_t *sem )
以原子操作的方式將信號量的值 +1
當信號量的值 > 0 時,其他正在調用 sem_wait 等待信號量的線程將被喚醒
這5個函數成功時返回 0,失敗則返回-1並設置errno。
互斥量
互斥量(互斥鎖)可以保護關鍵代碼,以確保其獨占式的訪問。
POSIX互斥鎖的相關函數主要有如下 5個,都定義在 pthread.h 中:
- pthread_mutex_init
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr)
用於初始化互斥鎖。
這些函數的mutex參數都指向要操作的目標互斥鎖。mutexattr參數指定互斥鎖的屬性,為NULL時表示使用預設屬性。
還可以使用如下方式來初始化一個互斥鎖:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
- pthread_mutex_lock
int pthread_mutex_lock(pthread_mutex_t *mutex)
以原子方式給一個互斥鎖加鎖。如果目標互斥鎖已經被鎖上,則將阻塞,直到該互斥鎖的占有者將其解鎖。
- pthread_mutex_trylock
int pthread_mutex_trylock(pthread_mutex_t *mutex)
與 pthread_mutex_lock 類似(相當於 pthread_mutex_lock 的非阻塞版)。始終立即返回,當目標鎖已經被加鎖時,將返回錯誤碼EBUSY。
- pthread_mutex_unlock
int pthread_mutex_unlock(pthread_mutex_t *mutex)
以原子方式給一個互斥鎖解鎖。如果此時有其他線程正在等待這個互斥鎖,則這些線程中的某一個將獲得它。
- pthread_mutex_destroy
int pthread_mutex_destroy(pthread_mutex_t *mutex)
用於銷毀互斥鎖,以釋放其占用的內核資源。銷毀一個已經加鎖的互斥鎖將導致不可預期的後果。
上面這些函數成功時返回 0,失敗則返回錯誤碼。
條件變數
如果說互斥鎖是用於同步線程對共用數據的訪問的話,那麼條件變數則是用於線上程之間同步共用數據的值。假設一個進程中包含多個線程,這些線程共用變數 x,我們希望某個(或某些)線程等待 “x==10” 條件成立後再執行後續的代碼,就可以使用條件變數來實現。
條件變數提供了一種通知機制:當某個共用數據達到某個值的時候,喚醒等待這個共用數據的線程。
為了避免多線程之間發生“搶奪資源”的問題,條件變數在使用過程中必須和一個互斥鎖搭配使用。
條件變數用 pthread_cond_t 類型的變數表示,條件變數的相關函數主要有以下幾個,都定義在 pthread.h 中:
- 初始化條件變數
int pthread_cond_init(pthread_cond_t * cond, const pthread_condattr_t * attr)
參數 cond 用於指明要初始化的條件變數;參數 attr 用於自定義條件變數的屬性,通常我們將它賦值為 NULL,表示以系統預設的屬性完成初始化操作。
當使用預設屬性去初始化時,也可以用如下方法完成初始化:
pthread_cond_t myCond = PTHREAD_COND_INITIALIZER
- 阻塞當前線程,等待條件成立
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex)
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mutex, const struct timespec* abstime)
cond 參數表示已初始化好的條件變數;mutex 參數表示與條件變數配合使用的互斥鎖;abstime 參數表示阻塞線程的時間。
註意:abstime 參數指的是絕對時間,如果要阻塞線程 5 秒鐘,就需要用獲得的當前系統的時間去加上 5 秒,最終得到的時間才是傳遞的實參值。
調用兩個函數之前,我們必須先創建好一個互斥鎖並完成 加鎖 操作,然後才能作為實參傳遞給 mutex 參數。兩個函數會完成以下兩項工作:
- 阻塞線程,直至接收到“條件成立”的信號
- 當線程被添加到等待隊列上時,將互斥鎖 解鎖
註意:當函數接收到“條件成立”的信號後,它並不會立即結束對線程的阻塞,而是先完成對互斥鎖的“加鎖”操作,然後才解除阻塞。
兩個函數的區別在於:
- pthread_cond_wait() 函數可以永久阻塞線程,直到條件變數成立的那一刻
- pthread_cond_timedwait() 函數只能在 abstime 參數指定的時間內阻塞線程,超出時限後,該函數將重新對互斥鎖執行“加鎖”操作,並解除對線程的阻塞,函數的返回值為 ETIMEDOUT。
- 解除線程的阻塞狀態
int pthread_cond_signal(pthread_cond_t* cond)
int pthread_cond_broadcast(pthread_cond_t* cond)
cond 參數表示初始化好的條件變數
兩個函數都能解除線程的“被阻塞”狀態,區別在於:
- pthread_cond_signal() 函數至少解除一個線程的“被阻塞”狀態,如果等待隊列中包含多個線程,優先解除哪個線程將由操作系統的線程調度程式決定
- pthread_cond_broadcast() 函數可以解除等待隊列中所有線程的“被阻塞”狀態。
由於互斥鎖的存在,解除阻塞後的線程也不一定能立即執行。當互斥鎖處於“加鎖”狀態時,解除阻塞狀態的所有線程會組成等待互斥鎖資源的隊列,等待互斥鎖“解鎖”。
- 銷毀條件變數
int pthread_cond_destroy(pthread_cond_t *cond)
cond 參數表示要銷毀的條件變數
註意:銷毀後的條件變數還可以調用 pthread_cond_init() 函數重新初始化後使用。
以上函數成功時都返回0,失敗則返回錯誤碼。
線程同步機制包裝成類
為了充分復用代碼,將上面的 3 種線程同步機制分別封裝成 3 個類,實現在 locker.h 頭文件中。
#ifndef LOCKER_H
#define LOCKER_H
#include <exception>
#include <pthread.h>
#include <semaphore.h>
class sem
{
public:
sem()
{
if (sem_init(&m_sem, 0, 0) != 0)
{
throw std::exception();
}
}
sem(int num)
{
if (sem_init(&m_sem, 0, num) != 0)
{
throw std::exception();
}
}
~sem()
{
sem_destroy(&m_sem);
}
bool wait()
{
return sem_wait(&m_sem) == 0;
}
bool post()
{
return sem_post(&m_sem) == 0;
}
private:
sem_t m_sem;
};
class locker
{
public:
locker()
{
if (pthread_mutex_init(&m_mutex, NULL) != 0)
{
throw std::exception();
}
}
~locker()
{
pthread_mutex_destroy(&m_mutex);
}
bool lock()
{
return pthread_mutex_lock(&m_mutex) == 0;
}
bool unlock()
{
return pthread_mutex_unlock(&m_mutex) == 0;
}
pthread_mutex_t *get()
{
return &m_mutex;
}
private:
pthread_mutex_t m_mutex;
};
class cond
{
public:
cond()
{
if (pthread_cond_init(&m_cond, NULL) != 0)
{
//pthread_mutex_destroy(&m_mutex);
throw std::exception();
}
}
~cond()
{
pthread_cond_destroy(&m_cond);
}
bool wait(pthread_mutex_t *m_mutex)
{
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_wait(&m_cond, m_mutex);
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
bool timewait(pthread_mutex_t *m_mutex, struct timespec t)
{
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_timedwait(&m_cond, m_mutex, &t);
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
bool signal()
{
return pthread_cond_signal(&m_cond) == 0;
}
bool broadcast()
{
return pthread_cond_broadcast(&m_cond) == 0;
}
private:
//static pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
#endif
線程池
線程池一種線程使用模式。線程池維護著多個線程,等待著監督管理者分配可併發執行的任務。這避免了在處理短時間任務時創建與銷毀線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分調度。
線程池的組成部分有:
- 線程池管理器:創建和初始化線程,啟動和停止線程,調配任務;管理線程池
- 工作線程:線程池中的線程
- 任務介面:添加任務的介面,以提供工作線程調度任務的執行。
- 任務隊列:用於存放沒有處理的任務,提供一種緩衝機制,同時具有調度功能,高優先順序的任務放在隊列前面
線程池中線程數量
線程池中的線程數量最直接的限制因素是中央處理器(CPU)的處理器(processors/cores)的數量N:如果你的CPU是4-cores的,對於CPU密集型的任務(如視頻剪輯等消耗CPU計算資源的任務)來說,那線程池中的線程數量最好也設置為4(或者+1防止其他因素造成的線程阻塞);對於IO密集型的任務,一般要多於CPU的核數,因為線程間競爭的不是CPU的計算資源而是IO,IO的處理一般較慢,多於cores數的線程將為CPU爭取更多的任務,不至線上程處理IO的過程造成CPU空閑導致資源浪費。
公式:最佳線程數 = CPU當前可使用的Cores數 * 當前CPU的利用率 * (1 + CPU等待時間 / CPU處理時間)
本項目採用的是半同步/半反應堆線程池,將線程池代碼封裝在 threadpool.h 頭文件中
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
/* 引用上面的線程同步機制的包裝類 */
#include "../lock/locker.h"
/* 線程池類,定義為模板方便復用 */
template <typename T>
class threadpool
{
public:
/* thread_number是線程池中線程數量,max_request是請求隊列中最多允許的、等待處理的請求數量 */
threadpool(connection_pool *connPool, int thread_number = 8, int max_request = 10000);
~threadpool();
/* 往請求隊列中添加任務 */
bool append(T *request);
private:
/* 工作線程運行的函數,它不斷從工作隊列中取出任務並執行 */
static void *worker(void *arg);
void run();
private:
int m_thread_number; // 線程池中的線程數
int m_max_requests; // 請求隊列中允許的最大請求數
pthread_t *m_threads; // 描述線程池的數組,其大小為 m_thread_number
std::list<T *> m_workqueue; // 請求隊列
locker m_queuelocker; // 保護請求隊列的互斥鎖
sem m_queuestat; // 是否有任務需要處理
bool m_stop; // 是否結束線程
connection_pool *m_connPool; //資料庫
};
template <typename T>
threadpool<T>::threadpool( connection_pool *connPool, int thread_number, int max_requests) : m_thread_number(thread_number), m_max_requests(max_requests), m_stop(false), m_threads(NULL),m_connPool(connPool)
{
if (thread_number <= 0 || max_requests <= 0)
throw std::exception();
m_threads = new pthread_t[m_thread_number];
if (!m_threads)
throw std::exception();
for (int i = 0; i < thread_number; ++i)
{
//printf("create the %dth thread\n",i);
/* 因為需要在靜態函數中使用類的動態成員,故將類的對象作為參數闖入 */
if (pthread_create(m_threads + i, NULL, worker, this) != 0)
{
delete[] m_threads;
throw std::exception();
}
/* 將線程設置為分離態 */
if (pthread_detach(m_threads[i]))
{
delete[] m_threads;
throw std::exception();
}
}
}
template <typename T>
threadpool<T>::~threadpool()
{
delete[] m_threads;
m_stop = true;
}
template <typename T>
bool threadpool<T>::append(T *request)
{
/* 操作工作隊列時需要加鎖,因為它被所以線程共用 */
m_queuelocker.lock();
if (m_workqueue.size() > m_max_requests)
{
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();
return true;
}
template <typename T>
void *threadpool<T>::worker(void *arg)
{
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}
template <typename T>
void threadpool<T>::run()
{
while (!m_stop)
{
m_queuestat.wait();
m_queuelocker.lock();
if (m_workqueue.empty())
{
m_queuelocker.unlock();
continue;
}
T *request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if (!request)
continue;
//從連接池中取出一個資料庫連接
request->mysql = m_connPool->GetConnection();
//process(模板類中的方法,這裡是http類)進行處理
request->process();
//將資料庫連接放回連接池
m_connPool->ReleaseConnection(request->mysql);
}
}
#endif