《The Go Programming Language》 知識點記載,學習筆記、章節練習與個人思考。第一章內容。 ...
這次我們開始muduo
源代碼的實際編寫,首先我們知道muduo
是LT
模式,Reactor
模式,下圖為Reactor
模式的流程圖[來源1]
然後我們來看下muduo
的整體架構[來源1]
首先muduo
有一個主反應堆mainReactor
以及幾個子反應堆subReactor
,其中子反應堆的個數由用戶使用setThreadNum
函數設置,mainReactor
中主要有一個Acceptor
,當用戶建立新的連接的時候,Acceptor
會將connfd
和對應的事件打包為一個channel
然後採用輪詢的演算法,指定將該channel
給所選擇的subReactor
,以後該subReactor
就負責該channel
的所有工作。
TcpServer類
我們按照從上到下的思路進行講解,以下內容我們按照一個簡單的EchoServer
的實現思路來講解,我們知道當我們自己實現一個Server
的時候,會在構造函數中實例化一個TcpServer
EchoServer(EventLoop *loop,
const InetAddress &addr,
const std::string &name)
: server_(loop, addr, name)
, loop_(loop)
{
// 註冊回調函數
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
);
server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
);
// 設置合適的loop線程數量 loopthread 不包括baseloop
server_.setThreadNum(3);
}
於是我們去看下TcpServer
的構造函數是在乾什麼
TcpServer::TcpServer(EventLoop *loop,
const InetAddress &listenAddr,
const std::string &nameArg,
Option option)
: loop_(CheckLoopNotNull(loop))
, ipPort_(listenAddr.toIpPort())
, name_(nameArg)
, acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
, threadPool_(new EventLoopThreadPool(loop, name_))
, connectionCallback_()
, messageCallback_()
, nextConnId_(1)
, started_(0)
{
// 當有新用戶連接時候,會執行該回調函數
acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,
std::placeholders::_1, std::placeholders::_2));
}
我們只需要關註acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
和threadPool_(new EventLoopThreadPool(loop, name_))
首先很明確的一點,構造了一個Acceptor
,我們首先要知道Acceptor
主要就是連接新用戶並打包為一個Channel
,所以我們就應該知道Acceptor
按道理應該實現socket
,bind
,listen
,accept
這四個函數。
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
: loop_(loop), acceptSocket_(createNonblocking()) // socket
,
acceptChannel_(loop, acceptSocket_.fd()), listenning_(false)
{
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(true);
acceptSocket_.bindAddress(listenAddr); // 綁定套接字
// 有新用戶的連接,執行一個回調(打包為channel)
acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}
其中Acceptor
中有個acceptSocket_
,其實就是我們平時所用的listenfd
,構造函數中實現了socket
,bind
,而其餘的兩個函數的使用在其餘代碼
// 開啟伺服器監聽
void TcpServer::start()
{
// 防止一個TcpServer被start多次
if (started_++ == 0)
{
threadPool_->start(threadInitCallback_); // 啟動底層的loop線程池,這裡會按照設定了threadnum設置pool的數量
loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
}
}
我們知道,當我們設置了threadnum
之後,就會有一個mainloop
,那麼這個loop_
就是那個mainloop
,其中可以看見這個loop_
就只做一個事情Acceptor::listen
。
void Acceptor::listen()
{
listenning_ = true;
acceptSocket_.listen(); // listen
acceptChannel_.enableReading(); // acceptChannel_ => Poller
}
這裡就實現了listen
函數,還有最後一個函數accept
,我們慢慢向下分析,從代碼可以知道acceptChannel_.enableReading()
之後就會使得這個listenfd
所在的channel
對讀事件感興趣,那什麼時候會有讀事件呢,就是當用戶建立新連接的時候,那麼我們應該想一下,那當感興趣的事件發生之後,listenfd
應該乾什麼呢,應該執行一個回調函數呀。註意Acceptor
構造函數中有這樣一行代碼acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
這就是那個回調,我們去看下handleRead
在幹嘛。
// listenfd有事件發生了,就是有新用戶連接了
void Acceptor::handleRead()
{
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
// 若用戶實現定義了,則執行,否則說明用戶對新到來的連接沒有需要執行的,所以直接關閉
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr); // 輪詢找到subLoop,喚醒,分發當前的新客戶端的Channel
}
else
{
::close(connfd);
}
}
...
}
這裡是不是就實現了accept
函數,至此當用戶建立一個新的連接時候,Acceptor
就會得到一個connfd
和其對應的peerAddr
返回給mainloop
,這時候我們在註意到TcpServer
構造函數中有這樣一行代碼acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));
我們給acceptor_
設置了一個newConnectionCallback_
,於是由上面的代碼就可以知道,if (newConnectionCallback_)
為真,就會執行這個回調函數,於是就會執行TcpServer::newConnection
,我們去看下這個函數是在幹嘛。
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
// 輪詢演算法選擇一個subloop來管理對應的這個新連接
EventLoop *ioLoop = threadPool_->getNextLoop();
char buf[64] = {0};
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;
LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
// 通過sockfd獲取其綁定的本地ip和埠
sockaddr_in local;
::bzero(&local, sizeof local);
socklen_t addrlen = sizeof local;
if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0)
{
LOG_ERROR("sockets::getLocalAddr");
}
InetAddress localAddr(local);
// 根據連接成功的sockfd,創建TcpConnection
TcpConnectionPtr conn(new TcpConnection(
ioLoop,
connName,
sockfd, // Socket Channel
localAddr,
peerAddr));
connections_[connName] = conn;
// 下麵的回調時用戶設置給TcpServer,TcpServer又設置給TcpConnection,TcpConnetion又設置給Channel,Channel又設置給Poller,Poller通知channel調用這個回調
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
// 設置瞭如何關閉連接的回調
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)
);
// 直接調用connectEstablished
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
這裡就比較長了,我先說下大概他幹了啥事情:首先通過輪詢找到下一個subloop
,然後將剛剛返回的connfd
和對應的peerAddr
以及localAddr
構造為一個TcpConnection
給subloop
,然後給這個conn
設置了一系列的回調函數,比如讀回調,寫回調,斷開回調等等。下一章我們來說下上面的代碼最後幾行在幹嘛。
自己的網址:www.shicoder.top
歡迎加群聊天 452380935
本文由博客一文多發平臺 OpenWrite 發佈!