最近打算寫個簡單的配置中心,考慮到實現便捷性,語言選擇了go,由於其中計劃用到zk,就調研了下golang的zk客戶端,並實現了個簡單的分散式server。 ...
golang的zk客戶端
最近打算寫個簡單的配置中心,考慮到實現便捷性,語言選擇了go,由於其中計劃用到zk,就調研了下golang的zk客戶端,並實現了個簡單的分散式server。最終找到了兩個,地址如下:
- gozk:https://wiki.ubuntu.com/gozk
- go-zookeeper:https://github.com/samuel/go-zookeeper
由於gozk的文檔不如後者,且代碼沒在gihub上,所以就直接選擇了後者。go-zookeeper文檔還是比較全面的:文檔
基本操作測試
這裡預設大家已經瞭解zk的用處和基本用法了,如果還不瞭解可以參看:官方文檔或中文文檔
下邊我們先來寫個簡單的例子來測試下基本的操作:
package main
/**
客戶端doc地址:github.com/samuel/go-zookeeper/zk
**/
import (
"fmt"
zk "github.com/samuel/go-zookeeper/zk"
"time"
)
/**
* 獲取一個zk連接
* @return {[type]}
*/
func getConnect(zkList []string) (conn *zk.Conn) {
conn, _, err := zk.Connect(zkList, 10*time.Second)
if err != nil {
fmt.Println(err)
}
return
}
/**
* 測試連接
* @return
*/
func test1() {
zkList := []string{"localhost:2183"}
conn := getConnect(zkList)
defer conn.Close()
conn.Create("/go_servers", nil, 0, zk.WorldACL(zk.PermAll))
time.Sleep(20 * time.Second)
}
/**
* 測試臨時節點
* @return {[type]}
*/
func test2() {
zkList := []string{"localhost:2183"}
conn := getConnect(zkList)
defer conn.Close()
conn.Create("/testadaadsasdsaw", nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
time.Sleep(20 * time.Second)
}
/**
* 獲取所有節點
*/
func test3() {
zkList := []string{"localhost:2183"}
conn := getConnect(zkList)
defer conn.Close()
children, _, err := conn.Children("/go_servers")
if err != nil {
fmt.Println(err)
}
fmt.Printf("%v \n", children)
}
func main() {
test3()
}
上邊的代碼裡邊我們測試了golang對zk的連接、創建節點、或取節點操作,在下邊的分散式server中將用到這幾個操作。
簡單的分散式server
目前分散式系統已經很流行了,一些開源框架也被廣泛應用,如dubbo、Motan等。對於一個分散式服務,最基本的一項功能就是服務的註冊和發現,而利用zk的EPHEMERAL節點則可以很方便的實現該功能。EPHEMERAL節點正如其名,是臨時性的,其生命周期是和客戶端會話綁定的,當會話連接斷開時,節點也會被刪除。下邊我們就來實現一個簡單的分散式server:
server:
- 服務啟動時,創建zk連接,併在go_servers節點下創建一個新節點,節點名為"ip:port",完成服務註冊
- 服務結束時,由於連接斷開,創建的節點會被刪除,這樣client就不會連到該節點
client:
- 先從zk獲取go_servers節點下所有子節點,這樣就拿到了所有註冊的server
- 從server列表中選中一個節點(這裡只是隨機選取,實際服務一般會提供多種策略),創建連接進行通信
這裡為了演示,我們每次client連接server,獲取server發送的時間後就斷開。主要代碼如下:
zkutil:用來處理zk操作
func GetConnect() (conn *zk.Conn, err error) {
conn, _, err = zk.Connect(hosts, timeOut*time.Second)
if err != nil {
fmt.Println(err)
}
return
}
func RegistServer(conn *zk.Conn, host string) (err error) {
_, err = conn.Create("/go_servers/"+host, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
return
}
func GetServerList(conn *zk.Conn) (list []string, err error) {
list, _, err = conn.Children("/go_servers")
return
}
server: server端操作
func main() {
go starServer("127.0.0.1:8897")
go starServer("127.0.0.1:8898")
go starServer("127.0.0.1:8899")
a := make(chan bool, 1)
<-a
}
func starServer(port string) {
tcpAddr, err := net.ResolveTCPAddr("tcp4", port)
fmt.Println(tcpAddr)
checkError(err)
listener, err := net.ListenTCP("tcp", tcpAddr)
checkError(err)
//註冊zk節點q
conn, err := example.GetConnect()
if err != nil {
fmt.Printf(" connect zk error: %s ", err)
}
defer conn.Close()
err = example.RegistServer(conn, port)
if err != nil {
fmt.Printf(" regist node error: %s ", err)
}
for {
conn, err := listener.Accept()
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %s", err)
continue
}
go handleCient(conn, port)
}
fmt.Println("aaaaaa")
}
func handleCient(conn net.Conn, port string) {
defer conn.Close()
daytime := time.Now().String()
conn.Write([]byte(port + ": " + daytime))
}
client: 客戶端操作
func main() {
for i := 0; i < 100; i++ {
startClient()
time.Sleep(1 * time.Second)
}
}
func startClient() {
// service := "127.0.0.1:8899"
//獲取地址
serverHost, err := getServerHost()
if err != nil {
fmt.Printf("get server host fail: %s \n", err)
return
}
fmt.Println("connect host: " + serverHost)
tcpAddr, err := net.ResolveTCPAddr("tcp4", serverHost)
checkError(err)
conn, err := net.DialTCP("tcp", nil, tcpAddr)
checkError(err)
defer conn.Close()
_, err = conn.Write([]byte("timestamp"))
checkError(err)
result, err := ioutil.ReadAll(conn)
checkError(err)
fmt.Println(string(result))
return
}
func getServerHost() (host string, err error) {
conn, err := example.GetConnect()
if err != nil {
fmt.Printf(" connect zk error: %s \n ", err)
return
}
defer conn.Close()
serverList, err := example.GetServerList(conn)
if err != nil {
fmt.Printf(" get server list error: %s \n", err)
return
}
count := len(serverList)
if count == 0 {
err = errors.New("server list is empty \n")
return
}
//隨機選中一個返回
r := rand.New(rand.NewSource(time.Now().UnixNano()))
host = serverList[r.Intn(3)]
return
}
我們先啟動server,可以看到有三個節點註冊到zk:
再啟動client,可以看到每次client都會隨機連接到一個節點進行通信:
至此,我們的分散式server就實現了,夠簡單吧。當然,實際項目中這樣的實現是遠遠不夠的,有興趣的可以研究下一些開源的框架。
相關代碼:github