作者:耿宗傑 前言 關於pprof的文章在網上已是汗牛充棟,卻是千篇一律的命令介紹,鮮有真正實操的,本文將參考Go社區資料,結合自己的經驗,實戰Go程式的性能分析與優化過程。 優化思路 首先說一下性能優化的一般思路。系統性能的分析優化,一定是從大到小的步驟來進行的,即從業務架構的優化,到系統架構的優 ...
作者:耿宗傑
前言
關於pprof的文章在網上已是汗牛充棟,卻是千篇一律的命令介紹,鮮有真正實操的,本文將參考Go社區資料,結合自己的經驗,實戰Go程式的性能分析與優化過程。
優化思路
首先說一下性能優化的一般思路。系統性能的分析優化,一定是從大到小的步驟來進行的,即從業務架構的優化,到系統架構的優化,再到系統模塊間的優化,最後到代碼編寫層面的優化。業務架構的優化是最具性價比的,技術難度相對較小,卻可以帶來大幅的性能提升。比如通過和同事或外部門溝通,減少了一些介面調用或者去掉了不必要的複雜的業務邏輯,可以輕鬆提升整個系統的性能。
系統架構的優化,比如加入緩存,由http改進為rpc等,也可以在少量投入下帶來較大的性能提升。最後是程式代碼級別的性能優化,這又分為兩方面,一是合格的數據結構與使用,二才是在此基礎上的性能剖析。比如在Go語言中使用slice這種方便的數據結構時,儘可能提前申請足夠的記憶體防止append超過容量時的記憶體申請和數據拷貝;使用併發保護時儘量由RWMutex 代替mutex,甚至在極高併發場景下使用更細粒度的原子操作代替鎖等等。
優化實踐
下麵進入正文,待優化程式是社區中一個例子,代碼有點長,實現的演算法是著名的電腦科學家Tarjan的求圖的強連通分量演算法,關於這個演算法的思想請自行google(就別自行百度了~)。以下為實操過程(會有那麼一丟丟長。。。):
初始版本代碼 havlak1.go:
// Go from multi-language-benchmark/src/havlak/go_pro// Copyright 2011 Google Inc.//// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.// Test Program for the Havlak loop finder.//// This program constructs a fairly large control flow// graph and performs loop recognition. This is the Go// version.//package mainimport ( "flag" "fmt" "log" "os" "runtime/pprof")type BasicBlock struct { Name int InEdges []*BasicBlock OutEdges []*BasicBlock}func NewBasicBlock(name int) *BasicBlock { return &BasicBlock{Name: name}}func (bb *BasicBlock) Dump() { fmt.Printf("BB#%06d:", bb.Name) if len(bb.InEdges) > 0 { fmt.Printf(" in :") for _, iter := range bb.InEdges { fmt.Printf(" BB#%06d", iter.Name) } } if len(bb.OutEdges) > 0 { fmt.Print(" out:") for _, iter := range bb.OutEdges { fmt.Printf(" BB#%06d", iter.Name) } } fmt.Printf("\n")}func (bb *BasicBlock) NumPred() int { return len(bb.InEdges)}func (bb *BasicBlock) NumSucc() int { return len(bb.OutEdges)}func (bb *BasicBlock) AddInEdge(from *BasicBlock) { bb.InEdges = append(bb.InEdges, from)}func (bb *BasicBlock) AddOutEdge(to *BasicBlock) { bb.OutEdges = append(bb.OutEdges, to)}//-----------------------------------------------------------type CFG struct { Blocks []*BasicBlock Start *BasicBlock}func NewCFG() *CFG { return &CFG{}}func (cfg *CFG) NumNodes() int { return len(cfg.Blocks)}func (cfg *CFG) CreateNode(node int) *BasicBlock { if node < len(cfg.Blocks) { return cfg.Blocks[node] } if node != len(cfg.Blocks) { println("oops", node, len(cfg.Blocks)) panic("wtf") } bblock := NewBasicBlock(node) cfg.Blocks = append(cfg.Blocks, bblock) if len(cfg.Blocks) == 1 { cfg.Start = bblock } return bblock}func (cfg *CFG) Dump() { for _, n := range cfg.Blocks { n.Dump() }}//-----------------------------------------------------------type BasicBlockEdge struct { Dst *BasicBlock Src *BasicBlock}func NewBasicBlockEdge(cfg *CFG, from int, to int) *BasicBlockEdge { self := new(BasicBlockEdge) self.Src = cfg.CreateNode(from) self.Dst = cfg.CreateNode(to) self.Src.AddOutEdge(self.Dst) self.Dst.AddInEdge(self.Src) return self}//-----------------------------------------------------------// Basic Blocks and Loops are being classified as regular, irreducible,// and so on. This enum contains a symbolic name for all these classifications//const ( _ = iota // Go has an interesting iota concept bbTop // uninitialized bbNonHeader // a regular BB bbReducible // reducible loop bbSelf // single BB loop bbIrreducible // irreducible loop bbDead // a dead BB bbLast // sentinel)// UnionFindNode is used in the Union/Find algorithm to collapse// complete loops into a single node. These nodes and the// corresponding functionality are implemented with this class//type UnionFindNode struct { parent *UnionFindNode bb *BasicBlock loop *SimpleLoop dfsNumber int}// Init explicitly initializes UnionFind nodes.//func (u *UnionFindNode) Init(bb *BasicBlock, dfsNumber int) { u.parent = u u.bb = bb u.dfsNumber = dfsNumber u.loop = nil}// FindSet implements the Find part of the Union/Find Algorithm//// Implemented with Path Compression (inner loops are only// visited and collapsed once, however, deep nests would still// result in significant traversals).//func (u *UnionFindNode) FindSet() *UnionFindNode { var nodeList []*UnionFindNode node := u for ; node != node.parent; node = node.parent { if node.parent != node.parent.parent { nodeList = append(nodeList, node) } } // Path Compression, all nodes' parents point to the 1st level parent. for _, ll := range nodeList { ll.parent = node.parent } return node}// Union relies on path compression.//func (u *UnionFindNode) Union(B *UnionFindNode) { u.parent = B}// Constants//// Marker for uninitialized nodes.const unvisited = -1// Safeguard against pathological algorithm behavior.const maxNonBackPreds = 32 * 1024// IsAncestor//// As described in the paper, determine whether a node 'w' is a// "true" ancestor for node 'v'.//// Dominance can be tested quickly using a pre-order trick// for depth-first spanning trees. This is why DFS is the first// thing we run below.//// Go comment: Parameters can be written as w,v int, inlike in C, where// each parameter needs its own type.//func isAncestor(w, v int, last []int) bool { return ((w <= v) && (v <= last[w]))}// listContainsNode//// Check whether a list contains a specific element. //func listContainsNode(l []*UnionFindNode, u *UnionFindNode) bool { for _, ll := range l { if ll == u { return true } } return false}// DFS - Depth-First-Search and node numbering.//func DFS(currentNode *BasicBlock, nodes []*UnionFindNode, number map[*BasicBlock]int, last []int, current int) int { nodes[current].Init(currentNode, current) number[currentNode] = current lastid := current for _, target := range currentNode.OutEdges { if number[target] == unvisited { lastid = DFS(target, nodes, number, last, lastid+1) } } last[number[currentNode]] = lastid return lastid}// FindLoops//// Find loops and build loop forest using Havlak's algorithm, which// is derived from Tarjan. Variable names and step numbering has// been chosen to be identical to the nomenclature in Havlak's// paper (which, in turn, is similar to the one used by Tarjan).//func FindLoops(cfgraph *CFG, lsgraph *LSG) { if cfgraph.Start == nil { return } size := cfgraph.NumNodes() nonBackPreds := make([]map[int]bool, size) backPreds := make([][]int, size) number := make(map[*BasicBlock]int) header := make([]int, size, size) types := make([]int, size, size) last := make([]int, size, size) nodes := make([]*UnionFindNode, size, size) for i := 0; i < size; i++ { nodes[i] = new(UnionFindNode) } // Step a: // - initialize all nodes as unvisited. // - depth-first traversal and numbering. // - unreached BB's are marked as dead. // for i, bb := range cfgraph.Blocks { number[bb] = unvisited nonBackPreds[i] = make(map[int]bool) } DFS(cfgraph.Start, nodes, number, last, 0) // Step b: // - iterate over all nodes. // // A backedge comes from a descendant in the DFS tree, and non-backedges // from non-descendants (following Tarjan). // // - check incoming edges 'v' and add them to either // - the list of backedges (backPreds) or // - the list of non-backedges (nonBackPreds) // for w := 0; w < size; w++ { header[w] = 0 types[w] = bbNonHeader nodeW := nodes[w].bb if nodeW == nil { types[w] = bbDead continue // dead BB } if nodeW.NumPred() > 0 { for _, nodeV := range nodeW.InEdges { v := number[nodeV] if v == unvisited { continue // dead node } if isAncestor(w, v, last) { backPreds[w] = append(backPreds[w], v) } else { nonBackPreds[w][v] = true } } } } // Start node is root of all other loops. header[0] = 0 // Step c: // // The outer loop, unchanged from Tarjan. It does nothing except // for those nodes which are the destinations of backedges. // For a header node w, we chase backward from the sources of the // backedges adding nodes to the set P, representing the body of // the loop headed by w. // // By running through the nodes in reverse of the DFST preorder, // we ensure that inner loop headers will be processed before the // headers for surrounding loops. // for w := size - 1; w >= 0; w-- { // this is 'P' in Havlak's paper var nodePool []*UnionFindNode nodeW := nodes[w].bb if nodeW == nil { continue // dead BB } // Step d: for _, v := range backPreds[w] { if v != w { nodePool = append(nodePool, nodes[v].FindSet()) } else { types[w] = bbSelf } } // Copy nodePool to workList. // workList := append([]*UnionFindNode(nil), nodePool...) if len(nodePool) != 0 { types[w] = bbReducible } // work the list... // for len(workList) > 0 { x := workList[0] workList = workList[1:] // Step e: // // Step e represents the main difference from Tarjan's method. // Chasing upwards from the sources of a node w's backedges. If // there is a node y' that is not a descendant of w, w is marked // the header of an irreducible loop, there is another entry // into this loop that avoids w. // // The algorithm has degenerated. Break and // return in this case. // nonBackSize := len(nonBackPreds[x.dfsNumber]) if nonBackSize > maxNonBackPreds { return } for iter := range nonBackPreds[x.dfsNumber] { y := nodes[iter] ydash := y.FindSet() if !isAncestor(w, ydash.dfsNumber, last) { types[w] = bbIrreducible nonBackPreds[w][ydash.dfsNumber] = true } else { if ydash.dfsNumber != w { if !listContainsNode(nodePool, ydash) { workList = append(workList, ydash) nodePool = append(nodePool, ydash) } } } } } // Collapse/Unionize nodes in a SCC to a single node // For every SCC found, create a loop descriptor and link it in. // if (len(nodePool) > 0) || (types[w] == bbSelf) { loop := lsgraph.NewLoop() loop.SetHeader(nodeW) if types[w] != bbIrreducible { loop.IsReducible = true } // At this point, one can set attributes to the loop, such as: // // the bottom node: // iter = backPreds[w].begin(); // loop bottom is: nodes[iter].node); // // the number of backedges: // backPreds[w].size() // // whether this loop is reducible: // type[w] != BasicBlockClass.bbIrreducible // nodes[w].loop = loop for _, node := range nodePool { // Add nodes to loop descriptor. header[node.dfsNumber] = w node.Union(nodes[w]) // Nested loops are not added, but linked together. if node.loop != nil { node.loop.Parent = loop } else { loop.AddNode(node.bb) } } lsgraph.AddLoop(loop) } // nodePool.size } // Step c}// External entry point.func FindHavlakLoops(cfgraph *CFG, lsgraph *LSG) int { FindLoops(cfgraph, lsgraph) return lsgraph.NumLoops()}//======================================================// Scaffold Code//======================================================// Basic representation of loops, a loop has an entry point,// one or more exit edges, a set of basic blocks, and potentially// an outer loop - a "parent" loop.//// Furthermore, it can have any set of properties, e.g.,// it can be an irreducible loop, have control flow, be// a candidate for transformations, and what not.//type SimpleLoop struct { // No set, use map to bool basicBlocks map[*BasicBlock]bool Children map[*SimpleLoop]bool Parent *SimpleLoop header *BasicBlock IsRoot bool IsReducible bool Counter int NestingLevel int DepthLevel int}func (loop *SimpleLoop) AddNode(bb *BasicBlock) { loop.basicBlocks[bb] = true}func (loop *SimpleLoop) AddChildLoop(child *SimpleLoop) { loop.Children[child] = true}func (loop *SimpleLoop) Dump(indent int) { for i := 0; i < indent; i++ { fmt.Printf(" ") } // No ? operator ? fmt.Printf("loop-%d nest: %d depth %d ", loop.Counter, loop.NestingLevel, loop.DepthLevel) if !loop.IsReducible { fmt.Printf("(Irreducible) ") } // must have > 0 if len(loop.Children) > 0 { fmt.Printf("Children: ") for ll := range loop.Children { fmt.Printf("loop-%d", ll.Counter) } } if len(loop.basicBlocks) > 0 { fmt.Printf("(") for bb := range loop.basicBlocks { fmt.Printf("BB#%06d ", bb.Name) if loop.header == bb { fmt.Printf("*") } } fmt.Printf("\b)") } fmt.Printf("\n")}func (loop *SimpleLoop) SetParent(parent *SimpleLoop) { loop.Parent = parent loop.Parent.AddChildLoop(loop)}func (loop *SimpleLoop) SetHeader(bb *BasicBlock) { loop.AddNode(bb) loop.header = bb}//------------------------------------// Helper (No templates or such)//func max(x, y int) int { if x > y { return x } return y}// LoopStructureGraph//// Maintain loop structure for a given CFG.//// Two values are maintained for this loop graph, depth, and nesting level.// For example://// loop nesting level depth//----------------------------------------// loop-0 2 0// loop-1 1 1// loop-3 1 1// loop-2 0 2//var loopCounter = 0type LSG struct { root *SimpleLoop loops []*SimpleLoop}func NewLSG() *LSG { lsg := new(LSG) lsg.root = lsg.NewLoop() lsg.root.NestingLevel = 0 return lsg}func (lsg *LSG) NewLoop() *SimpleLoop { loop := new(SimpleLoop) loop.basicBlocks = make(map[*BasicBlock]bool) loop.Children = make(map[*SimpleLoop]bool) loop.Parent = nil loop.header = nil loop.Counter = loopCounter loopCounter++ return loop}func (lsg *LSG) AddLoop(loop *SimpleLoop) { lsg.loops = append(lsg.loops, loop)}func (lsg *LSG) Dump() { lsg.dump(lsg.root, 0)}func (lsg *LSG) dump(loop *SimpleLoop, indent int) { loop.Dump(indent) for ll := range loop.Children { lsg.dump(ll, indent+1) }}func (lsg *LSG) CalculateNestingLevel() { for _, sl := range lsg.loops { if sl.IsRoot { continue } if sl.Parent == nil { sl.SetParent(lsg.root) } } lsg.calculateNestingLevel(lsg.root, 0)}func (lsg *LSG) calculateNestingLevel(loop *SimpleLoop, depth int) { loop.DepthLevel = depth for ll := range loop.Children { lsg.calculateNestingLevel(ll, depth+1) ll.NestingLevel = max(loop.NestingLevel, ll.NestingLevel+1) }}func (lsg *LSG) NumLoops() int { return len(lsg.loops)}func (lsg *LSG) Root() *SimpleLoop { return lsg.root}//======================================================// Testing Code//======================================================func buildDiamond(cfgraph *CFG, start int) int { bb0 := start NewBasicBlockEdge(cfgraph, bb0, bb0+1) NewBasicBlockEdge(cfgraph, bb0, bb0+2) NewBasicBlockEdge(cfgraph, bb0+1, bb0+3) NewBasicBlockEdge(cfgraph, bb0+2, bb0+3) return bb0 + 3}func buildConnect(cfgraph *CFG, start int, end int) { NewBasicBlockEdge(cfgraph, start, end)}func buildStraight(cfgraph *CFG, start int, n int) int { for i := 0; i < n; i++ { buildConnect(cfgraph, start+i, start+i+1) } return start + n}func buildBaseLoop(cfgraph *CFG, from int) int { header := buildStraight(cfgraph, from, 1) diamond1 := buildDiamond(cfgraph, header) d11 := buildStraight(cfgraph, diamond1, 1) diamond2 := buildDiamond(cfgraph, d11) footer := buildStraight(cfgraph, diamond2, 1) buildConnect(cfgraph, diamond2, d11) buildConnect(cfgraph, diamond1, header) buildConnect(cfgraph, footer, from) footer = buildStraight(cfgraph, footer, 1) return footer}var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to this file")func main() { flag.Parse() if *cpuprofile != "" { f, err := os.Create(*cpuprofile) if err != nil { log.Fatal(err) } pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } lsgraph := NewLSG() cfgraph := NewCFG() cfgraph.CreateNode(0) // top cfgraph.CreateNode(1) // bottom NewBasicBlockEdge(cfgraph, 0, 2) for dummyloop := 0; dummyloop < 15000; dummyloop++ { FindHavlakLoops(cfgraph, NewLSG()) } n := 2 for parlooptrees := 0; parlooptrees < 10; parlooptrees++ { cfgraph.CreateNode(n + 1) buildConnect(cfgraph, 2, n+1) n = n + 1 for i := 0; i < 100; i++ { top := n n = buildStraight(cfgraph, n, 1) for j := 0; j < 25; j++ { n = buildBaseLoop(cfgraph, n) } bottom := buildStraight(cfgraph, n, 1) buildConnect(cfgraph, n, top) n = bottom } buildConnect(cfgraph, n, 1) } FindHavlakLoops(cfgraph, lsgraph) for i := 0; i < 50; i++ { FindHavlakLoops(cfgraph, NewLSG()) } fmt.Printf("# of loops: %d (including 1 artificial root node)\n", lsgraph.NumLoops()) lsgraph.CalculateNestingLevel()}
我們藉助macbook系統上的time命令來列印程式運行的時間(內核態、用戶態、總時間):
編譯後運行程式:
用戶態耗時23.07s,內核態耗時0.4s,總耗時13.7s(用了兩個核,170%)。因為程式裡面已經先開啟了pprof統計cpu耗時,直接top命令看:
可以看到,pprof數據採集持續了12.99s,採樣時長是19.49s(還是兩核的緣故)。
這裡要說一下,無論是cpu耗時統計還是記憶體占用統計,都是間隔採樣。cpu耗時時每隔一段時間(大概是10ms)對調用棧的函數進行記錄,最後分析在所有的記錄次數中,各個函數出現的次數,包括在運行中的次數,和入棧次數(說明它調用了別的函數)。記憶體占用統計是每分配512K記錄一次分配路徑。
耗時最多的是mapaccess1_fast64,這是運行時中的map讀寫時的數據查詢函數。如果編譯程式時沒有禁用內聯,看到的會有所不同,其中會顯示FindHavlakLoops函數,並標識為inline。因為FindHavlakLoops裡面就調用了FindLoops,所以在編譯器會直接把這個函數展開,用FindLoops替換FindHavlakLoops函數。也可以在不禁用內聯編譯時,設置pprof的noinlines開關為true,預設為false,即noinlines=true。
這裡看到最大的函數並不是業務函數而是系統函數,那沒啥好優化系統函數的(只能是咱用的不對了唄~)。就看是哪裡調用的這個系統函數:
web mapaccess1_fast64
可以看到,調用最多的是DFS和FindLoops。那就看看這倆函數裡面是怎麼使用map的,來看DFS:
可以看到,DFS函數里耗時較長又是map操作的,是242 246 和250三行。對於這裡的優化方法,是用list結構代替map結構,因為能用list達到效果的情況下沒必要用map。這個咋一看沒問題,但好像又有點彆扭對吧?其實是因為這個程式的本身特點,這裡有明顯的一點,就是BasicBlock結構的特殊性,本身的Name屬性就是自身的索引下標。查找某個BasicBlock不需要使用map來記錄,直接通過下標訪問顯然是最低的時間複雜度(關於map和list的時間複雜度就不多說了,map看起來是O1,但其實沒有考慮hash計算本身的過程和解決hash衝突的成本,而list是必然的O1)。通過把這部分的map修改為list數據結構,版本H2,再編譯查看耗時情況:
此時耗時降低到了8s。再次查看cpu耗時分佈:
可以看到,top1變成了scanobject函數,不再是map讀寫了。看到這個函數,以及下邊的mallocgc,我們要知道,這都是記憶體相關的,前者是記憶體對象掃描的處理函數,是垃圾回收三色標記的一個過程,後者則是直接管理記憶體分配和回收的(註意是同時負責記憶體分配和回收,malloc && gc)。這說明目前程式花了很多時間在進行記憶體管理,看比例是8%。那就要分析一下了,是什麼東西產生了這麼多記憶體變化,合不合理。分析記憶體,就是pprof的memprofile功能了。添加一下相關的記憶體統計代碼,具體怎麼添加大家肯定都會,就不貼了(網上多得是)。添加完之後,重新編譯生成版本H3,執行H3,生成對應的記憶體監測文件:
查看記憶體的分配情況,在這裡不禁止內聯,因為在實際運行時該內聯的函數也是會被展開替換掉的:
可以看到,節點創建是第一,FindLoops是第二。因為創建BasicBlock首先很簡單,沒有複雜的過程,其次這是程式中的一個基礎對象結構,若要改結構體,那可能涉及到演算法也得改,這個顯然是不合適的,不僅可能改出bug,還可能收益不高。那我們就順著看第二位的FindLoops,這個就是前邊我們看到的調用mapaccess內置函數的另一個業務函數,所以優化的方法跟上面類似,還是優化map的使用,替換為list。這裡有一點特殊的是,替換成list之後,map的追加需要改一下,加一個判斷重覆的邏輯,所以新加了一個appendUnique方法。再次編譯,版本H4,查看耗時:
這時程式耗時降到了5.6s。再次查看記憶體分配,確認優化效果:
可以看到,FindLoops函數已經不在高位,記憶體消耗降下去了。再看一下此時的cpu耗時分佈:
可以看到,FindLoops成為top1,scanobject函數成了第二位。這就對了,因為我們就是要讓cpu更多的去運行業務代碼,把時間花到真正需要的地方。
這時就可以進行下一輪的性能優化了(這就是性能調優,一遍一遍的排查耗時,壓縮不必要的cpu時間,壓榨計算性能)。繼續看一下此時FindLoops到底在哪兒化的時間多,是否合理:
從各個語句的耗時來看,好像沒啥大問題,沒有很離譜的耗時(360ms那行是因為迴圈的緣故)。這說明編碼上沒有大問題,按照我們前邊一開始說的程式優化的步驟,就需要往上找了,看能不能優化邏輯來減少不必要的計算,以達到提升性能的目的(即,每一個計算步驟處理的都沒啥大問題了,那能不能少算點兒)。這裡FindLoops在程式入口,花了不少時間來初始化一系列臨時變數,加起來有180ms,這就意味著每次調用FindLoops函數,都要先花180ms的準備工作。這部分臨時變數的多次創建+初始化,可以通過加記憶體緩存的方式來減少重覆創建和申請,這裡涉及到的標準解法其實就是對象池(像Go創建的web服務,在併發量很高的情況下,每一次http請求的解析都需要創建對象+申請記憶體+反序列這一系列的初始動作,這時就可以藉助sync.Pool來減少這種重覆工作,提升性能)。同理,這裡也是加入了一個記憶體緩存對象cache:
把原本的記憶體申請初始化過程做了替換。在原有緩存不夠的情況下,申請新的變數,否者截取原有緩存使用,減少記憶體申請:
調整完畢後,編譯新版本H5,再看下耗時:
這時候程式的耗時已經降到4.1s,相比一開始的13.7s,已經提升了兩倍多。看下現在的cpu耗時分佈:
可以看到,相比上次的分佈,前兩位都是業務代碼函數了,說明進一步提高了業務代碼的耗時占比,降低了無關係統函數的負載。這種直接使用全局變數的方式加cache不是併發安全的,但是因為這裡程式的邏輯本身也不是併發的,所以這裡沒問題。
到這裡,實操的優化過程就走完了。提煉總結一下優化的步驟和使用的主要方法命令有:
1.通過top5查看cpu耗時:確定是字典數據操作占比最大;
2.通過web命令查看是誰主要調用了字典操作函數:確定有DFS和FindLoops;
3.使用list 命令查看DFS函數中每行代碼的耗時,找到跟map操作相關的部分,確定優化方法:把map換成list;
4.重新運行,再用top命令查看cpu耗時分佈:找到耗時最大的是記憶體對象掃描函數scanobject;
5.加入記憶體剖析代碼,運行查看記憶體分配的大小分佈:確定FindLoops占用較大;
6.使用list命令查看FindLoops函數的每行記憶體開銷:確定是map創建占用記憶體較大;
7.同樣使用list數據結構替換map,重新運行,再看記憶體大小分佈:確定FindLoops已不在高位,同時再看CPU耗時時,scanobject已經降下去了,目的達到;
8.此時又開始新的一輪排查,繼續查看cpu耗時排行榜,FindLoops耗時居首;
9.繼續使用list方法看FindLoops函數每行的耗時,沒有明顯的問題。那就要換思路,從排查編碼問題轉換為排查邏輯問題,減少計算過程:這裡是加緩存;
10.加完緩存看到性能明顯提升了,說明優化對了。這時又該迴圈進入下一輪的排查的優化了,以此往複。。。直到壓榨出我們能達到的最大性能!
以上就是本次程式優化的整體步驟和思路,過程中秉持的思路方法是一貫的,就是不斷的用pprof排查top級的函數耗時和記憶體占用,通過優化數據結構、減少不必要的計算過程、降低記憶體分配和回收的負擔等方式來提升性能,這一點對所有的程式都適用,也是後續可以借鑒的方法論。
兩點感悟
1.優化程式的大前提是你一定要對程式有足夠深入的理解! (或者說我們不能優化程式優化出bug來啊。。。)。最後生產H6版本,之所以又對性能提升了一倍,全是建立在作者對程式完全理解的基礎之上的,所以他才可以調整迴圈邏輯,復用程式對象,調整調用邏輯等等。這一層再往上層思考,就到了程式邏輯,系統架構內置整體的業務架構層面了。這其實就又回到了一開始我們說的程式優化由大到小的總體思路上了。
2.帶GC的語言相比不帶的,反而更考驗記憶體管理的技術。Go語言在開發效率上的提升,是把垃圾回收交給了系統來處理,但別忘了,消耗的依然是我們自己的cpu時間(羊毛,那不得從羊身上來...)。所以我們在使用每種結構體,進行每種計算操作時,都要明白其背後涉及的記憶體開銷,要把記憶體變化放到潛意識裡去管理。
以上就是本次pprof優化Go程式的實操過程及總結感想,供參考,感謝~