1. IO模型 記憶體和外設的交互叫做IO,網路IO就是將數據在記憶體和網卡間拷貝。 IO本質就是等待和拷貝,一般等待耗時往往遠高於拷貝耗時。所以提高IO效率就是儘可能減少等待時間的比重。 IO模型 簡單對比解釋 阻塞IO 阻塞等待數據到來 非阻塞IO 輪詢等待數據到來 信號驅動 信號遞達時再來讀取或寫 ...
1. IO模型
記憶體和外設的交互叫做IO,網路IO就是將數據在記憶體和網卡間拷貝。
IO本質就是等待和拷貝,一般等待耗時往往遠高於拷貝耗時。所以提高IO效率就是儘可能減少等待時間的比重。
IO模型 | 簡單對比解釋 |
---|---|
阻塞IO | 阻塞等待數據到來 |
非阻塞IO | 輪詢等待數據到來 |
信號驅動 | 信號遞達時再來讀取或寫入數據 |
多路轉接 | 讓大批線程等待,自身讀取數據 |
非同步通信 | 讓其他進程或線程進行等待和讀取,自身獲取結果 |
1.1 阻塞IO
執行流在某個文件描述符下讀取數據時,執行流一直等待IO條件就緒後讀取數據,這就是阻塞IO。
1.2 非阻塞IO
執行流會以迴圈的方式反覆嘗試讀取數據,如果IO條件未就緒,執行流會直接返回繼續其他任務。
非阻塞讀取方式
可通過fcntl設置文件的狀態。
非阻塞讀取時,數據未就緒是以出錯的形式返回的,錯誤碼為EAGIN
或EWOULDBLOCK
,信號導致讀取未成功錯誤碼為EINTR
。
void set_nonblock(int fd)
{
int fl = fcntl(fd, F_GETFL);
if (fl < 0) {
perror("fcntl failed");
return;
}
if (fcntl(fd, F_SETFL, fl | O_NONBLOCK) < 0) {
perror("fcntl failed");
return;
}
}
int main() {
set_nonblock(0);
char buf[64] = {0};
while (true) {
ssize_t n = read(0, buf, sizeof(buf) - 1);
if (n > 0)
{
buf[n - 1] = 0;
std::cout << buf << std::endl;
}
else if (n == 0)
{
perror("end of file");
break;
}
else
{
if (errno == EAGAIN || errno == EWOULDBLOCK) // 非阻塞數據未就緒返回
continue;
else if (errno == EINTR) // IO被信號中斷返回
continue;
else
{
perror("read error");
break;
}
}
}
return 0;
}
較為雞肋,一般不用。
1.3 信號驅動
IO事件就緒時,內核通過SIGIO信號通知進程。等待的過程是非同步的,但拷貝數據是同步的,所以我們認為信號驅動也是同步IO。
但信號處理是非同步的,所以數據提取可能不及時。
1.4 多路轉接
內核提供select、poll、epoll等多路轉接方案,最高可同時等待幾百個文件。拷貝數據的任務仍由進程完成,等待數據的任務交給內核。
1.5 非同步通信
只要自身完全沒有參與IO等待和拷貝就是非同步通信,否則就是同步。
將緩衝區提供給非同步介面,介面等待並拷貝將數據至緩衝區,最後通知進程。進程不參與IO可直接處理數據,所以是非同步的。
非同步IO系統提供有一些對應的系統介面,但大多使用複雜,也不建議使用。非同步IO也有更好的替代方案。
IO事件就緒
IO事件就緒可分為讀事件就緒和寫事件就緒。
一般接收緩衝區設有高水位,高於該水位讀事件就緒,發送緩衝區設有低水位,低於該水位寫事件就緒。
因為頻繁讀寫內核緩衝區需要狀態切換,會附帶一系列的處理工作,導致效率下降。
2. 多路轉接
Linux下多路轉接的方案常見的有三種:select、poll、epoll,select出現是最早的,使用也是最繁瑣的。
2.1 select
select的介面
select能夠等待多個fd的IO條件是否就緒。
#include <sys/select.h>
int select(int nfds, fd_set* rfds, fd_set* wfds, fd_set* efds, struct timeval* timeout);
struct timeval {
time_t tv_sec; /* seconds */
suseconds_t tv_usec; /* microseconds */
};
參數 | 解釋 |
---|---|
nfds |
fd的總個數,select遍歷fdset結構的範圍(被等待的fd的最大值+1) |
readfds |
調用時表示需要關註的讀事件,返回時表示那些事件已經就緒 |
writefds |
調用時表示需要關註的寫事件,返回時表示那些事件已經就緒 |
exceptfds |
調用時表示需要關註的異常事件,返回時表示那些事件已經就緒。如對端關閉,讀寫異常等 |
timeout |
調用時表示本次調用阻塞等待時間,返回時表示此次返回剩餘的等待時間 |
返回值 | 大於0表示就緒fd的個數,為0表示本次調用結束,–1表示出錯 |
fd_set的介面
fd_set
是文件描述符的點陣圖結構,下標表示文件描述符,比特位內容表示是否需要等待。
// fd_set操作函數
void FD_CLR (int fd, fd_set *set); // 清除
int FD_ISSET(int fd, fd_set *set); // 檢測
void FD_SET (int fd, fd_set *set); // 設置
void FD_ZERO ( fd_set *set); // 置零
select的使用
const int GPORT = 8080;
const int GSIZE = 10;
enum event_type {
read_event = 0x1 << 1,
write_event = 0x1 << 2,
except_event = 0x1 << 3,
};
struct fd_collection {
fd_collection() {}
fd_collection(const fd_collection& fds) {
_rfds = fds._rfds, _wfds = fds._wfds, _efds = fds._efds, _maxfd = fds._maxfd;
}
bool set(int event, int fd) {
if (_fdarr.size() >= GSIZE) return false;
if (event & read_event) _rfds.set(fd);
if (event & write_event) _wfds.set(fd);
if (event & except_event) _wfds.set(fd);
_fdarr.push_back(fd);
if (_maxfd < fd) _maxfd = fd;
return true;
}
void clear(int fd) {
_rfds.clear(fd);
_wfds.clear(fd);
_efds.clear(fd);
for (int i = 0; i < _fdarr.size(); i++)
if (_fdarr[i] == fd) _fdarr[i] = -1;
}
class file_descptrs {
public:
file_descptrs() { bzero(); }
~file_descptrs() {}
void set (int fd) { FD_SET(fd, &_set); }
void clear(int fd) { FD_CLR(fd, &_set); }
bool isset(int fd) { return FD_ISSET(fd, &_set); }
void bzero() { FD_ZERO(&_set); }
fd_set* get() { return &_set; }
private:
fd_set _set;
};
file_descptrs _rfds;
file_descptrs _wfds;
file_descptrs _efds;
std::vector<int> _fdarr;
int _maxfd = -1;
};
class select_server : public inet::tcp::server {
public:
select_server(uint16_t port) : server(port), _wouldblock(true)
{}
select_server(uint16_t port, int sec, int usec) : server(port), _timeout({sec, usec})
{}
void start() {
_fds.set(read_event, _sock);
while (true) {
int n = 0;
struct timeval timeout = _timeout;
fd_collection fds_cp(_fds);
if (_wouldblock)
n = select(fds_cp._maxfd+1, fds_cp._rfds.get(), fds_cp._wfds.get(), fds_cp._efds.get(), nullptr);
else
n = select(fds_cp._maxfd+1, fds_cp._rfds.get(), fds_cp._wfds.get(), fds_cp._efds.get(), &timeout);
switch (n) {
case 0: INFO("time out: %.2f", timeout.tv_sec + timeout.tv_usec / 1.0 / 1000);
break;
case -1: ERROR("select error, %d %s", errno, strerror(errno));
break;
default: handler_event(fds_cp);
break;
}
}
}
private:
void handler_event(fd_collection& resfds) {
for (auto fd : _fds._fdarr) {
if (fd == -1) continue;
if (resfds._rfds.isset(fd)) {
if (fd == _sock) {
acceptor();
} else {
std::string buf;
recver(fd, &buf);
}
}
if (resfds._wfds.isset(fd)) {
std::string msg = "test";
sender(fd, msg);
}
if (resfds._efds.isset(fd)) {
WARN("excepton event occurred, fd: %d", fd);
}
}
}
void acceptor() {
std::string cip;
uint16_t cport;
int sock = accept(&cip, &cport);
INFO("a connect %d has been accepted [%s:%d]", sock, cip.c_str(), cport);
// if (!_fds.set(read_event | write_event | except_event, sock))
if (!_fds.set(read_event, sock)) {
close(sock);
WARN("connect close, fd array is full");
}
}
void recver(int fd, std::string* buf) {
ssize_t s = recv(fd, buf, 1024);
if (s > 0) {
std::cout << *buf << std::endl;
}
else {
if (s == 0) INFO("client quit");
else WARN("recv error, %d %s", errno, strerror(errno));
_fds.clear(fd);
close(fd);
}
}
void sender(int fd, const std::string& msg) {
size_t s = send(fd, msg);
if (s <= 0) {
if (s == 0) INFO("client quit");
else WARN("send error, %d %s", errno, strerror(errno));
_fds.clear(fd);
close(fd);
}
}
private:
bool _wouldblock;
struct timeval _timeout;
fd_collection _fds;
};
select的優缺點
優點 |
---|
一次等待多個fd,使IO等待時間重疊,一定程度上提高IO效率 |
缺點 |
調用前要重新設置fd集,調用後要遍歷檢測就緒fd,需要額外數組 |
select能夠檢測fd的個數上限太小 |
頻繁地將用戶數據拷貝到內核中 |
select內部遍歷fd_set結構以檢測就緒 |
2.2 poll
poll相比select在使用和實現上都有進步。不過重點是epoll。
poll的介面
#include <poll.h>
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
struct pollfd {
int fd; /* file descriptor */
short events; /* events to look for */
short revents; /* events returned */
};
參數 | 解釋 |
---|---|
timeout |
阻塞等待時間,不過採用整數單位是毫秒。 |
struct pollfd* 和nfds_t |
pollfd結構體數組以及數據長度 |
struct pollfd.fd :關註的文件描述符 |
|
struct pollfd.events :關註的事件類型 |
|
struct pollfd.revents :就緒的事件類型 |
事件類型 | 描述 |
---|---|
POLLIN |
數據(包括普通數據和優先數據)可讀 |
POLLRDNORM |
普通數據可讀 |
POLLRDBAND |
優先順序帶數據可讀(Linux 不支持) |
POLLPRI |
高優先順序數據可讀,比如 TCP 帶外數據 |
POLLOUT |
數據(包括普通數據和優先數據)可寫 |
POLLWRNORM |
普通數據可寫 |
POLLWRBAND |
優先順序帶數據可寫 |
POLLRDHUP |
TCP 連接被對方關閉,或者對方關閉了寫操作,它由GNU引入 |
POLLERR |
錯誤 |
POLLHUP |
掛起。比如管道的寫端被關閉後,讀端描述符將收到 POLLHUP 事件 |
POLLNVAL |
文件描述符沒有打開 |
poll的使用
const int default_port = 8080;
const int default_size = 20;
const int default_timeout = -1;
const int default_fd = -1;
const short default_event = 0;
class poll_server : public inet::tcp::server {
public:
poll_server(uint16_t port) : server(port), _fds(new struct pollfd[default_size])
, _cap(0), _timeout(default_timeout) {
pollfd_arr_init();
}
void pollfd_arr_init() {
for (int i = 0; i < default_size; i++) pollfd_init(_fds[i]);
}
void pollfd_init(struct pollfd& pf) {
pf.fd = default_fd;
pf.events = default_event;
pf.revents = default_event;
}
void pollfd_clear(struct pollfd& pf) {
pf.fd = default_fd;
pf.events = default_event;
pf.revents = default_event;
}
void start() {
_fds[0].fd = _sock;
_fds[0].events = POLLIN;
++_cap;
while (true) {
int timeout = _timeout;
switch (poll(_fds.get(), _cap, timeout)) {
case 0: INFO("time out: %d", timeout); break;
case -1: ERROR("select error, %d %s", errno, strerror(errno)); break;
default: event_handler(); break;
}
}
}
private:
void event_handler() {
for (int i = 0; i < _cap; i++) {
auto& fd = _fds[i].fd;
auto& revents = _fds[i].revents;
if (revents & POLLIN) {
if (fd == _sock) {
acceptor();
} else {
std::string buf;
recver(i, &buf);
}
}
if (revents & POLLOUT) {
std::string msg = "test";
sender(i, msg);
}
if (revents & POLLERR){
WARN("excepton event occurred, fd: %d", fd);
}
}
}
void acceptor() {
std::string cip;
uint16_t cport;
int newfd = accept(&cip, &cport);
if (_cap >= default_size) {
close(newfd);
WARN("connect close, fd array is full");
return;
}
for (int i = 0; i < default_size; i++) {
if (_fds[i].fd == default_fd) {
_fds[i].fd = newfd;
_fds[i].events = POLLIN | POLLOUT;
_cap++;
break;
}
}
INFO("a connect %d has been accepted [%s:%d]", newfd, cip.c_str(), cport);
}
void recver(int i, std::string* buf) {
ssize_t s = recv(_fds[i].fd, buf, 1024);
if (s > 0) {
std::cout << *buf << std::endl;
} else {
if (s == 0) INFO("client quit");
else WARN("recv error, %d %s", errno, strerror(errno));
close(_fds[i].fd);
pollfd_clear(_fds[i]);
--_cap;
}
}
void sender(int i, const std::string& msg) {
size_t s = send(_fds[i].fd, msg);
if (s <= 0) {
if (s == 0) INFO("client quit");
else WARN("send error, %d %s", errno, strerror(errno));
close(_fds[i].fd);
pollfd_clear(_fds[i]);
--_cap;
}
}
private:
std::unique_ptr<struct pollfd[]> _fds;
int _cap;
int _timeout;
};
poll的優缺點
優點 |
---|
監視fd的個數無上限 |
將事件輸入輸出分離,避免原始數據被修改 |
缺點 |
返回後仍需要遍曆數組檢測就緒事件 |
poll內部仍需要內核自己遍歷檢測就緒事件 |
每次調用都要將pollfd結構從內核空間拷貝到用戶空間 |
2.3 epoll
epoll的介面
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
typedef union epoll_data {
void* ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
epoll_create | 負責創建epoll模型 |
---|---|
size |
目前size被忽略,為相容可寫128/256 |
返回值 | epoll句柄 |
epoll_ctl | 負責用戶告訴內核那些事件需要關註 |
epfd |
epoll句柄 |
op |
指定相關操作 |
EPOLL_CTL_ADD :添加事件 |
|
EPOLL_CTL_MOD :修改事件 |
|
EPOLL_CTL_DEL :刪除事件 |
|
fd |
事件關註的文件描述符 |
epoll_event |
用來指定fd上關註的事件 |
epoll_wait | 負責內核告訴用戶那些事件就緒 |
epfd |
epoll句柄 |
epoll_event |
輸出緩衝區,存放已就緒的事件 |
maxevents |
緩衝區的長度 |
timeout |
阻塞等待的時間 |
返回值 | 就緒事件的個數 |
events巨集常量取值 | 解釋 |
---|---|
EPOLLIN |
表示對應的文件描述符可以讀(包括對端SOCKET正常關閉) |
EPOLLOUT |
表示對應的文件描述符可以寫 |
EPOLLPRI |
表示對應的文件描述符有緊急的數據可讀(帶外數據) |
EPOLLERR |
表示對應的文件描述符發生錯誤 |
EPOLLHUP |
表示對應的文件描述符被掛斷 |
EPOLLET |
將EPOLL設為邊緣觸發(Edge Triggered)模式 |
EPOLLONESHOT |
只監聽一次事件,本次之後自動將該fd刪去 |
epoll的使用
epoll的原理
- epoll模型中用紅黑樹保存註冊的fd和事件,用就緒隊列保存就緒的fd和事件。
- epoll_ctl的本質就是新增修改刪除紅黑樹的節點,並對fd對應的文件中註冊回調函數。
- 如果事件就緒,內核在將硬體數據拷貝至內核緩衝區後,還會自動執行回調將紅黑樹節點添加到就緒隊列中。
- epoll_wait負責檢查是否有事件就緒,本質就是檢測就緒隊列為空。
epoll的工作模式
epoll有兩種工作方式,分別是水平觸發LT和邊緣觸發ET。
LTET的概念
- LT水平觸發:只要事件一直就緒,就會一直通知。
- ET邊緣觸發:只有事件就緒或再次就緒時,才會通知一次。
LT水平觸發
事件就緒時,可以不立刻處理或只部分處理。
只要事件處於就緒狀態,每次調用epoll_wait都會通知該事件就緒,直到處理完畢處於未就緒狀態。
ET邊緣觸發
設置事件為EPOLLET,表示對於該事件使用ET模式。
事件就緒時必須一次性處理清空數據,否則下次是不會通知該事件就緒的,直到該事件再次就緒。
LTET的讀寫特點
數據剩餘ET不會提醒,所以必須一次性讀取所有數據,但如果讀取時剛好無數據就會被阻塞。 所以ET必須採用非阻塞讀寫。
LT模式事件就緒時讀取一定不會被阻塞,因為一定有數據。
LTET的效率對比
一般ET的效率>=LT的效率。原因如下:
- 一般ET通知次數比LT少,也就是系統調用次數少。
- ET會倒逼程式員一次讀取全部數據,所以底層TCP會更新出更大的滑動視窗。
LTET的應用場景
- ET要求程式必須一次性讀取所有數據,再讓上層處理,ET重IO效率。
- LT可以只交付部分數據,儘快讓上層處理,LT重處理效率。
ET高IO,LT高響應。
epoll的優缺點
優點 | 解釋 |
---|---|
介面分離解耦 | 每次調用不需要重新設置事件集,做到輸入輸出事件分離 |
使用簡單高效 | 調用後用戶不需要遍歷,內核提供就緒事件緩衝區 |
輕量數據拷貝 | 不需要頻繁的進行將數據從內核和用戶之間的拷貝 |
無遍歷效率高 | 底層不需要遍歷,利用回調將就緒事件添加到就緒隊列中 |
沒有數量限制 | 文件描述符數目無上限 |
epoll的寫入設置
- 只有讀取緩衝區有數據,讀事件才會就緒。所以讀事件可以一直關註,我們稱為常設置。
- 只要寫入緩衝區沒有滿,寫事件就一直就緒。所以寫事件按需設置,寫入完成後立即關閉,否則會一直觸發。
一般構建響應後,直接發送數據,只有當緩衝區滿的時候,再將沒寫完的數據交給epoll處理。
select、poll、epoll都是如此,但epoll的ET模式可以常設置寫事件。