天天看點

MillWheel: Google是如何在事件流處理上做到exactly one semantic的

作者:大資料與人工智能分享

Google在2013年發了一篇非常重要的paper,來教大家Google是如何在stream processing (事件流處理)方面做到exactly once semantic的,叫MillWheel。這個實作并不是最早做到exactly once的(可能trident會稍微早一點),但是這裡面通過low watermark和per key storage這兩個概念來做絕對是創新。原文在這裡:https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf

為了做到exactly once semantic,我們得先介紹幾個概念

  • Persistent storage:在儲存事件流處理完的結果之時,得有一個系統可以永久儲存這些處理完的結果,因為很多處理完的結果是需要被再次調用的
  • Low watermark:事件流是從世界各地發往Google的伺服器的,是以事件流在各個資料中心之間的傳輸是有延時的。 是以MillWheel需要你提供一個時間區間,所有的資料應該都在這個時間區間裡面到達你的事件流處理器
  • Duplicate prevention:每一行來到的重複的資料都會被删除

每一行的MillWheel資料可以被了解成三個值:key,value和timestamp。在這裡lower watermark是根據每個發送過來的時間戳計算的

MillWheel: Google是如何在事件流處理上做到exactly one semantic的

資料處理作為一個整體可以根據使用者提供的DAG(無回路圖)來決定這些事件流的内容被如何處理,這樣使用者可以疊加各種各樣的計算方式。比如我在一個不知名高逼格問答網站上面點選了一個廣告,這個點選可以根據廣告id,廣告主id,使用者id來做聚合,但是處理邏輯是分開的,但是都可以在MillWheel的架構下面執行。

MillWheel: Google是如何在事件流處理上做到exactly one semantic的

一個特别傻的用MillWheel做的某問答網站收錢系統

具體MillWheel架構給每一行資料提供的保證是:每一行處理的資料都會根據每一個key做一個checkpoint,而且每一行資料隻提供一次。我們接下來看一看具體MillWheel是怎麼做到這個保證的,以及我們可以如何利用這個保證。

既然我們要根據每一個key來做checkpointing,那麼每一行資料都得有一個可以把key從資料裡面讀取出來的邏輯。Google内部有很多deserialization protocol,比如protobuf,會被用來做key的讀取。這裡key要看具體你需要處理的業務邏輯是什麼,假設是一個廣告系統的話廣告主的ID或者點選廣告的使用者的ID都是一個合理的key。

MillWheel: Google是如何在事件流處理上做到exactly one semantic的

在提供一個key之後,MillWheel還會提供一個per key persistent storage,讓你來更好處理你的業務邏輯。比如我需要給廣告主提供的是每個廣告每五分鐘多少次點選,但是使用者我不想每一個使用者都存那麼多東西,那可能每一個使用者的點選我隻要存hyperloglog就可以了,隻要看他最近有沒有很多點選來判斷他是不是機器人,這個點選是否有效。

MillWheel: Google是如何在事件流處理上做到exactly one semantic的

當然,并不是每一個廣告的點選都會被送到系統裡面:在擷取key的時候我們也會擷取這一行資料的時間戳,這個時間戳會被用來計算low watermark。low watermark的定義是現在已經到達MillWheel但是還沒有被處理的資料的時間戳,但是這個時間戳不會超過一個使用者定義的上線、不能比當下的時間晚太多。是以隻要超過使用者定義的時間的時間範圍的資料,就是遲到的資料,遲到的資料的狀态是不會被存到記憶體裡面的。這個設計的厲害之處在于,如果資料處理一直很快,且所有的消息都沒有遲到,那麼low watermark會很接近現實的時間。如果資料出現遲到,再遲也不會超越使用者設定的上限。

因為這裡有一個low watermark的概念,那麼我們就得確定有一個服務可以計算low watermark。MillWheel的設計是每一個事件流處理器會回報自己的最老還沒處理的資料的時間戳,然後Injector會從每一個處理器收集最遲的時間戳。要收集最遲的時間戳因為每一個處理器的watermark應該都是一樣且應該是最保守的。

MillWheel: Google是如何在事件流處理上做到exactly one semantic的

每次處理每一行的資料的時候,隻要過了low watermark,MillWheel需要做下面這些事情:

  • 檢查這個資料是不是重複了
  • 處理使用者提供的邏輯
  • 所有的state存到資料庫裡
  • 告訴發送資料的伺服器已經處理完畢
  • 伺服器發送後面的資料發送給處理器

這裡發送端是會給每一行unique ID的,然後接收端根據這個unique ID去做dedup。有的時候為了優化速度,可以一次性從伺服器拿很多行資料一起處理。

這裡還有幾個比較複雜的狀況我們需要考慮。比如輸出資料也是需要checkpoint的,不然的話有可能在同一個時間區間輸出兩個截然不同的資料,因為之前聚合結束的state沒有被存下來。通過checkpoint輸出,整個資料處理直接變成了一個idempotent的服務。當然本身有些資料處理就是idempotent的,那麼這個時候可以省略dedup,或者先broadcast給下遊這個輸出再checkpoint。

這裡還有一個需要注意的地方是每一個key必須隻有一個writer,而且每個key的state在儲存的時候必須是atomic的,不然是沒有辦法保證每一個key的state是consistent的。

MillWheel: Google是如何在事件流處理上做到exactly one semantic的

MillWheel裡面大概所有的流程

論文後面主要讨論的是在deploy了之後效果如何以及一些edge case,我這裡摘幾個比較有意思的

MillWheel: Google是如何在事件流處理上做到exactly one semantic的

low watermark的計算是有延時的。整體來說再快的資料處理可能還是有兩秒左右的延遲

MillWheel: Google是如何在事件流處理上做到exactly one semantic的

延遲不會因為增加了機器的數量就減少,因為出現的慢的伺服器可能性更大

最後我再講兩句為什呢這個問題很重要。在一個網際網路廣告公司裡面,各種各樣的事件是要根據資料流來收錢的。Google要拿着廣告的點選去跟其他公司收錢,YouTube要拿着它的video view去跟其他公司收錢。在這些情況下,收對錢是一件非常重要的事情:你要是錢收少了,公司遭受損失;你要是錢收多了,你的資料會跟第三方做稽核的公司資料出現出入,會出現非常嚴重的商譽問題。是以這個資料流處理在這方面不能多不能少,最好每一行隻處理一次隻收一次錢。

這時候你可能想退一步,說為了處理收錢這個問題,我能不能直接每天或者每小時做一次dedup,然後再回複給使用者說你的廣告拿到了多少點選,我要收你多少錢。這裡還有一個問題是在很多情況下收錢這件事情是實時彙報給廣告商的,因為廣告商最好是是有能力随時看到自己的廣告到底效果如何,然後可以選擇增加或者減少預算。甚至在某些特定的情況下(比如Superbowl或者黑色星期五),廣告商其實是本着“我今天就是要花這麼多錢,哪個平台上面撒出去我是不管的”,那這個時候實時的reporting就變得尤其的重要。

繼續閱讀