1 什麼是TLS 原理在網上資料很多,這裡不展開。 簡單點說,動態申請的每線程變數。有一類比較熟悉的每線程變數是一個帶__thread的每線程變數,兩者的區別在於,TLS這類每線程變數是動態申請的。有以下一系列介面: #include <pthread.h> int pthread_key_crea ...
1 什麼是TLS
原理在網上資料很多,這裡不展開。
簡單點說,動態申請的每線程變數。有一類比較熟悉的每線程變數是一個帶__thread的每線程變數,兩者的區別在於,TLS這類每線程變數是動態申請的。有以下一系列介面:
#include <pthread.h>
int pthread_key_create(pthread_key_t *key, void (*destructor)(void*));
int pthread_key_delete(pthread_key_t key);
int pthread_setspecific(pthread_key_t key, const void *value);
void *pthread_getspecific(pthread_key_t key);
一般使用方法是
- 創建一個關鍵字key,多線程僅需創建一次,銷毀函數可以為null;
- 將需要保存的上下文設置到key-value對中
- 需要訪問時,根據key取出value
- 銷毀key
- 與__thread類型的變數就在於需要使用全局變數保存key。
2 BP鎖結構體
struct rcu_reader {
/* Data used by both reader and synchronize_rcu() */
unsigned long ctr;
/* Data used for registry */
struct cds_list_head node __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
pthread_t tid;
int alloc; /* registry entry allocated */
};
其中的ctr代表當前線程的鎖狀態,tid表示當前線程號,alloc是一個標記位,代表該鎖狀態結構體有沒有被使用;因為要保存每線程的當前鎖狀態,bp代碼中是以數組的形式訪問的,第一次申請8個,第二次申請16個,第三次申請32個,以成倍增加的方式擴大。
鎖的狀態有以下三種,分別為,
enum rcu_state {
RCU_READER_ACTIVE_CURRENT, 當前讀者,寫者在第一步抓取這些讀者
RCU_READER_ACTIVE_OLD, 老讀者,寫者在第二步檢測這些讀者
RCU_READER_INACTIVE, 不在讀操作中
};
3 全局gp記錄
struct rcu_gp {
/*
* Global grace period counter.
* Contains the current RCU_GP_CTR_PHASE.
* Also has a RCU_GP_COUNT of 1, to accelerate the reader fast path.
* Written to only by writer with mutex taken.
* Read by both writer and readers.
*/
unsigned long ctr;
} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
struct rcu_gp rcu_gp = { .ctr = RCU_GP_COUNT };
記錄了全局的ctr,預設值為1.
4 幾個巨集
#define RCU_GP_COUNT (1UL << 0) -------------1
/* Use the amount of bits equal to half of the architecture long size */
#define RCU_GP_CTR_PHASE (1UL << (sizeof(long) << 2))-----------64位系統為2的32次,32位系統為2的16次
#define RCU_GP_CTR_NEST_MASK (RCU_GP_CTR_PHASE - 1)----------64位系統0xffffffff,32位系統0xffff
5 初始化
void rcu_bp_register(void);
每個需要使用的bprcu的線程只要調用上面的函數進行註冊即可,這個函數的工作是:
- 如果是第一個線程註冊,則調用pthread_key_create創建每線程變數
- 將該線程加入registry鏈表
- 初始化時鎖ctr的值為0
- 將該rcu_reader設置進key-value對(pthread_setspecific)
6 加讀鎖
static inline void _rcu_read_lock_update(unsigned long tmp)/* tmp是當前線程的ctr */
{
/* 這段表示,當前的線程中ctr中的低位為全0時,取全局的ctr值 */
if (caa_likely(!(tmp & RCU_GP_CTR_NEST_MASK))) {
_CMM_STORE_SHARED(URCU_TLS(rcu_reader)->ctr, _CMM_LOAD_SHARED(rcu_gp.ctr));
urcu_bp_smp_mb_slave();
} else
/* 這段表示如果當前線程中的ctr中的低位不為0,ctr值加1, */
_CMM_STORE_SHARED(URCU_TLS(rcu_reader)->ctr, tmp + RCU_GP_COUNT);
}
7 解讀鎖
static inline void _rcu_read_unlock(void)
{
unsigned long tmp;
tmp = URCU_TLS(rcu_reader)->ctr;
urcu_assert(tmp & RCU_GP_CTR_NEST_MASK);
/* Finish using rcu before decrementing the pointer. */
urcu_bp_smp_mb_slave();
_CMM_STORE_SHARED(URCU_TLS(rcu_reader)->ctr, tmp - RCU_GP_COUNT);
cmm_barrier(); /* Ensure the compiler does not reorder us with mutex */
}
僅僅是在當前的ctr基礎上減1.
8 加解鎖總結
加鎖時,如果當前ctr值為0,也就是初始值,即第一次上鎖時,需要將ctr的值更新為全局的ctr值。
解鎖時,無條件將該當前ctr值減1,所以加解鎖必須是對稱的。
參與rcu,上鎖狀態ctr低位值大於0,空閑狀態等於0.
9 鎖狀態的轉化
由下麵這個函數解析。
static inline enum rcu_state rcu_reader_state(unsigned long *ctr)
{
unsigned long v;
/* 沒有使用過,返回未進入臨界區 */
if (ctr == NULL)
return RCU_READER_INACTIVE;
/*
* Make sure both tests below are done on the same version of *value
* to insure consistency.
*/
v = CMM_LOAD_SHARED(*ctr);
/* RCU_GP_CTR_NEST_MASK這個值為0xffffffff,即為ctr的低位值為0時,返回未進入臨界區 */
if (!(v & RCU_GP_CTR_NEST_MASK))
return RCU_READER_INACTIVE;
/* 先看第三步的非運算,即裡面的值為0,則是讀狀態,否則是老的讀狀態
再看第二部的與運算,檢測第16的值是否為1,為1則是老的讀狀態,為0則是讀狀態
最後第一步的異或運算,相等則為0,則肯定是讀狀態,不相等,則要看第十六位是否相同,相同則為讀狀態,不相等則為老的讀狀態
總之,只要ctr的值不為0,就是讀狀態
*/
if (!((v ^ rcu_gp.ctr) & RCU_GP_CTR_PHASE))
return RCU_READER_ACTIVE_CURRENT;
/* 不識別狀態 */
return RCU_READER_ACTIVE_OLD;
}
10 等待讀者結束
static void wait_for_readers(struct cds_list_head *input_readers,
struct cds_list_head *cur_snap_readers,
struct cds_list_head *qsreaders)
{
unsigned int wait_loops = 0;
struct rcu_reader *index, *tmp;
/*
* Wait for each thread URCU_TLS(rcu_reader).ctr to either
* indicate quiescence (not nested), or observe the current
* rcu_gp.ctr value.
*/
for (;;) {
if (wait_loops < RCU_QS_ACTIVE_ATTEMPTS)
wait_loops++;
cds_list_for_each_entry_safe(index, tmp, input_readers, node) {
switch (rcu_reader_state(&index->ctr)) {
case RCU_READER_ACTIVE_CURRENT:
if (cur_snap_readers) {
cds_list_move(&index->node,
cur_snap_readers);
break;
}
/* Fall-through */
case RCU_READER_INACTIVE:
cds_list_move(&index->node, qsreaders);
break;
case RCU_READER_ACTIVE_OLD:
/*
* Old snapshot. Leaving node in
* input_readers will make us busy-loop
* until the snapshot becomes current or
* the reader becomes inactive.
*/
break;
}
}
if (cds_list_empty(input_readers)) {
break;
} else {
/* Temporarily unlock the registry lock. */
mutex_unlock(&rcu_registry_lock);
if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS)
(void) poll(NULL, 0, RCU_SLEEP_DELAY_MS);
else
caa_cpu_relax();
/* Re-lock the registry lock before the next loop. */
mutex_lock(&rcu_registry_lock);
}
}
}
這個是snap可能是snapshot的縮寫,快照的意思,也就是代表當前正在讀的線程合影。
RCU_READER_ACTIVE_CURRENT將處與讀狀態的線程移入正在讀鏈表,不在讀狀態的線程移入qs鏈表,其他狀態等待其變成正在讀或者不讀。
11 Rcu寫者的同步等待
void synchronize_rcu(void)
{
CDS_LIST_HEAD(cur_snap_readers);
CDS_LIST_HEAD(qsreaders);
sigset_t newmask, oldmask;
int ret;
ret = sigfillset(&newmask);
assert(!ret);
ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
assert(!ret);
mutex_lock(&rcu_gp_lock);
mutex_lock(&rcu_registry_lock);
if (cds_list_empty(®istry))
goto out;
/* All threads should read qparity before accessing data structure
* where new ptr points to. */
/* Write new ptr before changing the qparity */
smp_mb_master();
/*
* Wait for readers to observe original parity or be quiescent.
* wait_for_readers() can release and grab again rcu_registry_lock
* interally.
*/
wait_for_readers(®istry, &cur_snap_readers, &qsreaders);
/*
* Adding a cmm_smp_mb() which is _not_ formally required, but makes the
* model easier to understand. It does not have a big performance impact
* anyway, given this is the write-side.
*/
cmm_smp_mb();
/* Switch parity: 0 -> 1, 1 -> 0 */
CMM_STORE_SHARED(rcu_gp.ctr, rcu_gp.ctr ^ RCU_GP_CTR_PHASE);
/*
* Must commit qparity update to memory before waiting for other parity
* quiescent state. Failure to do so could result in the writer waiting
* forever while new readers are always accessing data (no progress).
* Ensured by CMM_STORE_SHARED and CMM_LOAD_SHARED.
*/
/*
* Adding a cmm_smp_mb() which is _not_ formally required, but makes the
* model easier to understand. It does not have a big performance impact
* anyway, given this is the write-side.
*/
cmm_smp_mb();
/*
* Wait for readers to observe new parity or be quiescent.
* wait_for_readers() can release and grab again rcu_registry_lock
* interally.
*/
wait_for_readers(&cur_snap_readers, NULL, &qsreaders);
/*
* Put quiescent reader list back into registry.
*/
cds_list_splice(&qsreaders, ®istry);
/*
* Finish waiting for reader threads before letting the old ptr being
* freed.
*/
smp_mb_master();
out:
mutex_unlock(&rcu_registry_lock);
mutex_unlock(&rcu_gp_lock);
ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
assert(!ret);
}
第一步,將參與本次rcu活動的線程挑選出來。
第二步,將全局ctr的值翻轉,各位0->1, 1->0。全局ctr只有兩種取值情況,
1和0x1 0000 0001。
第三步,等待所有讀者退出。
第四步,同步等待結束。
12 Call_rcu寫者的非同步等待
非同步與同步的區別在於寫者是等待所有讀者退出,本線程執行寫操作,還是由call_rcu線程等待所有寫者退出,由call_rcu線程執行寫操作。因為沒有實際研究這段代碼,直接看調用棧
#0 0x000000fff066ab90 in syscall () from /lib/libc.so.6
(gdb) bt
#0 0x000000fff066ab90 in syscall () from /lib/libc.so.6
#1 0x000000fff08f7ea8 in smp_mb_master ()
at ../userspace-rcu-0.9.3/urcu.c:165
#2 0x000000fff08f817c in wait_for_readers (
input_readers=0xfff090e5f0 <registry>, cur_snap_readers=0xff60dfe4b8,
qsreaders=0xff60dfe4c8) at ../userspace-rcu-0.9.3/urcu.c:290
#3 0x000000fff08f84ec in synchronize_rcu_memb ()
at ../userspace-rcu-0.9.3/urcu.c:426
#4 0x000000fff08f9b50 in call_rcu_thread (arg=0xff640030b0)
at ../userspace-rcu-0.9.3/urcu-call-rcu-impl.h:362
#5 0x000000fff0a22040 in start_thread () from /lib/libpthread.so.0
#6 0x000000fff066f5c4 in __thread_start () from /lib/libc.so.6
與同步的操作類型,也是通過wait_for_readers的調用等待讀者退出
13 為什麼要使用異或操作
為了實現讀者從不阻塞,而寫者應該區別出這位讀者是否是本次的讀者。
假設存在兩個讀者群,一個寫者。讀者群1正在讀操作中,寫者進入等待讀者狀態,讀者群2也進入了臨界區,但是讀者群2不應該被列入本次的探測中。
再看一次鎖狀態的解析
if (!(v & RCU_GP_CTR_NEST_MASK))
return RCU_READER_INACTIVE;
if (!((v ^ rcu_gp.ctr) & RCU_GP_CTR_PHASE))
return RCU_READER_ACTIVE_CURRENT;
return RCU_READER_ACTIVE_OLD;
第一段還是原來的邏輯,沒參與本地讀寫操作。
第二段的操作,具體分析,
Ctr值 |
全局ctr值 |
結果 |
1 |
1 |
RCU_READER_ACTIVE_CURRENT |
1 |
0x1 0000 0001 |
RCU_READER_ACTIVE_OLD |
2 |
1 |
RCU_READER_ACTIVE_CURRENT |
0x1 0000 0001 |
1 |
RCU_READER_ACTIVE_OLD |
0x1 0000 0001 |
0x1 0000 0001 |
RCU_READER_ACTIVE_CURRENT |
所以只要非0,都是讀狀態,只是區別出是寫者開始等待前的讀者還是寫者開始等待後的讀者。
14 舉例
有2個線程,一個寫,一個讀。全局ctr為1
- 讀者1進入臨界區,ctr值設置為1
- 寫者1嘗試寫,選中了讀者1,將其加入本次需要等待的鏈表,更新全局ctr為0x1 0000 0001,發生調度。
- 讀者1離開臨界區,但是又再次進入臨界區,ctr值設置為0x1 0000 0001
- 寫者開始檢測讀者1的當前狀態,發現是當前讀者,而不是老讀者,移出該讀者。所以這個相位的作用就是區別出該讀者是否已經離開上一次的讀操作。寫者退出等待,繼續往下執行。
__simple原創
轉載請註明出處