網路編程中的關鍵問題總結總結下網路編程中關鍵的細節問題,包含連接建立、連接斷開、消息到達、發送消息等等;連接建立包括服務端接受 (accept) 新連接和客戶端成功發起 (connect) 連接。 accept接受連接的問題在本文最後會聊到,這裡談談connect的關鍵點; 使用非阻塞連接建...
網路編程中的關鍵問題總結
總結下網路編程中關鍵的細節問題,包含連接建立、連接斷開、消息到達、發送消息等等;
連接建立
包括服務端接受 (accept) 新連接和客戶端成功發起 (connect) 連接。
accept接受連接的問題在本文最後會聊到,這裡談談connect的關鍵點;
使用非阻塞連接建立需要註意:
connect/select返回後,可能沒有連接上;需要再次確認是否成功連接;
步驟為:
- 使用非同步connect直接連接一次,因為使用了非阻塞,函數立刻返回;
- 檢查返回值,為0成功連接,否則加入到select/epoll中監控;
- 當有寫事件時,連接成功;當即可讀又可寫時,可能是有錯誤或者連接成功後有數據已經發過來;所以,此時,需要用getsockopt()讀取socket的錯誤選項,二次確認是否真的連接成功:
Fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
error = 0;
if ( (n = connect(sockfd, saptr, salen)) < 0)
if (errno != EINPROGRESS)
return(-1);
/* Do whatever we want while the connect is taking place. */
if (n == 0)
goto done; /* connect completed immediately */
if ( (n = Select(sockfd+1, &rset, &wset, NULL,
nsec ? &tval : NULL)) == 0) {
close(sockfd); /* timeout */
errno = ETIMEDOUT;
return(-1);
}
if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) {
len = sizeof(error);
//二次確認是否真的連接成功
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
return(-1); /* Solaris pending error */
} else
err_quit("select error: sockfd not set");
連接斷開
包括主動斷開 (close 或 shutdown) 和被動斷開 (read 返回 0)。
當打算關閉網路連接時,如何能知道對方已經發送了數據自己還沒有收到?
在TCP層面解決:主動關閉的時候只使用半關閉shutdown(), 這樣,服務端這邊之時關閉了寫端,還可以正常讀;客戶端收到關閉的信號後(read返回0),會再調用shutdown關閉整個連接;
在應用層面解決:雙方通過某個標記協商,在標記之後不再讀寫數據,這樣就可以完全的關閉連接了;
關閉連接時需要註意的:
是否還有未發送的數據,需要保證應用緩衝區中的數據都發送完畢之後再關閉緩衝區;
TCP緩存區不用我們考慮,因為在調用shutdown或close的時候,TCP的實現是會將TCP的發送緩衝區中的數據都發送出去,然後再發送FIN報文(也可能是組合成一個報文發送);
消息到達
消息到達是最重要的事件;對它的處理決定了網路編程的風格:是阻塞還是非阻塞、分包的處理、應用層的緩衝如何設計等等;
處理分包
所謂分包,就是在一個個位元組流消息中如何區分出一個個消息來;
常見的分包方法有:
- 固定長度;
- 特殊的結尾符,比如字元串的\0,或者回車換行等;
- 固定的消息頭中指定後續的消息的長度,然後跟上一個消息體內容;
- 使用協議本身的格式,比如json格式頭尾配對(XML也一樣);
位元組序轉換註意位元組對齊
如果傳輸的是二進位類型,在位元組流的緩存區中直接強轉可能core dump;因為有的系統訪問地址需要位元組對齊,不能在任意地址上訪問二進位類型(如整形),合理的方式是將其copy到一個本地變數中,然後再做位元組序的轉換:
int32_t peekInt32() const
{
assert(readableBytes() >= sizeof(int32_t));
int32_t be32 = 0;
::memcpy(&be32,readerIndex_, sizeof(be32) );
return be32toh(be32);
}
應用層緩存區的實現
數據到達時處理需要註意:
socket讀事件來到,必須一次將所有的數據都讀完,否則會造成一直有可讀事件,造成busy-loop;讀到的數據當然就需要有個應用層的緩衝區來存放;
因為應用的緩存區是有限的,可以預設設置一個大小,比如2kb,或者根本就不設置初始大小,用多少分配多少;muduo中使用的是vector 來作為緩存區,可以動態增長;
muduo buffer使用的技巧:
buffe採用了vector自動增長的數據結構;
從系統內核中調用的時候,在應用層需要有足夠大的緩衝區,最好能一次將系統recv到的緩衝區給讀空,一次系統調用就搞定一切事情;
而應用緩衝區考慮到有很多個併發的可能,針對每個連接一次都分配較大的緩衝區浪費嚴重,陳碩推薦使用readv一次讀入到兩個地址中,首先將第一個地址填滿,如果還有更多數據,就寫入到臨時緩衝區中,然後append到應用緩衝區;
讀的時候使用readv,局部使用一個足夠大的額外空間(64KB),這樣,一次讀取就足以將socket中的緩存區讀空(一般不會超過64K,tcp buffer如果確實要設置大的緩存區,需要調整系統參數);如果數據不多,可能內部buffer就裝下了,沒有額外操作,否則,多的數據讀到了外部的緩存區,再append到內部緩存區:
ssize_t Buffer::readFd(int fd, int* savedErrno)
{
// saved an ioctl()/FIONREAD call to tell how much to read
char extrabuf[65536];
struct iovec vec[2];
const size_t writable = writableBytes();
vec[0].iov_base = begin()+writerIndex_;
vec[0].iov_len = writable;
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
// when there is enough space in this buffer, don't read into extrabuf.
// when extrabuf is used, we read 128k-1 bytes at most.
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
//只有一次系統調用:這裡的實現比較巧妙
const ssize_t n = sockets::readv(fd, vec, iovcnt);
if (n < 0)
{
*savedErrno = errno;
}
else if (implicit_cast<size_t>(n) <= writable)
{
writerIndex_ += n;
}
else
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable);
}
// if (n == writable + sizeof extrabuf)
// {
// goto line_30;
// }
return n;
}
發送消息
網路編程中數據發送比數據接受要難處理;
數據的接收,只需要peek足夠的數據後,就可以從應用緩衝區接收出來,然後處理;而數據的發送,還需要考慮對方接受緩慢的情況,導致tcp發送緩衝區累積,最終導致應用緩衝區累積;
舉個例子:某客戶端對echo伺服器只發送,但故意不接收;
客戶端如果只是發送,但從不接收的話,那麼這邊發送過去的報文,首先會導致客戶端的tcp接收緩衝區滿,然後通過ack報文告訴伺服器端,這邊的滑動視窗為0了,不能再發了;後續客戶端發送的報文就把伺服器端TCP發送緩衝區積滿,然後累積應用層的發送緩衝區(因為是非阻塞),最終導致服務端的應用緩存區滿或者記憶體撐爆;
需要發送數據的時候,優先直接調用write()發送,如果發送不成功,或沒有全部發送完畢,才加入到發送緩存區,等待可寫事件到來後發送;
直接調用write()發送數據時,需要先將本次需要發送的數據添加到緩存區,然後發送緩存區,不可直接發送本次數據(因為緩存區中可能有遺留的數據未發送完)
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
//註意,這裡只調用了一次write,而沒有反覆調用write直到出現EAGAIN錯誤,
//原因是如果第一次調用沒有發送完全部的數據,第二次調用幾乎肯定是EAGAIN錯誤,
//因此這裡減少了一次系統調用,這麼做不影響正確性,卻能夠降低系統時延
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)
{
//如果發送緩存區為空,不再關註寫事件,避免 busy loop
channel_->disableWriting();
//如果還有寫完成之後的回調,加入待執行回調隊列
if (writeCompleteCallback_)
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
//如果此時正在關閉,調用shutdownInLoop 繼續執行關閉過程
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}
消息發送完畢
對於低流量的服務,可以不必關心這個事件;另外,這裡“發送完畢”是指將數據寫入操作系統的緩衝區,後續由 TCP 協議棧負責數據的發送與重傳,不代表對方已經收到數據。
其它問題
IO multiplexing 是否可以配合阻塞套接字使用?
一般都配合非阻塞socket使用,如果使用阻塞IO,可能在讀寫事件上阻塞當前線程,造成無法繼續處理已經就緒的事件;
初學網路編程可能都會有這個想法,select返回後,如果是讀事件,那麼這時候tcp讀緩衝區肯定是有數據,這時即使使用阻塞套接字來read,應該也不會阻塞;但這樣忽略了一個點,緩衝區確實是有數據,但是很可能到達的數據並不滿足你要求讀的數據大小,這樣read調用還是會阻塞,直到有足夠的數據才返回;
那麼,對於數據讀不可以,對accept()總可以吧,連接事件返回,一般都是有新用戶接入,這時候阻塞的accept()應該總是能夠返回;但在某些情況下,可能對方剛連接上就斷開了,並給服務端發送了一個RST請求,造成服務端這邊將已經就緒的連接請求又移除了,這樣的場景下,select返回,但是accept卻無法獲取新的連接,造成阻塞,直到下一個連接請求到來;(這方面的例子詳見《UNIX網路編程捲1:套接字聯網API》16.6節非阻塞accept() )
所以任何時候,IO multiplexing都需要配合非阻塞IO使用;
零拷貝的實現
對於內核層的實現,底層調用的是系統調用sendFile()方法;
zerocopy技術省去了將操作系統的read buffer拷貝到程式的buffer, 以及從程式buffer拷貝到socket buffer的步驟, 直接將 read buffer 拷貝到 socket buffer;
詳見:http://www.cnblogs.com/zemliu/p/3695549.html
應用層上的實現,對於自定義的結構,一般是交換內部指針(使用C++11,可以使用move操作來實現高效交換結構體)
如果是vector等結構,使用其成員函數swap()就能達到高效的交換(類似C++11中的move操作);
例如muduo中buffer實現:通過swap實現了緩存區的指針交換,從而達到數據交換的目的,而不用拷貝緩衝區;
void swap(Buffer& rhs)
{
buffer_.swap(rhs.buffer_); // std::vector<char> buffer_;
std::swap(readerIndex_, rhs.readerIndex_);
std::swap(writerIndex_, rhs.writerIndex_);
}
epoll使用LT
epoll使用是LT而非ET,原因如下:
- LT編程方便,select的經驗都可同樣適用;
- 讀的時候只需要一次系統調用,而ET必須讀到EAGAIN錯誤;減少一次系統調用,降低時延;
一般認為 edge-trigger 模式的優勢在於能夠減少 epoll 相關係統調用,這話不假,但網路服務程式里可不是只有 epoll 相關係統調用,為了繞過餓死問題,edge-trigger 模式下用戶要自行進行 read/write 迴圈處理,這其中增加的系統調用和減少的 epoll 系統調用加起來,總體性能收益究竟如何?只有實際測量才知道,無法一概而論。為了降低處理邏輯複雜度,常用的事件處理庫大部分都選擇了 level-trigger 模式(如 libevent、boost::asio、muduo等)
參考
《UNIX網路編程捲1:套接字聯網API》
《Linux多線程服務端編程:使用muduo網路庫》
Posted by: 大CC | 31DEC,2015
博客:blog.me115.com [訂閱]
Github:大CC