等待隊列 是內核中實現進程調度的一個十分重要的數據結構,其任務是維護一個鏈表,鏈表中每一個節點都是一個PCB(進程式控制制塊), 內核會將PCB掛在等待隊列中的所有進程都調度為睡眠狀態,直到某個喚醒的條件發生 。應用層的阻塞IO與非阻塞IO的使用我已經在 "Linux I/O多路復用" 一文中討論過了, ...
等待隊列是內核中實現進程調度的一個十分重要的數據結構,其任務是維護一個鏈表,鏈表中每一個節點都是一個PCB(進程式控制制塊),內核會將PCB掛在等待隊列中的所有進程都調度為睡眠狀態,直到某個喚醒的條件發生。應用層的阻塞IO與非阻塞IO的使用我已經在Linux I/O多路復用一文中討論過了,本文主要討論驅動中怎麼實現對設備IO的阻塞與非阻塞讀寫。顯然,實現這種與阻塞相關的機制要用到等待隊列機制。本文的內核源碼使用的是3.14.0版本
設備阻塞IO的實現
當我們讀寫設備文件的IO時,最終會回調驅動中相應的介面,而這些介面也會出現在讀寫設備進程的進程(內核)空間中,如果條件不滿足,介面函數使進程進入睡眠狀態,即使讀寫設備的用戶進程進入了睡眠,也就是我們常說的發生了阻塞。In a word,讀寫設備文件阻塞的本質是驅動在驅動中實現對設備文件的阻塞,其讀寫的流程可概括如下:
1. 定義-初始化等待隊列頭
//定義等待隊列頭
wait_queue_head_t waitq_h;
//初始化,等待隊列頭
init_waitqueue_head(wait_queue_head_t *q);
//或
//定義並初始化等待隊列頭
DECLARE_WAIT_QUEUE_HEAD(waitq_name);
上面的幾條選擇中,最後一種會直接定義並初始化一個等待頭,但是如果在模塊內使用全局變數傳參,用著並不方便,具體用哪種看需求。
我們可以追一下源碼,看一下上面這幾行都幹了什麼:
//include/linux/wait.h
35 struct __wait_queue_head {
36 spinlock_t lock;
37 struct list_head task_list;
38 };
39 typedef struct __wait_queue_head wait_queue_head_t;
wait_queue_head_t
--36-->這個隊列用的自旋鎖
--27-->將整個隊列"串"在一起的紐帶
然後我們看一下初始化的巨集:
55 #define __WAIT_QUEUE_HEAD_INITIALIZER(name) { \
56 .lock = __SPIN_LOCK_UNLOCKED(name.lock), \
57 .task_list = { &(name).task_list, &(name).task_list } }
58
59 #define DECLARE_WAIT_QUEUE_HEAD(name) \
60 wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name)
DECLARE_WAIT_QUEUE_HEAD()
--60-->根據傳入的字元串name,創建一個名為name的等待隊列頭
--57-->初始化上述task_list域,竟然沒有用內核標準的初始化巨集,無語。。。
2. 將本進程添加到等待隊列
為等待隊列添加事件,即進程進入睡眠狀態直到condition為真才返回。**_interruptible的版本版本表示睡眠可中斷,_timeout**版本表示超時版本,超時就會返回,這種命名規範在內核API中隨處可見。
void wait_event(wait_queue_head_t *waitq_h,int condition);
void wait_event_interruptible(wait_queue_head_t *waitq_h,int condition);
void wait_event_timeout(wait_queue_head_t *waitq_h,int condition);
void wait_event_interruptible_timeout(wait_queue_head_t *waitq_h,int condition);
這可是等待隊列的核心,我們來看一下
wait_event
└── wait_event
└── _wait_event
├── abort_exclusive_wait
├── finish_wait
├── prepare_to_wait_event
└── ___wait_is_interruptible
244 #define wait_event(wq, condition) \
245 do { \
246 if (condition) \
247 break; \
248 __wait_event(wq, condition); \
249 } while (0)
wait_event
--246-->如果condition為真,立即返回
--248-->否則調用__wait_event
194 #define ___wait_event(wq, condition, state, exclusive, ret, cmd) \
195 ({ \
206 for (;;) { \
207 long __int = prepare_to_wait_event(&wq, &__wait, state);\
208 \
209 if (condition) \
210 break; \
212 if (___wait_is_interruptible(state) && __int) { \
213 __ret = __int; \
214 if (exclusive) { \
215 abort_exclusive_wait(&wq, &__wait, \
216 state, NULL); \
217 goto __out; \
218 } \
219 break; \
220 } \
222 cmd; \
223 } \
224 finish_wait(&wq, &__wait); \
225 __out: __ret; \
226 })
___wait_event
--206-->死迴圈的輪詢
--209-->如果條件為真,跳出迴圈,執行finish_wait();進程被喚醒
--212-->如果進程睡眠的方式是interruptible的,那麼當中斷來的時候也會abort_exclusive_wait被喚醒
--222-->如果上面兩條都不滿足,就會回調傳入的schedule(),即繼續睡眠
模板
struct wait_queue_head_t xj_waitq_h;
static ssize_t demo_read(struct file *filp, char __user *buf, size_t size, loff_t *offset)
{
if(!condition)
wait_event_interruptible(&xj_waitq_h,condition);
}
static file_operations fops = {
.read = demo_read,
};
static __init demo_init(void)
{
init_waitqueue_head(&xj_waitq_h);
}
IO多路復用的實現
對於普通的非阻塞IO,我們只需要在驅動中註冊的read/write介面時不使用阻塞機制即可,這裡我要討論的是IO多路復用,即當驅動中的read/write並沒有實現阻塞機制的時候,我們如何利用內核機制來在驅動中實現對IO多路復用的支持。下麵這個就是我們要用的API
int poll(struct file *filep, poll_table *wait);
void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
當應用層調用select/poll/epoll機制的時候,內核其實會遍歷回調相關文件的驅動中的poll介面,通過每一個驅動的poll介面的返回值,來判斷該文件IO是否有相應的事件發生,我們知道,這三種IO多路復用的機制的核心區別在於內核中管理監視文件的方式,分別是位,數組,鏈表,但對於每一個驅動,回調的介面都是poll。
模板
struct wait_queue_head_t waitq_h;
static unsigned int demo_poll(struct file *filp, struct poll_table_struct *pts)
{
unsigned int mask = 0;
poll_wait(filp, &wwaitq_h, pts);
if(counter){
mask = (POLLIN | POLLRDNORM);
}
return mask;
}
static struct file_operations fops = {
.owner = THIS_MODULE,
.poll = demo_poll,
};
static __init demo_init(void)
{
init_waitqueue_head(&xj_waitq_h);
}
其他API
剛纔我們討論瞭如何使用等待隊列實現阻塞IO,非阻塞IO,其實關於等待隊列,內核還提供了很多其他API用以完成相關的操作,這裡我們來認識一下
//在等待隊列上睡眠
sleep_on(wait_queue_head_t *wqueue_h);
sleep_on_interruptible(wait_queue_head_t *wqueue_h);
//喚醒等待的進程
void wake_up(wait_queue_t *wqueue);
void wake_up_interruptible(wait_queue_t *wqueue);