常用操作文件目錄的函數 1. CreateDirectory 創建文件夾 原型: BOOL CreateDirectory( LPCTSTR lpPathName, LPSECURITY_ATTRIBUTES lpSecurityAttributes ); 參數說明: lpPathName 要創建的 ...
網路基礎
協議架構
tcp鏈接
假如需要開發者去實現一套新的網路協議(例如 redis 的resp), 是基於TCP的, 那tcp這層的協議,是否需要開發者自己去實現?
這層如果自己實現, 其實很複雜, 會涉及很多演算法相關.
因此, 出現了 socket 對傳輸層進行了抽象, 開發者不需要關註傳輸層具體的實現, 使用socket提供的介面, socket內部會實現,比如三次握手, 四次揮手.
Socket
很多系統都提供 Socket 作為 TCP(也有UDP) 網路連接的抽象,
Linux-> Internet domain socket -> SOCK STREAM
Linux 中 Socket 以 “文件描述符〞FD 作為標識
每建立一次連接接, sever都會創建一個新的 socket 專門和client通信, 原來監聽的socket還是一直處於監聽狀態.
socket 之前如何進行通信, 假設client 給 server發送消息, sever 沒有回覆, 那client是阻塞, 還是先執行別的工作, 這就是IO模型的範疇了.
IO模型
阻塞
非阻塞
多路復用
阻塞
當read中沒有數據過來, 線程(sever或者client都一樣)會阻塞等待.
同步讀寫Socket時,線程陷入內核態
當讀寫成功後,切換回用戶態,繼續執行
優點:開發難度小,代碼簡單
缺點:內核態切換開銷大
非阻塞
如果暫時無法收發數據,會返回錯誤
應用會不斷輪詢,直到Socket可以讀寫
優點:不會陷入內核態,自由度高
缺點:需要自旋輪詢
多路復用
大廠面試經常喜歡問的 epoll就是 多路復用的一種實現, 還有 select, poll 都是, 這三個主要的區別: 在返回數據時候的讀取方式.
業務把關註的事件註冊到 event poll池中, 當epoll接受到對應事件後,通知業務.
註冊多個Socket事件
調用epool,當有事件發生,返回
優點:提供了事件列表,不需要查詢輪詢各個Scoket
缺點:開發難度大,邏輯複雜
epoll 在 Mac: kqueue ; Windows: IOCP
小結
操作系統提供了Socket作為TCP和UDP通信的抽象
IO模型指的是操作Socket的方案
阻塞模型最利於業務編寫,但是性能差
多路復用性能好,但業務編寫麻煩
go socket的實現
實現方法
- 對系統的 epoll進行封裝, 因為go是可以在多個系統上運行的, 底層需要對平臺差異化封裝.
- 如果用戶去調用 封裝好的 epoll方法, 需要開發者執行去關註 epoll的狀態, 瞭解epoll的大致實現原理,肯定知道還需要去 解析epoll的返回, 然後 分析那些 socket 有事件來了. 這些工作都是通用性的,所以,go把這層一起封裝在了它的 網路層中.
- 然後,go為了更加簡化使用, 採用了 阻塞的思想, 一個協程對應一個 socket連接 , 當epoll 沒有對應事件返回時候, 就休眠等待, 有事件來了,就喚醒處理. 這樣就簡化了上層開發者的使用.
在底層使用操作系統的多路復用10
在協程層次使用阻塞模型
阻塞協程時,休眠協程
具體抽象 go代碼相關
多路復用器
各個系統的多路復用都有以下功能: 系統原生
1. 新建多路復用器 epoll create()
2. 往多路復用器里插入需要監聽的事件 epoll ctl()
3. 查詢發生了什麼事件 epoll wait()
Go Network Poller 多路復用器的抽象
epoll create() -> netpollinit() // 新建
epoll cti0 -> netpollopen() // 插入事件
epoll wait -> netpoll() // 查詢
netpollinit 新建多路復用器
func netpollinit() {
var errno uintptr
// 新建了 epoll 系統底層實現
epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
// 這裡的 epfd是個全局變數,初始化之後, 在操作時候就用這個 epfd
// 新建管道 Linux的管道,用於多線程多進程間的通信, 用於結束這個epoll
r, w, errpipe := nonblockingPipe()
ev := syscall.EpollEvent{
Events: syscall.EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
// 管道中加入了事件
errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
歸納:
新建了一個底層 epoll對象 epfd
新建一個pipe管道用於中斷Epoll
將“管道有數據到達〞 事件註冊在Epoll中
netpollopen() 插入事件
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
// fd 是socket的對象 pd是 socket 和協程的對應關係
var ev syscall.EpollEvent
ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
*(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
// 上面這兩行代碼 是在把 ev.Data 和 pd 聯繫起來, 後面epoll 返回時候, 就能直接和pd的協程聯繫起來
// 將fd要關註的事件, 註冊到 epoll中, 調用的是 EpollCtl 和上面也對上了
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}
type pollDesc struct {
_ sys.NotInHeap
link *pollDesc // in pollcache, protected by pollcache.lock
fd uintptr // socket ID
fdseq atomic.Uintptr // protects against stale pollDesc
atomicInfo atomic.Uint32 // atomic pollInfo
// 想要讀的協程
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
// 想要寫的協程
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
lock mutex // protects the following fields
closing bool
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline (a nanotime in the future, -1 when expired)
wseq uintptr // protects from stale write timers
wt timer // write deadline timer
wd int64 // write deadline (a nanotime in the future, -1 when expired)
self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}
歸納:
傳入一個Socket的FD,和pollDesc指針
pollDesc指針是Socket相關詳細信息
pollIDesc中記錄了哪個協程休眠在等待此Socket
將Socket可讀、可寫、斷開事件註冊到Epoll中
netpoll 查詢
這裡有必要講下 epoll的使用的一些特點, 能更好理解go中的實現.
epoll_wait
是阻塞函數, 但是可以傳一個 超時時間, 超過時間後就先返回, 不阻塞了。
int epoll_wait(int epfd,struct epoll_event * events, int maxevents,int timeout)
epfd epoll
的id ,就是創建時候的 epfd
.
epoll_event
是傳回有事件的fd和even的記錄
timeout
是 超時時間
返回值 是指示 epoll_event 中,前多個元素,是有值的, 避免遍歷.
// 超時時間 delay
func netpoll(delay int64) gList {
var events [128]syscall.EpollEvent
retry:
// 去調了系統的 EpollWait 然後 看參數, 第二個參數裡面是包含了 有事件的even
n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
// n 就表示 events 前多少個是有值的
var toRun gList
for i := int32(0); i < n; i++ {
var mode int32
if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'r'
}
if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
pd := (*pollDesc)(tp.pointer())
tag := tp.tag()
// 通過ev.Data 和 pd關聯了起來 參入事件的時候, 有講過這個
if pd.fdseq.Load() == tag {
pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
netpollready(&toRun, pd, mode) // 通過pd ,獲取到了對應的 協程隊列
}
}
}
return toRun // 返回的是 關註這個事件的協程隊列
}
歸納:
調用epoll wait(),查詢有哪些事件發生
根據Socket相關的pollDesc信息,返回哪些協程可以喚醒
Go Network Poller 如何工作
收發數據
場景 1:Socket 已經可讀寫
- netpoll 方法需要迴圈執行,這樣才能及時獲得那些事件有返回了, 才能通知對應的協程.
誰來調這個方法, 入口方法放到了 gcStart , 因為 go會保證一段時間,一定會調下 gc, 在講 go 記憶體,垃圾回收時候講過.
再來看 netpoll()
中 netpollready()
的實現:
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' { // 可讀
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' { // 可寫
wg = netpollunblock(pd, 'w', true)
}
// 通過上面的方法,標記好了 pdReady之後,
if rg != nil {
toRun.push(rg) //把對應的協程,加入到一個可執行的 隊列中,一個鏈表
}
if wg != nil {
toRun.push(wg)
}
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg // 預設是 可讀協程
if mode == 'w' { //如果是 可寫 w, 就取可寫的協程
gpp = &pd.wg
}
// 下麵的代碼就是把 對應的 rg 或者 wg 這個欄位改成了 pdReady, 標記下有事件來了.
for {
// 這裡的old值 , 第一次取可能是一個等待協程的地址 ,通過下麵的情景能看到
old := gpp.Load()
if old == pdReady {
return nil
}
if old == pdNil && !ioready {
return nil
}
var new uintptr
if ioready {
new = pdReady
}
if gpp.CompareAndSwap(old, new) { // 如果值相等,就寫入新的值
if old == pdWait {
old = pdNil
}
return (*g)(unsafe.Pointer(old)) // 當old為一個協程的地址,這裡返回的就是一個協程
}
}
}
上面方法就是把 pollDesc 的 rg或者wg置為 pdReady
當協程去調用 poll_runtime_pollWait()
方法 詢問時候, 發現已經是 ready
狀態,就開始進行 讀寫
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
// 這種方式 可以調用聲明包中的小寫方法
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
for !netpollblock(pd, int32(mode), false) { // 這裡就是判斷這個 pd的rg或者wg 是否是 ready狀態
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
}
return pollNoError
}
歸納:
runtime 迴圈調用 netpoll() 方法 (g0協程 的gcStart 垃圾回收開始方法)
發現Socket可讀寫時,給對應的rg或者wg置為pdReady(1)
協程調用poll_runtime_pollWait()
判斷rg或者wg已經置為pdReady(1),返向0
場景 2:Socket 暫時無法讀寫
需要深入看上面那個判斷 pd
的rg
是否為ready
狀態的方法
當協程去調用 poll_runtime_pollWait()
會跳轉到:
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
if gpp.CompareAndSwap(pdReady, pdNil) {
return true
}
if gpp.CompareAndSwap(pdNil, pdWait) {
break
}
if v := gpp.Load(); v != pdReady && v != pdNil {
throw("runtime: double wait")
}
}
// 如果不是ready ,那麼協程就會休眠等待, 並把協程的地址,賦值給 pd的rg或wg,在`netpollblockcommit`中
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(pdNil)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
歸納:
runtime 迴圈調用netpoll()方法 (g0協程)
協程調用poll_runtime_pollWait()
發現對應的rg或者wg為0
給對應的rg或者wg置為協程地址
休眠等待
當有事件來的時候, netpoll 中返回了 對應事件的 glist
, 然後,runtime就會通知這些協程開始工作.
runtime 迴圈調用 netpoll() 方法 (g0協程)
發現Socket可讀寫時,給對應的查看對應的rg或者wg
若為協程地址,返回協程地址
調度器開始調度對應協程
大體的結構如下: