本文想要完成對twemproxy發送流程——msg_send的探索,對於twemproxy發送流程的數據結構已經在《twemproxy接收流程探索——剖析twemproxy代碼正編》介紹過了,msg_send和msg_recv的流程大致類似。請在閱讀代碼時,查看註釋,英文註釋是作者對它的代碼的註解, ...
本文想要完成對twemproxy發送流程——msg_send的探索,對於twemproxy發送流程的數據結構已經在《twemproxy接收流程探索——剖析twemproxy代碼正編》介紹過了,msg_send和msg_recv的流程大致類似。請在閱讀代碼時,查看註釋,英文註釋是作者對它的代碼的註解,中文註釋是我自己的感悟。
函數msg_send
1 rstatus_t 2 msg_send(struct context *ctx, struct conn *conn) 3 { 4 rstatus_t status; 5 struct msg *msg; 6 /*表示活躍的發送狀態*/ 7 ASSERT(conn->send_active); 8 /*表示準備發送*/ 9 conn->send_ready = 1; 10 do { 11 /*獲取下一次發送的msg開頭*/ 12 msg = conn->send_next(ctx, conn); 13 if (msg == NULL) { 14 /* nothing to send */ 15 return NC_OK; 16 } 17 /*發送框架,在此框架內conn->send_ready會改變*/ 18 status = msg_send_chain(ctx, conn, msg); 19 if (status != NC_OK) { 20 return status; 21 } 22 23 } while (conn->send_ready); 24 25 return NC_OK; 26 }
發送框架msg_send_chain
由於在發送時,其底層採用writev的高效發送方式,難免出現數據發送到一邊,系統的發送隊列已滿的情況,面對這種尷尬的情況,你應該如何處理?twemproxy的作者給出了自己的方式。
1 static rstatus_t 2 msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg) 3 { 4 struct msg_tqh send_msgq; /* send msg q */ 5 struct msg *nmsg; /* next msg */ 6 struct mbuf *mbuf, *nbuf; /* current and next mbuf */ 7 size_t mlen; /* current mbuf data length */ 8 struct iovec *ciov, iov[NC_IOV_MAX]; /* current iovec */ 9 struct array sendv; /* send iovec */ 10 size_t nsend, nsent; /* bytes to send; bytes sent */ 11 size_t limit; /* bytes to send limit */ 12 ssize_t n; /* bytes sent by sendv */ 13 14 TAILQ_INIT(&send_msgq); 15 16 array_set(&sendv, iov, sizeof(iov[0]), NC_IOV_MAX); 17 18 /* preprocess - build iovec */ 19 20 nsend = 0; 21 /* 22 * readv() and writev() returns EINVAL if the sum of the iov_len values 23 * overflows an ssize_t value Or, the vector count iovcnt is less than 24 * zero or greater than the permitted maximum. 25 */ 26 limit = SSIZE_MAX; 27 28 /* 29 *send_msgq是一個臨時的發送隊列,將當前能進行發送的msg,即處理完的msg 30 *進行存儲。發送隊列僅僅自後面處理時能讓調用者以msg的buf為單位處理。 31 *sendv是一個字元串數組,由於發送底層採用的函數是writev,為此sendv將發 32 *送的數據都存儲在一起,sendv才是真正發送的數據記憶體。 33 */ 34 for (;;) { 35 ASSERT(conn->smsg == msg); 36 37 TAILQ_INSERT_TAIL(&send_msgq, msg, m_tqe); 38 39 for (mbuf = STAILQ_FIRST(&msg->mhdr); 40 mbuf != NULL && array_n(&sendv) < NC_IOV_MAX && nsend < limit; 41 mbuf = nbuf) { 42 nbuf = STAILQ_NEXT(mbuf, next); 43 /* 44 *發送的信息是否為空,即發送開始的位元組位置是否和結束位置一致。 45 *在處理redis多key命令的mget,mdel,mset以及memcached多key命令 46 *get,gets時,由於分片的原因,分片後的msg也會在客戶端發送隊列 47 *中。在分片處理完要發送後,這些分片的msg應該不能被髮送,為此, 48 *對於分片的msg的pos進行了將msg的發送量置為空,這邊的sendv在添 49 *加發送內容時,忽視了這些分片。 50 */ 51 if (mbuf_empty(mbuf)) { 52 continue; 53 } 54 55 mlen = mbuf_length(mbuf); 56 if ((nsend + mlen) > limit) { 57 mlen = limit - nsend; 58 } 59 60 ciov = array_push(&sendv); 61 ciov->iov_base = mbuf->pos; 62 ciov->iov_len = mlen; 63 64 nsend += mlen; 65 } 66 67 /*超過發送限制*/ 68 if (array_n(&sendv) >= NC_IOV_MAX || nsend >= limit) { 69 break; 70 } 71 72 /*不存在發送內容*/ 73 msg = conn->send_next(ctx, conn); 74 if (msg == NULL) { 75 break; 76 } 77 } 78 79 /* 80 * (nsend == 0) is possible in redis multi-del 81 * see PR: https://github.com/twitter/twemproxy/pull/225 82 */ 83 84 /*發送函數conn_sendv*/ 85 conn->smsg = NULL; 86 if (!TAILQ_EMPTY(&send_msgq) && nsend != 0) { 87 n = conn_sendv(conn, &sendv, nsend); 88 } else { 89 n = 0; 90 } 91 92 nsent = n > 0 ? (size_t)n : 0; 93 94 /* postprocess - process sent messages in send_msgq */ 95 /* 96 *由於其發送函數底層採用writev,在發送過程中可能存在發送中斷或者發送 97 *數據沒有全部發出的情況,為此需要通過實際發送的位元組數nsent來確認系統 98 *實際上發送到了哪一個msg的哪一個mbuf的哪一個位元組pos,以便下一次從pos 99 *開始發送實際的內容,以免重覆發送相同的內容,導致不可見的錯誤。 100 */ 101 for (msg = TAILQ_FIRST(&send_msgq); msg != NULL; msg = nmsg) { 102 nmsg = TAILQ_NEXT(msg, m_tqe); 103 104 TAILQ_REMOVE(&send_msgq, msg, m_tqe); 105 106 /*發送內容為空,進行發送完的處理*/ 107 if (nsent == 0) { 108 if (msg->mlen == 0) { 109 conn->send_done(ctx, conn, msg); 110 } 111 continue; 112 } 113 114 /* adjust mbufs of the sent message */ 115 for (mbuf = STAILQ_FIRST(&msg->mhdr); mbuf != NULL; mbuf = nbuf) { 116 nbuf = STAILQ_NEXT(mbuf, next); 117 118 if (mbuf_empty(mbuf)) { 119 continue; 120 } 121 122 mlen = mbuf_length(mbuf); 123 if (nsent < mlen) { 124 /* mbuf was sent partially; process remaining bytes later */ 125 /*此處確認了實際上發送到了哪一個msg的哪一個mbuf的哪一個位元組pos*/ 126 mbuf->pos += nsent; 127 ASSERT(mbuf->pos < mbuf->last); 128 nsent = 0; 129 break; 130 } 131 132 /* mbuf was sent completely; mark it empty */ 133 mbuf->pos = mbuf->last; 134 nsent -= mlen; 135 } 136 137 /* message has been sent completely, finalize it */ 138 if (mbuf == NULL) { 139 conn->send_done(ctx, conn, msg); 140 } 141 } 142 143 ASSERT(TAILQ_EMPTY(&send_msgq)); 144 145 if (n >= 0) { 146 return NC_OK; 147 } 148 149 return (n == NC_EAGAIN) ? NC_OK : NC_ERROR; 150 }
發送函數conn_sendv
writev作為一個高效的網路io,它的正確用法一直是個問題,這裡給出了twemproxy的作者給出了自己正確的註解。對於其的異常處理值得借鑒
1 ssize_t 2 conn_sendv(struct conn *conn, struct array *sendv, size_t nsend) 3 { 4 ssize_t n; 5 6 ASSERT(array_n(sendv) > 0); 7 ASSERT(nsend != 0); 8 ASSERT(conn->send_ready); 9 10 for (;;) { 11 /*這裡的nc_writev就是writev*/ 12 n = nc_writev(conn->sd, sendv->elem, sendv->nelem); 13 14 log_debug(LOG_VERB, "sendv on sd %d %zd of %zu in %"PRIu32" buffers", 15 conn->sd, n, nsend, sendv->nelem); 16 17 if (n > 0) { 18 /* 19 *已發送數據長度比待發送數據長度小,說明系統發送隊列已滿或者不 20 *可寫,此刻需要停止發送數據。 21 */ 22 if (n < (ssize_t) nsend) { 23 conn->send_ready = 0; 24 } 25 conn->send_bytes += (size_t)n; 26 return n; 27 } 28 29 if (n == 0) { 30 log_warn("sendv on sd %d returned zero", conn->sd); 31 conn->send_ready = 0; 32 return 0; 33 } 34 /* 35 *EINTR表示由於信號中斷,沒發送成功任何數據,此刻需要停止發送數據。 36 *EAGAIN以及EWOULDBLOCK表示系統發送隊列已滿或者不可寫,為此沒發送 37 *成功任何數據,此刻需要停止發送數據,等待下次發送。 38 *除了上述兩種錯誤,其他的錯誤為連接出現了問題需要停止發送數據並 39 *進行斷鏈操作,conn->err非零時在程式流程中會觸發斷鏈。 40 */ 41 if (errno == EINTR) { 42 log_debug(LOG_VERB, "sendv on sd %d not ready - eintr", conn->sd); 43 continue; 44 } else if (errno == EAGAIN || errno == EWOULDBLOCK) { 45 conn->send_ready = 0; 46 log_debug(LOG_VERB, "sendv on sd %d not ready - eagain", conn->sd); 47 return NC_EAGAIN; 48 } else { 49 conn->send_ready = 0; 50 conn->err = errno; 51 log_error("sendv on sd %d failed: %s", conn->sd, strerror(errno)); 52 return NC_ERROR; 53 } 54 } 55 56 NOT_REACHED(); 57 58 return NC_ERROR; 59 }
小結
在這短短的數百行代碼中,我們獲知了msg_send的簡單過程,最最重要的是我們知道了writev函數的發送內容處理和異常處理,特別是它如教科書般的異常處理方式使我收益良多。