twemproxy發送流程探索——剖析twemproxy代碼正編

来源:http://www.cnblogs.com/onlyac/archive/2017/06/11/6361524.html
-Advertisement-
Play Games

本文想要完成對twemproxy發送流程——msg_send的探索,對於twemproxy發送流程的數據結構已經在《twemproxy接收流程探索——剖析twemproxy代碼正編》介紹過了,msg_send和msg_recv的流程大致類似。請在閱讀代碼時,查看註釋,英文註釋是作者對它的代碼的註解, ...


本文想要完成對twemproxy發送流程——msg_send的探索,對於twemproxy發送流程的數據結構已經在《twemproxy接收流程探索——剖析twemproxy代碼正編》介紹過了,msg_send和msg_recv的流程大致類似。請在閱讀代碼時,查看註釋,英文註釋是作者對它的代碼的註解,中文註釋是我自己的感悟。

函數msg_send

 1 rstatus_t
 2 msg_send(struct context *ctx, struct conn *conn)
 3 {
 4     rstatus_t status;
 5     struct msg *msg;
 6     /*表示活躍的發送狀態*/
 7     ASSERT(conn->send_active);
 8     /*表示準備發送*/
 9     conn->send_ready = 1;
10     do {
11         /*獲取下一次發送的msg開頭*/
12         msg = conn->send_next(ctx, conn);
13         if (msg == NULL) {
14             /* nothing to send */
15             return NC_OK;
16         }
17         /*發送框架,在此框架內conn->send_ready會改變*/
18         status = msg_send_chain(ctx, conn, msg);
19         if (status != NC_OK) {
20             return status;
21         }
22 
23     } while (conn->send_ready);
24 
25     return NC_OK;
26 }

 

發送框架msg_send_chain

由於在發送時,其底層採用writev的高效發送方式,難免出現數據發送到一邊,系統的發送隊列已滿的情況,面對這種尷尬的情況,你應該如何處理?twemproxy的作者給出了自己的方式。

  1 static rstatus_t
  2 msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg)
  3 {
  4     struct msg_tqh send_msgq;            /* send msg q */
  5     struct msg *nmsg;                    /* next msg */
  6     struct mbuf *mbuf, *nbuf;            /* current and next mbuf */
  7     size_t mlen;                         /* current mbuf data length */
  8     struct iovec *ciov, iov[NC_IOV_MAX]; /* current iovec */
  9     struct array sendv;                  /* send iovec */
 10     size_t nsend, nsent;                 /* bytes to send; bytes sent */
 11     size_t limit;                        /* bytes to send limit */
 12     ssize_t n;                           /* bytes sent by sendv */
 13 
 14     TAILQ_INIT(&send_msgq);
 15 
 16     array_set(&sendv, iov, sizeof(iov[0]), NC_IOV_MAX);
 17 
 18     /* preprocess - build iovec */
 19 
 20     nsend = 0;
 21     /*
 22      * readv() and writev() returns EINVAL if the sum of the iov_len values
 23      * overflows an ssize_t value Or, the vector count iovcnt is less than
 24      * zero or greater than the permitted maximum.
 25      */
 26     limit = SSIZE_MAX;
 27 
 28     /*
 29      *send_msgq是一個臨時的發送隊列,將當前能進行發送的msg,即處理完的msg
 30      *進行存儲。發送隊列僅僅自後面處理時能讓調用者以msg的buf為單位處理。
 31      *sendv是一個字元串數組,由於發送底層採用的函數是writev,為此sendv將發
 32      *送的數據都存儲在一起,sendv才是真正發送的數據記憶體。
 33      */
 34     for (;;) {
 35         ASSERT(conn->smsg == msg);
 36 
 37         TAILQ_INSERT_TAIL(&send_msgq, msg, m_tqe);
 38 
 39         for (mbuf = STAILQ_FIRST(&msg->mhdr);
 40              mbuf != NULL && array_n(&sendv) < NC_IOV_MAX && nsend < limit;
 41              mbuf = nbuf) {
 42             nbuf = STAILQ_NEXT(mbuf, next);
 43             /*
 44              *發送的信息是否為空,即發送開始的位元組位置是否和結束位置一致。
 45              *在處理redis多key命令的mget,mdel,mset以及memcached多key命令
 46              *get,gets時,由於分片的原因,分片後的msg也會在客戶端發送隊列
 47              *中。在分片處理完要發送後,這些分片的msg應該不能被髮送,為此,
 48              *對於分片的msg的pos進行了將msg的發送量置為空,這邊的sendv在添
 49              *加發送內容時,忽視了這些分片。
 50              */
 51             if (mbuf_empty(mbuf)) {
 52                 continue;
 53             }
 54 
 55             mlen = mbuf_length(mbuf);
 56             if ((nsend + mlen) > limit) {
 57                 mlen = limit - nsend;
 58             }
 59 
 60             ciov = array_push(&sendv);
 61             ciov->iov_base = mbuf->pos;
 62             ciov->iov_len = mlen;
 63 
 64             nsend += mlen;
 65         }
 66 
 67         /*超過發送限制*/
 68         if (array_n(&sendv) >= NC_IOV_MAX || nsend >= limit) {
 69             break;
 70         }
 71 
 72         /*不存在發送內容*/
 73         msg = conn->send_next(ctx, conn);
 74         if (msg == NULL) {
 75             break;
 76         }
 77     }
 78 
 79     /*
 80      * (nsend == 0) is possible in redis multi-del
 81      * see PR: https://github.com/twitter/twemproxy/pull/225
 82      */
 83 
 84     /*發送函數conn_sendv*/
 85     conn->smsg = NULL;
 86     if (!TAILQ_EMPTY(&send_msgq) && nsend != 0) {
 87         n = conn_sendv(conn, &sendv, nsend);
 88     } else {
 89         n = 0;
 90     }
 91 
 92     nsent = n > 0 ? (size_t)n : 0;
 93 
 94     /* postprocess - process sent messages in send_msgq */
 95     /*
 96      *由於其發送函數底層採用writev,在發送過程中可能存在發送中斷或者發送
 97      *數據沒有全部發出的情況,為此需要通過實際發送的位元組數nsent來確認系統
 98      *實際上發送到了哪一個msg的哪一個mbuf的哪一個位元組pos,以便下一次從pos
 99      *開始發送實際的內容,以免重覆發送相同的內容,導致不可見的錯誤。
100      */
101     for (msg = TAILQ_FIRST(&send_msgq); msg != NULL; msg = nmsg) {
102         nmsg = TAILQ_NEXT(msg, m_tqe);
103 
104         TAILQ_REMOVE(&send_msgq, msg, m_tqe);
105 
106         /*發送內容為空,進行發送完的處理*/
107         if (nsent == 0) {
108             if (msg->mlen == 0) {
109                 conn->send_done(ctx, conn, msg);
110             }
111             continue;
112         }
113 
114         /* adjust mbufs of the sent message */
115         for (mbuf = STAILQ_FIRST(&msg->mhdr); mbuf != NULL; mbuf = nbuf) {
116             nbuf = STAILQ_NEXT(mbuf, next);
117 
118             if (mbuf_empty(mbuf)) {
119                 continue;
120             }
121 
122             mlen = mbuf_length(mbuf);
123             if (nsent < mlen) {
124                 /* mbuf was sent partially; process remaining bytes later */
125                 /*此處確認了實際上發送到了哪一個msg的哪一個mbuf的哪一個位元組pos*/
126                 mbuf->pos += nsent;
127                 ASSERT(mbuf->pos < mbuf->last);
128                 nsent = 0;
129                 break;
130             }
131 
132             /* mbuf was sent completely; mark it empty */
133             mbuf->pos = mbuf->last;
134             nsent -= mlen;
135         }
136 
137         /* message has been sent completely, finalize it */
138         if (mbuf == NULL) {
139             conn->send_done(ctx, conn, msg);
140         }
141     }
142 
143     ASSERT(TAILQ_EMPTY(&send_msgq));
144 
145     if (n >= 0) {
146         return NC_OK;
147     }
148 
149     return (n == NC_EAGAIN) ? NC_OK : NC_ERROR;
150 }

 發送函數conn_sendv

 writev作為一個高效的網路io,它的正確用法一直是個問題,這裡給出了twemproxy的作者給出了自己正確的註解。對於其的異常處理值得借鑒

 1 ssize_t
 2 conn_sendv(struct conn *conn, struct array *sendv, size_t nsend)
 3 {
 4     ssize_t n;
 5 
 6     ASSERT(array_n(sendv) > 0);
 7     ASSERT(nsend != 0);
 8     ASSERT(conn->send_ready);
 9 
10     for (;;) {
11         /*這裡的nc_writev就是writev*/
12         n = nc_writev(conn->sd, sendv->elem, sendv->nelem);
13 
14         log_debug(LOG_VERB, "sendv on sd %d %zd of %zu in %"PRIu32" buffers",
15                   conn->sd, n, nsend, sendv->nelem);
16 
17         if (n > 0) {
18             /*
19              *已發送數據長度比待發送數據長度小,說明系統發送隊列已滿或者不
20              *可寫,此刻需要停止發送數據。
21              */
22             if (n < (ssize_t) nsend) {
23                 conn->send_ready = 0;
24             }
25             conn->send_bytes += (size_t)n;
26             return n;
27         }
28 
29         if (n == 0) {
30             log_warn("sendv on sd %d returned zero", conn->sd);
31             conn->send_ready = 0;
32             return 0;
33         }
34         /*
35          *EINTR表示由於信號中斷,沒發送成功任何數據,此刻需要停止發送數據。
36          *EAGAIN以及EWOULDBLOCK表示系統發送隊列已滿或者不可寫,為此沒發送
37          *成功任何數據,此刻需要停止發送數據,等待下次發送。
38          *除了上述兩種錯誤,其他的錯誤為連接出現了問題需要停止發送數據並
39          *進行斷鏈操作,conn->err非零時在程式流程中會觸發斷鏈。
40          */
41         if (errno == EINTR) {
42             log_debug(LOG_VERB, "sendv on sd %d not ready - eintr", conn->sd);
43             continue;
44         } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
45             conn->send_ready = 0;
46             log_debug(LOG_VERB, "sendv on sd %d not ready - eagain", conn->sd);
47             return NC_EAGAIN;
48         } else {
49             conn->send_ready = 0;
50             conn->err = errno;
51             log_error("sendv on sd %d failed: %s", conn->sd, strerror(errno));
52             return NC_ERROR;
53         }
54     }
55 
56     NOT_REACHED();
57 
58     return NC_ERROR;
59 }

小結

在這短短的數百行代碼中,我們獲知了msg_send的簡單過程,最最重要的是我們知道了writev函數的發送內容處理和異常處理,特別是它如教科書般的異常處理方式使我收益良多。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Runtime學習 應用源碼學習   Runtime源碼分析,帶你瞭解OC實現過程。其中參考了大量的大神的代碼以及文獻,裡面也有個人的見解,歡迎拍磚,歡迎交流。 兩種常見使用場景 根據調試信息,發現兩者的區別是: 第一種進入到 第二種繞一個遠路,先初始化 兩者最終進入到如下方法 ...
  • Android實現定時任務一般會使用以上(Handler Timer Thread AlarmManager CountDownTimer)五種方式。當然還有很多組合使用(比如Handler+Thread 比如Handler類自帶的postDelyed 比如Handler+Timer+TimerTa ...
  • 簡單的說,就是當dispatchTouchEvent在進行事件分發的時候,只有前一個事件(如ACTION_DOWN)返回true,才會收到ACTION_MOVE和ACTION_UP的事件。 dispatchTouchEvent 和 onTouchEvent 可以通過return true 消費事件, ...
  • 在WWDC 2017開發者大會上,蘋果宣佈了一系列新的面向開發者的機器學習 API,包括面部識別的視覺 API、自然語言處理 API,這些 API 集成了蘋果所謂的 Core ML 框架。Core ML 的核心是加速在 iPhone、iPad、Apple Watch 上的人工智慧任務,支持深度神經網 ...
  • iOS CAEmitterLayer 實現粒子發射動畫效果 效果圖 代碼已上傳 GitHub:https://github.com/Silence GitHub/CoreAnimationDemo 動畫效果用 CAEmitterLayer 實現。CAEmitterLayer 顯示粒子發射動畫,具體的 ...
  • OkHttp介紹 Android系統提供了兩種HTTP通信類,HttpURLConnection和HttpClient,HttpURLConnection相對來說比HttpClient難用,google自從2.3版本之後一直推薦使用HttpURLConnection,並且在6.0版本的sdk中直接刪 ...
  • 在iOS開發裡面我們經常會進行NSMutable(可變類型的類,常用的如NSMutableString,NSMutableArray,NSMutableDictionary,NSMutableData等)屬性的聲明,在聲明時我們都知道要使用strong(強引用)來進行標識,但是很多人不知道為什麼不能 ...
  • 1。不要充滿至 100% 。 2。不要放電至 0% 。 3。電池的容量儘量維持在 50%。 4。不要在高溫下操作。 5。不要使用快充功能的充電器。 6。使用一般的充電器,如,500mA 或是 800mA。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...