Linux下實現I/O復用的系統調用方式主要:select、poll、epoll。 ...
I/O多路復用
Linux下實現I/O復用的系統調用方式主要:select、poll、epoll。
select
select系統調用可在一段指定時間內,監聽文件描述符上的可讀、可寫和異常等事件,判斷發生的事件需要輪詢。
註:網路程式中,select能處理的異常情況只有一種:socket上接收到帶外數據。
#include <sys/select.h>
//select監聽文件描述符事件
//nfds: 被監聽文件描述符中最大值+1
//readfds: 可讀事件對應的文件描述符集,對應位置1;會被內核修改,返回時無事件的置0。
//writefds: 可寫事件對應的文件描述符集,對應位置1;會被內核修改,返回時無事件的置0。
//exceptfds:異常事件對應的文件描述符集,對應位置1;會被內核修改,返回時無事件的置0。
//timeout
//return: 返回就緒文件描述符中最大值+1
int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);
//fd_set比特向量操作
FD_ZERO(fd_set *fdset); //清楚fdset中所有比特位
FD_SET(int fd, fd_set *fdset); //設置fdset中比特位fd
FD_CLR(int fd, fd_set *fdset); //清除fdset中比特位fd
FD_ISSET(int fd, fd_set *fdset); //測試fdset中比特位fd是否被設置,用於判斷是否有事件發生
文件描述符就緒條件
- socket可讀
- 可讀:socket內核接收緩存區位元組數大於或等於低水位標記SO_RCVLOWAT。讀操作返回讀取位元組數。
- 讀關閉:socket通信的對方關閉連接。讀操作返回0。
- 錯誤:socket上有未處理的錯誤。可以使用getsockopt讀取和清楚錯誤。
- 連接請求:監聽socket上有新的連接請求。
- socket可寫
- 可寫:socket內核發送緩存區中的可用位元組數大於或等於低水位標記SO_SNDLOWAT。寫操作返回實際寫位元組數。
- 寫關閉:寫操作被關閉,對寫關閉的socket執行寫操作將觸發一個SIGPIPE信號。
- 錯誤:socket上有未處理的錯誤。可以使用getsockopt讀取和清楚錯誤。
- 連接結果:socket使用非阻塞的connect連接成功或者失敗(超時)之後。
poll
poll系統調用與select類似,需要輪詢判斷監聽文件描述符集上的事件,但沒有監聽文件描述符數量限制。
#include <poll.h>
//poll監聽文件描述符上指定的事件
//fds: pollfd結構類型的數組
//nfds: pollfd數組長度
//return: 返回就緒文件描述符中最大值+1
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
//pollfd結構體,描述文件描述符上的可讀、可寫、異常等事件
struct pollfd{
int fd; //文件描述符
short events; //註冊的事件,一系列事件的按位或
short revents; //實際發生的事件,由內核填充,一些列事件的按位或
}
epoll
epoll相比select更加高效,主要體現在:
- epoll使用內核事件表維護監聽的文件描述符集,避免頻繁的用戶空間與內核空間頻繁的拷貝開銷。
- epoll直接返回發生事件的文件描述符即,避免了用戶程式輪詢的開銷。
- epoll內部通過註冊回調函數的方式,監聽特定文件描述符上的時間,避免了內核輪詢開銷。
- epoll提供了高效的邊沿觸發模式,邊沿觸髮帶來編程上的複雜性。
LT與ET模式
使用epoll監聽文件描述符時,有兩種事件觸發模式:
- 水平觸發(LT):epoll預設使用水平觸發。
- 邊沿觸發(ET):往epoll內核事件表這種註冊一個文件描述符上的EPOLLET時,將進行邊沿觸發。相比較水平觸發,邊沿觸發更加高效。
內核事件表
epoll在內核中使用事件表(紅黑樹)維護監聽的文件描述符,並支持對事件表的增刪改。
- 創建事件表:epoll_create,返回一個內核事件表的文件描述符。
- 操作事件表:epoll_ctl,對事件表進行增、刪、改。
- 監聽事件表:epoll_wait,監聽事件表上的事件。
#include <sys/epoll.h>
//epoll事件結構體
strcut epoll_event{
_uint32_t events; //epoll事件
epoll_data_t data; //用戶數據,包含監聽的文件描述符信息
}
//用戶數據聯合體
typedef union epoll_data{
void* ptr; //指定與fd相關的用戶數據,用戶數據包含監聽的文件描述符
int fd; //監聽的文件描述符fd(常用)
uint32_t u32;
uint64_t u64;
} epoll_data_t;
//創建epoll內核事件表
//size: 提示內核需要的事件表多大
int epoll_create(int size);
//操作epoll內核事件表
//epfd: epoll內核事件表
//op: 參數指定操作,包括增(EPOLL_CTL_ADD)、刪(EPOLL_CTL_DEL)、改(EPOLL_CTL_MOD)
//fd: 被操作的文件描述符
//event:事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
//監聽事件表上文件描述符的事件
//epfd: epoll內核事件表
//events: 就緒事件數組,由內核修改
//maxevents:最多監聽事件數
//timeout:
//return: 就緒文件描述符個數
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
示例
服務端
服務端代碼功能:
- 創建socket
- 為socket綁定固定地址和埠
- 監聽套接字等待客戶端連接
- 接收客戶端連接,獲取連接socket
- 使用select、poll或epoll監聽連接socket的可讀事件
- 連接socket可讀事件發生時,從連接socket中讀取數據
select
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
int main(int argc, char *argv[])
{
if (argc <= 2)
{
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
printf("ip is %s and port is %d\n", ip, port);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0)
{
printf("errno is: %d\n", errno);
close(listenfd);
}
char remote_addr[INET_ADDRSTRLEN];
printf("connected with ip: %s and port: %d\n", inet_ntop(AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN), ntohs(client_address.sin_port));
char buf[1024];
fd_set read_fds;
fd_set exception_fds;
FD_ZERO(&read_fds);
FD_ZERO(&exception_fds);
int nReuseAddr = 1;
setsockopt(connfd, SOL_SOCKET, SO_OOBINLINE, &nReuseAddr, sizeof(nReuseAddr));
while (1)
{
memset(buf, '\0', sizeof(buf));
FD_SET(connfd, &read_fds);
FD_SET(connfd, &exception_fds);
ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL);
printf("select one\n");
if (ret < 0)
{
printf("selection failure\n");
break;
}
if (FD_ISSET(connfd, &read_fds))
{
ret = recv(connfd, buf, sizeof(buf) - 1, 0);
if (ret <= 0)
{
printf("can not read\n");
break;
}
printf("get %d bytes of normal data: %s\n", ret, buf);
}
else if (FD_ISSET(connfd, &exception_fds))
{
ret = recv(connfd, buf, sizeof(buf) - 1, MSG_OOB);
if (ret <= 0)
{
printf("can not read\n");
break;
}
printf("get %d bytes of oob data: %s\n", ret, buf);
}
}
close(connfd);
close(listenfd);
return 0;
}
poll
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <poll.h>
int main(int argc, char *argv[])
{
if (argc <= 2)
{
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
printf("ip is %s and port is %d\n", ip, port);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0)
{
printf("errno is: %d\n", errno);
close(listenfd);
}
char remote_addr[INET_ADDRSTRLEN];
printf("connected with ip: %s and port: %d\n", inet_ntop(AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN), ntohs(client_address.sin_port));
char buf[1024];
pollfd poll_fds[1];
bzero(poll_fds, sizeof(poll_fds));
poll_fds[0].fd = connfd;
poll_fds[0].events = POLLIN;
int nReuseAddr = 1;
setsockopt(connfd, SOL_SOCKET, SO_OOBINLINE, &nReuseAddr, sizeof(nReuseAddr));
while (1)
{
memset(buf, '\0', sizeof(buf));
ret = poll(poll_fds, 1, -1);
printf("select one\n");
if (ret < 0)
{
printf("selection failure\n");
break;
}
if (poll_fds[0].events == POLLIN)
{
ret = recv(connfd, buf, sizeof(buf) - 1, 0);
if (ret <= 0)
{
printf("can not read\n");
break;
}
printf("get %d bytes of normal data: %s\n", ret, buf);
}
}
close(connfd);
close(listenfd);
return 0;
}
epoll
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
int main(int argc, char *argv[])
{
if (argc <= 2)
{
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
printf("ip is %s and port is %d\n", ip, port);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0)
{
printf("errno is: %d\n", errno);
close(listenfd);
}
char remote_addr[INET_ADDRSTRLEN];
printf("connected with ip: %s and port: %d\n", inet_ntop(AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN), ntohs(client_address.sin_port));
char buf[1024];
//創建內核事件表
int epfd = epoll_create(10);
//註冊事件
epoll_event event;
event.data.fd = connfd;
event.events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &event);
//就緒事件數組
epoll_event events[10];
int nReuseAddr = 1;
setsockopt(connfd, SOL_SOCKET, SO_OOBINLINE, &nReuseAddr, sizeof(nReuseAddr));
while (1)
{
memset(buf, '\0', sizeof(buf));
ret = epoll_wait(epfd, events, 1, -1);
printf("select one\n");
if (ret < 0)
{
printf("errno: %d\n", errno);
printf("selection failure\n");
break;
}
if (events[0].events == EPOLLIN)
{
ret = recv(connfd, buf, sizeof(buf) - 1, 0);
if (ret <= 0)
{
printf("can not read\n");
break;
}
printf("get %d bytes of normal data: %s\n", ret, buf);
}
}
close(connfd);
close(listenfd);
return 0;
}
客戶端
客戶端代碼功能:
- 創建套接字
- 請求連接
- 發送數據
- 關閉連接。
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
int main(int argc, char *argv[])
{
if (argc <= 2)
{
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
struct sockaddr_in server_address;
bzero(&server_address, sizeof(server_address));
server_address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &server_address.sin_addr);
server_address.sin_port = htons(port);
int sockfd = socket(PF_INET, SOCK_STREAM, 0);
assert(sockfd >= 0);
if (connect(sockfd, (struct sockaddr *)&server_address, sizeof(server_address)) < 0)
{
printf("connection failed\n");
}
else
{
printf("send oob data out\n");
const char *oob_data = "abc";
const char *normal_data = "123";
send(sockfd, normal_data, strlen(normal_data), 0);
send(sockfd, oob_data, strlen(oob_data), MSG_OOB);
send(sockfd, normal_data, strlen(normal_data), 0);
}
close(sockfd);
return 0;
}