- redis
2
10 月初,Redis 搞了個大新聞。别緊張,是個好消息:Redis 引入了名為 stream 的新資料類型和對應的指令,大概會在年底正式釋出到 4.x 版本中。像引入新資料類型這樣的變化在 Redis 的發展曆史上非常罕見,是以稱之為大新聞一點也不為過。至少很多介紹 Redis 的資料要跟着修訂了。
背景
按作者的介紹,stream 類型的想法深受 Kafka 的 stream 概念的影響,是以順理成章沿用了這個名字。當然這并不意味 Redis 将提供 Kafka stream 特性的替代品,它倆依舊是兩種泾渭分明的東西。Redis 的 stream 特性旨在填補 PubSub 和 Blocked list 機制間的空缺,解決這兩者不能解決的問題。
Redis 的 PubSub 可以用來實作簡單的訂閱機制。一個或多個 client 向 Redis 訂閱特定的頻道,當某個 client 向這個頻道釋出消息時,Redis 會把消息發送給訂閱該頻道的 client。需要注意的是,Redis 隻負責轉發消息,并不保證訂閱的 client 是否真正收到了消息,比如 client 可能正好挂掉了或者中間出了點網絡問題。在某些情況下,這種簡單的訂閱機制就夠用了;但在某些情況下,我們需要確定消息已經釋出出去,PubSub 就不能滿足要求。
一個替代的方案是采用 BLPOP 等指令,也即前文提到的 Blocked list。client 調用 BLPOP(或其他類似的指令),阻塞在特定的頻道上。如果有 client 釋出消息(在這裡,就是 rpush 新的值),被阻塞的 client 就會結束阻塞,得到新 rpush 進來的值。如果 Redis 沒法把新消息發送給 client,那麼這個消息會留在頻道裡。當 client 下次重新調用 BLPOP 時,就能拿回這個消息。這個方案聽起來不錯,至少它解決了確定消息釋出的問題。但你可能也想到了,能收到特定頻道的消息的隻有一個 client,因為隻要某個 client 接收了消息,消息就不再存在于頻道當中了。而 PubSub 是支援一對多發送消息的。另一個問題是,每個 client 隻能去擷取最新的消息,對于複雜的操作,BLPOP 等指令便無能為力了。
stream 就是為了解決以上問題才提出來的。
用法
遵循其他資料類型的慣例,操作 stream 類型的鍵的指令都以
X
開頭。
(由于 stream 特性尚未正式釋出,且部分特性還處于 TODO 狀态,下面内容肯定會有所變更。如果有改動,我會修訂這部分的内容)
添加操作
XADD key [MAXLEN [~] <count>] <ID or *> [field value] [field value] ...
stream 跟 hash 一樣,有 subkey 的概念。上面指令裡的 ID 就是指 subkey。一般情況下,你不需要指定 ID,僅需提供
*
來讓 Redis 生成一個 ID。Redis 生成的 ID 格式如下:
$ms-$seq
。其中
$ms
指目前的 13 位毫秒時間戳,
$seq
指給定 key 在目前毫秒時間戳下的序列号(從 0 開始),中間以
-
隔開。早前用的分隔符是
.
,後來考慮到
xx.yy
這種形式太容易錯當作浮點數了,是以改用
-
。如果 Redis 生成 ID 的時候,目前毫秒時間戳跟上一個 ID 的時間戳一樣,它會把序列号加一。假使伺服器發生時間回撥的情況,Redis 會沿用上一個 ID 的時間戳,隻是把序列号加一。實際上這種生成 ID 的機制并非為了記錄建立的時間,僅僅用于生成遞增的 ID。你也可以在調用時指定自己生成的 ID。
[field value] ...
這部分指定的是 stream key 對應 ID 的值。每個 ID 帶的 field 可以不同。
取長度操作
XLEN key
傳回長度,就是這樣。
讀取操作
XRANGE key start end [COUNT <n>]
XRANGE 傳回某個 stream 給定範圍内的 ID 所對應的值。你可以通過 COUNT 指定傳回的值的最大數目。
舉個例子,像這樣建立兩個 ID:
127.0.0.1:6379> xadd test * apple 1
1507383725597-0
127.0.0.1:6379> xadd test * binana 2
1507383735965-0
如下的 XRANGE 操作能夠傳回這兩個 ID 的值。
127.0.0.1:6379> xrange test 1507383725597-0 1507383735965-0
1) 1) 1507383725597-0
2) 1) "apple"
2) "1"
2) 1) 1507383735965-0
2) 1) "binana"
2) "2"
大多數情況下,你用到的是
-
和
+
這兩個特殊 ID 值,像這樣:
xrange test 1507383725597-0 +
。前者表示 ID 範圍的起始位置,後者表示 ID 範圍的末尾位置。
XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
[RETRY <milliseconds> <ttl>] STREAMS key_1 key_2 ... key_N
ID_1 ID_2 ... ID_N
如果想同時讀取多個 stream 的值,需要用到 XREAD。XREAD 能夠傳回給定多個 stream 的某個 起始ID 之後的資料。我加粗了之後兩個字,因為跟 XRANGE 不同,XREAD 不傳回 起始ID 的值。你可以通過 COUNT 指定各個 stream 傳回的值的最大數目。
XREAD 的阻塞是可選的,你可以通過 BLOCK 參數去指定允許阻塞的時間。如果不指定,表示不阻塞,立刻傳回 nil。注意這一點跟 BLPOP 不同,BLPOP 一類的指令,預設是永久阻塞的。
XREAD 主要的參數是 STREAMS 後面的 key 和 起始ID 清單。key 和 起始ID 需要是一一對應的,有多少個 key 就要指定多少個 起始ID。跟 XRANGE 一樣,起始ID 也可以是
-
+
這樣的特殊值。注意由于
-
表示 ID 範圍的起始位置,而不是第一個 ID,是以用
-
可以擷取第一個 ID 的值。除此之外,起始ID 還可以是
$
,表示擷取指令執行之後的新增 ID 的值。顯然,
$
隻有跟 BLOCK 一起用才有意義。
RETRY/GROUP:尚未實作 TODO。
删除操作
stream 不支援“改”操作,是以“增删查改”還剩個“删”沒講。stream 沒有專門的删指令。還記得介紹 XADD 時展示的 MAXLEN 參數嗎?在 XADD 指令添加了新的 ID 之後,如果指令指定的 MAXLEN 超過了目前 stream 包含的 ID 的個數,Redis 會删除多出來的部分。
重新貼下 MAXLEN 的格式:
XADD MAXLEN [~] <count> ...
。count 決定了 MAXLEN 的值。如果 MAXLEN 和 count 之間沒有插入
~
,表示精确地保留
count
個 ID;如果插入了
~
,表示保留大約
count
個 ID。我會在“實作”這一節解釋所謂的“精确”和“大約”的差別。
用途
stream 很大程度上類似于 Blocked list,但是它的操作更加自由,不再受限于隻能讀取最新的值,也不再拘束于隻能讓單個 client 讀取值。跟 PubSub 相比,stream 允許 client 重新擷取釋出過的值,提供了更強的保障。
實作
Redis 把每個 stream 實作成以 ID 的值為 key 的字首樹,外加 length(目前的 ID 數)等中繼資料。考慮到預設生成的 ID 是毫秒時間戳+序列号,采用字首樹的形式可以節省下大量的空間。畢竟差幾千毫秒的兩個 ID,也會有前九位是完全相同的。另外字首樹還允許随機通路某個起始ID。
不過并非每個 ID 都是獨占一個節點。每當插入一個新的 ID 時,Redis 會先通路字首樹的最大的節點(畢竟 ID 是遞增的),如果這個節點不大于
STREAM_BYTES_PER_LISTPACK
(2048位元組),新的 ID 會被插入到這個節點裡面;否則才會建立新的節點。在查找一個 ID 時,Redis 會查找最後一個比該 ID 小的節點,然後從該節點往後周遊,直到找到該 ID 為止。在我看來,一個節點裡包含多個 ID 的設計,有利于 ID 周遊的操作。這種設計避免了在周遊時頻繁通路新的節點,更好地利用了 CPU 的本地緩存。
每個節點具有這樣的結構:
+--------------+---------+---------+--/--+---------+
| master_entry | entry_1 | entry_2 | ... | entry_N |
+--------------+---------+---------+--/--+---------+
其中
master_entry:
+-------+---------+------------+---------+--/--+---------+---------+
| count | deleted | num-fields | field_1 | field_2 | ... | field_N |
+-------+---------+------------+---------+--/--+---------+---------+
entry_x(SAMEFIELDS):
+-----+--------+-------+-/-+-------+
|flags|entry-id|value-1|...|
+-----+--------+-------+-/-+-------+
或者
+-----+--------+----------+-------+-------+-/-+---+
|flags|entry-id|num-fields|field_1|value_1|...|
+-----+--------+----------+-------+-------+-/-+---+
當節點被建立時,會以第一個插入的 ID 初始化
master_entry
的值。顯然,count 的初始值是 1,deleted 的初始值是 0,num-fields 等于該 ID 對應的 field 數目,後面的多個 field 則是該 ID 對應的 field 數。在插入
master_entry
之後,還會新增一個 entry 來記錄額外的 field 和每個 field 對應的 value。這個新增的 entry 的 entry-id 取 ID 跟字首樹節點的 key 的差。第一個 ID 的 entry-id 為 0,因為目前節點的 key 就是這個 ID,兩者不存在差異。之後每插入一個新的 ID,都會更新
master_entry
的 count 數,并插入對應的 entry。當然插入新 ID 的同時也不忘更新 length 等中繼資料。
前面提到,每個 ID 帶的 field 可以不同。但是在實際的使用中,每個 ID 帶的 field 基本是相同的。是以 Redis 做了個優化:如果新增的 ID 的 field 跟
master_entry
完全一樣,entry 裡面會設定一個名為 SAMEFIELDS 的 flags,并僅記錄 value 的值。除非新增 ID 的 field 跟
master_entry
有些不同,entry 裡面才會記錄新增 ID 的所有 field 和對應的 value。
最後說一下删除操作。由于
- stream 的删除操作,隻支援保留特定數目的 ID 數
- stream 會記錄全部的 ID 數(length)
- stream 的資料結構大體上是一個字首樹,字首樹的每個節點包含 count 個 ID
是以删除操作,就是
- 從前往後周遊,減去每個字首樹節點的 count,直到 length 等于 XADD 指定的 MAXLEN,或者減去下一個節點後剩下的 length 會小于 MAXLEN
- 如果減去某個節點後,剩下的 length 小于 MAXLEN,Redis 會周遊該節點,設定若幹個 entry 的 flags 為 DELETED 直到 length 等于 MAXLEN,更新 count 和 deleted 兩個域。
如果 XADD 指令指定的 MAXLEN 包含
~
,則表示大約保留 MAXLEN 個 ID。在這種情況下,Redis 隻會完成上面的第一步。換句話說,選擇“大約”能省下對某個節點進行周遊的時間。