操作系統 : CentOS7.3.1611_x64 go語言版本:1.8.3 linux/amd64 InfluxDB版本:1.1.0 influxdb預設配置: meta預設配置: dir meta數據存放目錄,預設值:/var/lib/influxdb/meta meta數據文件預設路徑:/va ...
操作系統 : CentOS7.3.1611_x64
go語言版本:1.8.3 linux/amd64
InfluxDB版本:1.1.0
influxdb預設配置:
/etc/influxdb/influxdb.conf
meta預設配置:
[meta] dir = "/var/lib/influxdb/meta" retention-autocreate = true logging-enabled = true
- dir
meta數據存放目錄,預設值:/var/lib/influxdb/meta
meta數據文件預設路徑:/var/lib/influxdb/meta/meta.db
- retention-autocreate
用於控制預設存儲策略,資料庫創建時,會自動生成autogen的存儲策略,預設值:true
- logging-enabled
是否開啟meta日誌,預設值:true
meta文件的dump和load
源碼路徑: github.com/influxdata/influxdb/services/meta/client.go
meta文件dump
// snapshot will save the current meta data to disk func snapshot(path string, data *Data) error { file := filepath.Join(path, metaFile) tmpFile := file + "tmp" f, err := os.Create(tmpFile) if err != nil { return err } defer f.Close() var d []byte if b, err := data.MarshalBinary(); err != nil { return err } else { d = b } if _, err := f.Write(d); err != nil { return err } if err = f.Sync(); err != nil { return err } //close file handle before renaming to support Windows if err = f.Close(); err != nil { return err } return renameFile(tmpFile, file) }
snapshot可以通過以下兩種方式觸發:
1、當執行 Client.Open 函數時會進行snapshot操作;
2、執行meta文件更新時通過commit函數進行snapshot操作;
在InfluxDB中程式中,通過 NewServer 函數創建MetaClient變數(meta.NewClient),然後執行MetaClient.Open()進行初始化;
後續會通過Server.Open函數(run/server.go)啟動各項服務,如果有meta文件的更新操作,通過commit函數進行snapshot操作;
meta文件load
// Load will save the current meta data from disk func (c *Client) Load() error { file := filepath.Join(c.path, metaFile) f, err := os.Open(file) if err != nil { if os.IsNotExist(err) { return nil } return err } defer f.Close() data, err := ioutil.ReadAll(f) if err != nil { return err } if err := c.cacheData.UnmarshalBinary(data); err != nil { return err } return nil }
Client.Open()中會執行Load操作,NewServer時會自動載入。
meta文件內容編解碼
源碼路徑: github.com/influxdata/influxdb/services/meta/data.go
meta數據encode:
// MarshalBinary encodes the metadata to a binary format. func (data *Data) MarshalBinary() ([]byte, error) { return proto.Marshal(data.marshal()) }
meta數據decode:
// UnmarshalBinary decodes the object from a binary format. func (data *Data) UnmarshalBinary(buf []byte) error { var pb internal.Data if err := proto.Unmarshal(buf, &pb); err != nil { return err } data.unmarshal(&pb) return nil }
proto路徑 :github.com/gogo/protobuf/proto
meta文件結構定義
源碼路徑: github.com/influxdata/influxdb/services/meta/data.go
meta文件存儲的就是 meta.Data 的數據,結構定義如下:
// Data represents the top level collection of all metadata. type Data struct { Term uint64 // associated raft term Index uint64 // associated raft index ClusterID uint64 Databases []DatabaseInfo Users []UserInfo MaxShardGroupID uint64 MaxShardID uint64 }
Term :暫時不知道乾什麼用的。
Index :從源碼看這個應該是類似版本號的東西,初始化為1,執行commit操作是會增加。如果為1,會立即執行持久化操作(在Open函數中操作)。
ClusterID : 是InfluxDB集群相關內容;
Databases :用於存儲資料庫信息;
Users :用於存儲資料庫用戶信息;
DatabaseInfo 定義 :
// DatabaseInfo represents information about a database in the system. type DatabaseInfo struct { Name string DefaultRetentionPolicy string RetentionPolicies []RetentionPolicyInfo ContinuousQueries []ContinuousQueryInfo }
RetentionPolicyInfo 定義:
// RetentionPolicyInfo represents metadata about a retention policy. type RetentionPolicyInfo struct { Name string ReplicaN int Duration time.Duration ShardGroupDuration time.Duration ShardGroups []ShardGroupInfo Subscriptions []SubscriptionInfo }
ShardGroupInfo 定義:
// ShardGroupInfo represents metadata about a shard group. The DeletedAt field is important // because it makes it clear that a ShardGroup has been marked as deleted, and allow the system // to be sure that a ShardGroup is not simply missing. If the DeletedAt is set, the system can // safely delete any associated shards. type ShardGroupInfo struct { ID uint64 StartTime time.Time EndTime time.Time DeletedAt time.Time Shards []ShardInfo TruncatedAt time.Time }
ShardInfo 定義:
// ShardInfo represents metadata about a shard. type ShardInfo struct { ID uint64 Owners []ShardOwner }
ShardOwner 定義:
// ShardOwner represents a node that owns a shard. type ShardOwner struct { NodeID uint64 }
ShardOwner主要用於集群,其中NodeId用於標識集群的節點ID,在InfluxDB 1.1社區版本中集群已經不支持了,該欄位無效。
SubscriptionInfo 定義:
// SubscriptionInfo hold the subscription information type SubscriptionInfo struct { Name string Mode string Destinations []string }
ContinuousQueryInfo 定義:
// ContinuousQueryInfo represents metadata about a continuous query. type ContinuousQueryInfo struct { Name string Query string }
UserInfo 定義:
// UserInfo represents metadata about a user in the system. type UserInfo struct { Name string Hash string Admin bool Privileges map[string]influxql.Privilege }
其它
meta文件解析示例代碼:
package main import ( "os" "fmt" "io/ioutil" "github.com/influxdata/influxdb/services/meta" ) func Load(metaFile string) error { cacheData:= &meta.Data{ Index: 1, } //file := filepath.Join(c.path, metaFile) f, err := os.Open(metaFile) if err != nil { if os.IsNotExist(err) { return nil } return err } defer f.Close() data, err := ioutil.ReadAll(f) if err != nil { return err } if err := cacheData.UnmarshalBinary(data); err != nil { return err } //fmt.Println(data) //fmt.Println("=======================") fmt.Println("Term :",cacheData.Term) fmt.Println("Index :",cacheData.Index) fmt.Println("Databases :") //fmt.Println(cacheData.Databases) for k,dbInfo := range cacheData.Databases { //fmt.Println(k,dbInfo) fmt.Println("k =",k) fmt.Println(dbInfo.Name,dbInfo.DefaultRetentionPolicy) for _,rPolicy := range dbInfo.RetentionPolicies { //fmt.Println(rPolicy) fmt.Println(rPolicy.Name,rPolicy.ReplicaN,rPolicy.Duration,rPolicy.ShardGroupDuration) fmt.Println("-------------ShardGroups---------------") //fmt.Println(rPolicy.ShardGroups) for shardIdx,shardGroup := range rPolicy.ShardGroups { //fmt.Println(shardGroup) fmt.Println("shardIdx =",shardIdx) fmt.Println("ID :",shardGroup.ID) fmt.Println("StartTime :",shardGroup.StartTime) fmt.Println("EndTime :",shardGroup.EndTime) fmt.Println("DeletedAt :",shardGroup.DeletedAt) //fmt.Println("Shards :",shardGroup.Shards) fmt.Printf("Shards :") for _,shard := range shardGroup.Shards { fmt.Println(shard.ID,shard.Owners) } fmt.Println("TruncatedAt :",shardGroup.TruncatedAt) //fmt.Println(shardGroup.ID,shardGroup.StartTime,shardGroup.EndTime) // DeletedAt,Shards , TruncatedAt } //fmt.Println(rPolicy.Subscriptions) fmt.Println("--------------Subscriptions----------------") for subsIdx,subInfo := range rPolicy.Subscriptions { //fmt.Println(subInfo) fmt.Println("subsIdx =",subsIdx) fmt.Println("Name :",subInfo.Name) fmt.Println("Mode :",subInfo.Mode) fmt.Println("Destinations :",subInfo.Destinations) } } fmt.Println("=======================") } fmt.Println("Users :") fmt.Println(cacheData.Users) fmt.Println(cacheData.MaxShardGroupID) fmt.Println(cacheData.MaxShardID) return nil } func main() { argsWithProg := os.Args if(len(argsWithProg) < 2) { fmt.Println("usage : ",argsWithProg[0]," configFile") return } metaFile := os.Args[1] fmt.Println(argsWithProg) fmt.Println(metaFile) Load(metaFile) }
好,就這些了,希望對你有幫助。
本文github地址:
https://github.com/mike-zhang/mikeBlogEssays/blob/master/2018/20180112_InfluxDB_meta文件解析.rst
歡迎補充