天天看點

「推薦系統從0到1」服務發現

「推薦系統從0到1」服務發現

前沿

首先談談我對推薦系統的引擎和算法的了解。

現在市面上講起推薦系統,大多都是講各種算法,講的天花亂墜,高深莫測,其實很多算法都是大同小異,核心思想是差不多的,隻不過實作手段略有差異。而在工業上,各種複雜算法能夠落地的,我認為不多,大部分的廠商,運用的算法都是很集中的那一部分算法。

一套好的推薦系統,對于引擎是非常依賴的,實驗顯示,響應時長與各項名額之間都是有直接關聯的,響應時長越長,名額越低。

作為一個樸實的推薦碼農,我還是想從基礎做起,樸樸實實,腳踏實地,先把引擎部分做好。當然,算法後面也會有,畢竟引擎和算法缺一不可。

那麼,廢話少說,推薦引擎,搞起吧!

服務發現

既然是搞引擎,也就是後端,當然要先把架構先搭建起來。

後端服務,微服務已經成為了目前的主流,具有非常多的優點,比如高内聚,單獨部署,各自負載均衡等,當然缺點也有,通信更複雜了等。具體就不在這裡展開了,有興趣的兄弟們可以百度,google一下。

而微服務之間的通信中,用戶端如何确定服務端的位址,就需要服務發現了。

在整個流程中,可以分為服務端的要做的,以及用戶端要做的,下面依次來看一下。

服務端比較簡單,隻需要将自己的資訊存儲到某個存儲中。

用戶端呢,首先要從存儲中拿到服務端資訊清單,然後根據一些負載均衡的原則,選擇一個位址,最終來調用。

是不是原理上非常簡單!那麼進入實操吧!

etcd介紹

以前有zookeeper,而zookeeper可以看到,早就不再維護更新了。

而etcd,用go語言開發,因kubernetes而聞名,在kubernetes中,使用etcd作為分布式存儲擷取分布式鎖。

是以我們當然用更年輕,更輕量,并且也非常穩定的etcd搞了!就是這麼喜新厭舊= =

etcd使用raft算法實作的一緻性,至于raft算法,可以看下面這個動畫示範,很完美生動。

raft動畫示範

etcd實戰

我這邊用docker來做自己的測試環境,上我的docker-compose.yaml

version: '2.2'
services:
  etcd:
    image: gcr.io/etcd-development/etcd:v3.4.13
    container_name: etcd
    restart: always
    ports:
      - 2379:2379
      - 2380:2380
    command:
      - "/usr/local/bin/etcd"
      - "--name"
      - "s1"
      - "--data-dir"
      - "/etcd-data"
      - "--advertise-client-urls"
      - "http://0.0.0.0:2379"
      - "--listen-client-urls"
      - "http://0.0.0.0:2379"
      - "--initial-advertise-peer-urls"
      - "http://0.0.0.0:2380"
      - "--listen-peer-urls"
      - "http://0.0.0.0:2380"
      - "--initial-cluster-token"
      - "tkn"
      - "--initial-cluster"
      - "s1=http://0.0.0.0:2380"
      - "--initial-cluster-state"
      - "new"
           

如果想通過其他途徑安裝可以看官方的說明:

安裝etcd

那麼,既然是存儲,我們就來測試一下CRUD吧,還有etcd的租約功能。

CRUD:

# etcdctl put test/key hello
OK
# etcdctl get test/key
test/key
hello
# etcdctl put test/key goodbye
OK
# etcdctl get test/key
test/key
goodbye
# etcdctl del test/key
1
# etcdctl get test/key
           

租約:

建立租約,120s過期

# etcdctl lease grant 120
lease 3f3575c45fa5ff26 granted with TTL(120s)
           

檢視租約清單

# etcdctl lease list
found 1 leases
3f3575c45fa5ff26
           

建立kv,并綁定租約

# etcdctl put test/key hello --lease="3f3575c45fa5ff26"
OK
           

檢視租約下的key剩餘時間

# etcdctl lease timetolive 3f3575c45fa5ff26 --keys
lease 3f3575c45fa5ff26 granted with TTL(120s), remaining(46s), attached keys([test/key])
           

檢視還存在的key

# etcdctl get --prefix ""
test/key
hello
           

等租約過期後,檢視key,key已被自動删除

# etcdctl lease timetolive 3f3575c45fa5ff26 --keys
lease 3f3575c45fa5ff26 already expired
# etcdctl get --prefix ""
           

租約續約:

同樣建立租約,綁定kv

# etcdctl lease grant 30
lease 3f3575c45fa5ff2c granted with TTL(30s)
# etcdctl put test/key hello --lease="3f3575c45fa5ff2c"
OK
           

續約

# etcdctl lease keep-alive 3f3575c45fa5ff2c
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
           

打開個新視窗檢視租約與key

# etcdctl lease timetolive 3f3575c45fa5ff2c --keys
lease 3f3575c45fa5ff2c granted with TTL(30s), remaining(23s), attached keys([test/key])
# etcdctl get --prefix ""
test/key
hello
           

發現并沒有過期。

golang+grpc+etcd 服務發現終極實戰!

先上github倉庫:github倉庫

代碼目錄/go_server/src/lib/discovery/

說一下整個流程:

服務端向etcd注冊服務,就是将本服務的資訊寫進etcd。

用戶端大體流程:

  1. 從etcd取服務端位址清單,并watch清單變化,并更新。
  2. 把位址清單寫進grpc resolver的resolver.ClientConn的位址清單中。
  3. grpc建立連接配接,根據負載均衡請求。

整個子產品分為7個檔案:

  • config.go,配置檔案。
  • discovery.go,用于初始化。
  • register.go,用于服務注冊。
  • resolver.go,用于解析etcd裡注冊的服務位址,以及grpc負載均衡。
  • util.go,公共方法。
  • wrapper.go,對外部提供的調用封裝。
  • ctx.go,context,設定逾時時間。

config.go

package config

import "time"

// etcd
const (
	Timeout        = 15 * time.Second
	Expires        = 10
	TickerInterval = 5
	// scheme
	Scheme = "etcd"
	// etcd中存儲key的格式字首:/scheme/authority/endpoint
	DirFormat = "/%s/%s/%s/"
	// grpc resolver中自定義解析需要提供的格式:scheme://authority/endpoint
	// 其中scheme可以了解為解析政策,authority可以了解為權限管理,endpoint為位址
	TargetFormat = "%s://%s/%s"
)

// server name
const (
	GreetServer = "greet_server"
)
           

discovery.go

package discovery

import (
	"fmt"
	"go_server/src/lib/discovery/config"
	"go_server/src/lib/logger"
	"strings"

	"go.etcd.io/etcd/clientv3"
)

var (
	client *clientv3.Client
)

// Init 初始化etcd
func Init(etcdAddr string) error {
	var err error
	if client == nil {
		//建構etcd client
		client, err = clientv3.New(clientv3.Config{
			Endpoints:   strings.Split(etcdAddr, ";"),
			DialTimeout: config.Timeout,
		})
		if err != nil {
			logger.Error("連接配接etcd失敗:%s\n", err)
			fmt.Printf("連接配接etcd失敗:%s\n", err)
			return err
		}
	}
	return nil
}
           

register.go

package discovery

import (
	"context"
	"errors"
	"fmt"
	"go_server/src/lib/discovery/config"
	"os"
	"os/signal"
	"syscall"
	"time"

	"go.etcd.io/etcd/clientv3"
)

//Service 服務端用于服務注冊的對象
type Service struct {
	Name string //服務名稱
	Host string //{ip}:{port}
	Env  string //所屬環境

	Key string //儲存在etcd中的key
}

var service *Service

func (s *Service) register() error {
	if s.Env == "" {
		return errors.New("env is null")
	}
	s.Key = fmt.Sprintf(config.DirFormat, config.Scheme, s.Env, s.Name) + s.Host
	ticker := time.NewTicker(time.Second * time.Duration(config.TickerInterval))
	go func() {
		for {
			resp, err := client.Get(context.Background(), s.Key)
			if err != nil {
				fmt.Printf("擷取服務位址失敗:%s", err)
			} else if resp.Count == 0 { //尚未注冊
				err = s.keepAlive()
				if err != nil {
					fmt.Printf("保持連接配接失敗:%s", err)
				}
			}
			<-ticker.C
		}
	}()
	return nil
}

// keepAlive 建立租約,綁定,并續期
func (s *Service) keepAlive() error {
	//建立租約
	leaseResp, err := client.Grant(context.Background(), config.Expires)
	if err != nil {
		fmt.Printf("建立租期失敗:%s\n", err)
		return err
	}

	//将服務位址注冊到etcd中
	_, err = client.Put(context.Background(), s.Key, s.Host, clientv3.WithLease(leaseResp.ID))
	if err != nil {
		fmt.Printf("注冊服務失敗:%s", err)
		return err
	}

	//租約續期
	ch, err := client.KeepAlive(context.Background(), leaseResp.ID)
	if err != nil {
		fmt.Printf("租約續期失敗:%s\n", err)
		return err
	}

	//清空keepAlive傳回的channel
	go func() {
		for {
			<-ch
		}
	}()
	return nil
}

//取消注冊
func (s *Service) unRegister() {
	if client != nil {
		_, _ = client.Delete(context.Background(), s.Key)
	}
}

func WaitForClose() {
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
	sig := <-ch
	service.unRegister()
	if i, ok := sig.(syscall.Signal); ok {
		os.Exit(int(i))
	} else {
		os.Exit(0)
	}
}
           

resolver.go

package discovery

import (
	"context"
	"fmt"
	"go_server/src/lib/discovery/config"
	"strings"

	"go.etcd.io/etcd/clientv3"
	"google.golang.org/grpc"
	"google.golang.org/grpc/resolver"
)

//EtcdResolver解析器
type EtcdResolver struct {
	dir        string
	clientConn resolver.ClientConn
}

func Resolver(env string, name string) *grpc.ClientConn {
	//注冊etcd解析器
	r := &EtcdResolver{}
	resolver.Register(r)
	target := fmt.Sprintf(config.TargetFormat, r.Scheme(), env, name)
	//用戶端連接配接伺服器(負載均衡:輪詢) 會同步調用r.Build()
	dailOpts := []grpc.DialOption{
		grpc.WithBalancerName("round_robin"), // grpc内部提供的輪詢負載均衡
		grpc.WithInsecure(),
		grpc.WithDefaultCallOptions(
			grpc.MaxCallRecvMsgSize(1024 * 1024 * 16),
		),
	}
	conn, err := grpc.Dial(target, dailOpts...)
	if err != nil {
		fmt.Println("連接配接伺服器失敗:", err)
	}
	return conn
}

func (r *EtcdResolver) Scheme() string {
	return config.Scheme
}

//建構解析器 grpc.Dial()同步調用
func (r *EtcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
	r.clientConn = clientConn
	r.dir = fmt.Sprintf(config.DirFormat, target.Scheme, target.Authority, target.Endpoint)
	go r.watch()
	return r, nil
}

//監聽etcd中某個key字首的服務位址清單的變化
func (r *EtcdResolver) watch() {
	//初始化服務位址清單
	var addrList []resolver.Address

	resp, err := client.Get(context.Background(), r.dir, clientv3.WithPrefix())
	if err != nil {
		fmt.Println("擷取服務位址清單失敗:", err)
	} else {
		for i := range resp.Kvs {
			fmt.Println(strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir))
			addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir)})
		}
	}

	r.clientConn.NewAddress(addrList)

	//監聽服務位址清單的變化
	rch := client.Watch(context.Background(), r.dir, clientv3.WithPrefix())
	for n := range rch {
		for _, ev := range n.Events {
			addr := strings.TrimPrefix(string(ev.Kv.Key), r.dir)
			switch ev.Type {
			case clientv3.EventTypePut:
				if !exists(addrList, addr) {
					addrList = append(addrList, resolver.Address{Addr: addr})
					r.clientConn.NewAddress(addrList)
				}
			case clientv3.EventTypeDelete:
				if s, ok := remove(addrList, addr); ok {
					addrList = s
					r.clientConn.NewAddress(addrList)
				}
			}
		}
	}
}

func exists(l []resolver.Address, addr string) bool {
	for i := range l {
		if l[i].Addr == addr {
			return true
		}
	}
	return false
}

func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
	for i := range s {
		if s[i].Addr == addr {
			s[i] = s[len(s)-1]
			return s[:len(s)-1], true
		}
	}
	return nil, false
}

//Close ...
func (r *EtcdResolver) Close() {}

//ResolveNow ...
func (r *EtcdResolver) ResolveNow(_ resolver.ResolveNowOption) {}
           

util.go

package discovery

import (
	"fmt"
	"net"
)

// 擷取本機ip位址
func getIntranetIP() (ip string) {
	if addrs, err := net.InterfaceAddrs(); err == nil {
		for _, address := range addrs {
			if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
				if ipnet.IP.To4() != nil {
					ip = ipnet.IP.String()
					break
				}
			}
		}
	}
	return
}

// 自動擷取本機的ip以及端口号,ip:port格式
func getListener() (listener net.Listener, host string, err error) {
	host = "0.0.0.0:0"
	listener, err = net.Listen("tcp", host)
	if err == nil {
		addr := listener.Addr().String()
		_, portString, _ := net.SplitHostPort(addr)
		host = fmt.Sprintf("%s:%s", getIntranetIP(), portString)
	}
	return
}
           

wrapper.go

package discovery

import (
	"fmt"
	"go_server/src/lib/discovery/config"
	"go_server/src/lib/proto/greet"

	"google.golang.org/grpc"
)

func GreetRegister(env string, server greet.GreetServer) error {
	listener, host, err := getListener()
	if err != nil {
		fmt.Println("監聽網絡失敗:", err)
		return err
	}
	fmt.Println("host:", host)
	srv := grpc.NewServer()
	go srv.Serve(listener)
	greet.RegisterGreetServer(srv, server)
	service = &Service{Name: config.GreetServer, Host: host, Env: env}
	err = service.register()
	if err != nil {
		fmt.Println(err)
		return err
	}
	return nil
}

func GreetResolve(env string) greet.GreetClient {
	return greet.NewGreetClient(Resolver(env, config.GreetServer))
}
           

ctx.go

package discovery

import (
	"context"
	"time"
)

// 1s逾時
func Context1s() (ctx context.Context, cancel context.CancelFunc) {
	return context.WithTimeout(context.TODO(), time.Second)
}
           

測試一下吧,測試檔案也都在github倉庫裡:

搞個測試的proto,server和client,也直接上代碼:

greet.proto

syntax = "proto3";


option go_package = "src/lib/proto/greet";

service Greet {
  rpc Morning(GreetRequest)returns(GreetResponse){}
  rpc Night(GreetRequest)returns(GreetResponse){}
}

message GreetRequest {
  string name = 1;
}

message GreetResponse {
  string message = 1;
  string from = 2;
}
           

server main.go

package main

import (
	"context"
	"flag"
	"fmt"
	"go_server/src/lib/discovery"
	proto "go_server/src/lib/proto/greet"
)

var (
	Flag     = flag.String("flag", "a", "flag")
	EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address")
	Env      = flag.String("Env", "test", "env")
)

//rpc服務接口
type GreetServer struct{}

func (gs *GreetServer) Morning(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
	fmt.Printf("Morning 調用: %s\n", req.Name)
	return &proto.GreetResponse{
		Message: "Good morning, " + req.Name,
		From:    *Flag,
	}, nil
}

func (gs *GreetServer) Night(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
	fmt.Printf("Night 調用: %s\n", req.Name)
	return &proto.GreetResponse{
		Message: "Good night, " + req.Name,
		From:    *Flag,
	}, nil
}

func main() {
	flag.Parse()
	err := discovery.Init(*EtcdAddr)
	if err != nil {
		fmt.Println(err)
		return
	}
	err = discovery.GreetRegister(*Env, &GreetServer{})
	if err != nil {
		fmt.Println(err)
		return
	}
	discovery.WaitForClose()
}
           

client main.go

package main

import (
	"flag"
	"fmt"
	"go_server/src/lib/discovery"
	proto "go_server/src/lib/proto/greet"
	"time"
)

var (
	EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address")
	Env      = flag.String("Env", "test", "env")
)

func main() {
	flag.Parse()
	err := discovery.Init(*EtcdAddr)
	if err != nil {
		fmt.Println(err)
		return
	}
	c := discovery.GreetResolve(*Env)
	ticker := time.NewTicker(1 * time.Second)
	for range ticker.C {
		fmt.Println("Morning 調用...")
		ctx, cancel := discovery.Context1s()
		resp1, err := c.Morning(
			ctx,
			&proto.GreetRequest{Name: "Jinfeng"},
		)
		cancel()
		if err != nil {
			fmt.Println("Morning調用失敗:", err)
			return
		}
		fmt.Printf("Morning 響應:%s,來自:%s\n", resp1.Message, resp1.From)

		fmt.Println("Night 調用...")
		ctx, cancel = discovery.Context1s()
		resp2, err := c.Night(
			ctx,
			&proto.GreetRequest{Name: "Jinfeng"},
		)
		cancel()
		if err != nil {
			fmt.Println("Night調用失敗:", err)
			return
		}
		fmt.Printf("Night 響應:%s,來自:%s\n", resp2.Message, resp2.From)
	}
}
           

跑起來吧,起3個server,可以看到,在etcd已經注冊了3台服務。

# etcdctl get --prefix ""
/etcd/test/greet_server/192.168.31.71:52963
192.168.31.71:52963
/etcd/test/greet_server/192.168.31.71:52969
192.168.31.71:52969
/etcd/test/greet_server/192.168.31.71:52973
192.168.31.71:52973
           

client調用

➜  client git:(main) ✗ go run .
192.168.31.71:52963
192.168.31.71:52969
192.168.31.71:52973
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:c
Night 調用...
Night 響應:Good night, Jinfeng,來自:a
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:b
Night 調用...
Night 響應:Good night, Jinfeng,來自:c
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:a
Night 調用...
Night 響應:Good night, Jinfeng,來自:b
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:c
           

shutdown一台服務

Morning 響應:Good morning, Jinfeng,來自:a
Night 調用...
Night 響應:Good night, Jinfeng,來自:b
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:a
Night 調用...
Night 響應:Good night, Jinfeng,來自:b
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:a
Night 調用...
Night 響應:Good night, Jinfeng,來自:b
           

重新啟動

Morning 響應:Good morning, Jinfeng,來自:a
Night 調用...
Night 響應:Good night, Jinfeng,來自:b
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:c
Night 調用...
Night 響應:Good night, Jinfeng,來自:a
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:b
Night 調用...
Night 響應:Good night, Jinfeng,來自:c
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:a
Night 調用...
Night 響應:Good night, Jinfeng,來自:b
           

這一輪,隻是用grpc内部簡單的輪訓來做負載均衡,後面有空了,再加入一緻性哈希等方法吧!

到現在,服務發現已經有了,下面就可以先做一個簡單的推薦系統,把流程跑起來了!

後面計劃先做一個隻有簡單召回的推薦系統,然後再慢慢優化整套系統。

兄弟們,奧利給!

原文連結

「推薦系統從0到1」服務發現

繼續閱讀