分布式id生成器
有時我們需要能夠生成類似MySQL自增ID這樣不斷增大,同時又不會重複的id。以支援業務中的高并發場景。比較典型的,電商促銷時,短時間内會有大量的訂單湧入到系統,比如每秒10w+。明星出軌時,會有大量熱情的粉絲發微網誌以表心意,同樣會在短時間内産生大量的消息。
在插入資料庫之前,我們需要給這些消息、訂單先打上一個ID,然後再插入到我們的資料庫。對這個id的要求是希望其中能帶有一些時間資訊,這樣即使我們後端的系統對消息進行了分庫分表,也能夠以時間順序對這些消息進行排序。
Twitter的snowflake算法是這種場景下的一個典型解法。先來看看snowflake是怎麼一回事:

snowflake中的比特位分布
首先确定我們的數值是64位,int64類型,被劃分為四部分,不含開頭的第一個bit,因為這個bit是符号位。用41位來表示收到請求時的時間戳,機關為毫秒,然後五位來表示資料中心的id,然後再五位來表示機器的執行個體id,最後是12位的循環自增id(到達1111,1111,1111後會歸0)。
這樣的機制可以支援我們在同一台機器上,同一毫秒内産生
2 ^ 12 = 4096
條消息。一秒共409.6萬條消息。從值域上來講完全夠用了。
資料中心加上執行個體id共有10位,可以支援我們每資料中心部署32台機器,所有資料中心共1024台執行個體。
表示
timestamp
的41位,可以支援我們使用69年。當然,我們的時間毫秒計數不會真的從1970年開始記,那樣我們的系統跑到
2039/9/7 23:47:35
就不能用了,是以這裡的
timestamp
實際上隻是相對于某個時間的增量,比如我們的系統上線是2018-08-01,那麼我們可以把這個timestamp當作是從
2018-08-01 00:00:00.000
的偏移量。
worker_id配置設定
timestamp
,
datacenter_id
worker_id
和
sequence_id
這四個字段中,
timestamp
sequence_id
是由程式在運作期生成的。但
datacenter_id
worker_id
需要我們在部署階段就能夠擷取得到,并且一旦程式啟動之後,就是不可更改的了(想想,如果可以随意更改,可能被不慎修改,造成最終生成的id有沖突)。
一般不同資料中心的機器,會提供對應的擷取資料中心id的API,是以
datacenter_id
我們可以在部署階段輕松地擷取到。而worker_id是我們邏輯上給機器配置設定的一個id,這個要怎麼辦呢?比較簡單的想法是由能夠提供這種自增id功能的工具來支援,比如MySQL:
mysql> insert into a (ip) values("10.1.2.101");
Query OK, 1 row affected (0.00 sec)
mysql> select last_insert_id();
+------------------+
| last_insert_id() |
+------------------+
| 2 |
+------------------+
1 row in set (0.00 sec)
從MySQL中擷取到
worker_id
之後,就把這個
worker_id
直接持久化到本地,以避免每次上線時都需要擷取新的
worker_id
。讓單執行個體的
worker_id
可以始終保持不變。
當然,使用MySQL相當于給我們簡單的id生成服務增加了一個外部依賴。依賴越多,我們的服務的可運維性就越差。
考慮到叢集中即使有單個id生成服務的執行個體挂了,也就是損失一段時間的一部分id,是以我們也可以更簡單暴力一些,把
worker_id
直接寫在worker的配置中,上線時,由部署腳本完成
worker_id
字段替換。
開源執行個體
标準snowflake實作
github.com/bwmarrin/snowflake
是一個相當輕量化的snowflake的Go實作。其文檔對各位使用的定義見圖 6-2所示。
圖 6-2 snowflake庫
和标準的snowflake完全一緻。使用上比較簡單:
package main
import (
"fmt"
"os"
"github.com/bwmarrin/snowflake"
)
func main() {
n, err := snowflake.NewNode(1)
if err != nil {
println(err)
os.Exit(1)
}
for i := 0; i < 3; i++ {
id := n.Generate()
fmt.Println("id", id)
fmt.Println(
"node: ", id.Node(),
"step: ", id.Step(),
"time: ", id.Time(),
"\n",
)
}
}
當然,這個庫也給我們留好了定制的後路,其中預留了一些可定制字段:
// Epoch is set to the twitter snowflake epoch of Nov 04 2010 01:42:54 UTC
// You may customize this to set a different epoch for your application.
Epoch int64 = 1288834974657
// Number of bits to use for Node
// Remember, you have a total 22 bits to share between Node/Step
NodeBits uint8 = 10
// Number of bits to use for Step
// Remember, you have a total 22 bits to share between Node/Step
StepBits uint8 = 12
Epoch
就是本節開頭講的起始時間,
NodeBits
指的是機器編号的位長,
StepBits
指的是自增序列的位長。
sonyflake
sonyflake是Sony公司的一個開源項目,基本思路和snowflake差不多,不過位配置設定上稍有不同,見圖 6-3:
圖 6-3 sonyflake
這裡的時間隻用了39個bit,但時間的機關變成了10ms,是以理論上比41位表示的時間還要久(174年)。
Sequence ID
和之前的定義一緻,
Machine ID
其實就是節點id。
sonyflake
與衆不同的地方在于其在啟動階段的配置參數:
func NewSonyflake(st Settings) *Sonyflake
Settings
資料結構如下:
type Settings struct {
StartTime time.Time
MachineID func() (uint16, error)
CheckMachineID func(uint16) bool
}
StartTime
選項和我們之前的
Epoch
差不多,如果不設定的話,預設是從
2014-09-01 00:00:00 +0000 UTC
開始。
MachineID
可以由使用者自定義的函數,如果使用者不定義的話,會預設将本機IP的低16位作為
machine id
。
CheckMachineID
是由使用者提供的檢查
MachineID
是否沖突的函數。這裡的設計還是比較巧妙的,如果有另外的中心化存儲并支援檢查重複的存儲,那我們就可以按照自己的想法随意定制這個檢查
MachineID
是否沖突的邏輯。如果公司有現成的Redis叢集,那麼我們可以很輕松地用Redis的集合類型來檢查沖突。
redis 127.0.0.1:6379> SADD base64_encoding_of_last16bits MzI0Mgo=
(integer) 1
redis 127.0.0.1:6379> SADD base64_encoding_of_last16bits MzI0Mgo=
(integer) 0
使用起來也比較簡單,有一些邏輯簡單的函數就略去實作了:
package main
import (
"fmt"
"os"
"time"
"github.com/sony/sonyflake"
)
func getMachineID() (uint16, error) {
var machineID uint16
var err error
machineID = readMachineIDFromLocalFile()
if machineID == 0 {
machineID, err = generateMachineID()
if err != nil {
return 0, err
}
}
return machineID, nil
}
func checkMachineID(machineID uint16) bool {
saddResult, err := saddMachineIDToRedisSet()
if err != nil || saddResult == 0 {
return true
}
err := saveMachineIDToLocalFile(machineID)
if err != nil {
return true
}
return false
}
func main() {
t, _ := time.Parse("2006-01-02", "2018-01-01")
settings := sonyflake.Settings{
StartTime: t,
MachineID: getMachineID,
CheckMachineID: checkMachineID,
}
sf := sonyflake.NewSonyflake(settings)
id, err := sf.NextID()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println(id)
}
分布式鎖
在單機程式并發或并行修改全局變量時,需要對修改行為加鎖以創造臨界區。為什麼需要加鎖呢?我們看看在不加鎖的情況下并發計數會發生什麼情況:
package main
import (
"sync"
)
// 全局變量
var counter int
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++
}()
}
wg.Wait()
println(counter)
}
多次運作會得到不同的結果:
❯❯❯ go run local_lock.go
945
❯❯❯ go run local_lock.go
937
❯❯❯ go run local_lock.go
959
基于Redis的setnx
在分布式場景下,我們也需要這種“搶占”的邏輯,這時候怎麼辦呢?我們可以使用Redis提供的
setnx
指令:
package main
import (
"fmt"
"time"
"github.com/go-redis/redis"
"github.com/gofrs/uuid"
)
// 聲明一個全局的rdb變量
var rdb *redis.Client
// 初始化連接配接
func initClient() (err error) {
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "zhy1996", // no password set
DB: 0, // use default DB
})
_, err = rdb.Ping().Result()
if err != nil {
return err
}
return nil
}
var unlock_lua = `
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
`
func Lock(key, value string, expiration time.Duration) (bool, error) {
is, err := rdb.SetNX(key, value, expiration).Result()
if err != nil {
return false, fmt.Errorf("redis setnx failed")
}
return is, nil
}
func UnLock(key, value string) (bool, error) {
res, err := rdb.Eval(unlock_lua, []string{key}, value).Result()
if err != nil {
return false, err
}
v, ok := res.(int64)
if !ok {
return false, fmt.Errorf("lua script return is not int")
}
if v == 0 {
return false, nil
}
return true, nil
}
func main() {
err := initClient()
if err != nil {
fmt.Println(err)
}
ul, _ := uuid.NewV4()
value := ul.String()
for i := 0; i < 10; i++ {
is, err := Lock("lock_1", value, time.Second)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("是否拿到鎖:", is)
}
res, err := UnLock("lock_1", value)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("解鎖:", res)
}
看看運作結果:
是否拿到鎖: true
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
是否拿到鎖: false
解鎖: true
通過代碼和執行結果可以看到,我們遠端調用
setnx
實際上和單機的trylock非常相似,如果擷取鎖失敗,那麼相關的任務邏輯就不應該繼續向前執行。
setnx
很适合在高并發場景下,用來争搶一些“唯一”的資源。比如交易撮合系統中賣家發起訂單,而多個買家會對其進行并發争搶。這種場景我們沒有辦法依賴具體的時間來判斷先後,因為不管是使用者裝置的時間,還是分布式場景下的各台機器的時間,都是沒有辦法在合并後保證正确的時序的。哪怕是我們同一個機房的叢集,不同的機器的系統時間可能也會有細微的差别。
是以,我們需要依賴于這些請求到達Redis節點的順序來做正确的搶鎖操作。如果使用者的網絡環境比較差,那也隻能自求多福了。
基于etcd
etcd是分布式系統中,功能上與ZooKeeper類似的元件,這兩年越來越火了。上面基于ZooKeeper我們實作了分布式阻塞鎖,基于etcd,也可以實作類似的功能:
package main
import (
"log"
"github.com/zieckey/etcdsync"
)
func main() {
m, err := etcdsync.New("/lock", 10, []string{"http://127.0.0.1:2379"})
if m == nil || err != nil {
log.Printf("etcdsync.New failed")
return
}
err = m.Lock()
if err != nil {
log.Printf("etcdsync.Lock failed")
return
}
log.Printf("etcdsync.Lock OK")
log.Printf("Get the lock. Do something here.")
err = m.Unlock()
if err != nil {
log.Printf("etcdsync.Unlock failed")
} else {
log.Printf("etcdsync.Unlock OK")
}
}
- 先檢查
路徑下是否有值,如果有值,說明鎖已經被别人搶了/lock
- 如果沒有值,那麼寫入自己的值。寫入成功傳回,說明加鎖成功。寫入時如果節點被其它節點寫入過了,那麼會導緻加鎖失敗,這時候到 3
- watch
下的事件,此時陷入阻塞/lock
- 當
路徑下發生事件時,目前程序被喚醒。檢查發生的事件是否是删除事件(說明鎖被持有者主動unlock),或者過期事件(說明鎖過期失效)。如果是的話,那麼回到 1,走搶鎖流程。/lock