為了保證臨界資源的安全性和可靠性,線程不得不使用鎖,同一時間只允許一個或幾個線程訪問變數。常用的鎖有互斥量,讀寫鎖,條件變數 一、互斥量 互斥量是用pthread_mutex_t數據類型表示的,在使用之前,必須對其進行初始化,可以把它設置為PTHREAD_MUTEX_INITIALIZER(只適於靜 ...
為了保證臨界資源的安全性和可靠性,線程不得不使用鎖,同一時間只允許一個或幾個線程訪問變數。常用的鎖有互斥量,讀寫鎖,條件變數 一、互斥量 互斥量是用pthread_mutex_t數據類型表示的,在使用之前,必須對其進行初始化,可以把它設置為PTHREAD_MUTEX_INITIALIZER(只適於靜態分配的互斥量),也可以通過調用pthread_mutex_init函數進行初始化,最後還要調用pthread_mutex_destroy進行釋放。
#include <pthread.h> int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); int pthread_mutex_destroy(pthread_mutex_t *mutex);要用預設的屬性初始化互斥量,只需把attr設為NULL,後面在討論互斥量屬性。 對互斥量進行加鎖,使用pthread_mutex_lock,如果互斥量已經上鎖,調用線程將阻塞至互斥量解鎖,對互斥量解鎖,使用pthread_mutex_unlock,如果線程不希望被阻塞,它可以調用pthread_mutex_trylock嘗試對互斥量進行加鎖,如果互斥量未鎖住,則成功加鎖,如果互斥量已鎖住,pthread_mutex_trylock就會失敗,返回EBUSY。
#include <pthread.h> int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex);例子:
#include <stdio.h> #include <pthread.h> struct foo { int f_count; pthread_mutex_t f_lock; int f_id; }; struct foo * foo_alloc(int id) { struct foo *fp = NULL; if ((fp = malloc(sizeof(struct foo))) != NULL) { fp->f_count = 1; fp->f_id = id; if (pthread_mutex_init(&fp->f_lock, NULL) != 0) { free(fp); return NULL; } } return fp; } void foo_hold(struct foo *fp) { pthread_mutex_lock(&fp->f_lock); fp->f_count++; pthread_mutex_unlock(&fp->f_lock); } void foo_rele(struct foo *fp) { pthread_mutex_lock(&fp->f_lock); if (--fp->f_count == 0) { pthread_mutex_unlock(&fp->f_lock); pthread_mutex_destroy(&fp->f_lock); free(fp); } else { pthread_mutex_unlock(&fp->f_lock); } }View Code 上面的例子描述了用於保護某個數據結構的互斥量,我們在對象中嵌入引用計數,確保在所有使用該對象的線程完成數據訪問之前,該對象的記憶體空間不會被釋放。 如果線程對同一個互斥量加鎖兩次,那麼它自身將陷入死鎖狀態。如果有一個以上的互斥量,且允許一個線程一直占有第一個互斥量,並且試圖鎖住第二個互斥量時處於阻塞狀態,但是擁有第二個互斥量的線程也在試圖鎖住第一個互斥量,也阻塞,就死鎖了。 可以通過仔細控制互斥量加鎖的順序來避免死鎖的發生,譬如要求所有線程必須先鎖住互斥量A才能鎖住互斥量B。另一種辦法是當線程無法獲得下一個互斥量的時候,就釋放自己已占有的互斥量,過一段時間再試。 例子:
#include "apue.h" #include <pthread.h> #define NMASH 29 #define HASH(id) (((unsigned long)id) % NMASH) struct foo *fh[NMASH]; pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER; struct foo { int f_count; pthread_mutex_t f_lock; int f_id; struct foo *f_next; }; struct foo *foo_alloc(int id) { struct foo *fp = NULL; int idx = 0; if ((fp = malloc(sizeof(struct foo))) != NULL) { fp->f_count = 1; fp->f_id = if; if (pthread_mutex_init(&fp->f_lock, NULL) != 0) { free(fp); return NULL; } idx = HASH(id); pthread_mutex_lock(&hashlock); fp->f_next = fh[idx]; fh[idx] = fp; pthread_mutex_lock(&fp->f_lock); pthread_mutex_unlock(&hashlock); pthread_mutex_unlock(&fp->f_lock); } return fp; } void foo_hold(struct foo *fp) { pthread_mutex_lock(&fp->f_lock); fp->f_count++; pthread_mutex_unlock(&fp->f_lock); } struct foo *foo_find(int id) { struct foo *fp = NULL; pthread_mutex_lock(&hashlock); for (fp = fh[HASH(id)]; fp != NULL; fp = fp->next) { if (fp->f_id = id) { foo_hold(fp); break; } } pthread_mutex_unlock(&hashlock); return fp; } void foo_rele(struct foo *fp) { struct foo *tfp = NULL; int idx = 0; pthread_mutex_lock(&fp->f_lock); if (fp->f_count == 1) { pthread_mutex_unlock(&fp->f_lock); pthread_mutex_lock(&hashlock); pthread_mutex_lock(&fp->f_lock); if (fp->f_count != 1) { fp->f_count--; pthread_mutex_unlock(&hashlock); pthread_mutex_unlock(&fp->f_lock); return; } idx = HASH(fp->f_id); tfp = fh[idx]; if (tfp = fp) { fh[idx] = fp->f_next } else { while(tfp->next != fp) { tfp = tfp->next; } tfp->next = fp->f_next; } pthread_mutex_unlock(&hashlock); pthread_mutex_unlock(&fp->f_lock); pthread_mutex_destroy(&fp->f_lock); free(fp); } else { fp->f_count--; pthread_mutex_unlock(&fp->f_lock); } }View Code 這個例子比上一個例子多了一個散列表和一個保護散列表的互斥量,加鎖的順序是先hashlock,再f_lock,註意這個順序,就不會發生死鎖,不過這樣也導致代碼太繁瑣,最後一個函數解鎖f_lock後重新加鎖f_lock,需要重新考察f_count的值,因為可能在這期間被其他線程修改。 這樣的方式太複雜,讓hashlock也保護f_cout,事情會簡單很多。 例子:
#include "apue.h" #include <pthread.h> #define NMASH 29 #define HASH(id) (((unsigned long)id) % NMASH) struct foo *fh[NMASH]; pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER; struct foo { int f_count; pthread_mutex_t f_lock; int f_id; struct foo *f_next; }; struct foo *foo_alloc(int id) { struct foo *fp = NULL; int idx = 0; if ((fp = malloc(sizeof(struct foo))) != NULL) { fp->f_count = 1; fp->f_id = if; if (pthread_mutex_init(&fp->f_lock, NULL) != 0) { free(fp); return NULL; } idx = HASH(id); pthread_mutex_lock(&hashlock); fp->f_next = fh[idx]; fh[idx] = fp; pthread_mutex_lock(&fp->f_lock); pthread_mutex_unlock(&hashlock); pthread_mutex_unlock(&fp->f_lock); } return fp; } void foo_hold(struct foo *fp) { pthread_mutex_lock(&hashlock); fp->f_count++; pthread_mutex_unlock(&hashlock); } struct foo *foo_find(int id) { struct foo *fp = NULL; pthread_mutex_lock(&hashlock); for (fp = fh[HASH(id)]; fp != NULL; fp = fp->next) { if (fp->f_id = id) { foo_hold(fp); break; } } pthread_mutex_unlock(&hashlock); return fp; } void foo_rele(struct foo *fp) { struct foo *tfp = NULL; int idx = 0; pthread_mutex_lock(&hashlock); if (fp->f_count == 1) { idx = HASH(fp->f_id); tfp = fh[idx]; if (tfp = fp) { fh[idx] = fp->f_next } else { while(tfp->next != fp) { tfp = tfp->next; } tfp->next = fp->f_next; } pthread_mutex_unlock(&hashlock); pthread_mutex_destroy(&fp->f_lock); free(fp); } else { fp->f_count--; pthread_mutex_unlock(&hashlock); } }View Code 當線程試圖獲取一個已加鎖的互斥量時,pthread_mutex_timedlock互斥量原語允許綁定線程阻塞時間。pthread_mutex_timedlock和pthread_mutex_lock是基本等價的,但是達到超時時間後,pthread_mutex_timedlock會返回。超時時間指原意等待的絕對時間。這個超時時間是用timespec來表示的
#include <pthread.h> #include <time.h> int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timespec *restrict tsptr);
二、讀寫鎖 讀寫鎖與互斥量相似,不過讀寫鎖允許更高的並行性,一次只有一個線程可以占有寫模式的讀寫鎖,但是多個線程可以同時占有讀模式的讀寫鎖,簡單地來說,就說支持一個寫者,多個讀者。 當讀寫鎖是寫加鎖狀態時,所以試圖對這個鎖加鎖的線程都會被阻塞,當讀寫鎖在讀加鎖狀態時,所以試圖以讀模式對它進行加鎖的線程都可以得到訪問權,但是希望以寫模式加鎖的線程會被阻塞。不過當有一個線程企圖以寫模式獲取鎖時,讀寫鎖會阻塞後面的讀模式鎖請求,防止讀模式鎖長期占用。 可知,讀寫鎖適用於對數據結構讀的次數遠大於寫的情況,又稱共用互斥鎖,讀共用,寫互斥。
#include <pthread.h> int pthread_rwlock_init(pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr); int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);讀寫鎖調用phtread_rwlock_init進行初始化,如果希望讀寫鎖有預設的屬性,傳null給attr即可。 讀的模式下鎖定讀寫鎖,需要調用phtread_rwlock_rdlock,寫的模式下鎖定讀寫鎖,需要調用pthread_rwlock_wrlock,不過以何種方式鎖定讀寫鎖,都可以調用pthread_rwlock_unlock解鎖。
#include <pthread.h> int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);例子:
#include <stdio.h> #include <pthread.h> struct job { struct job *j_next; struct job *j_prev; pthread_t j_id; }; struct queue { struct job *q_head; struct job *q_tail; pthread_rwlock_t q_lock; }; int queue_init(struct queue *qp) { int err; qp->q_head = NULL; qp->q_tail = NULL; err = pthread_rwlock_init(&qb->q_lock, NULL); if (err != 0) { return err; } return 0 } void job_insert(struct queue *qp, struct job *jp) { pthread_rwlock_wrlock(&qb->q_lock); jp->next = qp->head; jp->j_prev = NULL; if (qp->q_head != NULL) { qp->q_head->j_prev = jp; } else { qp->tail = jp; } qp->head = jp; pthread_rwlock_unlock(&qp->q_lock); } void job_append(struct queue *qp, struct job *jp) { pthread_rwlock_wrlock(&qp->q_lock); jp->j_next = NULL; jp->j_prev = qp->tail; if (qp->q_tail != NULL) { qp->q_tail->j_next = jp; } qp->q_tail = jp; pthread_rwlock_unlock(&qp->q_lock); } void job_remove(struct queue *qp, struct job *jp) { pthread_rwlock_wrlock(&qp->q_lock); if (jp == qp->q_head) { qp->q_head = jp->j_next; if (qp->q_tail == jp) { qp->tail = NULL; } else { jp->next->j_prev = jp->j_prev; } } else if (jp == qp->q_tail) { qp->q_tail = jp->j_prev; jp->j_prev->j_next = NULL; } else { jp->j_prev->j_next = jp->j_next; jp->j_next->j_prev = jp->j_prev; } pthread_rwlock_unlock(&qp->q_lock); } struct job *job_find(struct queue *qp, pthread_t id) { struct job *jp; if (pthread_rwlock_rdlock(&qp->q_lock) != 0) { return NULL; } for (jp = qb->q_head; jp != NULL; jp = jp->j_next) { if (pthread_equal(jp->j_id, id)) { break; } } pthread_rwlock_unlock(&qp->q_lock); return jp; }View Code 與互斥量一樣,讀寫鎖也有帶超時的讀寫鎖函數,避免陷入永久的阻塞。
#include <pthread.h> #include <time.h> int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock, const struct timespec *restrict tsptr); int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock, const struct timespec *restrict tsptr);
三、條件變數 條件變數與互斥量一起使用時,允許線程以無競爭的方式等待特定的條件發生。 條件本身由互斥量保護,線程在改變條件狀態之前必須鎖定互斥量。在使用條件變數之前,必須把它初始化,可以把常量PTHREAD_CON_INITIALIZE賦給靜態分配的條件變數,也可用pthread_cond_init函數進行初始化。使用pthread_cond_destroy釋放。
#include <pthread.h> int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr); int pthread_cond_destroy(pthread_con_t *cond);如果需要一個預設屬性的條件變數,把null給attr即可。 我們使用pthread_cond_wait等待條件變數為真,如果在給定時間內不能滿足,則返回錯誤碼。
#include<pthread.h> int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex) int pthread_cond_timedwait(pthread_cond_t *restrict cond, phtread_mutex_t *restrict mutex, const struct timespec *restrict tsptr)調用者把鎖定的互斥量傳給函數,函數自動把調用線程放到等待條件的線程列表上,對互斥量解鎖,當pthread_cond_wait返回時,互斥量再次被鎖住。pthread_cond_timedwait多了原意等待的時間。 有兩個函數可用於通知線程條件已滿足,pthread_cond_signal函數至少喚醒一個,pthread_cond_broadcast喚醒等待該條件的所有線程。
#include<phtread.h> int pthread_cond_signal(pthread_cond_t *cond) int pthread_cond_broadcast(pthread_cond_t *cond)例子:
#include <pthread.h> struct msg { struct msg *m_next; }; struct msg *workq; pthread_cond_t qready = PTHREAD_COND_INITIALIZER; pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER; void process_msg(void) { struct msg *mp; for(;;) { pthread_mutex_lock(&qlock); while (workq == NULL) { pthread_cond_wait(&qready, &qlock); } mp = workq; workq = mp->m_next; pthread_mutex_unlock(&qlock); } } void enqueue_msg(struct msg *mp) { pthread_mutex_lock(&qlock); mp->m_next = workq; workq = mp; pthread_mutex_unlock(&qlock); pthread_cond_signal(&qready); }View Code