小市值選股策略的核心在於通過綜合分析公司的基本面、行業定位、財務健康狀況以及市場趨勢, 來尋找那些被市場低估但具備顯著成長潛力的股票,同時也要重視風險管理和投資組合的多樣化。 今天來給大家分享下小市值策略代碼如下: # 顯式導入 BigQuant 相關 SDK 模塊 from bigdatasour ...
模型
三部分組成:
發送等待隊列
接收等待隊列
管道 緩存區
定義
在 runtime
的chan.go
中
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
elemtype *_type // element type
// 上面的這幾個組成了一個環形緩存區
closed uint32 // 關閉狀態
// 下麵4個組成了2個隊列 g的
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
lock mutex // 鎖,用於在操作 channel的 元素時候,保證併發安全
}
特殊緩存區
qcount 已經存儲的個數
dataqsiz 環形隊列緩存的容量,即允許緩存的消息最大個數
buf 指向這個緩存區的地址
elemsize 元素的大小
elemtype 元素的類型
設計成這樣,主要目的不需要gc來清理,因為環形機構,會自動把刪除數據記憶體給占了。
環形緩存可以大幅降低GC的開銷
兩個隊列
這裡還要看下 waitq
的定義
type waitq struct {
first *sudog
last *sudog
}
// 在sema中也有用到,就g結構體的封裝
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // 接受參數的地址
// 當是接收g時候,如果有緩存中有數據,直接把數據拷貝到這個地址
}
註意:這裡的sendx 並不是指向發送隊列中的g,而且發送隊列應該寫入環形緩存區的index,
同理,recvx也是,指向接受數據的g,應該從緩衝區的那個index取數據
互斥鎖
lock mutex
互斥鎖並不是排隊發送/接收數據
不是讓發送和接收隊列來排隊的,這些發送和接收數據的隊列,休眠也不是在鎖的sema里
互斥鎖保護的hchan結構體本身
所以, Channel並不是無鎖的
狀態值
closed uint32 // 關閉狀態
0為開啟、1為關閉
當一個關閉的channel,再往裡寫或者重覆關閉、就會panic。但是可以讀。
發送數據
c<- 關鍵字是一個語法糖
編譯階段,會把C<- 轉化為 ruintime.chansend1
chansend1 會調用 chansend0 方法
直接發送 (已經有接收隊列在等)
發送數據前,己經有G在休眠等待接收
此時緩存肯定是空的,不用考慮緩存
將數據直接拷貝給G的接收變數,喚醒G
實現
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
// 部分源碼
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//有加鎖,如果這時候,再來一個g也發送,就會休眠去 sema隊列了
lock(&c.lock)
if c.closed != 0 { // 如果已經關閉,就會報錯,上面講過給一個關閉的chan發送會panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 從接收隊列裡面拿一個 g
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// 上面的註釋講的很清楚,直接把值給接收者,繞過 channel的buffer
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 和之前講的 sudog的elem對上了
if sg.elem != nil {
// 直接把數據拷貝到 接收者的 elem中
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒這個協程 接收者的g
goready(gp, skip+1)
}
步驟:
從隊列里取出一個等待接收的G
將數據直接拷貝到接收變數中
喚醒G,接收者的g
放入緩存
沒有G在休眠等待,但是有緩存空間
將數據放入緩存
實現
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 上面是直接發送的源碼,被截除
// 當緩存隊列存入的數量 小於 緩存的容量,就是還有緩存空間
if c.qcount < c.dataqsiz {
// 緩存區接受數據的地址
qp := chanbuf(c, c.sendx)
// 將數據拷貝過去
typedmemmove(c.elemtype, qp, ep)
// 指示下一個發送數據,存在那個緩衝區,這裡有個邏輯下麵講
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 元素個數加1,釋放鎖並返回
c.qcount++
unlock(&c.lock)
return true
}
}
qp := chanbuf(c, c.sendx)
// chanbuf(c, i) is pointer to the i'th slot in the buffer
// 返回緩衝區的 第幾個槽
// 寫入緩存區
c.sendx++
意味著這裡使用 sendx 來指引下一個發送的數據,寫到幾個槽,
所以才有下麵,如果滿了,就從0開始,形成環形
if c.sendx == c.dataqsiz {
c.sendx = 0
}
整個邏輯比較清晰:
獲取可存入的緩存地址
存入數據
維護索引
休眠等待
沒有G在休眠等待,而旦沒有緩存或滿了
自己進入發送隊列,休眠等待
實現
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 接著 直接放入緩存的代碼
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog() // 把自己組裝成一個 sudog結構體
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // 要發送的數據 也是放到這個 elem中的
mysg.waitlink = nil
mysg.g = gp // sudug 的g等於自己的g結構體
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) // 進入waitReasonChanSend 休眠
// 然後去 gopark 休眠了,這個方法在將 協程切換的時候,講過
// 協程到這裡就 休眠了,不繼續執行了,直到被喚醒。
}
步驟:
把自己包裝成sudog
sudog放入sendq隊列
休眠並解鎖
被喚醒後,數據已經被取走,維護其他數據 (下麵講 接收時候講)
發送小結
-
編譯階段,會把<-轉化為 runtime.chansend10
-
直接發送時,將數據直接拷貝到目標變數
-
放入緩存時,將數據放入環形緩存,成功返回
-
休眠等待時,將自己包裝後放入sendp, 休眠
接收
<-c 關鍵字
<-c 關鍵字是一個語法糖
編譯階段,i<-C轉化為 runtime.chanrecv()
編譯階段,i, ok<-c轉化為 runtime.chanrecv()
最後會調用 chanrecv() 方法
無緩存區、有發送協程在等待
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil { // 如果讀一個 block 為true的 channel ,協程會直接休眠,
// 正常讀channel這個 block都是 true
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
// 中間刪了一段 block 為 false的情況
lock(&c.lock)
if c.closed != 0 { // 如果channel已經關閉
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep) // 有數據 放回數據
}
return true, false // 沒數據,返回一個 false
}
} else {
if sg := c.sendq.dequeue(); sg != nil { // 如果發送隊列中有g
recv(c, sg, ep, func() { unlock(&c.lock) }, 3) // 調用這個 recv方法
return true, true
}
}
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 { // 無緩存區
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep) // 直接把 傳過來的g 的數據取走
}
}
// 給g跟新下參數
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 喚醒g,這時候,發送的數據已經被取走了
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
memmove(dst, src, t.Size_)
}
步驟:
判斷有G 在發送隊列等待,進入recv()
判斷此 Channel 無緩存
直接從等待的 G 中取走數據,喚醒G
這裡有兩個地方要註意,代碼中也標記了:
如果channel為nil,再去讀會直接休眠阻塞。這裡只的是 block為 true的讀,block為false的情況後面講,正常channe都是true
如果channel close了, 去讀有值返回值,沒值返回 false
有等待的g,緩存區滿了
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 上面是緩存區為0的情況
qp := chanbuf(c, c.recvx) // 取緩存區的數據,用的recvx 標記,和開頭的總結對應上了
// ep 就是 a <-c ,這個a的地址,如果a是nil,說明傳遞的值,沒有用,只是要個時機
// ep may be nil, in which case received data is ignored.
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 將發送隊列中休眠的這個g的數據 拷貝到了 緩存去
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++ // 取數據的地址增加
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
// 喚醒了這個 發送數據的g,因為這個g的數據已經放到了緩存區,不用休眠等待了
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
步驟:
接收數據前,已經有G 在休眠等待發送
而且這個 Channel 有緩存
從緩存取走一個數據
將休眠G 的數據放進緩存,喚醒G
接收緩存,沒有發送g在等待
直接從 緩存區拿數據走就行
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx) // 取出數據
if raceenabled {
racenotify(c, c.recvx, nil)
}
// 上面講過 也許接收的變數為空
if ep != nil {
typedmemmove(c.elemtype, ep, qp) // 數據拷貝過去
}
typedmemclr(c.elemtype, qp)//clr 是 clear的意思 清理緩存區已經取走的這個數據的空間
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
}
步驟:
判斷沒有 G 在發送隊列等待
判斷此 Channel 有緩存
從緩存中取走一個數據
接收阻塞
比較多用在不讓協程退出,除非收到 context的cancel消息等。
沒有 G 在休眠等待,而旦沒有緩存或緩存空
自己進入接收隊列,休眠等待
代碼實現
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog() //把自己包裝成 sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // 接收數據的地址,拷貝到了elem
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 加入了等待接收隊列
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2) // 休眠
}
這裡和上面的發送時候,已經有等待接收的g,對上了。
步驟:
判斷沒有 G 在發送隊列等待
判斷此 Channel 無緩存
將自己包裝成 sudog
sudog 放入接收等待隊列,休眠
喚醒時,發送的 G 已經把數據拷貝到位
接收總結:
編譯階段,<-C 會轉化為 chanrecv()
有等待的G,旦無緩存時,從G 接收
有等待的 G,且有緩存時,從緩存接收
無等待的 G,且緩存有數據,從緩存接收
無等待的 G,且緩存無數據,等待喂數據
看上面代碼時候,講過一般使用時候,那個block
是true
的,什麼情況下block
為false
,下篇聊。