Go语言实现Zookeeper服务发现
package zk
client.go
package zk
import (
"fmt"
"log"
"strings"
"time"
"github.com/samuel/go-zookeeper/zk"
)
// Client is used to contain connection config.
type Client struct {
Servers []string
Timeout time.Duration
}
// Register creates node if not exist, updates node if exist and creates parent if not exist.
func (client *Client) Register(path string, data []byte) error {
conn, _, err := zk.Connect(client.Servers, client.Timeout)
if err != nil {
return err
}
defer conn.Close()
exist, stat, err := conn.Exists(path)
if err != nil {
return err
}
if !exist {
err := client.ensureRoot(path)
if err != nil {
return err
}
_, err = conn.Create(path, data, 0, zk.WorldACL(zk.PermAll))
if err != nil && err != zk.ErrNodeExists {
return err
}
log.Printf("Register node: %s, node is created, state: [%s]\n", path, fmtStat(stat))
return nil
}
nodeData, stat, err := conn.Get(path)
if err != nil {
return err
}
if string(data[:]) == string(nodeData[:]) {
log.Printf("Register node: %s, node exists and data is unchanged, state: [%s]\n", path, fmtStat(stat))
return nil
}
stat, err = conn.Set(path, data, stat.Version)
if err != nil {
return err
}
log.Printf("Register node: %s, node exists and data is changed, state: [%s]\n", path, fmtStat(stat))
return nil
}
// Deregister deletes node if exist.
func (client *Client) Deregister(node string) error {
conn, _, err := zk.Connect(client.Servers, client.Timeout)
if err != nil {
return err
}
defer conn.Close()
exist, stat, err := conn.Exists(node)
if err != nil {
return err
}
if exist {
err = conn.Delete(node, stat.Version)
if err != nil {
return err
}
log.Printf("Deregister node: %s, node is deleted, state: [%s]\n", node, fmtStat(stat))
return nil
}
log.Printf("Deregister node: %s, node doesn't exist\n", node)
return nil
}
// RegisterChildren creates ephemeral node and creates parent if not exist.
func (client *Client) RegisterChildren(path string, data []byte) error {
conn, _, err := zk.Connect(client.Servers, client.Timeout)
if err != nil {
return err
}
defer conn.Close()
exist, stat, err := conn.Exists(path)
if err != nil {
return err
}
if !exist {
err = client.Register(path, []byte{})
if err != nil {
return err
}
}
child, err := conn.CreateProtectedEphemeralSequential(path+"/n", data, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
log.Printf("Register ephemeral node: %s, node is created, state: [%s]\n", child, fmtStat(stat))
return nil
}
// Get gets node data and creates node if not exist.
func (client *Client) Get(path string) ([]byte, error) {
conn, _, err := zk.Connect(client.Servers, client.Timeout)
if err != nil {
return []byte{}, err
}
defer conn.Close()
exist, stat, err := conn.Exists(path)
if err != nil {
return []byte{}, err
}
if !exist {
err = client.Register(path, []byte{})
if err != nil {
return []byte{}, err
}
}
data, stat, err := conn.Get(path)
if err != nil {
return []byte{}, err
}
log.Printf("Get node: %s, state: [%s]\n", path, fmtStat(stat))
return data, nil
}
// GetChildren gets node children data and creates node if not exist.
func (client *Client) GetChildren(path string) ([]string, error) {
conn, _, err := zk.Connect(client.Servers, client.Timeout)
if err != nil {
return []string{}, err
}
defer conn.Close()
exist, _, err := conn.Exists(path)
if err != nil {
return []string{}, err
}
if !exist {
err = client.Register(path, []byte{})
if err != nil {
return []string{}, err
}
}
children, _, err := conn.Children(path)
if err != nil {
return []string{}, err
}
log.Printf("Get children node: %s, children: %s\n", path, children)
return children, nil
}
// WatchNode watches node created and deleted, also watches node data updated.
func (client *Client) WatchNode(path string, createdHandler func(string, []byte) error, dataChangedHandler func(string, []byte) error, deleteHandler func(string) error) error {
conn, _, err := zk.Connect(client.Servers, client.Timeout)
if err != nil {
return err
}
defer conn.Close()
for {
exists, stat, event, err := conn.ExistsW(path)
if err != nil {
return err
}
log.Printf("Watching exists begins, path: %s, exists: %t, state: [%s]\n", path, exists, fmtStat(stat))
evt := <-event
if evt.Err != nil {
return evt.Err
}
log.Printf("Watching exists has received a event, path %s, type: %s\n", path, evt.Type.String())
switch evt.Type {
case zk.EventNodeCreated:
data, err := client.Get(path)
if err != nil {
return err
}
err = createdHandler(path, data)
if err != nil {
return err
}
case zk.EventNodeDataChanged:
data, err := client.Get(path)
if err != nil {
return err
}
err = dataChangedHandler(path, data)
if err != nil {
return err
}
case zk.EventNodeDeleted:
err = deleteHandler(path)
if err != nil {
return err
}
}
}
}
// WatchChildren watches node children modified.
func (client *Client) WatchChildren(path string, childrenChangedHandler func(string, []string) error, dataChangedHandler func(string, []byte) error, deleteHandler func(string) error) error {
conn, _, err := zk.Connect(client.Servers, client.Timeout)
if err != nil {
return err
}
defer conn.Close()
exist, stat, err := conn.Exists(path)
if err != nil {
return err
}
if !exist {
_, err = conn.Create(path, []byte{}, 0, zk.WorldACL(zk.PermAll))
if err != nil && err != zk.ErrNodeExists {
return err
}
log.Printf("Register node: %s, node is created, state: [%s]\n", path, fmtStat(stat))
}
for {
children, stat, event, err := conn.ChildrenW(path)
if err != nil {
return err
}
log.Printf("Watching children begins, path: %s, state: [%s]\n", path, fmtStat(stat))
evt := <-event
if evt.Err != nil {
return evt.Err
}
children, _, err = conn.Children(path)
if err != nil {
return err
}
log.Printf("Watching children has received a event, path: %s, type: %s, children: %s\n", path, evt.Type.String(), children)
switch evt.Type {
case zk.EventNodeChildrenChanged:
err = childrenChangedHandler(path, children)
if err != nil {
return err
}
case zk.EventNodeDataChanged:
data, err := client.Get(path)
if err != nil {
return err
}
err = dataChangedHandler(path, data)
if err != nil {
return err
}
case zk.EventNodeDeleted:
err = deleteHandler(path)
if err != nil {
return err
}
}
}
}
func (client *Client) ensureRoot(path string) error {
rootPath, err := getRoot(path)
if err != nil {
return err
}
if rootPath == "" {
return nil
}
err = client.Register(rootPath, []byte{})
if err != nil {
return err
}
return nil
}
func fmtStat(s *zk.Stat) string {
return fmt.Sprintf("Czxid: %d, Mzxid: %d, Ctime: %d, Mtime: %d, Version: %d, Cversion: %d, Aversion: %d, EphemeralOwner: %d, DataLength: %d, NumChildren: %d, Pzxid: %d",
s.Czxid, s.Mzxid, s.Ctime, s.Mtime, s.Version, s.Cversion, s.Aversion, s.EphemeralOwner, s.DataLength, s.NumChildren, s.Pzxid)
}
func getRoot(path string) (string, error) {
lastIndex := strings.LastIndex(path, "/")
if lastIndex == -1 {
return "", fmt.Errorf("Error path: %s", path)
} else if lastIndex == 0 {
return "", nil
} else {
return path[0:lastIndex], nil
}
}
node.go
package zk
import (
"log"
)
// Node contains zk node related infomation, which has as zk path, host and port
type Node struct {
Path string
Host string
Port int
}
func (node *Node) createdCallback(path string, data []byte) error {
log.Printf("[Callback] Node %s is created", path)
//To do
return nil
}
func (node *Node) dataChangedCallback(path string, data []byte) error {
log.Printf("[Callback] Node %s is updated", path)
// To do
return nil
}
func (node *Node) deletedCallback(path string) error {
log.Printf("[Callback] Node %s is delete", path)
// To do
return nil
}
func (node *Node) childrenChangedHandler(path string, children []string) error {
log.Printf("[Callback] Node children %s is updated", path)
//To do
return nil
}
func (node *Node) getWatchNodes() ([]string, []string) {
nodes := []string{}
childrenNodes := []string{}
// Test code begins
if node.Path == "/services/gateway" {
nodes = []string{"/services/center"}
childrenNodes = []string{"/services/game"}
} else if node.Path == "/services/game" {
nodes = []string{"/services/center"}
}
// Test code ends
return nodes, childrenNodes
}
func (node *Node) ephemeral() bool {
// Test code begins
if node.Path == "/services/gateway" || node.Path == "/services/game" {
return true
}
// Test code ends
return false
}
zk.go
package zk
import (
"fmt"
"log"
"time"
)
func watchNode(client *Client, node *Node, path string) {
go func() {
for {
err := client.WatchNode(path, node.createdCallback, node.dataChangedCallback, node.deletedCallback)
if err != nil {
log.Println(err)
}
time.Sleep(time.Second)
}
}()
}
func watchChildren(client *Client, node *Node, path string) {
go func() {
for {
err := client.WatchChildren(path, node.childrenChangedHandler, node.dataChangedCallback, node.deletedCallback)
if err != nil {
log.Println(err)
}
time.Sleep(time.Second)
}
}()
}
func registerNode(client *Client, node *Node) {
err := client.Register(node.Path, []byte(fmt.Sprintf("%s:%d", node.Host, node.Port)))
if err != nil {
log.Println(err)
}
}
func registerChildren(client *Client, node *Node) {
err := client.RegisterChildren(node.Path, []byte(fmt.Sprintf("%s:%d", node.Host, node.Port)))
if err != nil {
log.Println(err)
}
}
func initNodeForNode(client *Client, node *Node, path string) {
data, err := client.Get(path)
if err != nil {
log.Println(err)
return
}
err = node.createdCallback(path, data)
if err != nil {
log.Println(err)
return
}
}
func initNodeForChildren(client *Client, node *Node, path string) {
children, err := client.GetChildren(path)
if err != nil {
log.Println(err)
return
}
err = node.childrenChangedHandler(path, children)
if err != nil {
log.Println(err)
return
}
}
// Init integrates create and watch functions
func Init(client *Client, node *Node) {
if node.ephemeral() {
registerChildren(client, node)
} else {
registerNode(client, node)
}
nodePaths, childrenNodePaths := node.getWatchNodes()
for _, path := range nodePaths {
initNodeForNode(client, node, path)
watchNode(client, node, path)
}
for _, path := range childrenNodePaths {
initNodeForChildren(client, node, path)
watchChildren(client, node, path)
}
}
package main
main.go
package main
import (
"log"
"os"
"strconv"
"strings"
"time"
"./zk"
)
func main() {
if len(os.Args) < 4 {
return
}
log.Println(os.Args)
path := "/services/" + os.Args[1]
host := os.Args[2]
port, err := strconv.Atoi(os.Args[3])
if err != nil {
log.Fatalln(err)
}
zkHosts := "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"
zk.Init(&zk.Client{Servers: strings.Split(zkHosts, ","), Timeout: time.Second * 10}, &zk.Node{Path: path, Host: host, Port: port})
time.Sleep(time.Second * 100)
}