handy網路庫源碼閱讀

来源:https://www.cnblogs.com/dayoushen/archive/2019/09/20/11560021.html
-Advertisement-
Play Games

簡潔易用的C++11網路庫,From:https://github.com/yedf/handy 在整理過去的資料過程中,發現過去有關註過這一個網路庫,簡單看了一下屬於輕量級的實現,因此本文將對該庫進行簡單的學習之旅,目標是對網路基礎知識進一步鞏固。 編譯和運行 庫目前實現了linux和mac環境, ...


簡潔易用的C++11網路庫,From:https://github.com/yedf/handy
在整理過去的資料過程中,發現過去有關註過這一個網路庫,簡單看了一下屬於輕量級的實現,因此本文將對該庫進行簡單的學習之旅,目標是對網路基礎知識進一步鞏固。

編譯和運行

庫目前實現了linux和mac環境,需要支持C++11因此gcc的版本要大於4.8,在我的虛擬機ubuntu12.04是要升級gcc版本,然後使用雲centos 7,之前安裝的cmake版本是2.8.12,與要求的版本大於3.2不匹配,因此先升級cmake

  $ cd /tmp
  $ wget https://cmake.org/files/v3.3/cmake-3.3.2.tar.gz
  $ tar xzvf cmake-3.3.2.tar.gz
  $ cd cmake-3.3.2
  $ ./bootstrap
  $ gmake
  $ make install
#FROM : https://blog.csdn.net/fword/article/details/79347356

升級後能順利編譯。

網路庫基礎知識

既然是高性能網路庫,那linux必然是epoll,在raw-examples帶有對epoll的測試epoll.cc(水平觸發)和epoll-et.cc(邊緣觸發)
水平觸發:當被監控的文件描述符上有可讀寫事件發生時,epoll_wait()會通知處理程式去讀寫。如果這次沒有把數據一次性全部讀寫完(如讀寫緩衝區太小),那麼下次調用 epoll_wait()時,它還會通知你在上沒讀寫完的文件描述符上繼續讀寫,當然如果你一直不去讀寫,它會一直通知你!如果系統中有大量你不需要讀寫的就緒文件描述符,而它們每次都會返回,這樣會大大降低處理程式檢索自己關心的就緒文件描述符的效率!
Edge_triggered(邊緣觸發):當被監控的文件描述符上有可讀寫事件發生時,epoll_wait()會通知處理程式去讀寫。如果這次沒有把數據全部讀寫完(如讀寫緩衝區太小),那麼下次調用epoll_wait()時,它不會通知你,也就是它只會通知你一次,直到該文件描述符上出現第二次可讀寫事件才會通知你!這種模式比水平觸發效率高,系統不會充斥大量你不關心的就緒文件描述符!
水平觸發和邊緣觸發

根據linux的man-page中說明邊緣觸發要求在EPOLL_CTRL_ADD的時候就對文件描述符進行EPOLLIN|EPOLLOUT|EPOLLET事件關註(建議只對客戶端套接字),這能避免不斷地使用EPOLL_CTL_MOD修改對EPOLLIN和EPOLLOUT事件地關註。通常情況下監聽套接字為水平觸發,客戶套接字邊緣觸發,對監聽套接字和客戶套接字都要設置非阻塞模式。監聽套接字使用水平觸發的原因是,多個連接同時到達如果使用邊緣觸發則epoll只會通知一次,有一些TCP連接在就緒隊列積累得不到及時處理,如果使用水平觸發需要採取而外的處理方式(使用while迴圈accpet,直到accept返回-1且errno設置為EAGIN表示所有的連接處理完了)
EPOLL的系統函數定義如下:

#include <sys/epoll.h>
   typedef union epoll_data {
                   void    *ptr;
                   int      fd;
                   uint32_t u32;
                   uint64_t u64;
               } epoll_data_t;
   struct epoll_event {
       uint32_t     events;    // Epoll events
       epoll_data_t data;      // User data variable
   };
/*
功能:創建epoll對象
[1]size無意義,要求大於0
返回值:成功為非負文件描述符,失敗為-1
*/
int epoll_create(int size);

/*
功能:對epoll對象增加,修改或刪除感興趣事件,輸入<文件描述符fd, 操作op, 事件epoll_event>
操作OP:增EPOLL_CTL_ADD,改EPOLL_CTL_MOD,刪EPOLL_CTL_DEL
事件epoll_event.events:對應文件描述符可讀EPOLLIN,可寫EPOLLOUT,對方關閉EPOLLRDHUP,異常EPOLLPRI
,錯誤EPOLLERR,掛起EPOLLHUP,設置邊緣觸發EPOLLET,設置只觸發一次EPOLLONESHOT,EPOLLWAKEUP,EPOLLEXCLUSIVE
返回值:0-成功,-1失敗
*/
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

/*
功能:等待內核中的epoll_event事件可讀或者timeout到達
[1]epfd是一個epoll實例句柄根據epoll_create得到
[2]epoll_event包含文件描述符和Epoll事件,對應記憶體由用戶開闢
[3]最多事件數,必須大於0
[4]超時事件,單位為ms
返回值:>0有對應個文件描述符發生了事件;0超時到達;-1發生錯誤
*/
int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);

下麵是代碼節選

//epoll.cc 水平觸發

//main函數
//0)忽略SIGPIPE信號,避免對等方關閉後觸發了寫操作引起的SIGPIPE信號,而導致進程退出
::signal(SIGPIPE, SIG_IGN);
//1)定義了回饋的報文,長度1048576是為了測試寫緩衝區滿了的情況
httpRes = "HTTP/1.1 200 OK\r\nConnection: Keep-Alive\r\nContent-Type: text/html; charset=UTF-8\r\nContent-Length: 1048576\r\n\r\n123456";
    for (int i = 0; i < 1048570; i++) {
        httpRes += '\0';
    }
//2)創建epoll實例
int epollfd = epoll_create(1);
//3)創建socket監聽套接字listenfd,設置非阻塞模式,bind,listen和加入到epollfd關註
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
int r = ::bind(listenfd, (struct sockaddr *) &addr, sizeof(struct sockaddr));
r = listen(listenfd, 20);
setNonBlock(listenfd);
updateEvents(epollfd, listenfd, EPOLLIN, EPOLL_CTL_ADD); //epoll_ctrl(epollfd,EPOLL_CTL_ADD,listenfd,ev.EPOLLIN)關註監聽套接字的可讀事件
//4)迴圈epoll_wait等待內核事件
for (;;) {  //實際應用應當註冊信號處理函數,退出時清理資源
        loop_once(epollfd, listenfd, 10000); //調用epoll_wait,超時等待為10s,如果有事件返回也會立即返回
    }

//loop_once函數
int n = epoll_wait(efd, activeEvs, kMaxEvents, waitms);
for (int i = 0; i < n; i++) {
        int fd = activeEvs[i].data.fd;
        int events = activeEvs[i].events;
        if (events & (EPOLLIN | EPOLLERR)) {
            if (fd == lfd) {
                handleAccept(efd, fd); //有連接到來,accpet得到對應文件描述符,調用updateEvents加入efd的EPOLLIN關註列表
            } else {
                handleRead(efd, fd); //客戶端有數據,保存連接上下文到map<fd, Con>cons中,根據http的協議(結尾"\n\n"或"\r\n\r\n")發送httpRes給客戶端,註意這裡httpRes太長,寫write返回小於0且errno為EAGAIN或EWOULDBLOCK,則要表示緩衝區已近滿了不能再寫了,要修改關註對應套接字的可讀可寫事件;後續回調可寫繼續寫入,最後寫完成後修改為只關註可讀事件。
            }
        } else if (events & EPOLLOUT) {
            if (output_log)
                printf("handling epollout\n");
            handleWrite(efd, fd);
        } else {
            exit_if(1, "unknown event");
        }
//updateEvents函數
void updateEvents(int efd, int fd, int events, int op) {
    struct epoll_event ev;
    memset(&ev, 0, sizeof(ev));
    ev.events = events;
    ev.data.fd = fd;
    printf("%s fd %d events read %d write %d\n", op == EPOLL_CTL_MOD ? "mod" : "add", fd, ev.events & EPOLLIN, ev.events & EPOLLOUT);
    int r = epoll_ctl(efd, op, fd, &ev); //把文件描述符fd加入到epoll對象efd關註
    exit_if(r, "epoll_ctl failed");
}

值得註意的是水平觸發和邊緣觸發的區別,是在epoll_ctl中ev.events指定,預設為水平觸發;後續要特別註意對可寫事件的處理上,水平觸發需要在寫遇到WOULDBLOCK後關註可寫事件,寫完後取消關註可寫事件,而邊緣觸發只是在epoll_ctl的add操作中指定EPOLLET和同時關註可讀可寫事件,而後在寫write數據中遇到EWOULDBLOCK直接跳出寫迴圈等到內核說可以再寫則繼續寫。關於讀read每次都讀到返回-1且error為EAGAIN|EWOULDBLOCK,這種策略下就不用在讀方面區分是水平模式還是邊緣模式。

註意:作者給出的示例中,沒有設置監聽套接字SO_REUSEADDR,如果服務端斷開而任一客戶端沒斷開,服務端重新啟動將出想bind失敗,錯誤原因是"Address already in use"會有約2s時間處於TIME_WAIT狀態,建議服務端開始開啟這個選項,當然也要考慮多次啟動和搶占地址的情況出現。

功能模塊

handy文件夾即網路庫的核心,最後生成動態庫和靜態庫,測試程式在example和10m兩個文件夾,分析網路庫將重點關註handy文件夾。handy文件夾主要的功能實現在如下文件中(從CMakeList文件可以看出)

  • ${PROJECT_SOURCE_DIR}/handy/daemon.cc
  • ${PROJECT_SOURCE_DIR}/handy/net.cc //定義網路設置輔助函數,比如setNonBlock,setNoDelay;設計讀寫緩衝區Buffer
  • ${PROJECT_SOURCE_DIR}/handy/codec.cc //定義編解碼,目前支持以\r\n結尾和長度開始的消息
  • ${PROJECT_SOURCE_DIR}/handy/http.cc //支持http
  • ${PROJECT_SOURCE_DIR}/handy/conn.cc //TCP連接上下文
  • ${PROJECT_SOURCE_DIR}/handy/poller.cc //對epoll的封裝
  • ${PROJECT_SOURCE_DIR}/handy/udp.cc //對udp的封裝
  • ${PROJECT_SOURCE_DIR}/handy/threads.cc //線程池和隊列的封裝
  • ${PROJECT_SOURCE_DIR}/handy/file.cc //文件io的函數集
  • ${PROJECT_SOURCE_DIR}/handy/util.cc //時間等基礎函數
  • ${PROJECT_SOURCE_DIR}/handy/conf.cc //INI配置文件讀寫的封裝
  • ${PROJECT_SOURCE_DIR}/handy/stat-svr.cc //
  • ${PROJECT_SOURCE_DIR}/handy/port_posix.cc //網路位元組序等系統相關網路輔助函數
  • ${PROJECT_SOURCE_DIR}/handy/event_base.cc //事件迴圈和通道定義
  • ${PROJECT_SOURCE_DIR}/handy/logging.cc) //日誌

給上面功能分一下類:

  • 1)基礎部件:daemon.cc, threads.cc, file.cc, util.cc, conf.cc, logging.cc
  • 2)系統相關:net.cc,poller.cc,port_posix.cc
  • 3)協議相關:codec.cc,udp.cc,http.cc
  • 4)網路封裝:event_base.cc,conn.cc,stat-svr.cc
    對於基礎部件可以單獨測試,只看一下一些有趣的設計;對於系統相關的需要瞭解其作用;對於協議相關的要瞭解介面;對網路封裝是本文的重點。

基礎部件

公共函數util

  1. format返回格式化後的string,涉及到記憶體重分配
//util.h
struct util {
    static std::string format(const char *fmt, ...);
}
//util.cpp
string util::format(const char *fmt, ...) {
    char buffer[500]; //棧記憶體
    unique_ptr<char[]> release1;
    char *base;
    for (int iter = 0; iter < 2; iter++) {
        int bufsize;
        if(iter == 0) { //第一次嘗試用char[500]去獲取格式化數據
            bufsize = sizeof(buffer);
            base = buffer;
        } else { //第二次嘗試用char[30000]去獲取格式化數據
            bufsize = 30000;
            base = new char[bufsize]; //或許需要檢查一下30k記憶體是否分配成功
            release1.reset(base); //新記憶體將由unique_ptr接管,即在程式真正退出前,unique_ptr對象銷毀時同時銷毀綁定的記憶體;
        }
        char *p = base;
        char *limit = base + bufsize;
        if (p < limit) {
            va_list ap;
            va_start(ap, fmt);
            p += vsnprintf(p, limit - p, fmt, ap);
            va_end(ap);
        }
        // Truncate to available space if necessary
        if(p >= limit) {
            if(iter == 0) {
                continue;
            } else {
                p = limit - 1;
                *p = '\0';
            }
        }
        break;
    }
    return base;//註意這裡是把char* 返回給一個臨時結果string;如果是返回char *則會出現unique_ptr銷毀一次而外部使用時崩潰,permission denid
}

以上主要的疑問:
1)p += vsnprintf(p, limit - p, fmt, ap);理論上p +=max(bufsize)會導致p>=limit出現嗎?
--邊界情況會出現p==limit而不會大於。

2)引入unique_ptr的作用是什麼?是為了委托base的記憶體回收嗎?即本程式會記憶體泄漏嗎?
--unique_ptr的存在時為了函數結束後對成員進行回收,如果不用unique_ptr,那麼會增加如下代碼釋放記憶體:

    string strTemp(base); //多了一次拷貝
    if(base != NULL && base != buffer) delete base; base = NULL; //多了一次釋放,主要判斷不為棧數組,否則非法釋放
    return strTemp;

測試代碼如下:

 56         string s1 = "hello";
 57         for(int i = 0; i < 99; i++) {
 58                 s1 += "hello";
 59         }
 60         printf("len(s1)=%d\n", s1.length()); //500
 61         string s2 = std::string(util::format("%s", s1.c_str() ) );
 62         printf("len(s2)=%d\n", s2.length()); //500
  1. 退出時自動調用某一個函數
//util.h
struct noncopyable {
   protected:
    noncopyable() = default;
    virtual ~noncopyable() = default;

    noncopyable(const noncopyable &) = delete;
    noncopyable &operator=(const noncopyable &) = delete;
};
struct ExitCaller : private noncopyable {
    ~ExitCaller() { functor_(); }
    ExitCaller(std::function<void()> &&functor) : functor_(std::move(functor)) {}

   private:
    std::function<void()> functor_;
};
//usage.cc
    //...
    int fd = open(filename.c_str(), O_RDONLY);
    if (fd < 0) {
        return Status::ioError("open", filename);
    }
    ExitCaller ec1([=] { close(fd); });

上述的ExitCaller類似LockGuard,或者說go語言的defer,表示當變數離開作用域時調用某一個函數,defer實現如下和上面只差一個lambda匿名函數:

#pragma once
#include <functional>

#define CONNECTION(text1,text2) text1##text2
#define CONNECT(text1,text2) CONNECTION(text1,text2)

class DeferHelper {
 public:
    DeferHelper(std::function<void ()> &&cb) : cb_(std::move(cb)) {}
    ~DeferHelper() { if (cb_) cb_(); }
 private:
    std::function<void ()> cb_;
};
#define defer(code)  DeferHelper CONNECT(L,__LINE__) ([&](){code;})

線程類threads

封裝了一個隊列和線程池。
隊列的優點時put會檢查是否滿,pop_wait會等待超時或丟列不為空;

template <typename T>
struct SafeQueue : private std::mutex, private noncopyable {
    static const int wait_infinite = std::numeric_limits<int>::max(); //最大等待時間ms
    // 0 不限制隊列中的任務數
    SafeQueue(size_t capacity = 0) : capacity_(capacity), exit_(false) {}
    //隊列滿則返回false,否則push_back到items_中,並使用ready_.notify_one()通知一個去取
    bool push(T &&v);
    //超時則返回T(),出現在隊列為空情況;不超時返回items_中頭元素
    T pop_wait(int waitMs = wait_infinite);
    //超時返回false;不超時,v中存儲items_中頭元素
    bool pop_wait(T *v, int waitMs = wait_infinite);
    //有鎖獲取元素個數,即items_.size
    size_t size();
    //退出,置exit_標識為true
    void exit();
    //取退出標識
    bool exited() { return exit_; }

   private:
    std::list<T> items_;
    std::condition_variable ready_;
    size_t capacity_;
    std::atomic<bool> exit_;
    void wait_ready(std::unique_lock<std::mutex> &lk, int waitMs); //等待waitMs,調用ready.wait_until函數
};

多線程隊列則時能利用多個線程消化SafeQueue中的任務

typedef std::function<void()> Task;
extern template class SafeQueue<Task>;

struct ThreadPool : private noncopyable {
    //創建線程池,threads指定線程個數建議為cpunum或2*cpunum,
    ThreadPool(int threads, int taskCapacity = 0, bool start = true);
    //銷毀safeQueue和一些列印信息
    ~ThreadPool();
    //使用線程從safeQueue中取元素讓後執行
    void start();
    //停止safeQueue
    ThreadPool &exit() {
        tasks_.exit();
        return *this;
    }
    //等待線程集合退出,for(auto &t : threads_)t.join();
    void join();

    //隊列滿返回false, 使用std::move把右值引用變成引用:tasks_.push(move(task));
    bool addTask(Task &&task);
    bool addTask(Task &task) { return addTask(Task(task)); }
    size_t taskSize() { return tasks_.size(); }

   private:
    SafeQueue<Task> tasks_;
    std::vector<std::thread> threads_;
};

文件IO相關file

  1. Status結構體
    記錄最後文件操作的執行狀態
struct Status {
    Status() : state_(NULL) {}
    Status(int code, const char *msg);//state = new char[strlen(msg) + 8];state[0-4]=(strlen(msg) + 8),state[4-8]=code
    //...
private:
    //    state_[0..3] == length of message
    //    state_[4..7]    == code
    //    state_[8..]  == message
    const char *state_;
  1. 文件IO導出函數
//file.h
    //把文件filename的內容讀到cont中
    static Status getContent(const std::string &filename, std::string &cont);
    //把cont寫到文件filename中
    static Status writeContent(const std::string &filename, const std::string &cont);
    //寫入cont到臨時文件tmpName,刪除舊文件name,重命名tmpName文件為name文件
    static Status renameSave(const std::string &name, const std::string &tmpName, const std::string &cont);
    //把文件夾dir中的文件名加入到result中,使用dirent.d中的readdir函數
    static Status getChildren(const std::string &dir, std::vector<std::string> *result);
    //刪除文件,使用unlink刪除,c語言中的remove則內部使用了remove,不過remove也能刪除目錄
    static Status deleteFile(const std::string &fname);
    //創建目錄,使用mkdir(name.c_str(), 0755),八進位0755表示文件許可權為文件所有著7(r4+w2+e1),組5(r4+e1),其他用戶5(r4+e1)
    static Status createDir(const std::string &name);
    //刪除文件夾deleteDir
    static Status deleteDir(const std::string &name);
    //使用stat返迴文件的信息
    static Status getFileSize(const std::string &fname, uint64_t *size);
    //使用rename函數重命名一個文件
    static Status renameFile(const std::string &src, const std::string &target);
    //使用access判斷文件是否存在;或許何以通過stat返回失敗-1且errno==ENOENT判斷文件不存在
    static bool fileExists(const std::string &fname);

配置INI文件conf

為了程式的靈活性,一般都會有INI配置文件,INI配置文件的格式如下
[section]
key1 = value1
key2 = 2
作者導出介面如下:

//conf.h
struct Conf {
    int parse(const std::string &filename); //解析文件filename的內容到values_
    std::string get(std::string section, std::string name, std::string default_value); //取字元串值section[name],沒取到返回default_value
    long getInteger(std::string section, std::string name, long default_value);///取整數值section[name],沒取到返回default_value
    double getReal(std::string section, std::string name, double default_value);//取浮點數值section[name],沒取到返回default_value
    bool getBoolean(std::string section, std::string name, bool default_value);//取布爾值section[name],沒取到返回default_value
    std::list<std::string> getStrings(std::string section, std::string name);//取setction[name]對應的值
    std::map<std::string, std::list<std::string>> values_;//存儲為section.key,value,為什麼值是用list來存呢?因為有多行的value的情況。
    std::string filename; //對應解析的文件名

據實現描述這個conf參考了python的ConfigParser,我喜歡輕量級mars的conf解析

日誌logging

日誌是伺服器中比較重要的,因為發生異常基本都需要分析日誌改善程式,日誌庫大部分都有glog的影子。對於服務端的日誌,因為在多線程中,因此不能寫串,有人提倡用prinf而不是ostream,ostream真的不是多線程安全,這一點待探索;日誌是能分等級的,常見為DEBUG,INFO,WARNING,FATAL;日誌可以是緩衝寫或實時寫,但要保證程式退出的時候儘量少的丟日誌,尤其是異常退出的時候;日誌是要支持滾動的,根據具體的要求按天滾動或者按大小滾動;每條日誌頭部有時間信息,尾部可能有文件和代碼行信息。

通過查看logging.h的實現可以發現,日誌分等級,日誌是一個靜態單例通過static Logger &getLogger()返回,然後定義了一些巨集對日誌進行操作。文件要先設置文件名,然後真正寫入是調用logv函數,寫入前根據滾動規則獲取要寫入文件描述符,拼接當前時間等信息和傳入的要寫入的內容,實時寫入到文件中。

守護deamon

實現是目的個人理解是為了讓服務在後臺運行,測試exmple/daemon.cc的程式,用戶輸入後終端會退出,但是服務會不退出。實現流程是fork一個子進程,然後父進程執行退出,調用setsid讓子進程脫離當前終端的控制不隨當前終端結束而結束。

系統相關部件

位元組序轉換和遠程連接信息port_posix

實現了htobe的uint16_t,uint32_t,uint64_t,int16_t,int32_t,int64_t轉換
實現了獲取DNS信息的getHostByName("www.google.com")

net

  • fcntl設置文件描述符屬性:setNonBlock;setsockopt設置套接字屬性:setReuseAddr,setReusePort,setNoDelay
  • ip地址轉換<string ip, port>到struct sockaddr_in addr_
  • 字元串Slice切片,包含開始和結束字元指針,及一些相關操作;
  • 緩衝區buffer,設計一個可擴容動態伸縮的記憶體數組,本處實現的尾位置不可跨越(不可e_ < b_ ),即0<=b_ <= e_ <cap_。重要細節如下:
struct Buffer {
    Buffer() : buf_(NULL), b_(0), e_(0), cap_(0), exp_(512) {}
    ~Buffer() { delete[] buf_; } //析構的時候銷毀
    //統計屬性
    size_t size() const { return e_ - b_; } //有效數據長度
    bool empty() const { return e_ == b_; } //沒有有效數據
    char *data() const { return buf_ + b_; } //有效數據起地址
    char *begin() const { return buf_ + b_; }
    char *end() const { return buf_ + e_; }

    //記憶體分配,返回end()結果,分三種情況
    //1) end_ + len <= cap,足夠記憶體容納,不需要修改記憶體
    //2) size() + len < cap_ / 2,可容納len,但一般以上的記憶體都在尾部,需要執行moveHead即把有效數據移動到buf_上讓b_=0
    //3) 其他情況,expand(len),擴張的大小為max(exp_, 2*cap_, size() + len)
    char *makeRoom(size_t len);
    //分配長度為len的容量,返回數據結束位置
    char *allocRoom(size_t len) {
        char *p = makeRoom(len);
        addSize(len); //e_ += len;
        return p;
    }

    //增加一段數據
    Buffer &append(const char *p, size_t len) {
        memcpy(allocRoom(len), p, len); //1.調用allocRoom分配足夠容量,把新數據進去
        return *this;
    }
    //消費長度為len的數據,註意len一定小於size()
    Buffer &consume(size_t len) {
        b_ += len;
        if (size() == 0)
            clear();
        return *this;
    }
    Buffer &absorb(Buffer &buf); //交換this和buf
private:
    char *buf_;//記憶體的首地址
    size_t b_, e_, cap_, exp_;//開始位置,結束位置,總容量,exp_期望大小
    void copyFrom(const Buffer &b); //深拷貝b,先拷貝參數,然後this.buf_=new char[b.cap_];memcpy(this.buf_+b_,bu.buf_+b_,b.size())

多路復用Epoll的封裝poller

poll/epoll能管理的不僅僅是套接字,而是所有的文件描述符,在linux中管道,timefd_create,eventfd都是可以納入epoll來管理,因此要對epoll做簡單的封裝,核心的內容是addChannel,removeChannel,updateChannel對channel中的文件描述符fd和事件event的管理。

//poller.h
struct PollerBase : private noncopyable {
    int64_t id_;
    int lastActive_;
    PollerBase() : lastActive_(-1) {
        static std::atomic<int64_t> id(0);
        id_ = ++id;
    }
    virtual void addChannel(Channel *ch) = 0;
    virtual void removeChannel(Channel *ch) = 0;
    virtual void updateChannel(Channel *ch) = 0;
    virtual void loop_once(int waitMs) = 0;
    virtual ~PollerBase(){};
};

PollerBase *createPoller(); //返回一個繼承自PollerBase的PollerEpoll

struct PollerEpoll : public PollerBase {
    int fd_; //epoll對象,在構造函數中通過epoll_create得到
    std::set<Channel *> liveChannels_; //Channel集合,可認為是要關註<fd,event>集合,不擁有他們的生命周器
    // for epoll selected active events
    struct epoll_event activeEvs_[kMaxEvents]; //epoll_wait返回的活躍文件描述符
    PollerEpoll(); //epoll_create1(EPOLL_CLOEXEC);
    ~PollerEpoll(); //while (liveChannels_.size()) {(*liveChannels_.begin())->close();};  ::close(fd_);
    void addChannel(Channel *ch) override; //加入關註int r = epoll_ctl(fd_, EPOLL_CTL_ADD, ch->fd(), &ev);liveChannels_.insert(ch);
    void removeChannel(Channel *ch) override;//取消關註liveChannels_.erase(ch);
    void updateChannel(Channel *ch) override;//更新關註int r = epoll_ctl(fd_, EPOLL_CTL_MOD, ch->fd(), &ev);activeEvs_[i].data.ptr = NULL;(這一個是為什麼呢?)
    void loop_once(int waitMs) override;//等待epoll對象返回,回調對應的事件給通道lastActive_ = epoll_wait(fd_, activeEvs_, kMaxEvents, waitMs);Channel *ch = (Channel *) activeEvs_[i].data.ptr;ch->handleWrite();
};

協議相關

流數據長度和內容codec

TCP是基於位元組流(STREAM)的可靠協議,客戶端一條最小的有意義的數據稱為一幀,基於流意味著數據幀可能兩幀數據同時到達,或者數據幀不全的情況。服務端應用要根據和客戶端約定的協議分離出一幀幀數據,響應相應的請求。

//codec.h
struct CodecBase {
    // > 0 解析出完整消息,消息放在msg中,返回已掃描的位元組數
    // == 0 解析部分消息
    // < 0 解析錯誤
    virtual int tryDecode(Slice data, Slice &msg) = 0;
    virtual void encode(Slice msg, Buffer &buf) = 0;
    virtual CodecBase *clone() = 0;
};
//以\r\n結尾的消息
struct LineCodec : public CodecBase {
    int tryDecode(Slice data, Slice &msg) override; //找到以\r\n或\n結尾的,返回長度和msg
    void encode(Slice msg, Buffer &buf) override; //給msg加上\r\n寫入到buf中
    CodecBase *clone() override { return new LineCodec(); }
}
//給出長度的消息,[4][len_4][msg_len]
struct LengthCodec : public CodecBase {
    int tryDecode(Slice data, Slice &msg) override;//首部8位元組,第4-8位元組為長度,如果有完成的數據返回長度和msg
    void encode(Slice msg, Buffer &buf) override;//給buf增加數據‘mBdT’+len(msg)+msg
    CodecBase *clone() override { return new LengthCodec(); }
}

非可靠傳輸協議UDP

UDP是一種簡單的面向數據報的運輸層協議,不提供可靠性,只是把應用程式傳給IP層的數據報發送出去,但是不能保證它們能到達目的地。在一些直播中會使用UDP,有一些游戲開發者也探索了UDP實現可靠性的可能。
UDP創建的流程:

    int fd = socket(AF_INET, SOCK_DGRAM, 0); //註意第二個參數為SOCK_DGRAM數據報流
    int r = net::setReuseAddr(fd);
    fatalif(r, "set socket reuse option failed");
    r = net::setReusePort(fd, reusePort);
    fatalif(r, "set socket reuse port option failed");
    r = util::addFdFlag(fd, FD_CLOEXEC);
    fatalif(r, "addFdFlag FD_CLOEXEC failed");
    r = ::bind(fd, (struct sockaddr *) &addr_.getAddr(), sizeof(struct sockaddr));

讀寫UDP的命令如下:

    //recvfrom
    truct sockaddr_in raddr;
    socklen_t rsz = sizeof(raddr);
    ssize_t rn = recvfrom(fd, buf, bufsize, 0, (sockaddr *) &raddr, &rsz);
    if (rn < 0) {
        error("udp %d recv failed: %d %s", fd, errno, strerror(errno));
        return;
    }

    //sendto
    truct sockaddr_in raddr;
    socklen_t rsz = sizeof(raddr);
    int wn = ::sendto(fd, buf, bufsize, 0, (sockaddr *) raddr, rsz);

WEB常用HTTP協議

http協議應該是每一個網路人直接接觸最多的內容,因為BS和部分CS結構網路傳輸都是用http,因為其簡單且描述的內容很全面。
http的交互分為客戶端和服務端,客戶端也可以是瀏覽器,客戶端發起的請求叫做HTTP請求(HTTP Request),其包括:request line + header + body,header與body之間有一個\r\n;HTTP的請求方法有Get, Post, Head, Put, Delete等。HTTP請求的回覆(HTTP Response)包括:status line + header + body (header分為普通報頭,響應報頭與實體報頭)
一個典型的請求:

GET http://nooverfit.com/wp/ HTTP/1.1
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Language: zh-Hans-CN,zh-Hans;q=0.5
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.18362
Accept-Encoding: gzip, deflate
Host: nooverfit.com
Connection: Keep-Alive
Cookie: Hm_lvt_416c770ac83a9d9wewewe=15678wwewe,1568260075; Hm_lvt_bfc6c239dfdfad0bbfed25f88a973fb0=1559dfd232

//HTTP Response
HTTP/1.1 200 OK
Server:
Date: Thu, 19 Sep 2019 16:10:38 GMT
Content-Type: text/html; charset=UTF-8
Transfer-Encoding: chunked
Connection: keep-alive
Vary: Cookie,Accept-Encoding,User-Agent
Upgrade: h2,h2c
Accept-Ranges: bytes
Referrer-Policy: 

<html><head><title>This is title</title></head><body><h1>Hello</h1>Now is 20130611 02:14:31.518462</body></html>

對http實現來說首先是要解析請求和回覆,HttpMsg就是對http協議消息的解析,結果是分離出一個完整的請求幀

struct HttpMsg {
    enum Result {
        Error,
        Complete,
        NotComplete,
        Continue100,
    };
    HttpMsg() { HttpMsg::clear(); };
    //內容添加到buf,返回寫入的位元組數
    virtual int encode(Buffer &buf) = 0;
    //嘗試從buf中解析,預設複製body內容
    virtual Result tryDecode(Slice buf, bool copyBody = true) = 0;
    //清空消息相關的欄位
    virtual void clear();

    std::map<std::string, std::string> headers;
    std::string version, body;
    // body可能較大,為了避免數據複製,加入body2
    Slice body2;

    std::string getHeader(const std::string &n) { return getValueFromMap_(headers, n); }
    Slice getBody() { return body2.size() ? body2 : (Slice) body; }

    //如果tryDecode返回Complete,則返回已解析的位元組數
    int getByte() { return scanned_; }
    //...
}

得到完整請求幀後就是分析對應的請求方法和請求資源

struct HttpRequest : public HttpMsg {
    std::map<std::string, std::string> args;
    std::string method, uri, query_uri; //請求的方法和uri
    virtual int encode(Buffer &buf);
    virtual Result tryDecode(Slice buf, bool copyBody = true);
    //...
}

處理完請求之後就是回饋給對應的客戶端

struct HttpResponse : public HttpMsg {
    std::string statusWord; //example "ok"
    int status; // example 200
    //...
}

網路封裝

到了最後才是最難的網路封裝部分,先上一個muduo網路庫的圖,這個是典型的reactor模式的設計,主要借鑒於java的NIO網路模型的設計

首先有一個事件迴圈,會實例化一個poller,然後也會導出定時器介面,然後應用層會是tcp或者http服務的套接字會半丁到channel,通過EventLoop的updateloop加入poller對象關註,當有連接到來則回調channel中相關回調,最後傳遞到客戶和服務方。handy的設計像是muduo的簡化版本,沒那麼繁雜。even_base中實現和event_imp事件迴圈(不斷調用poller::loop_once)和計時定時器,Channel通道(文件描述符擁有著,控制關註事件,可讀可寫事件回調),

//event_base.cpp
//事件迴圈類
struct EventsImp {
PollerBase *poller_;
SafeQueue<Task> tasks_;

void loop_once(int waitMs) {
        poller_->loop_once(std::min(waitMs, nextTimeout_));
        handleTimeouts();
    }
void EventsImp::loop() {
    while (!exit_)
            loop_once(10000);
    //...
//添加超時任務
void safeCall(const Task &task) { safeCall(Task(task)); }
void safeCall(Task &&task) {
        tasks_.push(move(task));
        wakeup();
    }
//...
};

//通道,封裝了可以進行epoll的一個fd
struct Channel {
protected:
    EventBase *base_; //一個Channel一定屬於一個EventBase
    PollerBase *poller_; //base_->poller_
    int fd_; //初始化綁定的文件描述符
    short events_; //當前的關註事件
    int64_t id_; //遞增標記
    std::function<void()> readcb_, writecb_, errorcb_; //讀寫錯誤回調

    // base為事件管理器,fd為通道內部的fd,events為通道關心的事件,構造最後會調用poller_->addChannel(this);加入poller中
    Channel(EventBase *base, int fd, int events);

    //設置回調
    void onRead(const Task &readcb) { readcb_ = readcb; }
    void onWrite(const Task &writecb) { writecb_ = writecb; }
    void onRead(Task &&readcb) { readcb_ = std::move(readcb); }
    void onWrite(Task &&writecb) { writecb_ = std::move(writecb); }

    //啟用讀寫監聽
    void enableRead(bool enable); //設置events_;更新通道poller_->updateChannel(this);
    void enableWrite(bool enable);
    void enableReadWrite(bool readable, bool writable);
    bool readEnabled(); //返回是否關註了可讀return events_ & kReadEvent;
    bool writeEnabled();//返回是否關註了可寫return events_ & kWriteEvent;
    
    //處理讀寫事件
    void handleRead() { readcb_(); } //在poller的loop_once迴圈中,會根據struct epoll_event.data.ptr轉換為Channel,如果可讀則調用對應的handleRead
    void handleWrite() { writecb_(); }//在poller的loop_once迴圈中,會根據struct epoll_event.data.ptr轉換為Channel,如果可寫則調用對應的handleWrite
}

在TCP數據能收到(回調)後,重要的是如何保存客戶端的數據,處理完請求後發送給對應的客戶端,因為有多個客戶端的存在,因此要使用TcpConn來記錄哪些TCP到來了,處理結果要回饋給哪個數據。

//conn.h
// Tcp連接,使用引用計數
struct TcpConn : public std::enable_shared_from_this<TcpConn> {
    // Tcp連接的個狀態
    enum State {
        Invalid = 1,
        Handshaking,
        Connected,
        Closed,
        Failed,
    };
    //服務端
    static TcpConnPtr createConnection(EventBase *base, int fd, Ip4Addr local, Ip4Addr peer) {
            TcpConnPtr con(new C);
            con->attach(base, fd, local, peer);
            return con;
        }
    void attach(EventBase *base, int fd, Ip4Addr local, Ip4Addr peer) {
        fatalif((destPort_ <= 0 && state_ != State::Invalid) || (destPort_ >= 0 && state_ != State::Handshaking),
            "you should use a new TcpConn to attach. state: %d", state_);
        base_ = base;
        state_ = State::Handshaking;
        local_ = local;
        peer_ = peer;
        delete channel_;
        channel_ = new Channel(base, fd, kWriteEvent | kReadEvent);
        trace("tcp constructed %s - %s fd: %d", local_.toString().c_str(), peer_.toString().c_str(), fd);
        TcpConnPtr con = shared_from_this();
        con->channel_->onRead([=] { con->handleRead(con); });
        con->channel_->onWrite([=] { con->handleWrite(con); });
    }

    //發送數據
    void sendOutput() { send(output_); }//return ::write(channel_->fd, buf, bytes);if (wd == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) //寫對應fd,如果寫失敗關註可寫事件(水平觸發模式)
    
    //收到數據
    void handleRead(const TcpConnPtr &con) {
        while (state_ == State::Connected) {
            input_.makeRoom();
            int rd = readImp(channel_->fd(), input_.end(), input_.space());
            if(rd > 0) input_.addSize(rd);
        }
        //...
    }

    //客戶端
    template <class C = TcpConn>
    static TcpConnPtr createConnection(EventBase *base, const std::string &host, unsigned short port, int timeout = 0, const std::string &localip = "") {
        TcpConnPtr con(new C);
        con->connect(base, host, port, timeout, localip); //執行connect
        return con;
    }
public:
    EventBase *base_; //屬於哪一個事件迴圈
    Channel *channel_; //屬於哪一個通道
    Buffer input_, output_; //輸入和輸出緩衝區
    Ip4Addr local_, peer_; //本地的套接字
    State state_; //連接狀態
    TcpCallBack readcb_, writablecb_, statecb_;//讀寫,連入/練出狀態回調
    std::string destHost_, localIp_;
    std::unique_ptr<CodecBase> codec_; //對應codec
};

//伺服器
struct TcpServer {
    TcpServer(EventBases *bases); //屬於哪一個事件迴圈
    int bind(const std::string &host, unsigned short port, bool reusePort = false); //socket,bind,listen,創建listen_channel設置讀回調為handleAccept()
    static TcpServerPtr startServer(EventBases *bases, const std::string &host, unsigned short port, bool reusePort = false); //創建一個TcpServer,並調用bind函數
    void onConnState(const TcpCallBack &cb);//有新的連接連入
    // 消息處理與Read回調衝突,只能調用一個
    void onConnMsg(CodecBase *codec, const MsgCallBack &cb) {
        codec_.reset(codec);
        msgcb_ = cb;
        assert(!readcb_);
    }
private:
    EventBase *base_;//屬於哪一個事件迴圈
    Ip4Addr addr_; //服務端地址
    Channel *listen_channel_; //服務端的Channel
    TcpCallBack statecb_, readcb_; //讀寫回調
    MsgCallBack msgcb_; //消息回調
    std::unique_ptr<CodecBase> codec_;
    void handleAccept();//有新的連接到來,accept得到客戶套接字cfd,創建一個TcpConnPtr綁定cfd,設置conn的讀寫和消息回調
    //...
};

結尾

以上就是handy的基本分析,總結來說算輕量級的muduo,可能還不應該用在生產環境,畢竟花一天多就能看得七七八八。最後就是示例代碼了。

//example/echo.cc
#include <handy/handy.h>
using namespace handy;

int main(int argc, const char *argv[]) {
    EventBase base;
    Signal::signal(SIGINT, [&] { base.exit(); });
    TcpServerPtr svr = TcpServer::startServer(&base, "", 2099);
    exitif(svr == NULL, "start tcp server failed");
    svr->onConnRead([](const TcpConnPtr &con) { con->send(con->getInput()); });
    base.loop();
}
//example/http-hello.cc
#include <handy/handy.h>

using namespace std;
using namespace handy;

int main(int argc, const char *argv[]) {
    int threads = 1;
    if (argc > 1) {
        threads = atoi(argv[1]);
    }
    setloglevel("TRACE");
    MultiBase base(threads);
    HttpServer sample(&base);
    int r = sample.bind("", 8081);
    exitif(r, "bind failed %d %s", errno, strerror(errno));
    sample.onGet("/hello", [](const HttpConnPtr &con) {
        string v = con.getRequest().version;
        HttpResponse resp;
        resp.body = Slice("hello world");
        con.sendResponse(resp);
        if (v == "HTTP/1.0") {
            con->close();
        }
    });
    Signal::signal(SIGINT, [&] { base.exit(); });
    base.loop();
    return 0;
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、日誌框架概述1.1 日誌框架的產生1.2 市面上的日誌框架二、SLF4j 使用與整合2.1 如何在系統中使用SLF4j2.2 如何整合日誌框架2.3 SpringBoot中的日誌關係三、日誌使用3.1 預設配置3.2 日誌格式3.2 指定配置四、切換日誌框架一、日誌框架概述1.1 日誌框架的產生... ...
  • == 與 equals()的聯繫: ==: 我們都知道Java中 == 對用於基礎數據類型(byte, short, int, long, float, double, boolean, char)判斷時, 是直接對變數值的比較. 而對於引用類型變數則是對變數地址的比較. equals(): 我們可 ...
  • 本案例主要講解 實現分散式鎖的兩種實現方式: 實現、 實現。網上關於這方面講解太多了,Van自認為文筆沒他們好,還是用示例代碼說明。 一、 實現 該方案只考慮 單機部署的場景 1.1 加鎖 1.1.1 原理 1. : 使用 來當鎖,因為 是唯一的; 1. : 我傳的是唯一值( ),很多童鞋可能不明白 ...
  • 2019-09-20-23:24:15 今天逛論壇、逛知識星球時、逛b站up主時,都說到低學歷,非科班的人最好不要去自學Python 他們都說:如果我們學python是為了找工作,最好不要把python作為主要編程語言, 因為python的主流方向都要求比較高,不適合我這種低學歷,非科班生的人學 學 ...
  • 1,下載anaconda 安裝 2,配置環境變數 anaconda3/Scripts anaconda3/Library/bin anaconda3/library/mingw-64/bin 此時配置完成 操作時主要是用 scripts下的conda.exe 註意:執行命令例如: conda lis ...
  • 之前,我做了一個實訓的項目,但是一直沒有展示如何做的,現在就讓我講解一下如何用django+bootstrap4+mysql實現這個智慧交通系統。這裡用到的是網頁的bootstrap4框架和mysql資料庫。 一:軟體項目內容 1.實訓項目簡介 智慧交通系統是旨在提高員工的日常工作便捷性、工作效率, ...
  • 最近一個月都在弄某某交易所,讓之前學的東西能夠用上,在這裡分享一下收貨的東西吧 #### 簡介 系統是進行了二次開發,用的是ZTuo開源框架第一個版本,節省了很多時間(坑也挺多,哈哈哈),文章結尾貼源碼鏈接 ,在這裡說一聲謝謝您們的付出。首先來張我自己畫的架構圖(獻醜了) ![file](https ...
  • 之前有一些人問我java怎麼學?其實我第一個建議是:“敲代碼!敲代碼!不光要看,更要乾!”,很多人光看不練,視頻收藏了一堆,就是不看,或者看了一堆視頻,就是不敲。這種學習方式:沒用!只有你敲了代碼之後,這個內容才是你的。 當你學編程的時候,要看看你的鍵盤,以敲壞鍵盤為人生目標!什麼時候你通過敲代碼把 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...