天天看点

利用etcd实现服务注册和服务发现

文章目录

    • 服务注册
    • 服务发现
    • 协议编写
    • 服务端实现
    • 客户端实现
    • 实验结果
    • 参考文章

服务注册

主要逻辑在go func函数里面,先是去etcd获取一下服务,没有获取到的话就注册进去。

package etcdservice

import (
   "context"
   "fmt"
   "log"
   "strings"
   "time"

   "go.etcd.io/etcd/clientv3"
)

// Register register service with name as prefix to etcd, multi etcd addr should use ; to split
func Register(etcdAddr, name string, addr string, ttl int64) error {
   var err error

   if cli == nil {
      cli, err = clientv3.New(clientv3.Config{
         Endpoints:   strings.Split(etcdAddr, ";"),
         DialTimeout: 15 * time.Second,
      })
      if err != nil {
         fmt.Printf("connect to etcd err:%s", err)
         return err
      }
   }

   ticker := time.NewTicker(time.Second * time.Duration(ttl))

   go func() {
      for {
         getResp, err := cli.Get(context.Background(), "/"+schema+"/"+name+"/"+addr)
         //fmt.Printf("getResp:%+v\n",getResp)
         if err != nil {
            log.Println(err)
            fmt.Printf("Register:%s", err)
         } else if getResp.Count == 0 {
            err = withAlive(name, addr, ttl)
            if err != nil {
               log.Println(err)
               fmt.Printf("keep alive:%s", err)
            }
         } else {
            //fmt.Printf("getResp:%+v, do nothing\n",getResp)
         }

         <-ticker.C
      }
   }()

   return nil
}

func withAlive(name string, addr string, ttl int64) error {
   leaseResp, err := cli.Grant(context.Background(), ttl)
   if err != nil {
      return err
   }

   //fmt.Printf("key:%v\n", "/"+schema+"/"+name+"/"+addr)
   _, err = cli.Put(context.Background(), "/"+schema+"/"+name+"/"+addr, addr, clientv3.WithLease(leaseResp.ID))
   if err != nil {
      fmt.Printf("put etcd error:%s",err)
      return err
   }

   _, err = cli.KeepAlive(context.Background(), leaseResp.ID)
   if err != nil {
      fmt.Printf("keep alive error:%s",err)
      return err
   }
    return nil
}

// UnRegister remove service from etcd
func UnRegister(name string, addr string) {
   if cli != nil {
      cli.Delete(context.Background(), "/"+schema+"/"+name+"/"+addr)
   }
}
           

服务发现

主要实现了grpc 服务发现相关接口,可以参考源码google.golang.org/grpc/resolve 目录下面的DNS相关实现的代码

package etcdservice

import (
    "context"
    "log"
    "strings"
    "time"

    "go.etcd.io/etcd/clientv3"
    "github.com/coreos/etcd/mvcc/mvccpb"
    "google.golang.org/grpc/resolver"
)

const schema = "ns"

var cli *clientv3.Client

type etcdResolver struct {
    rawAddr string
    cc      resolver.ClientConn
}

// NewResolver initialize an etcd client
func NewResolver(etcdAddr string) resolver.Builder {
    return &etcdResolver{rawAddr: etcdAddr}
}

func (r *etcdResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
    var err error
    //构建etcd client
    if cli == nil {
        cli, err = clientv3.New(clientv3.Config{
            Endpoints:   strings.Split(r.rawAddr, ";"),
            DialTimeout: 15 * time.Second,
        })
        if err != nil {
            return nil, err
        }
    }

    r.cc = cc

    go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")

    return r, nil
}

func (r etcdResolver) Scheme() string {
    return schema
}

func (r etcdResolver) ResolveNow(rn resolver.ResolveNowOption) {
    log.Println("ResolveNow") // TODO check
}

// Close closes the resolver.
func (r etcdResolver) Close() {
    log.Println("Close")
}

func (r *etcdResolver) watch(keyPrefix string) {
    var addrList []resolver.Address

    getResp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
    if err != nil {
        log.Println(err)
    } else {
        for i := range getResp.Kvs {
            addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[i].Key), keyPrefix)})
        }
    }

    r.cc.NewAddress(addrList)

    rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
    for n := range rch {
        for _, ev := range n.Events {
            addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
            switch ev.Type {
            case mvccpb.PUT:
                if !exist(addrList, addr) {
                    addrList = append(addrList, resolver.Address{Addr: addr})
                    r.cc.NewAddress(addrList)
                }
            case mvccpb.DELETE:
                if s, ok := remove(addrList, addr); ok {
                    addrList = s
                    r.cc.NewAddress(addrList)
                }
            }
            //log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
        }
    }
}

func exist(l []resolver.Address, addr string) bool {
    for i := range l {
        if l[i].Addr == addr {
            return true
        }
    }
    return false
}

func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
    for i := range s {
        if s[i].Addr == addr {
            s[i] = s[len(s)-1]
            return s[:len(s)-1], true
        }
    }
    return nil, false
}
           

协议编写

要编译proto文件需要做以下准备工作:

// gRPC运行时接口编解码支持库
  go get -u github.com/golang/protobuf/proto
  // 从 Proto文件(gRPC接口描述文件) 生成 go文件 的编译器插件
  go get -u github.com/golang/protobuf/protoc-gen-go
  获取go的gRPC包
  go get google.golang.org/grpc
           

编译方法 protoc --go_out=plugins=grpc:{输出目录} {proto文件}

syntax = "proto3";
package protos;

// The greeting service definition.
service Greeter {
    //   Sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {
    }
}

// The request message containing the user's name.
message HelloRequest {
    string name = 1;
}

// The response message containing the greetings
message HelloReply {
    string message = 1;
}
           

服务端实现

简单的实现的一个服务段代码,可以传入三个参数。

package main

import (
   "flag"
   "fmt"
   "log"
   "net"
   "os"
   "os/signal"
   "syscall"
   "time"

   "github.com/zhanben/go_server/etcdservice"
   pb "github.com/zhanben/go_server/protos"

   "golang.org/x/net/context"
   "google.golang.org/grpc"
)


var host = "127.0.0.1"

var (
   ServiceName = flag.String("ServiceName", "hello_service", "service name")
   Port        = flag.Int("Port", 50001, "listening port")
   EtcdAddr    = flag.String("EtcdAddr", "192.168.226.137:2379", "register etcd address")
)

func main() {
   flag.Parse()

   lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *Port))
   if err != nil {
      log.Fatalf("failed to listen: %s", err)
   } else {
      fmt.Printf("listen at:%d\n", *Port)
   }
   defer lis.Close()

   s := grpc.NewServer()
   defer s.GracefulStop()

   pb.RegisterGreeterServer(s, &server{})
   addr := fmt.Sprintf("%s:%d", host, *Port)
   fmt.Printf("server addr:%s\n",addr)
   go etcdservice.Register(*EtcdAddr, *ServiceName, addr, 5)

   ch := make(chan os.Signal, 1)
   signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
   go func() {
      s := <-ch
      etcdservice.UnRegister(*ServiceName, addr)

      if i, ok := s.(syscall.Signal); ok {
         os.Exit(int(i))
      } else {
         os.Exit(0)
      }

   }()

   if err := s.Serve(lis); err != nil {
      fmt.Printf("failed to serve: %s", err)
      //log.Fatalf("failed to serve: %s", err)
   }
}

// server is used to implement helloworld.GreeterServer.
type server struct{}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
   fmt.Printf("%v: Receive is %s\n", time.Now(), in.Name)
   return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
           

客户端实现

package main

import (
   "flag"
   "fmt"
   "strconv"
   "time"

   "github.com/zhanben/go_client/etcdservice"
   pb "github.com/zhanben/go_client/protos"

   "golang.org/x/net/context"
   "google.golang.org/grpc"
   "google.golang.org/grpc/resolver"
)

var (
   ServiceName = flag.String("ServiceName", "hello_service", "service name")
   EtcdAddr  = flag.String("EtcdAddr", "192.168.226.137:2379", "register etcd address")
)

func main() {
   flag.Parse()
   r := etcdservice.NewResolver(*EtcdAddr)
   resolver.Register(r)
	// "://author/" 为啥加这个还没有搞明白
   conn, err := grpc.Dial( r.Scheme()+"://author/"+ *ServiceName, grpc.WithBalancerName("round_robin"), grpc.WithInsecure())
   if err != nil {
      panic(err)
   }

   ticker := time.NewTicker(1 * time.Second)
   for t := range ticker.C {
      client := pb.NewGreeterClient(conn)
      resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())})
      if err == nil {
         fmt.Printf("%v: Reply is %s\n", t, resp.Message)
      }else{
         fmt.Printf("call server error:%s\n", err)
      }
   }
}
           

实验结果

[email protected] go_server # ./server

listen at:50001

server addr:127.0.0.1:50001

2019-03-31 16:41:49.448467275 +0800 CST m=+6.782288045: Receive is world 49

2019-03-31 16:41:51.447500712 +0800 CST m=+8.781321325: Receive is world 51

2019-03-31 16:41:53.449616447 +0800 CST m=+10.783437240: Receive is world 53

[email protected] go_server # ./server -Port 50002

listen at:50002

server addr:127.0.0.1:50002

2019-03-31 16:41:50.448656042 +0800 CST m=+5.088384441: Receive is world 50

2019-03-31 16:41:52.44772994 +0800 CST m=+7.087458319: Receive is world 52

2019-03-31 16:41:54.448520346 +0800 CST m=+9.088248999: Receive is world 54

[email protected] go_client # ./client

2019-03-31 16:41:49.447097257 +0800 CST m=+1.120809708: Reply is Hello world 49

2019-03-31 16:41:50.447159492 +0800 CST m=+2.120871892: Reply is Hello world 50

2019-03-31 16:41:51.4470927 +0800 CST m=+3.120805012: Reply is Hello world 51

2019-03-31 16:41:52.447090969 +0800 CST m=+4.120803337: Reply is Hello world 52

2019-03-31 16:41:53.448013737 +0800 CST m=+5.121726160: Reply is Hello world 53

2019-03-31 16:41:54.447348688 +0800 CST m=+6.121061198: Reply is Hello world 54

打开两个server端,可以看到client轮流连接到两个server。断开一个server后,会全部切换到其中一个server端。

完整源码下载地址:https://download.csdn.net/download/hjxzb/11077135

参考文章

1、https://blog.csdn.net/kiddyjinjin/article/details/83001776

2、https://blog.csdn.net/qq_21816375/article/details/78159297

3、http://morecrazy.github.io/2018/08/14/grpc-go基于etcd实现服务发现机制/#源码解析