前沿
首先談談我對推薦系統的引擎和算法的了解。
現在市面上講起推薦系統,大多都是講各種算法,講的天花亂墜,高深莫測,其實很多算法都是大同小異,核心思想是差不多的,隻不過實作手段略有差異。而在工業上,各種複雜算法能夠落地的,我認為不多,大部分的廠商,運用的算法都是很集中的那一部分算法。
一套好的推薦系統,對于引擎是非常依賴的,實驗顯示,響應時長與各項名額之間都是有直接關聯的,響應時長越長,名額越低。
作為一個樸實的推薦碼農,我還是想從基礎做起,樸樸實實,腳踏實地,先把引擎部分做好。當然,算法後面也會有,畢竟引擎和算法缺一不可。
那麼,廢話少說,推薦引擎,搞起吧!
服務發現
既然是搞引擎,也就是後端,當然要先把架構先搭建起來。
後端服務,微服務已經成為了目前的主流,具有非常多的優點,比如高内聚,單獨部署,各自負載均衡等,當然缺點也有,通信更複雜了等。具體就不在這裡展開了,有興趣的兄弟們可以百度,google一下。
而微服務之間的通信中,用戶端如何确定服務端的位址,就需要服務發現了。
在整個流程中,可以分為服務端的要做的,以及用戶端要做的,下面依次來看一下。
服務端比較簡單,隻需要将自己的資訊存儲到某個存儲中。
用戶端呢,首先要從存儲中拿到服務端資訊清單,然後根據一些負載均衡的原則,選擇一個位址,最終來調用。
是不是原理上非常簡單!那麼進入實操吧!
etcd介紹
以前有zookeeper,而zookeeper可以看到,早就不再維護更新了。
而etcd,用go語言開發,因kubernetes而聞名,在kubernetes中,使用etcd作為分布式存儲擷取分布式鎖。
是以我們當然用更年輕,更輕量,并且也非常穩定的etcd搞了!就是這麼喜新厭舊= =
etcd使用raft算法實作的一緻性,至于raft算法,可以看下面這個動畫示範,很完美生動。
raft動畫示範
etcd實戰
我這邊用docker來做自己的測試環境,上我的docker-compose.yaml
version: '2.2'
services:
etcd:
image: gcr.io/etcd-development/etcd:v3.4.13
container_name: etcd
restart: always
ports:
- 2379:2379
- 2380:2380
command:
- "/usr/local/bin/etcd"
- "--name"
- "s1"
- "--data-dir"
- "/etcd-data"
- "--advertise-client-urls"
- "http://0.0.0.0:2379"
- "--listen-client-urls"
- "http://0.0.0.0:2379"
- "--initial-advertise-peer-urls"
- "http://0.0.0.0:2380"
- "--listen-peer-urls"
- "http://0.0.0.0:2380"
- "--initial-cluster-token"
- "tkn"
- "--initial-cluster"
- "s1=http://0.0.0.0:2380"
- "--initial-cluster-state"
- "new"
如果想通過其他途徑安裝可以看官方的說明:
安裝etcd
那麼,既然是存儲,我們就來測試一下CRUD吧,還有etcd的租約功能。
CRUD:
# etcdctl put test/key hello
OK
# etcdctl get test/key
test/key
hello
# etcdctl put test/key goodbye
OK
# etcdctl get test/key
test/key
goodbye
# etcdctl del test/key
1
# etcdctl get test/key
租約:
建立租約,120s過期
# etcdctl lease grant 120
lease 3f3575c45fa5ff26 granted with TTL(120s)
檢視租約清單
# etcdctl lease list
found 1 leases
3f3575c45fa5ff26
建立kv,并綁定租約
# etcdctl put test/key hello --lease="3f3575c45fa5ff26"
OK
檢視租約下的key剩餘時間
# etcdctl lease timetolive 3f3575c45fa5ff26 --keys
lease 3f3575c45fa5ff26 granted with TTL(120s), remaining(46s), attached keys([test/key])
檢視還存在的key
# etcdctl get --prefix ""
test/key
hello
等租約過期後,檢視key,key已被自動删除
# etcdctl lease timetolive 3f3575c45fa5ff26 --keys
lease 3f3575c45fa5ff26 already expired
# etcdctl get --prefix ""
租約續約:
同樣建立租約,綁定kv
# etcdctl lease grant 30
lease 3f3575c45fa5ff2c granted with TTL(30s)
# etcdctl put test/key hello --lease="3f3575c45fa5ff2c"
OK
續約
# etcdctl lease keep-alive 3f3575c45fa5ff2c
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
打開個新視窗檢視租約與key
# etcdctl lease timetolive 3f3575c45fa5ff2c --keys
lease 3f3575c45fa5ff2c granted with TTL(30s), remaining(23s), attached keys([test/key])
# etcdctl get --prefix ""
test/key
hello
發現并沒有過期。
golang+grpc+etcd 服務發現終極實戰!
先上github倉庫:github倉庫
代碼目錄/go_server/src/lib/discovery/
說一下整個流程:
服務端向etcd注冊服務,就是将本服務的資訊寫進etcd。
用戶端大體流程:
- 從etcd取服務端位址清單,并watch清單變化,并更新。
- 把位址清單寫進grpc resolver的resolver.ClientConn的位址清單中。
- grpc建立連接配接,根據負載均衡請求。
整個子產品分為7個檔案:
- config.go,配置檔案。
- discovery.go,用于初始化。
- register.go,用于服務注冊。
- resolver.go,用于解析etcd裡注冊的服務位址,以及grpc負載均衡。
- util.go,公共方法。
- wrapper.go,對外部提供的調用封裝。
- ctx.go,context,設定逾時時間。
config.go
package config
import "time"
// etcd
const (
Timeout = 15 * time.Second
Expires = 10
TickerInterval = 5
// scheme
Scheme = "etcd"
// etcd中存儲key的格式字首:/scheme/authority/endpoint
DirFormat = "/%s/%s/%s/"
// grpc resolver中自定義解析需要提供的格式:scheme://authority/endpoint
// 其中scheme可以了解為解析政策,authority可以了解為權限管理,endpoint為位址
TargetFormat = "%s://%s/%s"
)
// server name
const (
GreetServer = "greet_server"
)
discovery.go
package discovery
import (
"fmt"
"go_server/src/lib/discovery/config"
"go_server/src/lib/logger"
"strings"
"go.etcd.io/etcd/clientv3"
)
var (
client *clientv3.Client
)
// Init 初始化etcd
func Init(etcdAddr string) error {
var err error
if client == nil {
//建構etcd client
client, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(etcdAddr, ";"),
DialTimeout: config.Timeout,
})
if err != nil {
logger.Error("連接配接etcd失敗:%s\n", err)
fmt.Printf("連接配接etcd失敗:%s\n", err)
return err
}
}
return nil
}
register.go
package discovery
import (
"context"
"errors"
"fmt"
"go_server/src/lib/discovery/config"
"os"
"os/signal"
"syscall"
"time"
"go.etcd.io/etcd/clientv3"
)
//Service 服務端用于服務注冊的對象
type Service struct {
Name string //服務名稱
Host string //{ip}:{port}
Env string //所屬環境
Key string //儲存在etcd中的key
}
var service *Service
func (s *Service) register() error {
if s.Env == "" {
return errors.New("env is null")
}
s.Key = fmt.Sprintf(config.DirFormat, config.Scheme, s.Env, s.Name) + s.Host
ticker := time.NewTicker(time.Second * time.Duration(config.TickerInterval))
go func() {
for {
resp, err := client.Get(context.Background(), s.Key)
if err != nil {
fmt.Printf("擷取服務位址失敗:%s", err)
} else if resp.Count == 0 { //尚未注冊
err = s.keepAlive()
if err != nil {
fmt.Printf("保持連接配接失敗:%s", err)
}
}
<-ticker.C
}
}()
return nil
}
// keepAlive 建立租約,綁定,并續期
func (s *Service) keepAlive() error {
//建立租約
leaseResp, err := client.Grant(context.Background(), config.Expires)
if err != nil {
fmt.Printf("建立租期失敗:%s\n", err)
return err
}
//将服務位址注冊到etcd中
_, err = client.Put(context.Background(), s.Key, s.Host, clientv3.WithLease(leaseResp.ID))
if err != nil {
fmt.Printf("注冊服務失敗:%s", err)
return err
}
//租約續期
ch, err := client.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
fmt.Printf("租約續期失敗:%s\n", err)
return err
}
//清空keepAlive傳回的channel
go func() {
for {
<-ch
}
}()
return nil
}
//取消注冊
func (s *Service) unRegister() {
if client != nil {
_, _ = client.Delete(context.Background(), s.Key)
}
}
func WaitForClose() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
sig := <-ch
service.unRegister()
if i, ok := sig.(syscall.Signal); ok {
os.Exit(int(i))
} else {
os.Exit(0)
}
}
resolver.go
package discovery
import (
"context"
"fmt"
"go_server/src/lib/discovery/config"
"strings"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
)
//EtcdResolver解析器
type EtcdResolver struct {
dir string
clientConn resolver.ClientConn
}
func Resolver(env string, name string) *grpc.ClientConn {
//注冊etcd解析器
r := &EtcdResolver{}
resolver.Register(r)
target := fmt.Sprintf(config.TargetFormat, r.Scheme(), env, name)
//用戶端連接配接伺服器(負載均衡:輪詢) 會同步調用r.Build()
dailOpts := []grpc.DialOption{
grpc.WithBalancerName("round_robin"), // grpc内部提供的輪詢負載均衡
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(1024 * 1024 * 16),
),
}
conn, err := grpc.Dial(target, dailOpts...)
if err != nil {
fmt.Println("連接配接伺服器失敗:", err)
}
return conn
}
func (r *EtcdResolver) Scheme() string {
return config.Scheme
}
//建構解析器 grpc.Dial()同步調用
func (r *EtcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
r.clientConn = clientConn
r.dir = fmt.Sprintf(config.DirFormat, target.Scheme, target.Authority, target.Endpoint)
go r.watch()
return r, nil
}
//監聽etcd中某個key字首的服務位址清單的變化
func (r *EtcdResolver) watch() {
//初始化服務位址清單
var addrList []resolver.Address
resp, err := client.Get(context.Background(), r.dir, clientv3.WithPrefix())
if err != nil {
fmt.Println("擷取服務位址清單失敗:", err)
} else {
for i := range resp.Kvs {
fmt.Println(strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir))
addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir)})
}
}
r.clientConn.NewAddress(addrList)
//監聽服務位址清單的變化
rch := client.Watch(context.Background(), r.dir, clientv3.WithPrefix())
for n := range rch {
for _, ev := range n.Events {
addr := strings.TrimPrefix(string(ev.Kv.Key), r.dir)
switch ev.Type {
case clientv3.EventTypePut:
if !exists(addrList, addr) {
addrList = append(addrList, resolver.Address{Addr: addr})
r.clientConn.NewAddress(addrList)
}
case clientv3.EventTypeDelete:
if s, ok := remove(addrList, addr); ok {
addrList = s
r.clientConn.NewAddress(addrList)
}
}
}
}
}
func exists(l []resolver.Address, addr string) bool {
for i := range l {
if l[i].Addr == addr {
return true
}
}
return false
}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
for i := range s {
if s[i].Addr == addr {
s[i] = s[len(s)-1]
return s[:len(s)-1], true
}
}
return nil, false
}
//Close ...
func (r *EtcdResolver) Close() {}
//ResolveNow ...
func (r *EtcdResolver) ResolveNow(_ resolver.ResolveNowOption) {}
util.go
package discovery
import (
"fmt"
"net"
)
// 擷取本機ip位址
func getIntranetIP() (ip string) {
if addrs, err := net.InterfaceAddrs(); err == nil {
for _, address := range addrs {
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
ip = ipnet.IP.String()
break
}
}
}
}
return
}
// 自動擷取本機的ip以及端口号,ip:port格式
func getListener() (listener net.Listener, host string, err error) {
host = "0.0.0.0:0"
listener, err = net.Listen("tcp", host)
if err == nil {
addr := listener.Addr().String()
_, portString, _ := net.SplitHostPort(addr)
host = fmt.Sprintf("%s:%s", getIntranetIP(), portString)
}
return
}
wrapper.go
package discovery
import (
"fmt"
"go_server/src/lib/discovery/config"
"go_server/src/lib/proto/greet"
"google.golang.org/grpc"
)
func GreetRegister(env string, server greet.GreetServer) error {
listener, host, err := getListener()
if err != nil {
fmt.Println("監聽網絡失敗:", err)
return err
}
fmt.Println("host:", host)
srv := grpc.NewServer()
go srv.Serve(listener)
greet.RegisterGreetServer(srv, server)
service = &Service{Name: config.GreetServer, Host: host, Env: env}
err = service.register()
if err != nil {
fmt.Println(err)
return err
}
return nil
}
func GreetResolve(env string) greet.GreetClient {
return greet.NewGreetClient(Resolver(env, config.GreetServer))
}
ctx.go
package discovery
import (
"context"
"time"
)
// 1s逾時
func Context1s() (ctx context.Context, cancel context.CancelFunc) {
return context.WithTimeout(context.TODO(), time.Second)
}
測試一下吧,測試檔案也都在github倉庫裡:
搞個測試的proto,server和client,也直接上代碼:
greet.proto
syntax = "proto3";
option go_package = "src/lib/proto/greet";
service Greet {
rpc Morning(GreetRequest)returns(GreetResponse){}
rpc Night(GreetRequest)returns(GreetResponse){}
}
message GreetRequest {
string name = 1;
}
message GreetResponse {
string message = 1;
string from = 2;
}
server main.go
package main
import (
"context"
"flag"
"fmt"
"go_server/src/lib/discovery"
proto "go_server/src/lib/proto/greet"
)
var (
Flag = flag.String("flag", "a", "flag")
EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address")
Env = flag.String("Env", "test", "env")
)
//rpc服務接口
type GreetServer struct{}
func (gs *GreetServer) Morning(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
fmt.Printf("Morning 調用: %s\n", req.Name)
return &proto.GreetResponse{
Message: "Good morning, " + req.Name,
From: *Flag,
}, nil
}
func (gs *GreetServer) Night(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
fmt.Printf("Night 調用: %s\n", req.Name)
return &proto.GreetResponse{
Message: "Good night, " + req.Name,
From: *Flag,
}, nil
}
func main() {
flag.Parse()
err := discovery.Init(*EtcdAddr)
if err != nil {
fmt.Println(err)
return
}
err = discovery.GreetRegister(*Env, &GreetServer{})
if err != nil {
fmt.Println(err)
return
}
discovery.WaitForClose()
}
client main.go
package main
import (
"flag"
"fmt"
"go_server/src/lib/discovery"
proto "go_server/src/lib/proto/greet"
"time"
)
var (
EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address")
Env = flag.String("Env", "test", "env")
)
func main() {
flag.Parse()
err := discovery.Init(*EtcdAddr)
if err != nil {
fmt.Println(err)
return
}
c := discovery.GreetResolve(*Env)
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
fmt.Println("Morning 調用...")
ctx, cancel := discovery.Context1s()
resp1, err := c.Morning(
ctx,
&proto.GreetRequest{Name: "Jinfeng"},
)
cancel()
if err != nil {
fmt.Println("Morning調用失敗:", err)
return
}
fmt.Printf("Morning 響應:%s,來自:%s\n", resp1.Message, resp1.From)
fmt.Println("Night 調用...")
ctx, cancel = discovery.Context1s()
resp2, err := c.Night(
ctx,
&proto.GreetRequest{Name: "Jinfeng"},
)
cancel()
if err != nil {
fmt.Println("Night調用失敗:", err)
return
}
fmt.Printf("Night 響應:%s,來自:%s\n", resp2.Message, resp2.From)
}
}
跑起來吧,起3個server,可以看到,在etcd已經注冊了3台服務。
# etcdctl get --prefix ""
/etcd/test/greet_server/192.168.31.71:52963
192.168.31.71:52963
/etcd/test/greet_server/192.168.31.71:52969
192.168.31.71:52969
/etcd/test/greet_server/192.168.31.71:52973
192.168.31.71:52973
client調用
➜ client git:(main) ✗ go run .
192.168.31.71:52963
192.168.31.71:52969
192.168.31.71:52973
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:c
Night 調用...
Night 響應:Good night, Jinfeng,來自:a
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:b
Night 調用...
Night 響應:Good night, Jinfeng,來自:c
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:a
Night 調用...
Night 響應:Good night, Jinfeng,來自:b
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:c
shutdown一台服務
Morning 響應:Good morning, Jinfeng,來自:a
Night 調用...
Night 響應:Good night, Jinfeng,來自:b
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:a
Night 調用...
Night 響應:Good night, Jinfeng,來自:b
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:a
Night 調用...
Night 響應:Good night, Jinfeng,來自:b
重新啟動
Morning 響應:Good morning, Jinfeng,來自:a
Night 調用...
Night 響應:Good night, Jinfeng,來自:b
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:c
Night 調用...
Night 響應:Good night, Jinfeng,來自:a
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:b
Night 調用...
Night 響應:Good night, Jinfeng,來自:c
Morning 調用...
Morning 響應:Good morning, Jinfeng,來自:a
Night 調用...
Night 響應:Good night, Jinfeng,來自:b
這一輪,隻是用grpc内部簡單的輪訓來做負載均衡,後面有空了,再加入一緻性哈希等方法吧!
到現在,服務發現已經有了,下面就可以先做一個簡單的推薦系統,把流程跑起來了!
後面計劃先做一個隻有簡單召回的推薦系統,然後再慢慢優化整套系統。
兄弟們,奧利給!
原文連結