1.什麼是kqueue和IO復用 kueue是在UNIX上比較高效的IO復用技術。 所謂的IO復用,就是同時等待多個文件描述符就緒,以系統調用的形式提供。如果所有文件描述符都沒有就緒的話,該系統調用阻塞,否則調用返回,允許用戶進行後續的操作。 常見的IO復用技術有select, poll, epol ...
1.什麼是kqueue和IO復用
kueue是在UNIX上比較高效的IO復用技術。
所謂的IO復用,就是同時等待多個文件描述符就緒,以系統調用的形式提供。如果所有文件描述符都沒有就緒的話,該系統調用阻塞,否則調用返回,允許用戶進行後續的操作。
常見的IO復用技術有select, poll, epoll以及kqueue等等。其中epoll為Linux獨占,而kqueue則在許多UNIX系統上存在,包括OS X(好吧,現在叫macOS了。。)
2. 使用概覽
kueue在設計上是非常簡潔的,在易用性上可能比select和epoll更好一些。
使用kqueue的大致代碼如下:(後面會給出一個完整的示例)
const static int FD_NUM = 2 // 要監視多少個文件描述符
int kq = kqueue(); // kqueue對象
// kqueue的事件結構體,不需要直接操作
struct kevent changes[FD_NUM]; // 要監視的事件列表
struct kevent events[FD_NUM]; // kevent返回的事件列表(參考後面的kevent函數)
int stdin_fd = STDIN_FILENO;
int stdout_fd = STDOUT_FILENO;
// 在changes列表中註冊標準輸入流的讀事件 以及 標準輸出流的寫事件
// 最後一個參數可以是任意的附加數據(void * 類型),在這裡給事件附上了當前的文件描述符,後面會用到
EV_SET(&changes[0], stdin_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, &stdin_fd);
EV_SET(&changes[1], stdout_fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, &stdin_fd);
// 進行kevent函數調用,如果changes列表裡有任何就緒的fd,則把該事件對應的結構體放進events列表裡面
// 返回值是這次調用得到了幾個就緒的事件 (nev = number of events)
int nev = kevent(kq, changes, FD_NUM, events, FD_NUM, NULL); // 已經就緒的文件描述符數量
for(int i=0; i<nev; i++){
struct kevent event = events[i]; // 一個個取出已經就緒的事件
int ready_fd = *((int *)event.udata); // 從附加數據裡面取迴文件描述符的值
if( ready_fd == stdin_fd ){
// 讀取ready_fd
}else if( ready_fd == stdin_fd ){
// 寫入ready_fd
}
}
3. 相關結構體與函數解析
可以看出來,kqueue體系只有三樣東西:struct kevent結構體,EV_SET巨集以及kevent函數。
struct kevent 結構體內容如下:
struct kevent {
uintptr_t ident; /* identifier for this event,比如該事件關聯的文件描述符 */
int16_t filter; /* filter for event,可以指定監聽類型,如EVFILT_READ,EVFILT_WRITE,EVFILT_TIMER等 */
uint16_t flags; /* general flags ,可以指定事件操作類型,比如EV_ADD,EV_ENABLE, EV_DELETE等 */
uint32_t fflags; /* filter-specific flags */
intptr_t data; /* filter-specific data */
void *udata; /* opaque user data identifier,可以攜帶的任意數據 */
};
EV_SET 是用於初始化kevent結構的便利巨集,其簽名為:
EV_SET(&kev, ident, filter, flags, fflags, data, udata);
可以發現和kevent結構體完全對應,除了第一個,它就是你要初始化的那個kevent結構。
kevent 是真正進行IO復用的函數,其簽名為:
int kevent(int kq,
const struct kevent *changelist, // 監視列表
int nchanges, // 長度
struct kevent *eventlist, // kevent函數用於返回已經就緒的事件列表
int nevents, // 長度
const struct timespec *timeout); // 超時限制
4. 完整示例
下麵給出一個完整的示例,這個程式將從標準輸入中讀取數據,寫到標準輸出中。其中輸入輸出全部使用kqueue來進行IO復用。可以使用重定向把文件寫入標準輸入來進行測試。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/event.h>
#include <errno.h>
#include <string.h>
// 為文件描述符打開對應狀態位的工具函數
void turn_on_flags(int fd, int flags){
int current_flags;
// 獲取給定文件描述符現有的flag
// 其中fcntl的第二個參數F_GETFL表示要獲取fd的狀態
if( (current_flags = fcntl(fd, F_GETFL)) < 0 ) exit(1);
// 施加新的狀態位
current_flags |= flags;
if( fcntl(fd, F_SETFL, current_flags) < 0 ) exit(1);
}
// 錯誤退出的工具函數
int quit(const char *msg){
perror(msg);
exit(1);
}
const static int FD_NUM = 2; // 兩個文件描述符,分別為標準輸入與輸出
const static int BUFFER_SIZE = 1024; // 緩衝區大小
// 完全以IO復用的方式讀入標準輸入流數據,輸出到標準輸出流中
int main(){
struct kevent changes[FD_NUM];
struct kevent events[FD_NUM];
// 創建一個kqueue
int kq;
if( (kq = kqueue()) == -1 ) quit("kqueue()");
// 準備從標準輸入流中讀數據
int stdin_fd = STDIN_FILENO;
int stdout_fd = STDOUT_FILENO;
// 設置為非阻塞
turn_on_flags(stdin_fd, O_NONBLOCK);
turn_on_flags(stdout_fd, O_NONBLOCK);
// 註冊監聽事件
int k = 0;
EV_SET(&changes[k++], stdin_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, &stdin_fd);
EV_SET(&changes[k++], stdout_fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, &stdout_fd);
int nev, nread, nwrote = 0; // 發生事件的數量, 已讀位元組數, 已寫位元組數
char buffer[BUFFER_SIZE];
while(1){
nev = kevent(kq, changes, FD_NUM, events, FD_NUM, NULL); // 已經就緒的文件描述符數量
if( nev <= 0 ) quit("kevent()");
int i;
for(i=0; i<nev; i++){
struct kevent event = events[i];
if( event.flags & EV_ERROR ) quit("Event error");
int ev_fd = *((int *)event.udata);
// 輸入流就緒 且 緩衝區還有空間能繼續讀
if( ev_fd == stdin_fd && nread < BUFFER_SIZE ){
int new_nread;
if( (new_nread = read(ev_fd, buffer + nread, sizeof(buffer) - nread)) <= 0 )
quit("read()"); // 由於可讀事件已經發生,因此如果讀出0個位元組也是不正常的
nread += new_nread; // 遞增已讀數據位元組數
}
// 輸出流就緒 且 緩衝區有內容可以寫出
if( ev_fd == stdout_fd && nread > 0 ){
if( (nwrote = write(stdout_fd, buffer, nread)) <=0 )
quit("write()");
memmove(buffer, buffer+nwrote, nwrote); // 為了使實現的代碼更簡潔,這裡把還沒有寫出去的數據往前移動
nread -= nwrote; // 減去已經寫出去的位元組數
}
}
}
return 0;
}
程式中對stdin和stdout設置非阻塞的原因是我們希望有多少就緒的數據就讀多少,或者能寫入多少進緩衝區就寫入多少。否則在阻塞模式下,如果read沒有填滿buffer(文件沒讀完時),或者還有buffer數據沒寫入時,系統調用(read和write)會阻塞,這會對性能造成很大影響。因此這裡設置為非阻塞模式。