天天看點

Spark權威指南(中文版)----第20章 流處理基礎

Spark The Definitive Guide(Spark權威指南) 中文版。本書詳細介紹了Spark2.x版本的各個子產品,目前市面上最好的Spark2.x學習書籍!!!

掃碼關注公衆号:登峰大資料,閱讀中文Spark權威指南(完整版),系統學習Spark大資料架構!

如果您覺得作者翻譯的内容有幫助,請分享給更多人。您的分享,是作者翻譯的動力

Spark權威指南(中文版)----第20章 流處理基礎

流處理是不斷地合并新資料以計算結果的行為。在流進行中,輸入資料是無界的,沒有預定的開始或結束。它隻是形成一系列到達流處理系統的事件(例如,信用卡交易、網站點選或來自物聯網(IoT)裝置的傳感器讀數)。然後,使用者應用程式可以計算這個事件流上的各種查詢(例如,跟蹤每種事件類型的運作計數,或者将它們聚合到每小時的視窗中)。應用程式将在運作時輸出結果的多個版本,或者可能在外部“接收器”系統(如鍵值存儲)中使其保持最新。

當然,我們可以将流處理與批處理進行比較,批進行中的計算是在固定輸入資料集上運作的。通常,這可能是資料倉庫中的大型資料集,其中包含來自應用程式的所有曆史事件(例如,過去一個月的所有網站通路資料或傳感器讀數)。批處理也需要一個查詢來計算,類似于流處理,但是隻計算一次結果。

雖然流處理和批處理聽起來不同,但在實踐中,它們常常需要一起工作。為了處理這些需求,結構化流(Struct Streaming)從一開始就設計了與Spark的其餘部分(包括批處理應用程式)輕松地互操作的功能。實際上,結構化流開發人員創造了術語“連續應用程式”來捕獲端到端應用程式,這些應用程式由流、批處理和互動式作業組成,這些作業都在相同的資料上工作,以傳遞最終産品。結構化流的重點是使以端到端方式建構此類應用程式變得簡單,而不是僅僅處理每條記錄的流級處理。

20.1.流處理場景

我們将流處理定義為無界資料集的增量處理,但這是激發用例的一種奇怪的方式。在我們讨論流處理的優點和缺點之前,讓我們解釋一下為什麼要使用流處理。我們将描述六個來自底層流處理系統的需求不同的常見用例。

1、通知和預警

最明顯的流用例可能是通知和預警。給定一系列事件,如果發生某種事件或一系列事件,應該觸發通知或警報。這并不一定意味着自主或預先程式設計的決策;警報還可以用于将需要采取的某些操作通知對應的人員。一個例子可能是向一個中心的員工發出警報,他們需要從倉庫的某個位置擷取某個物品并将其發送給客戶。在這兩種情況下,通知都需要快速進行。

2、實時報表

許多公司使用流處理系統來運作任何員工都可以檢視的實時儀表闆。我們使用這些訓示闆來監控平台的總體使用情況、系統負載、正常運作時間,甚至在新特性推出時對它們的使用情況。

3、增量ETL

最常見的流應用程式之一是減少公司在将資訊重新存儲到資料倉庫時必須忍受的延遲。Spark批處理作業通常用于提取、轉換和加載(ETL)工作負載,這些工作負載将原始資料轉換為Parquet之類的結構化格式,以支援高效查詢。使用結構化流,這些作業可以在幾秒鐘内合并新資料,使使用者能夠在下遊更快地查詢資料。在這個用例中,以容錯的方式精确地處理一次資料是至關重要的: 我們不希望在輸入資料到達倉庫之前丢失任何資料,也不希望加載相同的資料兩次。此外,流系統需要以事務的方式更新資料倉庫,以免将在其上運作的查詢與部分寫入的資料混淆。

4、實時更新資料

流系統經常用于計算由另一個應用程式互動提供服務的資料。例如,像谷歌analytics這樣的web分析産品可能會持續跟蹤每個頁面的通路次數,并使用流媒體系統來保持這些通路次數的最新。當使用者與産品的UI互動時,這個web應用程式查詢最新的計數。支援這個用例需要流系統能夠以同步的方式對鍵值存儲(或其他服務系統)執行增量更新,而且通常這些更新是事務性的,就像ETL中的情況一樣,避免破壞應用程式中的資料。

5、實時決策

流系統上的實時決策包括分析新輸入并使用業務邏輯自動響應它們。一個示例用例是,銀行希望根據客戶的近期曆史自動驗證客戶信用卡上的新交易是否代表欺詐,如果費用是預先确定的欺詐金額門檻值,則拒絕該交易。這個決策需要在處理每個事務時實時做出,這樣開發人員就可以在流系統中實作這個業務邏輯,并針對事務流運作它。這種類型的應用程式可能需要維護每個使用者的大量狀态,以跟蹤其目前的支出模式,并自動将此狀态與每個新事務進行比較。

6、線上機器學習

實時決策用例的一個密切的派生是線上機器學習。在這個場景中,您可能希望在多個使用者的流資料和曆史資料的組合上訓練模型。一個例子可能比前面提到的信用卡交易用例更複雜:與其根據一個客戶的行為對寫死的規則作出反應,公司可能希望不斷地從所有客戶的行為更新模型,并根據它測試每個交易。對于流處理系統來說,這是最具挑戰性的用例,因為它需要跨多個客戶的聚合、針對靜态資料集的連接配接、與機器學習庫的內建以及低延遲響應時間。

20.2.流處理的優點

現在我們已經看到了流的一些用例,讓我們來明确流處理的一些優勢。在大多數情況下,批處理對于大多數用例來說更易于了解、排除故障和編寫應用程式。此外,批處理資料的能力允許比許多流系統更高的資料處理吞吐量。然而,流處理在兩種情況下是必不可少的。首先,流處理可以降低延遲:當您的應用程式需要快速響應(以分鐘、秒或毫秒為機關)時,您将需要一個流系統,該系統可以将狀态儲存在記憶體中,以獲得可接受的性能。我們描述的許多決策和預警用例都屬于這個陣營。其次,流處理在更新結果方面也比重複批處理作業更有效,因為它會自動增加計算量。例如,如果我們希望計算過去24小時内的web流量統計資料,那麼一個天真實作的批處理作業可能在每次運作時掃描所有資料,總是處理24小時的資料。相反,流媒體系統可以記住以前計算的狀态,隻計算新資料。例如,如果您告訴流媒體系統每小時更新您的報告,那麼它每次隻需要處理1小時的資料(自上次報告以來的新資料)。在批處理系統中,您必須手動實作這種增量計算,以獲得相同的性能,這将導緻流系統自動為您提供大量額外的工作。

20.3.流處理的挑戰

我們讨論了流處理的動機和優點,但是您可能知道,天下沒有免費的午餐。讓我們讨論在流上操作的一些挑戰。為了示範這個例子,讓我們假設我們的應用程式從傳感器(例如汽車内部)接收輸入消息,傳感器在不同的時間報告它的值。然後我們希望在這個流中搜尋特定的值,或特定的值模式。一個具體的挑戰是輸入記錄可能無序地到達我們的應用程式:例如,由于延遲和重新傳輸,我們可能會按順序收到以下更新序列,其中time字段顯示實際測量值的時間:

{value: 1, time: "2017-04-07T00:00:00"}              {value: 2, time: "2017-04-07T01:00:00"}              {value: 5, time: "2017-04-07T02:00:00"}              {value: 10, time: "2017-04-07T01:30:00"}              {value: 7, time: "2017-04-07T03:00:00"}
           

在任何資料處理系統中,我們都可以構造邏輯來執行一些基于接收單個值“5”的操作。在流媒體系統中,我們還可以快速響應這個單獨的事件。然而,如果您隻想根據接收到的特定值序列(比如2、10、5)觸發某個操作,那麼事情就變得更複雜了。在批處理的情況下,這并不特别困難,因為我們可以簡單地按時間字段對所有事件進行排序,進而看到10确實在2到5之間。然而,這對于流處理系統來說比較困難,原因是流處理系統将單獨接收每個事件,并且需要在事件之間跟蹤一些狀态,以記住2和5個事件,并認識到10個事件介于兩者之間。需要記住流上的這種狀态會帶來更多的挑戰。例如,如果您有大量的資料量(例如,數百萬個傳感器流),而狀态本身是大量的,該怎麼辦?如果系統中的機器發生故障,丢失一些狀态,該怎麼辦?如果負載不平衡,一台機器運作緩慢怎麼辦? 當對某些事件的分析“完成”時(例如,2-10-5模式沒有發生),您的應用程式如何向下遊消費者發出信号? 它應該等待固定的時間還是無限期地記住某個狀态? 當您想部署流應用程式時,可能會遇到所有這些挑戰和其他挑戰,比如生成系統事務的輸入和輸出。總而言之,我們在前一段和其他幾段中所描述的挑戰如下:

  • 基于應用程式時間戳(也稱為事件時間)處理無序資料
  • 維持大量的狀态
  • 支援高資料吞吐量
  • 盡管機器出現故障,但每個事件僅處理一次
  • 處理負載不平衡和掉隊
  • 以低延遲響應事件
  • 與其他存儲系統中的外部資料連接配接
  • 确定在新事件到達時如何更新輸出接收器
  • 以事務方式将資料寫入輸出系統
  • 在運作時更新應用程式的業務邏輯

這些主題都是大型流處理系統研究和開發的活躍領域。為了了解不同的流處理系統是如何應對這些挑戰的,我們描述了一些常見的設計概念。

20.4.流處理設計中的要點

為了支援我們描述的流處理挑戰,包括高吞吐量、低延遲和無序資料,有多種方法可以設計流處理系統。我們在這裡描述最常見的設計選項,然後在下一節描述Spark的選擇。

20.4.1.每次一條記錄 API與聲明式API的比較

設計流API最簡單的方法是将每個事件傳遞給應用程式,并讓它使用定制代碼進行響應。這是許多早期流系統(如Apache Storm)實作的方法,當應用程式需要完全控制資料處理時,它具有重要的地位。提供這種"每次一條記錄"API的流隻是給使用者一個“管道”集合,以便将它們連接配接到一個應用程式中。然而,這些系統的缺點是,我們前面描述的大多數複雜因素,如維護狀态,都是由應用程式單獨控制的。例如,對于"每次一條記錄"API,使用者負責在更長的時間内跟蹤狀态,在一段時間後删除狀态以清空空間,并在失敗後以不同的方式響應重複的事件。正确地為這些系統程式設計是相當具有挑戰性的。重要的是,底層api需要具有深入專業知識的人員來開發和維護。

是以,許多較新的流系統提供聲明性api,應用程式在其中指定要計算什麼,而不是如何計算以響應每個新事件,以及如何從失敗中恢複。例如,Spark的原始DStreams API提供了基于流上的map、reduce和filter等操作的功能性API。在内部,DStream API自動跟蹤每個操作符處理了多少資料,可靠地儲存了任何相關狀态,并在需要時從故障中恢複計算。谷歌Dataflow和Apache Kafka Streams等系統提供了類似的功能性api。Spark的結構化流實際上進一步擴充了這一概念,從功能操作切換到關系操作(類似sql),這些操作無需程式設計即可實作更豐富的自動優化執行。

20.4.2.事件時間與處理時間

對于具有聲明性api的系統,第二個問題是系統本身是否支援事件時間。事件時間是基于插入到源記錄中的時間戳來處理資料的概念,而不是基于流應用程式接收記錄的時間(稱為處理時間)。特别是在使用事件時間時,記錄可能會無序地到達系統,并且不同的源之間也可能不同步(對于相同的事件時間,一些記錄可能比其他記錄晚到達)。如果您的應用程式從可能會延遲的遠端資料源(如行動電話或物聯網裝置)收集資料,事件時間處理非常重要:如果沒有事件時間處理,當某些資料延遲時,您将錯過重要的處理模式。相反,如果應用程式隻處理本地事件(例如,在同一資料中心中生成的事件),則可能不需要複雜的事件處理。

在使用事件時間時,多個問題成為應用程式之間的常見問題,包括以一種允許系統合并延遲事件的方式跟蹤狀态,以及确定在事件時間中為給定時間視窗輸出結果(例如:當系統可能已經接收到所有的輸入到那一時間點)。正因為如此,許多聲明性系統(包括結構化流)的所有api中都內建了對事件時間的“原生”支援,是以可以在整個程式中自動處理這些問題。

20.4.3.連續處理及微批處理

您經常看到的最終設計決策是關于連續執行還是微批處理執行之間的選擇。在基于連續處理的系統中,系統中的每個節點都在不斷地偵聽來自其他節點的消息,并向其子節點輸出新的更新。例如,假設您的應用程式在多個輸入流上實作了map-reduce計算。在一個連續處理系統中,每個實作map的節點将逐個從輸入源讀取記錄,計算其在這些記錄上的函數,并将它們發送到适當的reducer。reducer程式将在獲得新記錄時更新其狀态。關鍵思想是,這發生在每個單獨的記錄上,如圖20-1所示。

Spark權威指南(中文版)----第20章 流處理基礎

連續處理的優點是,當總輸入速率相對較低時,它提供了盡可能低的延遲,因為每個節點都會立即響應一條新消息。然而,連續處理系統通常具有較低的最大吞吐量,因為它們對每條記錄都會産生大量的開銷(例如,調用作業系統向下遊節點發送資料包)。

相比之下,微批處理系統等待積累小批輸入資料(比如500毫秒),然後使用分布式任務集合并行處理每個批處理,類似于在Spark中執行批處理作業。微批處理系統通常可以實作每個節點的高吞吐量,因為它們利用了與批處理系統相同的優化(例如,向量化處理),并且不産生任何額外的每個記錄開銷,如圖20-2所示。

Spark權威指南(中文版)----第20章 流處理基礎

是以,它們需要更少的節點來處理相同的資料速率。微批處理系統還可以使用動态負載平衡技術來處理不斷變化的工作負載(例如,增加或減少任務數量)。然而,缺點是由于等待累積微批處理而導緻更高的基礎延遲。在實踐中,流應用程式的規模大到需要分發它們的計算,是以傾向于優先考慮吞吐量,是以Spark傳統上實作了微批處理。然而,在結構化流中,有一種積極的開發工作也支援同一API下的連續處理模式。

在這兩種執行模式之間進行選擇時,您應該記住的主要因素是所需的延遲和總操作成本(TCO)。根據應用程式的不同,微批處理系統可以輕松地将延遲從100毫秒延遲到每秒。在這種情況下,它們通常需要更少的節點來實作相同的吞吐量,進而降低操作成本(包括由于更少的節點故障而降低的維護成本)。對于低得多的延遲,您應該考慮使用連續處理系統,或者使用微批處理系統與快速服務層結合來提供低延遲查詢(例如,将資料輸出到MySQL或Apache Cassandra,在那裡可以在毫秒内将資料送達客戶機)。

20.5.Spark’s Streaming APIs

我們介紹了一些用于流處理的進階設計方法,但是到目前為止,我們還沒有詳細讨論Spark的api。Spark包括兩個流api,如本章開頭所讨論的。Spark流中的早期DStream API是完全面向微批處理的。它有一個聲明性的(基于函數的)API,但是不支援事件時間。新的結構化流API添加了更進階别的優化、事件時間和對連續處理的支援。

20.5.1.The DStream API

Spark最初的DStream API自2012年首次釋出以來,就被廣泛用于流處理。例如,在Datanami 2016年的調查中,DStreams是使用最廣泛的處理引擎。由于Spark的進階API接口和簡單的精确一次語義,許多公司在生産中大規模使用和操作Spark流。Spark流還支援與RDD代碼的互動,比如與靜态資料的連接配接。操作Spark流并不比操作普通Spark叢集困難多少。然而,DStreams API有幾個限制。首先,它完全基于Java/Python對象和函數,而不是資料架構和資料集中更豐富的結構化表概念。這限制了引擎執行優化的機會。其次,API純粹基于處理時間來處理事件時間操作,應用程式需要自己實作它們。最後,DStreams隻能以微批處理方式進行操作,并在其API的某些部分公開微批處理的持續時間,是以很難支援其他執行模式。

20.5.2.Structured Streaming

結構化流是基于Spark的結構化API建構的進階流API。它适用于所有運作結構化處理的環境,包括Scala、Java、Python、R和SQL。與DStreams一樣,它也是一個基于進階操作的聲明性API,但是通過在本書前一部分介紹的結構化資料模型的基礎上進行建構,結構化流可以自動執行更多類型的優化。然而,與DStreams不同,結構化流具有對事件時間資料的原生支援(其所有視窗操作符都自動支援它)。從Apache Spark 2.2開始,系統隻在微批處理模型中運作,但Databricks的Spark團隊已經宣布了一項名為持續處理的工作,以添加持續執行模式。這應該成為Spark 2.3中使用者的一個選項。

可以使用Apache Spark輕松建構端到端連續應用程式,Apache Spark結合了流、批處理和互動式查詢。例如,結構化流不使用獨立于DataFrames的API:您隻需編寫一個普通的DataFrame(或SQL)計算并在流上啟動它。當資料到達時,結構化流将以增量方式自動更新此計算的結果。在編寫端到端資料應用程式時,這是一個主要的幫助:開發人員不需要維護批處理代碼的單獨流版本(可能針對不同的執行系統),而且可能會導緻這兩個版本的代碼不同步。另一個例子是,結構化流可以将資料輸出到Spark SQL可用的标準接收器,例如Parquet表,進而可以輕松地從另一個Spark應用程式查詢流狀态。在Apache Spark的未來版本中,我們期望越來越多的項目元件與結構化流內建,包括MLlib中的線上學習算法。一般來說,結構化流是Spark streams的DStream API的一種更易于使用和性能更高的改進,是以在本書中我們将隻關注這個新API。許多概念,例如用轉換圖建構計算,也适用于DStreams,但是我們将對這些概念的闡述留給其他書籍。

20.6.結論

本章介紹了了解流處理所需的基本概念和思想。本章介紹的設計方法應該闡明如何評估給定應用程式的流系統。您還應該能夠輕松地了解DStreams和結構化流的作者所做的權衡,以及在使用結構化流時直接支援DataFrame程式的原因:不需要複制應用程式邏輯。在接下來的章節中,我們将深入研究結構化流來了解如何使用它。

繼續閱讀