天天看點

為什麼說流處理即未來?

​作者|Stephan Ewen

整理|秦江傑

本文整理自 Flink 創始公司 Ververica  聯合創始人兼 CTO - Stephan Ewen 在 Flink Forward China 2018 上的演講《Stream Processing takes on Everything》。

這個演講主題看似比較激進:流處了解決所有問題。很多人對于 Flink 可能還停留在最初的認知,覺得 Flink 是一個流處理引擎,實際上 Flink 可以做很多其他的工作,比如批處理、應用程式。在這個演講中,Stephan 首先會簡單說明他對 Flink 功能的觀點,然後深入介紹一個特定領域的應用和事件處理場景。這個場景乍看起來不是一個流處理的使用場景,但是在 Stephan 看來,它實際上就是一個很有趣的流處理使用場景。

Flink社群專刊下載下傳位址

為什麼說流處理即未來?

上圖對為什麼流處理可以處理一切作出诠釋,将資料看做流是一個自然而又十分強大的想法。大部分資料的産生過程都是随時間生成的流,比如一個 Petabyte 的資料不會憑空産生。這些資料通常都是一些事件的積累,比如支付、将商品放入購物車,網頁浏覽,傳感器采樣輸出。

基于資料是流的想法,我們對資料處理可以有相應的了解。比如将過去的曆史資料看做是一個截止到某一時刻的有限的流,或是将一個實時處理應用看成是從某一個時刻開始處理未來到達的資料。可能在未來某個時刻它會停止,那麼它就變成了處理從開始時刻到停止時刻的有限資料的批處理。當然,它也有可能一直運作下去,不斷處理新到達的資料。這個對資料的重要了解方式非常強大,基于這一了解,Flink 可以支援整個資料處理範疇内的所有場景。

為什麼說流處理即未來?

最廣為人知的 Flink 使用場景是流分析、連續處理(或者說漸進式處理),這些場景中 Flink 實時或者近實時的處理資料,或者采集之前提到的曆史資料并且連續的對這些事件進行計算。曉偉在之前的演講中提到一個非常好的例子來說明怎麼樣通過對 Flink 進行一些優化,進而可以針對有限資料集做一些特别的處理,這使得 Flink 能夠很好的支援批處理的場景,從性能上來說能夠與最先進的批處理引擎相媲美。而在這根軸的另一頭,是我今天的演講将要說明的場景 – 事件驅動的應用。這類應用普遍存在于任何服務或者微服務的架構中。這類應用接收各類事件(可能是 RPC 調用、HTTP 請求),并且對這些事件作出一些響應,比如把商品放進購物車,或者加入社交網絡中的某個群組。

為什麼說流處理即未來?

在我進一步展開今天的演講之前,我想先對社群在 Flink 的傳統領域(實時分析、連續處理)近期所做的工作做一個介紹。Flink 1.7 在 2018 年 11 月 30 日已經釋出。在 Flink 1.7 中為典型的流處理場景加入了一些非常有趣的功能。比如我個人非常感興趣的在流式 SQL 中帶時間版本的 Join。一個基本想法是有兩個不同的流,其中一個流被定義為随時間變化的參照表,另一個是與參照表進行 Join 的事件流。比如事件流是一個訂單流,參照表是不斷被更新的匯率,而每個訂單需要使用最新的匯率來進行換算,并将換算的結果輸出到結果表。這個例子在标準的 SQL 當中實際上并不容易表達,但在我們對 Streaming SQL 做了一點小的擴充以後,這個邏輯表達變得非常簡單,我們發現這樣的表達有非常多的應用場景。

另一個在流處理領域十分強大的新功能是将複雜事件處理(CEP)和 SQL 相結合。CEP 應用觀察事件模式。比如某個 CEP 應用觀察股市,當有兩個上漲後緊跟一個下跌時,這個應用可能做些交易。再比如一個觀察溫度計的應用,當它發現有溫度計在兩個超過 90 攝氏度的讀數之後的兩分鐘裡沒有任何操作,可能會進行一些操作。與 SQL 的結合使這類邏輯的表達也變得非常簡單。

第三個 Flink 1.7 中做了很多工作的功能是 Schema 更新。這個功能和基于流的應用緊密相關。就像你可以對資料庫進行資料 Schema 更新一樣,你可以修改 Flink 表中列的類型或者重新寫一個列。

另外我想簡單介紹的是流處理技術不僅僅是簡單對資料進行計算,這還包括了很多與外部系統進行事務互動。流處理引擎需要在采用不同協定的系統之間以事務的方式移動資料,并保證計算過程和資料的一緻性。這一部分功能也是在 Flink 1.7 中得到了增強。

為什麼說流處理即未來?

以上我對 Flink 1.7 的新功能向大家做了簡單總結。下面讓我們來看看今天我演講的主要部分,也就是利用 Flink 來搭建應用和服務。我将說明為什麼流處理是一個搭建應用和服務或者微服務的有趣技術。

為什麼說流處理即未來?

我将從左邊這個高度簡化的圖說起,我們一會兒将聊一些其中的細節。首先我們來看一個了解應用簡單的視角。如左圖所示,一個應用可以是一個 Container,一個 Spring 應用,或者 Java 應用、Ruby 應用,等等。這個應用從諸如 RPC,HTTP 等管道接收請求,然後依據請求進行資料庫變更。這個應用也可能調用另一個微服務并進行下一步的處理。我們可以非常自然的想到進入到應用的這些請求可以看做是個事件組成的序列,是以我們可以把它們看做是事件流。可能這些事件被緩存在消息隊列中,而應用會從消息隊列中消費這些事件進行處理,當應用需要響應一個請求時,它将結果輸出到另一個消息隊列,而請求發送方可以從這個消息隊列中消費得到所發送請求的響應。在這張圖中我們已經可以看到一些有趣的不同。

第一個不同是在這張圖中應用和資料庫不再是分開的兩個實體,而是被一個有狀态的流處理應用所代替。是以在流處理應用的架構中,不再有應用和資料庫的連接配接了,它們被放到了一起。這個做法有利有弊,但其中有些好處是非常重要的。首先是性能上的好處是明顯的,因為應用不再需要和資料庫進行互動,處理可以基于記憶體中的變量進行。其次這種做法有很好并且很簡單的一緻性。

為什麼說流處理即未來?

這張圖被簡化了很多,實際上我們通常會有很多個應用,而不是一個被隔離的應用,很多情況下你的應用會更符合這張圖。系統中有個接收請求的接口,然後請求被發送到第一個應用,可能會再被發到另一個應用,然後得到相應。在圖中有些應用會消費中間結果的流。這張圖已經展示了為什麼流處理是更适合比較複雜的微服務場景的技術。因為很多時候系統中不會有一個直接接收使用者請求并直接響應的服務,通常來說一個微服務需要跟其他微服務通信。這正如在流處理的架構中不同應用在建立輸出流,同時基于衍生出的流再建立并輸出新的流。

為什麼說流處理即未來?

到目前為止,我們看到的内容多少還比較直覺。而對基于流處理技術的微服務架構而言,人們最常問的一個問題是如何保證事務性?如果系統中使用的是資料庫,通常來說都會有非常成熟複雜的資料校驗和事務模型。這也是資料庫在過去許多年中十分成功的原因。開始一個事務,對資料做一些操作,送出或者撤銷一個事務。這個機制使得資料完整性得到了保證(一緻性,持久性等等)。

那麼在流進行中我們怎麼做到同樣的事情呢?作為一個優秀的流處理引擎,Flink 支援了恰好一次語義,保證了每個事件隻會被處理一遍。但是這依然對某些操作有限制,這也成為了使用流處理應用的一個障礙。我們通過一個非常簡單流處理應用例子來看我們可以做一些什麼擴充來解決這個問題。我們會看到,解決辦法其實出奇的簡單。

為什麼說流處理即未來?

讓我們以這個教科書式的事務為例子來看一下事務性應用的過程。這個系統維護了賬戶和其中存款餘額的資訊。這樣的資訊可能是銀行或者線上支付系統的場景中用到的。假設我們想要處理類似下面的事務:如果賬戶 A 中的餘額大于 100,那麼從賬戶 A 中轉賬 50 元到賬戶 B。這是個非常簡單的兩個賬戶之間進行轉賬的例子。

資料庫對于這樣的事務已經有了一個核心的範式,也就是原子性,一緻性,隔離性和持久性(ACID)。這是能夠讓使用者放心使用事務的幾個基本保證。有了他們,使用者不用擔心錢在轉賬過程中會丢失或者其他問題。讓我們用這個例子來放到流處理應用中,來讓流處理應用也能提供和資料相同的 ACID 支援:

原子性要求一個轉賬要不就完全完成,也就是說轉賬金額從一個賬戶減少,并增加到另一個賬戶,要不就兩個賬戶的餘額都沒有變化。而不會隻有一個賬戶餘額改變。否則的話錢就會憑空減少或者憑空增加。

一緻性和隔離性是說如果有很多使用者同時想要進行轉賬,那麼這些轉賬行為之間應該互不幹擾,每個轉賬行為應該被獨立的完成,并且完成後每個賬戶的餘額應該是正确的。也就是說如果兩個使用者同時操作同一個賬戶,系統不應該出錯。

持久性指的是如果一個操作已經完成,那麼這個操作的結果會被妥善的儲存而不會丢失。

我們假設持久性已經被滿足。一個流處理器有狀态,這個狀态會被 checkpoint,是以流處理器的狀态是可恢複的。也就是說隻要我們完成了一個修改,并且這個修改被 checkpoint 了,那麼這個修改就是持久化的。

為什麼說流處理即未來?

讓我們來看看另外三個例子。設想一下,如果我們用流處理應用來實作這樣一個轉賬系統會發生什麼。我們先把問題簡化一些,假設轉賬不需要有條件,僅僅是将 50 元從賬戶 A 轉到賬戶,也就是說賬戶 A 的餘額減少 50 元而賬戶 B 的餘額增加 50 元。我們的系統是一個分布式的并行系統,而不是一個單機系統。簡單起見我們假設系統中隻有兩台機器,這兩台機器可以是不同的實體機或者是在 YARN 或者 Kubernetes 上不同的容器。總之它們是兩個不同的流處理器執行個體,資料分布在這兩個流處理器上。我們假設賬戶 A 的資料由其中一台機器維護,而賬戶 B 的資料有另一台機器維護。

現在我們要做個轉賬,将 50 元從賬戶 A 轉移到賬戶 B,我們把這個請求放進隊列中,然後這個轉賬請求被分解為對賬戶 A 和 B 分别進行操作,并且根據鍵将這兩個操作路由到維護賬戶 A 和維護賬戶 B 的這兩台機器上,這兩台機器分别根據要求對賬戶 A 和賬戶 B 的餘額進行改動。這并不是事務操作,而隻是兩個獨立無意義的改動。一旦我們将轉賬的請求改的稍微複雜一些就會發現問題。

為什麼說流處理即未來?

下面我們假設轉賬是有條件的,我們隻想在賬戶 A 的餘額足夠的情況下才進行轉賬,這樣就已經有些不太對了。如果我們還是像之前那樣操作,将這個轉賬請求分别發送給維護賬戶 A 和 B 的兩台機器,如果 A 沒有足夠的餘額,那麼 A 的餘額不會發生變化,而 B 的餘額可能已經被改動了。我們就違反了一緻性的要求。

我們看到我們需要首先以某種方式統一做出是否需要更改餘額的決定,如果這個統一的決定中餘額需要被修改,我們再進行修改餘額的操作。是以我們先給維護 A 的餘額的機器發送一個請求,讓它檢視 A 的餘額。我們也可以對 B 做同樣的事情,但是這個例子裡面我們不關心 B 的餘額。然後我們把所有這樣的條件檢查的請求彙總起來去檢驗條件是否滿足。因為 Flink 這樣的流處理器支援疊代,如果滿足轉賬條件,我們可以把這個餘額改動的操作放進疊代的回報流當中來告訴對應的節點來進行餘額修改。反之如果條件不滿足,那麼餘額改動的操作将不會被放進回報流。這個例子裡面,通過這種方式我們可以正确的進行轉賬操作。從某種角度上來說我們實作了原子性,基于一個條件我們可以進行全部的餘額修改,或者不進行任何餘額修改。這部分依然還是比較直覺的,更大的困難是在于如何做到并發請求的隔離性。

為什麼說流處理即未來?

假設我們的系統沒有變,但是系統中有多個并發的請求。我們在之前的演講中已經知道,這樣的并發可能達到每秒鐘幾十億條。如圖,我們的系統可能從兩個流中同時接受請求。如果這兩個請求同時到達,我們像之前那樣将每個請求拆分成多個請求,首先檢查餘額條件,然後進行餘額操作。然而我們發現這會帶來問題。管理賬戶 A 的機器會首先檢查 A 的餘額是否大于 50,然後又會檢查 A 的餘額是否大于 100,因為兩個條件都滿足,是以兩筆轉賬操作都會進行,但實際上賬戶 A 上的餘額可能無法同時完成兩筆轉賬,而隻能完成 50 元或者 100 元的轉賬中的一筆。這裡我們需要進一步思考怎麼樣來處理并發的請求,我們不能隻是簡單地并發處理請求,這會違反事務的保證。從某種角度來說,這是整個資料庫事務的核心。資料庫的專家們花了一些時間提供了不同解決方案,有的方案比較簡單,有的則很複雜。但所有的方案都不是那麼容易,尤其是在分布式系統當中。

在流進行中怎麼解決這個問題呢?直覺上講,如果我們能夠讓所有的事務都按照順序依次發生,那麼問題就解決了,這也被成為可序列化的特性。但是我們當然不希望所有的請求都被依次順序處理,這與我們使用分布式系統的初衷相違背。是以我們需要保證這些請求最後的産生的影響看起來是按照順序發生的,也就是一個請求産生的影響是基于前一個請求産生影響的基礎之上的。換句話說也就是一個事務的修改需要在前一個事務的所有修改都完成後才能進行。這種希望一件事在另一件事之後發生的要求看起來很熟悉,這似乎是我們以前在流進行中曾經遇到過的問題。是的,這聽上去像是事件時間。用高度簡化的方式來解釋,如果所有的請求都在不同的事件時間産生,即使由于種種原因他們到達處理器的時間是亂序的,流處理器依然會根據他們的事件時間來對他們進行處理。流處理器會使得所有的事件的影響看上去都是按順序發生的。按事件時間處理是 Flink 已經支援的功能。

為什麼說流處理即未來?

那麼詳細說來,我們到底怎麼解決這個一緻性問題呢?假設我們有并行的請求輸入并行的事務請求,這些請求讀取某些表中的記錄,然後修改某些表中的記錄。我們首先需要做的是把這些事務請求根據事件時間順序擺放。這些請求的事務時間不能夠相同,但是他們之間的時間也需要足夠接近,這是因為在事件時間的處理過程中會引入一定的延遲,我們需要保證所處理的事件時間在向前推進。是以第一步是定義事務執行的順序,也就是說需要有一個聰明的算法來為每個事務制定事件時間。

在圖上,假設這三個事務的事件時間分别是 T+2, T 和 T+1。那麼第二個事務的影響需要在第一和第三個事務之前。不同的事務所做的修改是不同的,每個事務都會産生不同的操作請求來修改狀态。我們現在需要将對通路每個行和狀态的事件進行排序,保證他們的通路是符合事件時間順序的。這也意味着那些互相之間沒有關系的事務之間自然也沒有了任何影響。比如這裡的第三個事務請求,它與前兩個事務之間沒有通路共同的狀态,是以它的事件時間排序與前兩個事務也互相獨立。而目前兩個事務之間的操作的到達順序與事件時間不符時,Flink 則會依據它們的事件時間進行排序後再處理。

必須承認,這樣說還是進行了一些簡化,我們還需要做一些事情來保證高效執行,但是總體原則上來說,這就是全部的設計。除此之外我們并不需要更多其他東西。

為什麼說流處理即未來?

為了實作這個設計,我們引入了一種聰明的分布式事件時間配置設定機制。這裡的事件時間是邏輯時間,它并不需要有什麼現實意義,比如它不需要是真實的時鐘。使用 Flink 的亂序處理能力,并且使用 Flink 疊代計算的功能來進行某些前提條件的檢查。這些就是我們建構一個支援事務的流處理器的要素。

為什麼說流處理即未來?

我們實際上已經完成了這個工作,稱之為流式賬簿(Streaming Ledger),這是個在 Apache Flink 上很小的庫。它基于流處理器做到了滿足 ACID 的多鍵事務性操作。我相信這是個非常有趣的進化。流處理器一開始基本上沒有任何保障,然後類似 Storm 的系統增加了至少一次的保證。但顯然至少一次依然不夠好。然後我們看到了恰好一次的語義,這是一個大的進步,但這隻是對于單行操作的恰好一次語義,這與鍵值庫很類似。而支援多行恰好一次或者多行事務操作将流處理器提升到了一個可以解決傳統意義上關系型資料庫所應用場景的階段。

為什麼說流處理即未來?

Streaming Ledger 的實作方式是允許使用者定義一些表和對這些表進行修改的函數。

Streaming Ledger 會運作這些函數和表,所有的這些一起編譯成一個 Apache Flink 的有向無環圖(DAG)。Streaming Ledger 會注入所有事務時間配置設定的邏輯,以此來保證所有事務的一緻性。

為什麼說流處理即未來?

搭建這樣一個庫并不難,難的是讓它高性能的運作。讓我們來看看它的性能。這些性能測試是幾個月之前的,我們并沒有做什麼特别的優化,我們隻是想看看一些最簡單的方法能夠有什麼樣的性能表現。而實際性能表現看起來相當不錯。如果你看這些性能條形成的階梯跨度,随着流處理器數量的增長,性能的增長相當線性。

在事務設計中,沒有任何協同或者鎖參與其中。這隻是流處理,将事件流推入系統,緩存一小段時間來做一些亂序處理,然後做一些本地狀态更新。在這個方案中,沒有什麼特别代價高昂的操作。在圖中性能增長似乎超過了線性,我想這主要是因為 JAVA 的 JVM 當中 GC 的工作原因導緻的。在 32 個節點的情況下我們每秒可以處理大約兩百萬個事務。為了與資料庫性能測試進行對比,通常當你看資料庫的性能測試時,你會看到類似讀寫操作比的說明,比如 10% 的更新操作。而我們的測試使用的是 100% 的更新操作,而每個寫操作至少更新在不同分區上的 4 行資料,我們的表的大小大約是兩億行。即便沒有任何優化,這個方案的性能也非常不錯。

另一個在事務性能中有趣的問題是當更新的操作對象是一個比較小的集合時的性能。如果事務之間沒有沖突,并發的事務處理是一個容易的事情。如果所有的事務都獨立進行而互不幹擾,那這個不是什麼難題,任何系統應該都能很好的解決這樣的問題。

當所有的事務都開始操作同一些行時,事情開始變得更有趣了,你需要隔離不同的修改來保證一緻性。是以我們開始比較一個隻讀的程式、一個又讀又寫但是沒有寫沖突的程式和一個又讀又寫并有中等程度寫沖突的程式這三者之間的性能。你可以看到性能表現相當穩定。這就像是一個樂觀的并發沖突控制,表現很不錯。那如果我們真的想要針對這類系統的阿喀琉斯之踵進行考驗,也就是反複的更新同一個小集合中的鍵。

在傳統資料庫中,這種情況下可能會出現反複重試,反複失敗再重試,這是一種我們總想避免的糟糕情況。是的,我們的确需要付出性能代價,這很自然,因為如果你的表中有幾行資料每個人都想更新,那麼你的系統就失去了并發性,這本身就是個問題。但是這種情況下,系統并沒崩潰,它仍然在穩定的處理請求,雖然失去了一些并發性,但是請求依然能夠被處理。這是因為我們沒有沖突重試的機制,你可以認為我們有一個基于亂序處理天然的沖突避免的機制,這是一種非常穩定和強大的技術。

為什麼說流處理即未來?

我們還嘗試了在跨地域分布的情況下的性能表現。比如我們在美國、巴西,歐洲,日本和澳洲各設定了一個 Flink 叢集。也就是說我們有個全球分布的系統。如果你在使用一個關系型資料庫,那麼你會付出相當高昂的性能代價,因為通信的延遲變得相當高。跨大洲的資訊互動比在同一個資料中心甚至同一個機架上的資訊互動要産生大得多的延遲。

但是有趣的是,流處理的方式對延遲并不是十分敏感,延遲對性能有所影響,但是相比其它很多方案,延遲對流處理的影響要小得多。是以,在這樣的全球分布式環境中執行分布式程式,的确會有更差的性能,部分原因也是因為跨大洲的通信帶寬不如統一資料中心裡的帶寬,但是性能表現依然不差。

實際上,你可以拿它當做一個跨地域的資料庫,同時仍然能夠在一個大概 10 個節點的叢集上獲得每秒幾十萬條事務的處理能力。在這個測試中我們隻用了 10 個節點,每個大洲兩個節點。是以 10 個節點可以帶來全球分布的每秒 20 萬事務的處理能力。我認為這是很有趣的結果,這是因為這個方案對延遲并不敏感。

為什麼說流處理即未來?

我已經說了很多利用流處理來實作事務性的應用。可能聽起來這是個很自然的想法,從某種角度上來說的确是這樣。但是它的确需要一些很複雜的機制來作為支撐。它需要一個連續處理而非微批處理的能力,需要能夠做疊代,需要複雜的基于事件時間處理亂序處理。為了更好地性能,它需要靈活的狀态抽象和異步 checkpoint 機制。這些是真正困難的事情。這些不是由 Ledger Streaming 庫實作的,而是 Apache Flink 實作的,是以即使對這類事務性的應用而言,Apache Flink 也是真正的中流砥柱。

為什麼說流處理即未來?

至此,我們可以說流處理不僅僅支援連續處理、流式分析、批處理或者事件驅動的處理,你也可以用它做事務性的處理。當然,前提是你有一個足夠強大的流處理引擎。這就是我演講的全部内容。