public class RandomNickName { public enum Gender{ MAN, WOMAN, UNKNOWN, ; } public static void main(String[] args) { String nickName = nickName(Gender. ...
go的協程和線程都繞不過GMP,關於GMP基本的工作流程,有go開發經驗的大致都懂,這邊更多關註GMP如何解決一些類似 協程饑渴的問題,以及底層的大致實現原理。
多線程迴圈
上篇講了單線程是如何迴圈的,這裡還是為 GMP的出場 大致介紹下。
工作模型
多個M都去全局G的隊列中獲取 g,所以,全局g的隊列需要上鎖。
改進版,增加本地隊列
這樣每個m都緩存了一個本地隊列,避免每次都去全局隊列裡面拿,而且一次也能拿多個,降低了全局隊列鎖獲取的開銷。
這個其實就是 GMP了。P指的就是為M管理要執行的g
P 的定義
在runtime2.go中有定義:
刪除了很多源碼,只留下和這裡講的相關的代碼:
type p struct {
id int32
// M 線程
m muintptr // back-link to associated m (nil if idle)
// 可用的g的隊列,進入不用加鎖 嘚瑟
// Queue of runnable goroutines. Accessed without lock.
runqhead uint32 // 頭尾
runqtail uint32
runq [256]guintptr // 256 的容量
runnext guintptr // 下一個可執行的g
}
整體結構如下:
完整的GMP模型就如下:
和上面的改進版很像,只是抽象出來一個P,專門來管理
p的作用
1. M與G之間的中介
2. P持有一些G,使得每次獲取G的時候不用從全局找
3. 大大減少了併發衝突的情況
代碼的邏輯
要回到 schedule()
,上篇中有提到:
schedule()
中有為 M查找,要運行的g,通過這個方法:
gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
// 查詢一個 能運行的g去執行,嘗試去別的p裡面偷,或者從 本地、全局隊列了獲取。
// poll network 涉及到了 epoll 和 poll 和這裡沒關係
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
// local runq 本地拿
if gp, inheritTime := runqget(pp); gp != nil {
return gp, inheritTime, false
}
// global runq 全局拿
if sched.runqsize != 0 {
lock(&sched.lock) // 全局隊列拿 需要上鎖
gp := globrunqget(pp, 0)
unlock(&sched.lock)
}
// 如果還拿不到 就去偷 去別的p裡面偷
gp, inheritTime, tnow, w, newWork := stealWork(now)
}
func runqget(pp *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
next := pp.runnext //從p的next獲取的下一個
if next != 0 && pp.runnext.cas(next, 0) {
return next.ptr(), true
}
}
源碼都是未截取完整的。
看下全局怎麼拿的:
//Try get a batch of G's from the global runnable queue.
// sched.lock must be held.
func globrunqget(pp *p, max int32) *g {
assertLockHeld(&sched.lock)
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max {
n = max
}
if n > int32(len(pp.runq))/2 {
n = int32(len(pp.runq)) / 2
}
sched.runqsize -= n
gp := sched.runq.pop()
n--
for ; n > 0; n-- {
gp1 := sched.runq.pop()
runqput(pp, gp1, false)
}
return gp
}
經過了一系列的計算,最終確定拿 n個g放回本地。 因為傳過來的 max=0
所以 n 為 len(pp.runq)/ 2
和 sched.runqsize/gomaxprocs + 1
中的數量大的那個。
runq [256]guintptr 聲明為256
本地隊列容量的一半 128,或者 全局隊列的個數除以 gomaxprocs(p的個數) + 1,最多拿128個。
steal
上面邏輯講了,如果全局也拿不到,就調用 stealWork
去別的p裡面拿, 看下代碼:
// stealWork attempts to steal a runnable goroutine or timer from any P.
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
// 具體的邏輯,就不看了。就是遍歷裡面的p 從符合條件的p的本地隊列,拿一半過去。
}
小結:
如果在本地或者全局隊列中都找不到G, 去別的P中“偷〞,增強了線程的利用率
協程新建
代碼在 proc.go中
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc()
systemstack(func() {
// 新創建一個 g
newg := newproc1(fn, gp, pc)
pp := getg().m.p.ptr() // 獲取當前g 的 p
runqput(pp, newg, true) // 通過這函數,將g放置
})
}
// getg returns the pointer to the current g.
func getg() *g
小結:
1. 優先獲取當前的P
(這點上有看到說是隨機獲取一個p,但是從 getg()
官方描述來看,是獲取的當前p,如果這裡我未理清楚,可以評論下)
2. 將新協程放入P的runnext(插隊)
3. 若P本地隊列滿,放入全局隊列
問題:
目前為止,已經解決了第一個問題
多線程併發時,會搶奪協程隊列的全局鎖
但是 協程順序執行,無法併發 ,如一個協程執行時間非常長,一直占著m,就會導致其他g無法及時響應。