天天看点

Go语言实现Zookeeper服务发现(Zookeeper & Service Discovery & Go)package zkpackage main

Go语言实现Zookeeper服务发现

  • package zk
    • client.go
    • node.go
    • zk.go
  • package main
    • main.go

package zk

client.go

package zk

import (
	"fmt"
	"log"
	"strings"
	"time"

	"github.com/samuel/go-zookeeper/zk"
)

// Client is used to contain connection config.
type Client struct {
	Servers []string
	Timeout time.Duration
}

// Register creates node if not exist, updates node if exist and creates parent if not exist.
func (client *Client) Register(path string, data []byte) error {
	conn, _, err := zk.Connect(client.Servers, client.Timeout)
	if err != nil {
		return err
	}
	defer conn.Close()

	exist, stat, err := conn.Exists(path)
	if err != nil {
		return err
	}

	if !exist {
		err := client.ensureRoot(path)
		if err != nil {
			return err
		}

		_, err = conn.Create(path, data, 0, zk.WorldACL(zk.PermAll))
		if err != nil && err != zk.ErrNodeExists {
			return err
		}

		log.Printf("Register node: %s, node is created, state: [%s]\n", path, fmtStat(stat))
		return nil
	}

	nodeData, stat, err := conn.Get(path)
	if err != nil {
		return err
	}

	if string(data[:]) == string(nodeData[:]) {
		log.Printf("Register node: %s, node exists and data is unchanged, state: [%s]\n", path, fmtStat(stat))
		return nil
	}

	stat, err = conn.Set(path, data, stat.Version)
	if err != nil {
		return err
	}

	log.Printf("Register node: %s, node exists and data is changed, state: [%s]\n", path, fmtStat(stat))

	return nil
}

// Deregister deletes node if exist.
func (client *Client) Deregister(node string) error {
	conn, _, err := zk.Connect(client.Servers, client.Timeout)
	if err != nil {
		return err
	}
	defer conn.Close()

	exist, stat, err := conn.Exists(node)
	if err != nil {
		return err
	}

	if exist {
		err = conn.Delete(node, stat.Version)
		if err != nil {
			return err
		}

		log.Printf("Deregister node: %s, node is deleted, state: [%s]\n", node, fmtStat(stat))
		return nil
	}

	log.Printf("Deregister node: %s, node doesn't exist\n", node)
	return nil
}

// RegisterChildren creates ephemeral node and creates parent if not exist.
func (client *Client) RegisterChildren(path string, data []byte) error {
	conn, _, err := zk.Connect(client.Servers, client.Timeout)
	if err != nil {
		return err
	}
	defer conn.Close()

	exist, stat, err := conn.Exists(path)
	if err != nil {
		return err
	}

	if !exist {
		err = client.Register(path, []byte{})
		if err != nil {
			return err
		}
	}

	child, err := conn.CreateProtectedEphemeralSequential(path+"/n", data, zk.WorldACL(zk.PermAll))
	if err != nil {
		return err
	}
	log.Printf("Register ephemeral node: %s, node is created, state: [%s]\n", child, fmtStat(stat))

	return nil
}

// Get gets node data and creates node if not exist.
func (client *Client) Get(path string) ([]byte, error) {
	conn, _, err := zk.Connect(client.Servers, client.Timeout)
	if err != nil {
		return []byte{}, err
	}
	defer conn.Close()

	exist, stat, err := conn.Exists(path)
	if err != nil {
		return []byte{}, err
	}

	if !exist {
		err = client.Register(path, []byte{})
		if err != nil {
			return []byte{}, err
		}
	}

	data, stat, err := conn.Get(path)
	if err != nil {
		return []byte{}, err
	}
	log.Printf("Get node: %s, state: [%s]\n", path, fmtStat(stat))

	return data, nil
}

// GetChildren gets node children data and creates node if not exist.
func (client *Client) GetChildren(path string) ([]string, error) {
	conn, _, err := zk.Connect(client.Servers, client.Timeout)
	if err != nil {
		return []string{}, err
	}
	defer conn.Close()

	exist, _, err := conn.Exists(path)
	if err != nil {
		return []string{}, err
	}

	if !exist {
		err = client.Register(path, []byte{})
		if err != nil {
			return []string{}, err
		}
	}

	children, _, err := conn.Children(path)
	if err != nil {
		return []string{}, err
	}

	log.Printf("Get children node: %s, children: %s\n", path, children)

	return children, nil
}

// WatchNode watches node created and deleted, also watches node data updated.
func (client *Client) WatchNode(path string, createdHandler func(string, []byte) error, dataChangedHandler func(string, []byte) error, deleteHandler func(string) error) error {
	conn, _, err := zk.Connect(client.Servers, client.Timeout)
	if err != nil {
		return err
	}
	defer conn.Close()

	for {
		exists, stat, event, err := conn.ExistsW(path)
		if err != nil {
			return err
		}
		log.Printf("Watching exists begins, path: %s, exists: %t, state: [%s]\n", path, exists, fmtStat(stat))

		evt := <-event
		if evt.Err != nil {
			return evt.Err
		}
		log.Printf("Watching exists has received a event, path %s, type: %s\n", path, evt.Type.String())

		switch evt.Type {
		case zk.EventNodeCreated:
			data, err := client.Get(path)
			if err != nil {
				return err
			}
			err = createdHandler(path, data)
			if err != nil {
				return err
			}
		case zk.EventNodeDataChanged:
			data, err := client.Get(path)
			if err != nil {
				return err
			}
			err = dataChangedHandler(path, data)
			if err != nil {
				return err
			}
		case zk.EventNodeDeleted:
			err = deleteHandler(path)
			if err != nil {
				return err
			}
		}
	}
}

// WatchChildren watches node children modified.
func (client *Client) WatchChildren(path string, childrenChangedHandler func(string, []string) error, dataChangedHandler func(string, []byte) error, deleteHandler func(string) error) error {
	conn, _, err := zk.Connect(client.Servers, client.Timeout)
	if err != nil {
		return err
	}
	defer conn.Close()

	exist, stat, err := conn.Exists(path)
	if err != nil {
		return err
	}

	if !exist {
		_, err = conn.Create(path, []byte{}, 0, zk.WorldACL(zk.PermAll))
		if err != nil && err != zk.ErrNodeExists {
			return err
		}
		log.Printf("Register node: %s, node is created, state: [%s]\n", path, fmtStat(stat))
	}

	for {
		children, stat, event, err := conn.ChildrenW(path)
		if err != nil {
			return err
		}
		log.Printf("Watching children begins, path: %s, state: [%s]\n", path, fmtStat(stat))

		evt := <-event
		if evt.Err != nil {
			return evt.Err
		}

		children, _, err = conn.Children(path)
		if err != nil {
			return err
		}

		log.Printf("Watching children has received a event, path: %s, type: %s, children: %s\n", path, evt.Type.String(), children)

		switch evt.Type {
		case zk.EventNodeChildrenChanged:
			err = childrenChangedHandler(path, children)
			if err != nil {
				return err
			}
		case zk.EventNodeDataChanged:
			data, err := client.Get(path)
			if err != nil {
				return err
			}
			err = dataChangedHandler(path, data)
			if err != nil {
				return err
			}
		case zk.EventNodeDeleted:
			err = deleteHandler(path)
			if err != nil {
				return err
			}
		}
	}
}

func (client *Client) ensureRoot(path string) error {
	rootPath, err := getRoot(path)
	if err != nil {
		return err
	}

	if rootPath == "" {
		return nil
	}

	err = client.Register(rootPath, []byte{})
	if err != nil {
		return err
	}

	return nil
}

func fmtStat(s *zk.Stat) string {
	return fmt.Sprintf("Czxid: %d, Mzxid: %d, Ctime: %d, Mtime: %d, Version: %d, Cversion: %d, Aversion: %d, EphemeralOwner: %d, DataLength: %d, NumChildren: %d, Pzxid: %d",
		s.Czxid, s.Mzxid, s.Ctime, s.Mtime, s.Version, s.Cversion, s.Aversion, s.EphemeralOwner, s.DataLength, s.NumChildren, s.Pzxid)
}

func getRoot(path string) (string, error) {
	lastIndex := strings.LastIndex(path, "/")
	if lastIndex == -1 {
		return "", fmt.Errorf("Error path: %s", path)
	} else if lastIndex == 0 {
		return "", nil
	} else {
		return path[0:lastIndex], nil
	}
}

           

node.go

package zk

import (
	"log"
)

// Node contains zk node related infomation, which has as zk path, host and port
type Node struct {
	Path string
	Host string
	Port int
}

func (node *Node) createdCallback(path string, data []byte) error {
	log.Printf("[Callback] Node %s is created", path)
	//To do
	return nil
}

func (node *Node) dataChangedCallback(path string, data []byte) error {
	log.Printf("[Callback] Node %s is updated", path)
	// To do
	return nil
}

func (node *Node) deletedCallback(path string) error {
	log.Printf("[Callback] Node %s is delete", path)
	// To do
	return nil
}

func (node *Node) childrenChangedHandler(path string, children []string) error {
	log.Printf("[Callback] Node children %s is updated", path)
	//To do
	return nil
}

func (node *Node) getWatchNodes() ([]string, []string) {
	nodes := []string{}
	childrenNodes := []string{}
	// Test code begins
	if node.Path == "/services/gateway" {
		nodes = []string{"/services/center"}
		childrenNodes = []string{"/services/game"}
	} else if node.Path == "/services/game" {
		nodes = []string{"/services/center"}
	}
	// Test code ends
	return nodes, childrenNodes
}

func (node *Node) ephemeral() bool {
	// Test code begins
	if node.Path == "/services/gateway" || node.Path == "/services/game" {
		return true
	}
	// Test code ends
	return false
}

           

zk.go

package zk

import (
	"fmt"
	"log"
	"time"
)

func watchNode(client *Client, node *Node, path string) {
	go func() {
		for {
			err := client.WatchNode(path, node.createdCallback, node.dataChangedCallback, node.deletedCallback)
			if err != nil {
				log.Println(err)
			}
			time.Sleep(time.Second)
		}
	}()
}

func watchChildren(client *Client, node *Node, path string) {
	go func() {
		for {
			err := client.WatchChildren(path, node.childrenChangedHandler, node.dataChangedCallback, node.deletedCallback)
			if err != nil {
				log.Println(err)
			}
			time.Sleep(time.Second)
		}
	}()
}

func registerNode(client *Client, node *Node) {
	err := client.Register(node.Path, []byte(fmt.Sprintf("%s:%d", node.Host, node.Port)))
	if err != nil {
		log.Println(err)
	}
}

func registerChildren(client *Client, node *Node) {
	err := client.RegisterChildren(node.Path, []byte(fmt.Sprintf("%s:%d", node.Host, node.Port)))
	if err != nil {
		log.Println(err)
	}
}

func initNodeForNode(client *Client, node *Node, path string) {
	data, err := client.Get(path)
	if err != nil {
		log.Println(err)
		return
	}
	err = node.createdCallback(path, data)
	if err != nil {
		log.Println(err)
		return
	}
}

func initNodeForChildren(client *Client, node *Node, path string) {
	children, err := client.GetChildren(path)
	if err != nil {
		log.Println(err)
		return
	}
	err = node.childrenChangedHandler(path, children)
	if err != nil {
		log.Println(err)
		return
	}
}

// Init integrates create and watch functions
func Init(client *Client, node *Node) {
	if node.ephemeral() {
		registerChildren(client, node)
	} else {
		registerNode(client, node)
	}

	nodePaths, childrenNodePaths := node.getWatchNodes()
	for _, path := range nodePaths {
		initNodeForNode(client, node, path)
		watchNode(client, node, path)
	}
	for _, path := range childrenNodePaths {
		initNodeForChildren(client, node, path)
		watchChildren(client, node, path)
	}
}

           

package main

main.go

package main

import (
	"log"
	"os"
	"strconv"
	"strings"
	"time"

	"./zk"
)

func main() {
	if len(os.Args) < 4 {
		return
	}

	log.Println(os.Args)

	path := "/services/" + os.Args[1]
	host := os.Args[2]
	port, err := strconv.Atoi(os.Args[3])
	if err != nil {
		log.Fatalln(err)
	}

	zkHosts := "192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181"

	zk.Init(&zk.Client{Servers: strings.Split(zkHosts, ","), Timeout: time.Second * 10}, &zk.Node{Path: path, Host: host, Port: port})
	time.Sleep(time.Second * 100)
}