天天看點

如何做到“恰好一次”地傳遞數十億條消息

在分布式領域中存在着三種類型的消息投遞語義,分别是:最多一次(at-most-once)、至少一次(at-least-once)和恰好一次(exactly-once)。本文作者介紹了一個利用Kafka和RocksDB來建構的“恰好一次”消息去重系統的實作原理。

對任何一個資料流水線的唯一要求就是不能丢失資料。資料通常可以被延遲或重新排序,但不能丢失。

為了滿足這一要求,大多數的分布式系統都能夠保證“至少一次”的投遞消息技術。實作“至少一次”的投遞技術通常就是:“重試、重試、再重試”。在你收到消費者的确認消息之前,你永遠不要認為消息已經投遞過去。

但“至少一次”的投遞并不是使用者想要的。使用者希望消息被投遞一次,并且僅有一次。

然而,實作“恰好一次”的投遞需要完美的設計。每種投遞失敗的情況都必須認真考慮,并設計到架構中去,是以它不能在事後“挂到”現有的實作上去。即使這樣,“隻有一次”的投遞消息幾乎是不可能的。

在過去的三個月裡,我們建構了一個全新的去重系統,以便在面對各種故障時能讓系統盡可能實作“恰好一次”的投遞。

新系統能夠跟蹤舊系統100倍的消息數量,并且可靠性也得到了提高,而付出的代價卻隻有一點點。下面我們就開始介紹這個新系統。

問題所在

Segment内部的大部分系統都是通過重試、消息重新投遞、鎖定和兩階段送出來優雅地處理故障。但是,有一個特例,那就是将資料直接發送到公共API的用戶端程式。

用戶端(特别是移動用戶端)經常會發生網絡問題,有時候發送了資料,卻沒有收到API的響應。

想象一下,某天你乘坐公共汽車,在iPhone上使用HotelTonight軟體預訂房間。該應用程式将資料上傳到了Segment的伺服器上,但汽車突然進入了隧道并失去了網絡連接配接。你發送的某些資料在伺服器上已經被處理,但用戶端卻無法收到伺服器的響應消息。

在這種情況下,即使伺服器在技術上已經收到了這些确切的消息,但用戶端也會進行重試并将相同的消息重新發送給Segment的API。

從我們伺服器的統計資料來看,在四個星期的視窗時間内,大約有0.6%的消息似乎是我們已經收到過的重複消息。

這個錯誤率聽起來可能并不是很高。但是,對于一個能創造數十億美元效益的電子商務應用程式來說,0.6%的出入可能意味着盈利和數百萬美元損失之間的差别。

對消息進行去重

現在,我們認識到問題的症結了,我們必須删除發送到API的重複消息。但是,該怎麼做呢?

最簡單的思路就是使用針對任何類型的去重系統的進階API。在Python中,我們可以将其表示為:

def dedupe(stream): 

  for message in stream: 

    if has_seen(message.id):  

      discard(message) 

    else: 

      publish_and_commit(message)  

對于資料流中的每個消息,首先要把他的id(假設是唯一的)作為主鍵,檢查是否曾經見過這個特定的消息。如果以前見過這個消息,則丢棄它。如果沒有,則是新的,我們應重新釋出這個消息并以原子的方式送出消息。

為了避免存儲所有的消息,我們會設定“去重視窗”這個參數,這個參數定義了在消息過期之前key存儲的時長。隻要消息落在視窗時間之外,我們就認為它已過期失效。我們要保證在視窗時間内某個給定ID的消息隻發送一次。

這個行為很容易描述,但有兩個方面需要特别注意:讀/寫性能和正确性。

我們希望系統能夠低延遲和低成本的對通過流水線的數十億個事件進行去重。更重要的是,我們要確定所有的事件都能夠被持久化,以便可以從崩潰中恢複出來,并且不會輸出重複的消息。

架構

為了實作這一點,我們建立了一個“兩階段”架構,它讀入Kafka的資料,并且在四個星期的時間視窗内對接收到的所有事件進行去重。

如何做到“恰好一次”地傳遞數十億條消息

去重系統的進階架構圖

Kafka的拓撲結構

要了解其工作原理,首先看一下Kafka的流拓撲結構。所有傳入消息的API調用都将作為單獨的消息進行分離,并讀入到Kafka輸入主題(input topic)中。

首先,每個傳入的消息都有一個由用戶端生成的具有唯一性的messageId标記。在大多數情況下,這是一個UUIDv4(我們考慮切換到ksuids)。 如果用戶端不提供messageId,我們會在API層自動配置設定一個。

我們不使用矢量時鐘或序列号,因為我們希望能降低用戶端的複雜性。使用UUID可以讓任何人輕松地将資料發送到我們的API上來,因為幾乎所有的主要語言都支援它。

  "messageId": "ajs-65707fcf61352427e8f1666f0e7f6090", 

  "anonymousId": "e7bd0e18-57e9-4ef4-928a-4ccc0b189d18", 

  "timestamp": "2017-06-26T14:38:23.264Z", 

  "type": "page" 

}  

為了能夠将消息持久化,并能夠重新發送,一個個的消息被儲存到Kafka中。消息以messageId進行分區,這樣就可以保證具有相同messageId的消息能夠始終由同一個消費者處理。

這對于資料處理來說是一件很重要的事情。我們可以通過路由到正确的分區來查找鍵值,而不是在整個中央資料庫的數百億條消息中查找,這種方法極大地縮小了查找範圍。

去重“worker”(worker:勞工。譯者注,這裡表示的是某個程序。為防止引起歧義,下文将直接使用worker)是一個Go程式,它的功能是從Kafka輸入分區中讀入資料,檢查消息是否有重複,如果是新的消息,則發送到Kafka輸出主題中。

根據我們的經驗,worker和Kafka拓撲結構都非常容易掌握。我們無需使用一組遇到故障時需要切換到副本的龐大的Memcached執行個體。相反,我們隻需使用零協同的嵌入式RocksDB資料庫,并以非常低的成本來獲得持久化存儲。

RocksDB的worker程序

每一個worker都會在本地EBS硬碟上存放了一個RocksDB資料庫。RocksDB是由Facebook開發的嵌入式鍵值存儲系統,它的性能非常高。

每當從輸入主題中過來的消息被消費時,消費者通過查詢RocksDB來确定我們之前是否見過該事件的messageId。

如果RocksDB中不存在該消息,我們就将其添加到RocksDB中,然後将消息釋出到Kafka輸出主題。

如果消息已存在于RocksDB,則worker不會将其釋出到輸出主題,而是更新輸入分區的偏移,确認已處理過該消息。

性能

為了讓我們的資料庫獲得高性能,我們必須對過來的每個事件滿足三種查詢模式:

檢測随機key的存在性,這可能不存在于我們的資料庫中,但會在key空間中的任何地方找到。

高速寫入新的key

老化那些超出了“去重視窗”的舊的key

實際上,我們必須不斷地檢索整個資料庫,追加新的key,老化舊的key。在理想情況下,這些發生在同一資料模型中。

如何做到“恰好一次”地傳遞數十億條消息
如何做到“恰好一次”地傳遞數十億條消息

我們的資料庫必須滿足三種獨立的查詢模式

一般來說,這些性能大部分取決于我們資料庫的性能,是以應該了解一下RocksDB的内部機制來提高它的性能。

RocksDB是一個日志結構合并樹(log-structured-merge-tree, 簡稱LSM)資料庫,這意味着它會不斷地将新的key附加到磁盤上的預寫日志(write-ahead-log)中,并把排序過的key存放在記憶體中作為memtable的一部分。

如何做到“恰好一次”地傳遞數十億條消息

key存放在記憶體中作為memtable的一部分

寫入key是一個非常快速的過程。新的消息以追加的方式直接儲存到磁盤上,并且資料條目在記憶體中進行排序,以提供快速的搜尋和批量寫入。

每當寫入到memtable的條目達到一定數量時,這些條目就會被作為SSTable(排序的字元串表)持久化到磁盤上。由于字元串已經在記憶體中排過序了,是以可以将它們直接寫入磁盤。

如何做到“恰好一次”地傳遞數十億條消息

目前的memtable零級寫入磁盤

以下是在我們的生産日志中寫入的示例:

[JOB 40] Syncing log #655020 

[default] [JOB 40] Flushing memtable with next log file: 655022 

[default] [JOB 40] Level-0 flush table #655023: started 

[default] [JOB 40] Level-0 flush table #655023: 15153564 bytes OK 

[JOB 40] Try to delete WAL files size 12238598, prev total WAL file size 24346413, number of live WAL files 3.  

每個SSTable是不可變的,一旦建立,永遠不會改變。這是什麼寫入新的鍵這麼快的原因。無需更新檔案,無需寫入擴充。相反,在帶外壓縮階段,同一級别的多個SSTable可以合并成一個新的檔案。

如何做到“恰好一次”地傳遞數十億條消息

當在同一級别的SSTables壓縮時,它們的key會合并在一起,然後将新的檔案更新到下一個更高的級别。

看一下我們生産的日志,可以看到這些壓縮作業的示例。在這種情況下,作業41正在壓縮4個0級檔案,并将它們合并為單個較大的1級檔案。

/data/dedupe.db$ head -1000 LOG | grep "JOB 41" 

[JOB 41] Compacting 4@0 + 4@1 files to L1, score 1.00 

[default] [JOB 41] Generated table #655024: 1550991 keys, 69310820 bytes 

[default] [JOB 41] Generated table #655025: 1556181 keys, 69315779 bytes 

[default] [JOB 41] Generated table #655026: 797409 keys, 35651472 bytes 

[default] [JOB 41] Generated table #655027: 1612608 keys, 69391908 bytes 

[default] [JOB 41] Generated table #655028: 462217 keys, 19957191 bytes 

[default] [JOB 41] Compacted 4@0 + 4@1 files to L1 => 263627170 bytes  

壓縮完成後,新合并的SSTables将成為最終的資料庫記錄集,舊的SSTables将被取消連結。

如果我們登入到生産執行個體,我們可以看到正在更新的預寫日志以及正在寫入、讀取和合并的單個SSTable。

如何做到“恰好一次”地傳遞數十億條消息

日志和最近占用I/O的SSTable

下面生産的SSTable統計資料中,可以看到一共有四個“級别”的檔案,并且一個級别比一個級别的檔案大。

** Compaction Stats [default] ** 

Level    Files   Size(MB} Score Read(GB}  Rn(GB} Rnp1(GB} Write(GB} Wnew(GB} Moved(GB} W-Amp Rd(MB/s} Wr(MB/s} Comp(sec} Comp(cnt} Avg(sec} KeyIn KeyDrop 

---------------------------------------------------------------------------------------------------------------------------------------------------------- 

  L0      1/0      14.46   0.2      0.0     0.0      0.0       0.1      0.1       0.0   0.0      0.0     15.6         7         8    0.925       0      0 

  L1      4/0     194.95   0.8      0.5     0.1      0.4       0.5      0.1       0.0   4.7     20.9     20.8        26         2   12.764     12M     40 

  L2     48/0    2551.71   1.0      1.4     0.1      1.3       1.4      0.1       0.0  10.7     19.4     19.4        73         2   36.524     34M     14 

  L3    351/0   21735.77   0.8      2.0     0.1      1.9       1.9     -0.0       0.0  14.3     18.1     16.9       112         2   56.138     52M  3378K 

 Sum    404/0   24496.89   0.0      3.9     0.4      3.5       3.9      0.3       0.0  34.2     18.2     18.1       218        14   15.589     98M  3378K 

 Int      0/0       0.00   0.0      3.9     0.4      3.5       3.9      0.3       0.0  34.2     18.2     18.1       218        14   15.589     98M  3378K  

RocksDB儲存了索引和存儲在SSTable的特定SSTables的布隆過濾器,并将這些加載到記憶體中。通過查詢這些過濾器和索引可以找到特定的key,然後将完整的SSTable作為LRU基礎的一部分加載到記憶體中。

在絕大多數情況下,我們就可以看到新的消息了,這使得我們的去重系統成為教科書中的布隆過濾器案例。

布隆過濾器會告訴我們某個鍵“可能在集合中”,或者“絕對在集合中”。要做到這一點,布隆過濾器儲存了已經見過的任何元素的多種哈希函數的設定位。如果設定了散列函數的所有位,則過濾器将傳回消息“可能在集合中”。

如何做到“恰好一次”地傳遞數十億條消息

我們的集合包含{x,y,z},在布隆過濾器中查詢w,則布隆過濾器會傳回“不在集合中”,因為其中有一位沒有設定。

如果傳回“可能在集合中”,則RocksDB可以從SSTables中查詢到原始資料,以确定該項是否在該集合中實際存在。但在大多數情況下,我們不需查詢任何SSTables,因為過濾器将傳回“絕對不在集合”的響應。

在我們查詢RocksDB時,我們會為所有要查詢的相關的messageId發出一個MultiGet。基于性能考慮,我們會批量地釋出出去,以避免太多的并發鎖定操作。它還允許我們批量處理來自Kafka的資料,這是為了實作順序寫入,而不是随機寫入。

以上回答了為什麼讀/寫工作負載性能這麼好的問題,但仍然存在如何老化資料這個問題。

删除:按大小來限制,而不是按時間來限制

在我們的去重過程中,我們必須要确定是否要将我們的系統限制在嚴格的“去重視窗”内,或者是通過磁盤上的總資料庫大小來限制。

為了避免系統突然崩潰導緻去重系統接收到所有用戶端的消息,我們決定按照大小來限制接收到消息數量,而不是按照設定的時間視窗來限制。這允許我們為每個RocksDB執行個體設定最大的大小,以能夠處理突然的負載增加。但是其副作用是可能會将去重視窗降低到24小時以下。

我們會定期在RocksDB中老化舊的key,使其不會增長到無限大小。為此,我們根據序列号保留key的第二個索引,以便我們可以先删除最早接收到的key。

我們使用每個插入的key的序列号來删除對象,而不是使用RocksDB TTL(這需要在打開資料庫的時候設定一個固定的TTL值)來删除。

因為序列号是第二索引,是以我們可以快速地查詢,并将其标記為已删除。下面是根據序列号進行删除的示例代碼:

func (d *DB) delete(n int) error { 

        // open a connection to RocksDB 

        ro := rocksdb.NewDefaultReadOptions() 

        defer ro.Destroy() 

        // find our offset to seek through for writing deletes 

        hint, err := d.GetBytes(ro, []byte("seek_hint")) 

        if err != nil { 

                return err 

        } 

        it := d.NewIteratorCF(ro, d.seq) 

        defer it.Close() 

        // seek to the first key, this is a small 

        // optimization to ensure we don't use `.SeekToFirst()` 

        // since it has to skip through a lot of tombstones. 

        if len(hint) > 0 { 

                it.Seek(hint) 

        } else { 

                it.SeekToFirst() 

        seqs := make([][]byte, 0, n) 

        keys := make([][]byte, 0, n) 

        // look through our sequence numbers, counting up 

        // append any data keys that we find to our set to be 

        // deleted 

        for it.Valid() && len(seqs) < n { 

                k, v := it.Key(), it.Value() 

                key := make([]byte, len(k.Data())) 

                val := make([]byte, len(v.Data())) 

                copy(key, k.Data()) 

                copy(val, v.Data()) 

                seqs = append(seqs, key) 

                keys = append(keys, val) 

                it.Next() 

                k.Free() 

                v.Free() 

        wb := rocksdb.NewWriteBatch() 

        wo := rocksdb.NewDefaultWriteOptions() 

        defer wb.Destroy() 

        defer wo.Destroy() 

        // preserve next sequence to be deleted. 

        // this is an optimization so we can use `.Seek()` 

        // instead of letting `.SeekToFirst()` skip through lots of tombstones. 

        if len(seqs) > 0 { 

                hint, err := strconv.ParseUint(string(seqs[len(seqs)-1]), 10, 64) 

                if err != nil { 

                        return err 

                } 

                buf := []byte(strconv.FormatUint(hint+1, 10)) 

                wb.Put([]byte("seek_hint"), buf) 

        // we not only purge the keys, but the sequence numbers as well 

        for i := range seqs { 

                wb.DeleteCF(d.seq, seqs[i]) 

                wb.Delete(keys[i]) 

        // finally, we persist the deletions to our database 

        err = d.Write(wo, wb) 

        return it.Err() 

為了保證寫入速度,RocksDB不會立即傳回并删除一個鍵(記住,這些SSTable是不可變的!)。相反,RocksDB将添加一個“墓碑”,等到壓縮時再進行删除。是以,我們可以通過順序寫入來快速地老化,避免因為删除舊項而破壞記憶體資料。

確定正确性

我們已經讨論了如何確定數十億條消息投遞的速度、規模和低成本的搜尋。最後一個部分将講述各種故障情況下我們如何確定資料的正确性。

EBS快照和附件

為了確定RocksDB執行個體不會因為錯誤的代碼推送或潛在的EBS停機而損壞,我們會定期儲存每個硬碟驅動器的快照。雖然EBS已經在底層進行了複制,但是這一步可以防止資料庫受到某些底層機制的破壞。

如果我們想要啟用一個新執行個體,則可以先暫停消費者,将相關聯的EBS驅動器分開,然後重新附加到新的執行個體上去。隻要我們保證分區ID相同,重新配置設定磁盤是一個輕松的過程,而且也能保證資料的正确性。

如果worker發生崩潰,我們依靠RocksDB内置的預寫日志來確定不會丢失消息。消息不會從輸入主題送出,除非RocksDB已經将消息持久化在日志中。

讀取輸出主題

你可能會注意到,本文直到這裡都沒有提到“原子”步驟,以使我們能夠確定隻投遞一次消息。我們的worker有可能在任何時候崩潰,不如:寫入RocksDB時、釋出到輸出主題時,或确認輸入消息時。

我們需要一個原子的“送出”點,并覆寫所有這些獨立系統的事務。對于輸入的資料,需要某個“事實來源”:輸出主題。

如果去重worker因為某些原因發生崩潰,或者遇到Kafka的某個錯誤,則系統在重新啟動時,會首先查閱這個“事實來源”,輸出主題,來判斷事件是否已經釋出出去。

如果在輸出主題中找到消息,而不是RocksDB(反之亦然),則去重worker将進行必要的修複工作以保持資料庫和RocksDB之間的同步。實際上,我們使用輸出主題作為我們的預寫入日志和最終的事實來源,讓RocksDB進行檢查和校驗。

在生産環境中

我們的去重系統已經在生産運作了3個月,對其運作的結果我們感到非常滿意。我們有以下這些資料:

在RocksDB中,有1.5TB的key存儲在磁盤上

在老化舊的key之前,有一個四個星期的去重視窗

RocksDB執行個體中存儲了大約600億個key

通過去重系統的消息達到2000億條

該系統快速、高效、容錯性強,也非常容易了解。

特别是我們的v2版本系統相比舊的去重系統有很多優點。

以前我們将所有的key存儲在Memcached中,并使用Memcached的原子CAS(check-and-set)操作來設定key。 Memcached起到了送出點和“原子”地釋出key的作用。

雖然這個功能很好,但它需要有大量的記憶體來支撐所有的key。此外,我們必須能夠接受偶爾的Memcached故障,或者将用于高速記憶體故障切換的支出加倍。

Kafka/RocksDB的組合相比舊系統有如下幾個優勢:

資料存儲在磁盤上:在記憶體中儲存所有的key或完整的索引,其代價是非常昂貴的。通過将更多的資料轉移到磁盤,并利用多種不同級别的檔案和索引,能夠大幅削減成本。對于故障切換,我們能夠使用冷備(EBS),而不用運作其他的熱備執行個體。

分區:為了縮小key的搜尋範圍,避免在記憶體中加載太多的索引,我們需要保證某個消息能夠路由到正确的worker。在Kafka中對上遊進行分區可以對這些消息進行路由,進而更有效地緩存和查詢。

顯式地進行老化處理:在使用Memcached的時候,我們在每個key上設定一個TTL來标記是否逾時,然後依靠Memcached程序來對逾時的key進行處理。這使得我們在面對大量資料時,可能會耗盡記憶體,并且在丢棄大量逾時消息時,Memcached的CPU使用率會飙升。而通過讓用戶端來處理key的删除,使得我們可以通過縮短去重視窗來優雅地處理。

将Kafka作為事實來源:為了真正地避免對多個送出點進行消息去重,我們必須使用所有下遊消費者都常見的事實來源。使用Kafka作為“事實來源”是最合适的。在大多數失敗的情況下(除了Kafka失敗之外),消息要麼會被寫入Kafka,要麼不會。使用Kafka可以確定按順序投遞消息,并在多台計算機之間進行磁盤複制,而不需要在記憶體中保留大量的資料。

批量讀寫:通過Kafka和RocksDB的批量I/O調用,我們可以通過利用順序讀寫來獲得更好的性能。與之前在Memcached中使用的随機通路不同,我們能夠依靠磁盤的性能來達到更高的吞吐量,并隻在記憶體中保留索引。

總的來說,我們對自己建構的去重系統非常滿意。使用Kafka和RocksDB作為流媒體應用的原語開始變得越來越普遍。我們很高興能繼續在這些原語之上建構新的分布式應用程式。

本文作者:佚名

來源:51CTO