天天看点

gRPC-10 客户端负载均衡

代码地址

https://github.com/wanmei002/grpc-learn/tree/master/ch09

介绍

我工作中,请求一个接口,会先从去负载均衡器中获取一个ip,然后直接拿着ip去访问接口,在这里呢我们相当于把负载均衡器集成到客户端服务中,客户端发起请求的时候,会获得一个ip列表,根据特定的算法,选取一个IP, 向服务端发起请求

实现逻辑

  1. grpc.Dial 的时候输入我们的服务名
  2. 实现 resolver.go/Builder接口,连接的时候会调用这个接口里的方法 Build Scheme 方法
  3. 在 Build 方法里我们根据服务名,获取服务的ip地址列表
  4. 传入负载均衡算法,实现客户端负载均衡

客户端

grpc.Dial 传参

dl, err := grpc.Dial(fmt.Sprintf("%s:///%s", "order", "Order"),
       grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
       grpc.WithInsecure())
           
  • fmt.Sprintf("%s:///%s", "order", "Order")

    第一个

    order

    是服务解析器的名字,后面要通过这个名字找到对应的 实现

    Builder

    这个接口的服务解析器, 后一个

    Order

    是微服务名 , 这两个参数都是可以自定义的
  • grpc.WithDefaultServiceConfig(fmt.Sprintf(

    {“LoadBalancingPolicy”: “%s”}

    , roundrobin.Name))

    负载均衡算法,我们这里使用的是

    grpc

    自带的轮询算法。

实现服务解析器

// 提供服务的机器
var addrs = []string{":8093", ":8094"}

type loopBuilder struct{}
// 实现 Builder 接口, 这个接口的主要作用是把提供服务的IP列表,装载进拨号列表中
func (lb *loopBuilder) Build(target resolver.Target, cc resolver.ClientConn,
    opts resolver.BuildOptions) (resolver.Resolver, error) {
    log.Println("i am build")
    ll := &loopResolver{
        target:     target,
        cc:         cc,
        addrsStore: map[string][]string{
            LoopServiceName: addrs,
        },
    }
    ll.start()
    return ll, nil
}
// 这个是注册服务解析器的时候提供的注册名
func (*loopBuilder) Scheme() string {
    log.Println("i am scheme")
    return LoopScheme
}
// 实现 resolver.Resolver ,主要用于更新服务端IP列表
type loopResolver struct {
    target  resolver.Target
    cc      resolver.ClientConn
    addrsStore map[string][]string
}
// 更新拨号列表
func (ll *loopResolver) start() {
    addrList := ll.addrsStore[ll.target.Endpoint]
    log.Printf("loop resolver %+v\n", ll)
    addr := make([]resolver.Address, 0)
    for _, s := range addrList {
        addr = append(addr, resolver.Address{Addr: s})
    }
    log.Println("start start")
    
    ll.cc.UpdateState(resolver.State{Addresses: addr})
}

func (*loopResolver) ResolveNow(o resolver.ResolveNowOptions) {
    log.Println(" i am in resolveNew")
}

func (*loopResolver) Close(){
    log.Println(" i am in close ")
}

func init(){
    resolver.Register(&loopBuilder{})
}
           

服务端

注册多个ip提供服务

func main(){
    log.Println("loop register server")
    var wg = &sync.WaitGroup{} 
    wg.Add(len(addrList))
    for _, addr := range addrList {
        go StartServer(addr, wg) // 提供服务
    }
    
    wg.Wait()
    log.Println("over")
}