天天看點

分布式ID生成器及redis,etcd分布式鎖

分布式id生成器

有時我們需要能夠生成類似MySQL自增ID這樣不斷增大,同時又不會重複的id。以支援業務中的高并發場景。比較典型的,電商促銷時,短時間内會有大量的訂單湧入到系統,比如每秒10w+。明星出軌時,會有大量熱情的粉絲發微網誌以表心意,同樣會在短時間内産生大量的消息。

在插入資料庫之前,我們需要給這些消息、訂單先打上一個ID,然後再插入到我們的資料庫。對這個id的要求是希望其中能帶有一些時間資訊,這樣即使我們後端的系統對消息進行了分庫分表,也能夠以時間順序對這些消息進行排序。

Twitter的snowflake算法是這種場景下的一個典型解法。先來看看snowflake是怎麼一回事:

分布式ID生成器及redis,etcd分布式鎖

snowflake中的比特位分布

首先确定我們的數值是64位,int64類型,被劃分為四部分,不含開頭的第一個bit,因為這個bit是符号位。用41位來表示收到請求時的時間戳,機關為毫秒,然後五位來表示資料中心的id,然後再五位來表示機器的執行個體id,最後是12位的循環自增id(到達1111,1111,1111後會歸0)。

這樣的機制可以支援我們在同一台機器上,同一毫秒内産生

2 ^ 12 = 4096

條消息。一秒共409.6萬條消息。從值域上來講完全夠用了。

資料中心加上執行個體id共有10位,可以支援我們每資料中心部署32台機器,所有資料中心共1024台執行個體。

表示

timestamp

的41位,可以支援我們使用69年。當然,我們的時間毫秒計數不會真的從1970年開始記,那樣我們的系統跑到

2039/9/7 23:47:35

就不能用了,是以這裡的

timestamp

實際上隻是相對于某個時間的增量,比如我們的系統上線是2018-08-01,那麼我們可以把這個timestamp當作是從

2018-08-01 00:00:00.000

的偏移量。

worker_id配置設定

timestamp

datacenter_id

worker_id

sequence_id

這四個字段中,

timestamp

sequence_id

是由程式在運作期生成的。但

datacenter_id

worker_id

需要我們在部署階段就能夠擷取得到,并且一旦程式啟動之後,就是不可更改的了(想想,如果可以随意更改,可能被不慎修改,造成最終生成的id有沖突)。

一般不同資料中心的機器,會提供對應的擷取資料中心id的API,是以

datacenter_id

我們可以在部署階段輕松地擷取到。而worker_id是我們邏輯上給機器配置設定的一個id,這個要怎麼辦呢?比較簡單的想法是由能夠提供這種自增id功能的工具來支援,比如MySQL:

mysql> insert into a (ip) values("10.1.2.101");
Query OK, 1 row affected (0.00 sec)

mysql> select last_insert_id();
+------------------+
| last_insert_id() |
+------------------+
|                2 |
+------------------+
1 row in set (0.00 sec)
           

從MySQL中擷取到

worker_id

之後,就把這個

worker_id

直接持久化到本地,以避免每次上線時都需要擷取新的

worker_id

。讓單執行個體的

worker_id

可以始終保持不變。

當然,使用MySQL相當于給我們簡單的id生成服務增加了一個外部依賴。依賴越多,我們的服務的可運維性就越差。

考慮到叢集中即使有單個id生成服務的執行個體挂了,也就是損失一段時間的一部分id,是以我們也可以更簡單暴力一些,把

worker_id

直接寫在worker的配置中,上線時,由部署腳本完成

worker_id

字段替換。

開源執行個體

标準snowflake實作

github.com/bwmarrin/snowflake

是一個相當輕量化的snowflake的Go實作。其文檔對各位使用的定義見圖 6-2所示。

分布式ID生成器及redis,etcd分布式鎖

圖 6-2 snowflake庫

和标準的snowflake完全一緻。使用上比較簡單:

package main

import (
    "fmt"
    "os"

    "github.com/bwmarrin/snowflake"
)

func main() {
    n, err := snowflake.NewNode(1)
    if err != nil {
        println(err)
        os.Exit(1)
    }

    for i := 0; i < 3; i++ {
        id := n.Generate()
        fmt.Println("id", id)
        fmt.Println(
            "node: ", id.Node(),
            "step: ", id.Step(),
            "time: ", id.Time(),
            "\n",
        )
    }
}
           

當然,這個庫也給我們留好了定制的後路,其中預留了一些可定制字段:

// Epoch is set to the twitter snowflake epoch of Nov 04 2010 01:42:54 UTC
    // You may customize this to set a different epoch for your application.
    Epoch int64 = 1288834974657

    // Number of bits to use for Node
    // Remember, you have a total 22 bits to share between Node/Step
    NodeBits uint8 = 10

    // Number of bits to use for Step
    // Remember, you have a total 22 bits to share between Node/Step
    StepBits uint8 = 12
           

Epoch

就是本節開頭講的起始時間,

NodeBits

指的是機器編号的位長,

StepBits

指的是自增序列的位長。

sonyflake

sonyflake是Sony公司的一個開源項目,基本思路和snowflake差不多,不過位配置設定上稍有不同,見圖 6-3:

分布式ID生成器及redis,etcd分布式鎖

圖 6-3 sonyflake

這裡的時間隻用了39個bit,但時間的機關變成了10ms,是以理論上比41位表示的時間還要久(174年)。

Sequence ID

和之前的定義一緻,

Machine ID

其實就是節點id。

sonyflake

與衆不同的地方在于其在啟動階段的配置參數:

func NewSonyflake(st Settings) *Sonyflake
           

Settings

資料結構如下:

type Settings struct {
    StartTime      time.Time
    MachineID      func() (uint16, error)
    CheckMachineID func(uint16) bool
}
           

StartTime

選項和我們之前的

Epoch

差不多,如果不設定的話,預設是從

2014-09-01 00:00:00 +0000 UTC

開始。

MachineID

可以由使用者自定義的函數,如果使用者不定義的話,會預設将本機IP的低16位作為

machine id

CheckMachineID

是由使用者提供的檢查

MachineID

是否沖突的函數。這裡的設計還是比較巧妙的,如果有另外的中心化存儲并支援檢查重複的存儲,那我們就可以按照自己的想法随意定制這個檢查

MachineID

是否沖突的邏輯。如果公司有現成的Redis叢集,那麼我們可以很輕松地用Redis的集合類型來檢查沖突。

redis 127.0.0.1:6379> SADD base64_encoding_of_last16bits MzI0Mgo=
(integer) 1
redis 127.0.0.1:6379> SADD base64_encoding_of_last16bits MzI0Mgo=
(integer) 0
           

使用起來也比較簡單,有一些邏輯簡單的函數就略去實作了:

package main

import (
    "fmt"
    "os"
    "time"

    "github.com/sony/sonyflake"
)

func getMachineID() (uint16, error) {
    var machineID uint16
    var err error
    machineID = readMachineIDFromLocalFile()
    if machineID == 0 {
        machineID, err = generateMachineID()
        if err != nil {
            return 0, err
        }
    }

    return machineID, nil
}

func checkMachineID(machineID uint16) bool {
    saddResult, err := saddMachineIDToRedisSet()
    if err != nil || saddResult == 0 {
        return true
    }

    err := saveMachineIDToLocalFile(machineID)
    if err != nil {
        return true
    }

    return false
}

func main() {
    t, _ := time.Parse("2006-01-02", "2018-01-01")
    settings := sonyflake.Settings{
        StartTime:      t,
        MachineID:      getMachineID,
        CheckMachineID: checkMachineID,
    }

    sf := sonyflake.NewSonyflake(settings)
    id, err := sf.NextID()
    if err != nil {
        fmt.Println(err)
        os.Exit(1)
    }

    fmt.Println(id)
}
           

分布式鎖

在單機程式并發或并行修改全局變量時,需要對修改行為加鎖以創造臨界區。為什麼需要加鎖呢?我們看看在不加鎖的情況下并發計數會發生什麼情況:

package main

import (
    "sync"
)

// 全局變量
var counter int

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
        defer wg.Done()
            counter++
        }()
    }

    wg.Wait()
    println(counter)
}
           

多次運作會得到不同的結果:

❯❯❯ go run local_lock.go
945
❯❯❯ go run local_lock.go
937
❯❯❯ go run local_lock.go
959
           

基于Redis的setnx

在分布式場景下,我們也需要這種“搶占”的邏輯,這時候怎麼辦呢?我們可以使用Redis提供的

setnx

指令:

package main

import (
	"fmt"
	"time"

	"github.com/go-redis/redis"
	"github.com/gofrs/uuid"
)

// 聲明一個全局的rdb變量
var rdb *redis.Client

// 初始化連接配接
func initClient() (err error) {
	rdb = redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "zhy1996", // no password set
		DB:       0,         // use default DB
	})

	_, err = rdb.Ping().Result()
	if err != nil {
		return err
	}
	return nil
}

var unlock_lua = `
if redis.call("get",KEYS[1]) == ARGV[1] then
	return redis.call("del",KEYS[1])
else
	return 0
end
`

func Lock(key, value string, expiration time.Duration) (bool, error) {
	is, err := rdb.SetNX(key, value, expiration).Result()
	if err != nil {
		return false, fmt.Errorf("redis setnx failed")
	}
	return is, nil
}

func UnLock(key, value string) (bool, error) {
	res, err := rdb.Eval(unlock_lua, []string{key}, value).Result()
	if err != nil {
		return false, err
	}
	v, ok := res.(int64)
	if !ok {
		return false, fmt.Errorf("lua script return is not int")
	}
	if v == 0 {
		return false, nil
	}
	return true, nil
}

func main() {
	err := initClient()
	if err != nil {
		fmt.Println(err)
	}

	ul, _ := uuid.NewV4()
	value := ul.String()

	for i := 0; i < 10; i++ {
		is, err := Lock("lock_1", value, time.Second)
		if err != nil {
			fmt.Println(err)
			return
		}
		fmt.Println("是否拿到鎖:", is)
	}

	res, err := UnLock("lock_1", value)
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("解鎖:", res)
}
           

看看運作結果:

是否拿到鎖: true
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
解鎖: true
           

通過代碼和執行結果可以看到,我們遠端調用

setnx

實際上和單機的trylock非常相似,如果擷取鎖失敗,那麼相關的任務邏輯就不應該繼續向前執行。

setnx

很适合在高并發場景下,用來争搶一些“唯一”的資源。比如交易撮合系統中賣家發起訂單,而多個買家會對其進行并發争搶。這種場景我們沒有辦法依賴具體的時間來判斷先後,因為不管是使用者裝置的時間,還是分布式場景下的各台機器的時間,都是沒有辦法在合并後保證正确的時序的。哪怕是我們同一個機房的叢集,不同的機器的系統時間可能也會有細微的差别。

是以,我們需要依賴于這些請求到達Redis節點的順序來做正确的搶鎖操作。如果使用者的網絡環境比較差,那也隻能自求多福了。

基于etcd

etcd是分布式系統中,功能上與ZooKeeper類似的元件,這兩年越來越火了。上面基于ZooKeeper我們實作了分布式阻塞鎖,基于etcd,也可以實作類似的功能:

package main

import (
    "log"

    "github.com/zieckey/etcdsync"
)

func main() {
    m, err := etcdsync.New("/lock", 10, []string{"http://127.0.0.1:2379"})
    if m == nil || err != nil {
        log.Printf("etcdsync.New failed")
        return
    }
    err = m.Lock()
    if err != nil {
        log.Printf("etcdsync.Lock failed")
        return
    }

    log.Printf("etcdsync.Lock OK")
    log.Printf("Get the lock. Do something here.")

    err = m.Unlock()
    if err != nil {
        log.Printf("etcdsync.Unlock failed")
    } else {
        log.Printf("etcdsync.Unlock OK")
    }
}
           
  1. 先檢查

    /lock

    路徑下是否有值,如果有值,說明鎖已經被别人搶了
  2. 如果沒有值,那麼寫入自己的值。寫入成功傳回,說明加鎖成功。寫入時如果節點被其它節點寫入過了,那麼會導緻加鎖失敗,這時候到 3
  3. watch

    /lock

    下的事件,此時陷入阻塞
  4. /lock

    路徑下發生事件時,目前程序被喚醒。檢查發生的事件是否是删除事件(說明鎖被持有者主動unlock),或者過期事件(說明鎖過期失效)。如果是的話,那麼回到 1,走搶鎖流程。