一.前言 從上個世紀到現在,工程師們在優化伺服器性能的過程中,提出了各種不同的io模型,比如非阻塞io,io復用,信號驅動式io,非同步io。具體io模型在不同平臺上的實現也不一樣,比如io復用在bsd上可以由kqueue實現,在solaris系統上可以由/dev/poll實現。為了實現系統的可移植性 ...
一.前言
從上個世紀到現在,工程師們在優化伺服器性能的過程中,提出了各種不同的io模型,比如非阻塞io,io復用,信號驅動式io,非同步io。具體io模型在不同平臺上的實現也不一樣,比如io復用在bsd上可以由kqueue實現,在solaris系統上可以由/dev/poll實現。為了實現系統的可移植性,POSIX 確保 select和poll在 unix-like系統上得到廣泛的支持。
在上個世紀,Dan Kegel 提出了C10K的設想,現在C10K 已經不是什麼問題,比如nginx就可以做到百萬級別的qps。於是又有人提出來了C10M的設想,Robert David Graham 從unix的最初設計初衷給出了自己的解決方案。
二.常見io模型
1.阻塞io
常見的read系統調用,是最常見的阻塞io:
2.非阻塞式io
非阻塞io的典型使用方式如下,設置非阻塞標誌,並且常與io復用一起使用,使用起來比較複雜。
val = Fcntl(sockfd, F_GETFL, 0);
Fcntl(sockfd, F_SETFL, val | O_NONBLOCK); /* O_NONBLOCK 標誌非阻塞 */
3.io 復用 (select/poll)
io復用在處理數量龐大的fd時非常有效,我們以select為例,select的核心api是select函數:
int select(int nfds, fd_set *_Nullable restrict readfds,
fd_set *_Nullable restrict writefds,
fd_set *_Nullable restrict exceptfds,
struct timeval *_Nullable restrict timeout);
看一個例子:
#include "unp.h"
void
str_cli(FILE *fp, int sockfd)
{
int maxfdp1;
fd_set rset;
char sendline[MAXLINE], recvline[MAXLINE];
FD_ZERO(&rset);
for ( ; ; ) {
FD_SET(fileno(fp), &rset); /* 設置要監聽的socket fd */
FD_SET(sockfd, &rset); /* 設置要監聽的file fd */
maxfdp1 = max(fileno(fp), sockfd) + 1;
Select(maxfdp1, &rset, NULL, NULL, NULL); /* select 調用 */
if (FD_ISSET(sockfd, &rset)) { /* socket 可讀 */
if (Readline(sockfd, recvline, MAXLINE) == 0)
err_quit("str_cli: server terminated prematurely");
Fputs(recvline, stdout);
}
if (FD_ISSET(fileno(fp), &rset)) { /* input 可讀 */
if (Fgets(sendline, MAXLINE, fp) == NULL)
return; /* all done */
Writen(sockfd, sendline, strlen(sendline));
}
}
}
4.信號驅動式io
但凡涉及到信號的程式都比較複雜。要使用信號驅動式io,先開啟socket的信號驅動式io功能,並通過sigaction 系統調用安裝一個信號處理函數:
void
dg_echo(int sockfd_arg, SA *pcliaddr, socklen_t clilen_arg)
{
int i;
const int on = 1;
sigset_t zeromask, newmask, oldmask;
sockfd = sockfd_arg;
clilen = clilen_arg;
for (i = 0; i < QSIZE; i++) { /* init queue of buffers */
dg[i].dg_data = Malloc(MAXDG);
dg[i].dg_sa = Malloc(clilen);
dg[i].dg_salen = clilen;
}
iget = iput = nqueue = 0;
Signal(SIGHUP, sig_hup); /* 安裝信號處理函數 */
Signal(SIGIO, sig_io);
Fcntl(sockfd, F_SETOWN, getpid()); /* 設置屬主 */
Ioctl(sockfd, FIOASYNC, &on); /* 開啟信號驅動式io */
Ioctl(sockfd, FIONBIO, &on); /* non-bloking */
Sigemptyset(&zeromask); /* init three signal sets */
Sigemptyset(&oldmask);
Sigemptyset(&newmask);
Sigaddset(&newmask, SIGIO); /* signal we want to block */
Sigprocmask(SIG_BLOCK, &newmask, &oldmask);
for ( ; ; ) {
while (nqueue == 0)
sigsuspend(&zeromask); /* wait for datagram to process */
/* 4unblock SIGIO */
Sigprocmask(SIG_SETMASK, &oldmask, NULL);
Sendto(sockfd, dg[iget].dg_data, dg[iget].dg_len, 0,
dg[iget].dg_sa, dg[iget].dg_salen);
if (++iget >= QSIZE)
iget = 0;
/* 4block SIGIO */
Sigprocmask(SIG_BLOCK, &newmask, &oldmask);
nqueue--;
}
}
5.非同步io
我們來看一個aio的例子(由於aio的例子過於複雜,我們這裡只截取部分關鍵代碼):
for (i = 0; i < NBUF; i++) {
switch (bufs[i].op) {
case UNUSED:
/*
* Read from the input file if more data
* remains unread.
*/
if (off < sbuf.st_size) {
bufs[i].op = READ_PENDING;
bufs[i].aiocb.aio_fildes = ifd;
bufs[i].aiocb.aio_offset = off;
off += BSZ;
if (off >= sbuf.st_size)
bufs[i].last = 1;
bufs[i].aiocb.aio_nbytes = BSZ;
if (aio_read(&bufs[i].aiocb) < 0) /* aio_read */
err_sys("aio_read failed");
aiolist[i] = &bufs[i].aiocb;
numop++;
}
break;
case READ_PENDING:
if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS) /* aio_error */
continue;
if (err != 0) {
if (err == -1)
err_sys("aio_error failed");
else
err_exit(err, "read failed");
}
/*
* A read is complete; translate the buffer
* and write it.
*/
if ((n = aio_return(&bufs[i].aiocb)) < 0) /* 調用aio_return成功則 說明數據已經返回 */
err_sys("aio_return failed");
if (n != BSZ && !bufs[i].last)
err_quit("short read (%d/%d)", n, BSZ);
for (j = 0; j < n; j++)
bufs[i].data[j] = translate(bufs[i].data[j]);
bufs[i].op = WRITE_PENDING;
bufs[i].aiocb.aio_fildes = ofd;
bufs[i].aiocb.aio_nbytes = n;
if (aio_write(&bufs[i].aiocb) < 0) /* aio_write */
err_sys("aio_write failed");
/* retain our spot in aiolist */
break;
case WRITE_PENDING:
if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS) /* aio_error */
continue;
if (err != 0) {
if (err == -1)
err_sys("aio_error failed");
else
err_exit(err, "write failed");
}
/*
* A write is complete; mark the buffer as unused.
*/
if ((n = aio_return(&bufs[i].aiocb)) < 0)
err_sys("aio_return failed");
if (n != bufs[i].aiocb.aio_nbytes)
err_quit("short write (%d/%d)", n, BSZ);
aiolist[i] = NULL;
bufs[i].op = UNUSED;
numop--;
break;
}
}
6.同步和非同步的分類
網路上對io同步和非同步的爭論很多,這裡給出Stevens的分類標準:
同步 | 阻塞io,非阻塞io,io復用,信號驅動式io |
非同步 | 非同步io |
三.C10K io策略
在上個世紀,Dan Kegel 提出了C10K的設想,即單機實現10k的併發量,主要提出了以下四種類型的解決方法:
伺服器範式 | 例子 | 備註 | 軟體實現 |
Serve many clients with each thread, and use nonblocking I/O(level-triggered) | select, poll(posix), /dev/poll(solaris), kqueue(bsd) | 輪詢 | |
Serve many clients with each thread, and use nonblocking I/O (readiness change) | kqueue(bsd), epoll(linux), Realtime Signals(linux) | 事件通知 | nginx, redis |
Serve many clients with each server thread, and use asynchronous I/O | aio | 非同步,沒有得到廣泛支持 | |
Serve one client with each server thread |
LinuxThreads, Java threading support in JDK 1.3.x and earlier |
早期的java使用綠色線程 |
- 在實現的過程中有諸多限制,比如打開fd的限制,創建thread數量的限制,根據不同內核而異。
- 32 位系統,用戶態的虛擬空間只有3G,如果創建線程時分配的棧空間是10M,那麼一個進程最多只能創建300 個左右的線程。 64 位系統,用戶態的虛擬空間大到有128T,理論上不會受虛擬記憶體大小的限制(10M個線程),而會受系統的參數或性能限制(線程上下文切換)。
四.C10M
Robert David Graham認為如果要解決C10M的問題,必須對unix內核進行改造。當下的unix系統的設計目標是為了滿足非常廣泛的需求,於是加上了許多通用的功能,比如進程管理,記憶體管理等等。C10M的問題不是通用的問題,需要自己處理數據控制,而不是依賴unix內核,而且需要做到packet scalability, multi-core scalability, memory scalability。
專項問題,需要特殊的解決方案。
五.總結
本文從常見io模型出發,梳理了高併發伺服器可能涉及到的io模型,這些經典io模型在過去十年基本沒有發生變化。瞭解這些底層技術對我們瞭解深入理解伺服器是非常有必要的。
六.參考
http://www.kegel.com/c10k.html#threads.java
http://highscalability.com/blog/2013/5/13/the-secret-to-10-million-concurrent-connections-the-kernel-i.html
https://man7.org/linux/man-pages/man2/select.2.html