天天看点

Go语言微服务实战之再探服务发现

上一篇文章中我们跑通了自己的第一个微服务的例子,这篇文章我们稍微深入一下,看看micro框架是如何实现服务发现的。我们先用etcd来替换micro默认的基于mdns的注册中心,然后在窥探一下micro源码看看其原理。

1、 用etcd做服务发现

之前的例子中,我们是用micro默认的基于mdns的服务发现组件,测试是够用了,但一个商用的应用显然不能用mdns,我们应该用一个商业级的服务发现组件,有很多选择:zookeeper、consul、etcd等等,这里我们选择用etcd。

etcd是coreOS开发的基于raft的分布式KV数据库,被广泛应用于服务注册和发现。目前越来越流行的kubernetes就是采用etcd在进行数据存储,因为我们后续的例子会使用kubernetes来部署服务,所以正好用和k8s配套的etcd来做服务注册。

1.1、安装etcd

首先安装etcd,在mac下直接:

brew install etcd

非常简单,因为我们目前只是在本机做测试,所以可以不用做任何配置,直接在终端:

etcd

这样etcd就已经运行起来了。

1.2、用etcd替换micro默认服务发现组件

下面我们就来修改之前的示例代码,用etcd来替换micro默认的服务发现组件,以下是修改以后的服务端代码:

func main() {

	// 新创建一个服务,服务名为greeter,服务注册中心会用这个名字来发现服务
	service := micro.NewService(
		micro.Name("com.jupiter.api.greeter"),
		micro.Registry(etcd.NewRegistry()),
	)

	// 初始化
	service.Init()

	// 注册处理器
	pb.RegisterGreeterHandler(service.Server(), new(Greeter))

	// 启动服务运行
	if err := service.Run(); err != nil {
		fmt.Println(err)
	}
}
           

与我们第一个例子中的代码几乎一样,唯一的区别就是在创建服务的时候多了下面这样一个选项:

Go语言微服务实战之再探服务发现

我们之前已经提到过,micro的很多核心组件都是可以通过插件化的方式进行替换的,这里我们就用到了第三方实现的etcd插件,具体可以浏览下面这个网址:

https://github.com/micro/go-plugins

里面已经有各类实现好的插件。

值得注意的是如果我们直接在项目中引入并使用插件,会出现编译失败的问题,这是因为很多插件是基于micro早期的版本实现的,以etcd这个插件的实现为例,github上的代码是这样的:

Go语言微服务实战之再探服务发现

注意红框圈起来的import部分,导入的还是老版本的micro的数据结构,而我们使用的是micro最新的v2版本,所以需要自己来兼容一下,我们再项目中新建registry/etcd目录,在目录下新建一个etcd.go,内容如下:

package etcd

import (
	"github.com/micro/go-micro/v2/config/cmd"
	"github.com/micro/go-micro/v2/registry"
	"github.com/micro/go-micro/v2/registry/etcd"
)

func init() {
	cmd.DefaultRegistries["etcd"] = etcd.NewRegistry
}

func NewRegistry(opts ...registry.Option) registry.Registry {
	return etcd.NewRegistry(opts...)
}
           

注意最大的区别是我们import的是micro v2的数据结构。

现在,让我们直接在终端运行这个服务:

Go语言微服务实战之再探服务发现

从控制台的输出信息可以看到现在微服务的注册中心已经变成了etcd。

我们用etcdctl客户端查看一下此时etcd中的数据:

Go语言微服务实战之再探服务发现

可以看到我们的服务确实已经注册到了etcd中。

我们再来修改一下客户端代码:

func main() {
	// 创建一个服务
	service := micro.NewService(
		micro.Name("greeter.client"),
		micro.Registry(etcd.NewRegistry()))
	// 初始化
	service.Init()
	// 创建一个微服务的客户端
	greeter := pb.NewGreeterService("com.jupiter.api.greeter", service.Client())
	// 调用微服务
	rsp, err := greeter.Hello(context.TODO(), &pb.Request{Name: "Hello Micro"})
	if err != nil {
		fmt.Println(err)
	}

	fmt.Println(rsp.Msg)
}
           

同样,在创建的时候也指定注册中心选项为etcd,其他都一样。

终端运行代码以后同样看到了输出。

2、源代码分析

我们已经成功的将注册中心替换为etcd,现在是时候看看micro的源代码加深一下理解了。

2.1 服务注册过程

我们先来看看服务是如何把自己注册到注册中心去的。我们从示例代码中逐步切入:

// 新创建一个服务,服务名为greeter,服务注册中心会用这个名字来发现服务
	service := micro.NewService(
		micro.Name("com.jupiter.api.greeter"),
		micro.Registry(etcd.NewRegistry()),
	)
           

直接用micro.NewService来创建了一个服务,带了两个选项做为参数,一个是服务名,一个是用到的注册中心Registry,micro.NewService都做了些什么呢?

func newService(opts ...Option) Service {
	service := new(service)
	options := newOptions(opts...)

	// service name
	serviceName := options.Server.Options().Name

	// authFn returns the auth, we pass as a function since auth
	// has not yet been set at this point.
	authFn := func() auth.Auth { return options.Server.Options().Auth }

	// wrap client to inject From-Service header on any calls
	options.Client = wrapper.FromService(serviceName, options.Client)
	options.Client = wrapper.TraceCall(serviceName, trace.DefaultTracer, options.Client)
	options.Client = wrapper.AuthClient(serviceName, options.Server.Options().Id, authFn, options.Client)

	// wrap the server to provide handler stats
	options.Server.Init(
		server.WrapHandler(wrapper.HandlerStats(stats.DefaultStats)),
		server.WrapHandler(wrapper.TraceHandler(trace.DefaultTracer)),
		server.WrapHandler(wrapper.AuthHandler(authFn)),
	)

	// set opts
	service.opts = options

	return service
}
           

可以看到主要就是对传入的选项做了一些处理,没有什么复杂逻辑。值得一提的是上述代码第二行的newOptions这个方法,我们截取一段代码:

func newOptions(opts ...Option) Options {
	opt := Options{
		Auth:      auth.DefaultAuth,
		Broker:    broker.DefaultBroker,
		Cmd:       cmd.DefaultCmd,
		Config:    config.DefaultConfig,
		Client:    client.DefaultClient,
		Server:    server.DefaultServer,
		Store:     store.DefaultStore,
		Registry:  registry.DefaultRegistry,
		Transport: transport.DefaultTransport,
		Context:   context.Background(),
		Signal:    true,
	}
           

可以看到很多配置参数都用默认参数,其中服务端的Server默认参数是server.DefaultServer,这个参数在micro的defaults.go中进行了初始化:

func init() {
	// default client
	client.DefaultClient = gcli.NewClient()
	// default server
	server.DefaultServer = gsrv.NewServer()
	// default store
	store.DefaultStore = memoryStore.NewStore()
	// set default trace
	trace.DefaultTracer = memTrace.NewTracer()
}
           

其中的gcli和gsrv代表的是grpc的实现,因此当你用micro开发微服务的时候,默认是采用了grpc的实现。

目前为止,代码中还没看到注册的影子,我们继续看:

if err := service.Run(); err != nil {
		fmt.Println(err)
	}
           

调用service.Run()方法来启动服务运行,这个方法中又会调用Start这个函数:

if err := s.Start(); err != nil {
		return err
	}
           

Start()的源码很短,我们截取一部分:

if err := s.opts.Server.Start(); err != nil {
		return err
	}
           

可以看到,它调用的是Options.Server的Start(),而根据前面的代码,此处Options.Server实际上就是grpcServer,它的Start()方法里可以看到这样一句调用:

// announce self to the world
	if err := g.Register(); err != nil {
		if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
			logger.Errorf("Server register error: %v", err)
		}
	}
           

从注释就可以看出,这是把自己注册进注册中心,让全世界都知道自己的存在,具体是如何注册的呢,继续跟踪代码:

首先生成一个Node,一个Node你可以理解为一台提供服务的机器,假如我们在3台机器上部署了Greeter服务,那这里就会生成3个Node并写入到etcd中去:

// register service
	node := &registry.Node{
		Id:       config.Name + "-" + config.Id,
		Address:  mnet.HostPort(addr, port),
		Metadata: md,
}
           

经过一些处理后,在生成一个Service:

service := &registry.Service{
		Name:      config.Name,
		Version:   config.Version,
		Nodes:     []*registry.Node{node},
		Endpoints: endpoints,
	}
           

最后注册:

if err := config.Registry.Register(service, rOpts...); err != nil {
		return err
	}
           

可以看到,这里是调用某个具体的Registry的实现,典型的Strategy模式,micro定义好注册中心的接口,具体实现留给开发者自由发挥,micro约定的注册中心接口长下面这个样子:

type Registry interface {
	Init(...Option) error
	Options() Options
	Register(*Service, ...RegisterOption) error
	Deregister(*Service, ...DeregisterOption) error
	GetService(string, ...GetOption) ([]*Service, error)
	ListServices(...ListOption) ([]*Service, error)
	Watch(...WatchOption) (Watcher, error)
	String() string
}
           

我们用的是etcd的实现,那顺便就看看这个插件:

func (e *etcdRegistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
	if len(s.Nodes) == 0 {
		return errors.New("Require at least one node")
	}

	var gerr error

	// register each node individually
	for _, node := range s.Nodes {
		err := e.registerNode(s, node, opts...)
		if err != nil {
			gerr = err
		}
	}

	return gerr
}
           

代码很简单,就是向etcd集群中的每个节点写入服务信息,具体写入etcd的代码就不贴了,有兴趣的同学可以自行查看源码。

2.2 客户端是如何找到服务的

现在服务已经把自己注册进服务中心了,我们再来看看客户端是怎么找这个服务的。看看我们样例代码中下面这一句调用背后发生了什么?

// 调用微服务
	rsp, err := greeter.Hello(context.TODO(), &pb.Request{Name: "Hello Micro"})
           

最终调用的代码:

func (c *greeterService) Hello(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) {
	req := c.c.NewRequest(c.name, "Greeter.Hello", in)
	out := new(Response)
	err := c.c.Call(ctx, req, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}
           

其中c.c.Call是关键,代码中c.c是client.Client类型,如前所述,默认是grpcClient,所以看看它的Call方法的实现就可以了,代码比较多,我们只取关键,在grpcClient.Call方法中有下面这么一句:

next, err := g.next(req, callOpts)
	if err != nil {
		return err
	}
           

这里next方法的作用就是用负载均衡算法选择一个节点:

// get next nodes from the selector
	next, err := g.opts.Selector.Select(service, opts.SelectOptions...)
           

具体是如何选择的呢,代码告诉我们秘密:

// get the service
	// try the cache first
	// if that fails go directly to the registry
	services, err := c.rc.GetService(service)
	if err != nil {
		if err == registry.ErrNotFound {
			return nil, ErrNotFound
		}
		return nil, err
	}
           

这里注释已经写的很清楚了,先查缓存,缓存没有命中就调用Registry.GetService去找,GetService也是之前Registry接口的方法之一,对于etcd,自然就是调用etcd相关的接口列举出etcd集群中记录的服务的地址了。

最后,按照配置的负载均衡策略从这些地址中选择一个:

return sopts.Strategy(services), nil
           

这里的Strategy是可以配置的,常见的round robin、轮询等负载均衡策略可以示情况自行配置。

3、小结

这篇文章,我们尝试用etcd替换了micro默认的基于mdns的服务发现组件,另外通过跟踪源代码,了解了服务注册和客户端发现服务的原理,通过分析源码,可以加深我们对微服务原理的理解。

继续阅读