天天看點

《Spark大資料分析:核心概念、技術及實踐》Spark Core

本節書摘來自華章出版社《spark大資料分析:核心概念、技術及實踐》一書中的第1章,第節,作者穆罕默德·古勒(mohammed guller)更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視。

spark core

spark是大資料領域最活躍的開源項目,甚至比hadoop還要熱門。如第1章所述,它被認為是hadoop的繼任者。spark的使用率大幅增長。很多組織正在用spark取代hadoop。

從概念上看,spark類似于hadoop,它們都用于處理大資料。它們都能用商用硬體以很低的成本處理大資料。然而,相比于hadoop,spark有很多的優勢,這些将在本章進行介紹。

本章主要介紹spark core,這也是spark生态系統的基礎。我們首先概述spark core,然後介紹spark的總體架構和應用程式運作時的情況。spark core的程式設計接口也會一并介紹。

3.1 概述

spark是一個基于記憶體的用于處理、分析大資料的叢集計算架構。它提供了一套簡單的程式設計接口,進而使得應用程式開發者友善使用叢集節點的cpu、記憶體、存儲資源來處理大資料。

3.1.1 主要特點

spark的主要特點如下:

使用友善

快速

通用

可擴充

容錯

spark提供了比mapreduce更簡單的程式設計模型。使用spark開發分布式的資料處理應用程式比用mapreduce簡單多了。

spark針對開發大資料應用程式提供了豐富的api。它提供了80多個用于處理資料的操作符。而且,spark提供了比hadoop mapreduce更易讀的api。相比之下,hadoop mapreduce隻有兩個操作符,map和reduce。hadoop要求任何問題都必須能夠分解為一系列的map作業和reduce作業。然而,有些算法卻難以隻用map和reduce來描述。相比于hadoop

mapreduce,使用spark提供的操作符來處理複雜的資料顯得更加簡單。

而且,使用spark可以寫出比用hadoop mapreduce更簡潔的代碼。用hadoop map-reduce需要寫大量的子產品代碼。同樣的資料處理算法,用hadoop

mapreduce實作需要50行,而用spark隻需要10不到。有了豐富易讀的api,消除了子產品代碼,開發者的生産力大幅提升。相對于使用hadoop,使用spark開發者的生産力會有5~10倍的提升。

spark要比hadoop快上若幹個數量級。如果資料都加載在記憶體中,它能快上數百倍,哪怕資料無法完全載入記憶體,spark也能快上數十倍。

尤其是在處理大資料集的時候,速度顯得至關重要。如果一個處理資料的作業要花費數天或小時,那麼它将拖慢決策的速度,進而降低資料的價值。反之,如果同樣的處理能提速十倍乃至百倍,它将會創造更多的機會。它甚至可能開創出前所未有的新資料驅動應用程式。

spark比hadoop快的原因有兩方面。一方面,它可以使用基于記憶體的叢集計算。另一方面,它實作了更先進的執行引擎。

得益于基于記憶體的叢集計算,spark的性能有了數量級的提升。相比于從硬碟讀取資料,采用從記憶體讀取資料的方式,獲得的順序讀取吞吐量要大100倍。換句話說,從記憶體讀取資料要比從硬碟快100倍。當應用程式隻讀取和處理少量資料時,記憶體和硬碟之間讀取速度的差距并不太明顯。然而,一旦資料量達到太位元組級别,i/o延遲(資料從硬碟載入記憶體所花費的時間)就會顯著影響作業執行時間。

spark允許應用程式利用記憶體緩存資料。這能減少磁盤i/o。一個基于mapreduce的資料處理流水線可能包含多個作業。每個作業都需要從硬碟載入資料,處理它,而後再寫入硬碟中。而且,一個使用mapreduce實作的複雜資料處理應用程式可能需要反複從硬碟讀取資料,寫入資料。由于spark允許利用記憶體緩存資料,是以使用spark實作的同樣的應用程式隻需要從硬碟讀取資料一次即可。一旦資料緩存在記憶體中,接下來的每一個操作都可以直接操作緩存的資料。就像前面說的一樣,spark可以減少i/o延遲,這樣就能顯著減少作業總的執行時間。

需要注意的是,spark不會自動将輸入資料緩存在記憶體中。一個普遍的誤解是,一旦無法把輸入資料完全載入記憶體,那麼spark将無法使用。這并不正确。spark可以在叢集上處理太位元組級的資料,哪怕叢集的總記憶體隻有僅僅100gb。在資料處理流水線上何時緩存和緩存哪部分資料完全由應用程式決定。實際上,如果資料處理應用程式隻使用一次資料,那麼它完全不需要緩存資料。

spark比hadoop mapreduce快的第二個原因是它擁有更先進的作業執行引擎。spark和hadoop一樣都将一個作業轉化為由若幹個階段構成的有向無環圖(dag)。如果你不熟悉圖論,這裡簡單介紹下。圖是一個由頂點構成的集合,這些頂點由邊相連。有向圖指的是那些邊有方向的圖。無環圖指的是不存在環路的圖。dag指的就是不存在環路的有向圖。換句話說,在dag中不存在一條起點和終點都是同一個頂點的通路。第11章将對圖進行更詳細的介紹。

hadoop mapreduce對任意一個作業都會建立由map和reduce兩個階段構成的有向無環圖。如果一個複雜的資料處理算法用mapreduce實作,可能需要劃分成多個作業,而後按順序執行。這種設計導緻hadoop

mapreduce無法做任何的優化。

與之相反,spark并沒有迫使開發者在實作資料處理算法的時候将其劃分成多個作業。spark中的dag可以包含任意個階段。一個簡單的作業可能隻有一個階段,而一個複雜的作業可能會有多個階段。這使得spark可以做些hadoop無法實作的優化。spark可以一次執行一個包含多階段的複雜作業。因為它擁有所有階段的資訊,是以可以進行優化。舉例來說,它可以減少磁盤i/o和資料shuffle操作的時間。資料的shuffle操作通常會涉及網絡間的資料傳輸,并且會增加應用程式的執行時間。

spark為各種類型的資料處理作業提供一個統一的內建平台。它可以用于批處理、互動分析、流處理、機器學習和圖計算。相比之比,hadoop mapreduce隻适合批處理。是以一個使用hadoop

mapreduce的開發者為了能做流處理和圖計算隻能使用其他的架構。

對于不同類型的資料處理作業使用不同的架構,帶來了很多問題。首先,開發者不得不學習各種架構,每種架構的接口都不相同。這降低了開發者的生産力。其次,每種架構都相對獨立。是以,資料也必須複制多份,存放在不同的地方。類似地,代碼也必須重複多份,存放在多個地方。比如,你想使用hadoop mapreduce處理曆史資料,同時使用storm(一個流處理架構)處理流式資料,二者采用同樣的算法,那麼你不得不維護兩份相同的代碼,一份是hadoop mapreduce的,一份是storm的。最後,同時使用多個架構帶來了運維上的麻煩。你得為每一個架構建立并維護一個單獨的叢集。要知道維護多個叢集可比維護一個困難多了。

spark自帶了一系列的庫,用于批處理、互動分析、流處理、機器學習和圖計算。使用spark,可以使用單一架構來建立一個包含多個不同類型任務的資料處理流水線。進而,再也沒有必要為了多個不同類型的資料處理任務而學習不同架構或者部署單獨的叢集了。使用spark有助于降低運維的困難度,減少代碼和資料的重複。

有意思的是,越來越多流行的應用和庫開始內建到spark中或添加了對spark的支援,而它們一開始是使用hadoop作為其執行引擎的。比如apache mahout(一個建構于hadoop之上的機器學習庫)正在內建到spark中。到了2014年4月,mahout的開發者已經放棄了hadoop并且不再添加新的基于mapreduce的機器學習算法了。

同樣地,hive(見第1章)的開發者也正在開發一個運作在spark上的版本。pig(一個可以用腳本語言來建立資料處理流水線的資料分析平台)同樣支援spark作為它的執行引擎。cascading(一個用于開發hadoop資料應用程式的應用開發平台)也添加了對spark的支援。

可拓展

spark是可擴充的。spark叢集的資料處理能力可以通過增加更多叢集節點的方式得以提升。你可以從一個小叢集開始,随着資料量的增加,逐漸增加更多的計算能力。這相當經濟。

而且,spark的這個特性對于應用程式來說是透明的。當你往spark叢集增加節點的時候無須改動任何代碼。

spark是可容錯的。一個由數百個節點構成的叢集中,每個節點在任何一天故障的可能性都很高。硬碟損壞或其他硬體問題都有可能導緻節點不可用。spark能自動處理叢集中的節點故障。一個節點故障可能會導緻性能下降但不會導緻應用無法運作。

既然spark能自動處理節點故障,應用程式的開發者就不必在應用中處理這樣的異常情況了,這簡化了應用程式的代碼。

3.1.2 理想的應用程式

就像前面讨論的那樣,spark是一個通用架構,它用于各種大資料應用中。然而,對于一個理想的大資料應用程式而言,速度是相當重要的。使用疊代資料處理算法的應用和互動分析都是這樣的典型應用。

疊代算法

疊代算法是指那些在同樣資料上疊代多次的資料處理算法。使用這類算法的應用包括機器學習和圖處理應用。這些應用都在同樣的資料上疊代數十次乃至數百次算法。對于這類應用,spark是理想的選擇。

spark記憶體計算的特性使得在spark上面執行這些疊代算法比較快。由于spark允許應用在記憶體中緩存資料,是以一個疊代算法哪怕需要疊代100次,也隻需要在第一次疊代的時候從硬碟讀取資料,接下來的疊代都從記憶體中讀取。而從記憶體中讀取資料比從硬碟要快100倍,是以在spark上運作這些應用能快上一個數量級。

互動分析

互動式資料分析涉及互動式地探索資料。舉例來說,對于一個巨型資料集,在觸發一個可能需要花費數小時的長時間運作的批處理作業之前,先進行彙總分析是很有用的。類似地,一個商業分析師可能想要使用bi或資料可視化工具來進行互動分析。在這種場景下,使用者會在同一個資料集上執行多個查詢。spark就提供了這樣一個用于大資料互動分析的理想平台。

spark适用于互動分析的理由還是它的記憶體計算特性。應用程式可以緩存資料,進而使得資料能夠在記憶體中進行互動分析。第一個查詢請求從硬碟讀取資料,但是接下來的一連串請求都從記憶體中讀取緩存資料。查詢記憶體中的資料要比硬碟中的資料快上一個數量級。當資料緩存在記憶體中的時候,一個查詢請求可能隻需要花費數秒,而在硬碟中則需要不止一個小時。

3.2 總體架構

一個spark應用包括5個重要部分:驅動程式、叢集管理者、worker、執行者、任務(見圖3-1)。

圖3-1 高層spark架構

3.2.1 worker

worker為spark應用提供cpu、記憶體和存儲資源。worker把spark應用當成分布式程序在叢集節點上執行。

3.2.2 叢集管理者

spark使用叢集管理者來獲得執行作業所需要的叢集資源。顧名思義,叢集管理者管理叢集中worker節點的計算資源。它能跨應用從底層排程叢集資源。它可以讓多個應用分享叢集資源并且運作在同一個worker節點上。

spark目前支援三種叢集管理者:單獨模式、mesos模式、yarn模式。mesos模式和yarn模式都允許在同一個worker節點上同時運作spark應用和hadoop應用。第10章将詳細介紹叢集管理者。

3.2.3 驅動程式

驅動程式是一個把spark當成庫使用的應用。它提供資料處理的代碼,spark将在worker節點上執行這些代碼。一個驅動程式可以在spark叢集上啟動一個或多個作業。

3.2.4 執行者

執行者是一個jvm程序,對于一個應用由spark在每一個worker上建立。它可以多線程的方式并發執行應用代碼。它也可以把資料緩存在記憶體或硬碟中。

執行者的生命周期和建立它的應用一樣。一旦spark應用結束,那麼為它建立的執行者也将壽終正寝。

3.2.5 任務

任務是spark發送給執行者的最小工作單元。它運作在worker節點上執行者的一個線程中。每一個任務都執行一些計算,然後将結果傳回給驅動程式,或者分區以用于shuffle操作。

spark為每一個資料分區建立一個任務。一個執行者可以并發執行一個或多個任務。任務數量由分區的數量決定。更多的分區意味着将有更多的任務并行處理資料。

3.3 應用運作

本節主要描述資料處理代碼是怎麼在spark叢集中執行的。

3.3.1 術語

先來看看幾個術語的定義。

shuffle操作。shuffle操作是指在叢集節點上對資料進行重新配置設定。這是一個耗時操作,因為它涉及在網絡間傳輸資料。需要注意的是,shuffle操作不是對資料進行随機重新配置設定,它按照某些标準将資料分成不同的集合。每一個集合就是一個新的分區。

作業。作業是一系列計算的集合,spark執行這些計算并将結果傳回給驅動程式。作業本質上就是在spark叢集上運作資料處理算法。一個應用程式可以發起多個作業。本章稍後将會介紹作業是怎麼執行的。

階段。一個階段由若幹個任務構成。spark将一個作業分解為一個由若幹個階段構成的dag,每一個階段依賴于其他階段。舉個例子,把一個作業分解為階段0和階段1兩個階段。隻有當階段0完成之後,才可以開始階段1。spark利用shuffle邊界将任務分成不同的階段。不要求shuffle操作的任務屬于同一階段。隻有在開始一個新階段時,任務才需要輸入資料是經過shuffle操作的。

3.3.2 應用運作過程

有了上面的這些定義,我們就可以描述一個spark應用在叢集節點上并行處理資料的過程。當一個spark應用開始運作的時候,spark會連接配接叢集管理者,擷取在worker節點上的執行者資源。就像前面所說的,spark應用把一個資料處理算法當成一個作業送出。spark将這個作業分解成由若幹個階段構成的dag。然後,spark在執行者上排程這些階段的運作,排程操作由叢集管理者提供的底層排程器實作。執行者并行地運作spark送出的任務。

每一個spark應用都有一組其自己的位于worker節點上的執行者。這樣的設計有諸多好處。首先,不同應用中的任務由于運作在不同jvm之上,使得它們之間互相隔離。一個應用程式中的錯誤任務并不會讓其他應用崩潰。其次,排程任務變得輕而易舉。spark一次隻需要排程歸屬于同一應用的任務。它不用處理這樣一種複雜情況,其中排程的多個任務屬于多個并發執行的不同應用。

然而,這種設計也有不足之處。由于不同應用在不同的jvm程序中運作,是以它們之間就不太友善共享資料。即使它們可能在同一個worker節點上運作,它們也隻能通過讀寫磁盤的方式共享資料。就像前面所說的,讀寫磁盤是耗時的操作。是以,應用間通過磁盤共享資料,将會遇到性能問題。

3.4 資料源

spark本質上是一個使用叢集節點進行大資料集處理的計算架構。與資料庫不同,它并沒有存儲系統,但是它可以搭配外部存儲系統使用。spark一般都配合能存儲大量資料的分布式存儲系統使用。

spark支援多種資料源。spark應用程式可以使用的資料來源包括hdfs、hbase、cassandra、amazon

s3,或者其他支援hadoop的資料源。任何hadoop支援的資料源都可以被spark core使用。spark上的庫spark sql還支援更多資料源。第7章将會介紹spark-sql。

相容支援hadoop的資料源是相當重要的。許多組織都已經在hadoop上面投入了大量的精力。在hdfs或其他支援hadoop的資料存儲系統上都存儲着大量的資料。使用spark并不需要将這些資料遷移到其他存儲系統。而且,将hadoop mapreduce替換成spark并不需要另起爐竈,這是比較輕松的。如果現有的hadoop叢集正在執行mapreduce作業,也可以同時在上面運作spark應用。可以把現有的mapreduce作業轉化成spark作業。或者,也可以保留現有的mapreduce應用程式,不做更改,使用spark運作新的應用程式。

由于spark core原生支援hadoop相容的存儲系統,是以額外的資料源都能很友善地添加進來。比如,人們已經為spark編寫好了各種資料源的連接配接器,包括cassandra、mongodb、couchdb和其他流行的資料源。

spark也支援本地檔案系統。spark應用程式可以讀寫本地檔案系統上的資料。如果資料可以從本地檔案讀取并在單機上處理,那麼沒必要使用spark。盡管如此,spark的這個特性使得它便于開發應用和調試,并且易學。

3.5 api

應用可以通過使用spark提供的庫獲得spark叢集計算的能力。這些庫都是用scala編寫的。但是spark提供了各種語言的api。在本書編寫之際,spark api提供了如下語言的支援:scala、java、python和r。可以使用上面的任何語言來開發spark應用。也有其他語言(比如clojure)的非官方支援。

spark api主要由兩個抽象部件sparkcontext和彈性分布式資料集(rdd)構成。應用程式通過這兩個部件和spark進行互動。應用程式可以連接配接到spark叢集并使用相關資源。接下來會介紹這兩個抽象部件,然後詳細介紹rdd。

3.5.1 sparkcontext

sparkcontext是一個在spark庫中定義的類。它是spark庫的入口點。它表示與spark叢集的一個連接配接。使用spark api建立的其他一些重要對象都依賴于它。

每個spark應用程式都必須建立一個sparkcontext類執行個體。目前,每個spark應用程式隻能擁有一個激活的sparkcontext類執行個體。如果要建立一個新的執行個體,那麼在此之前必須讓目前激活的類執行個體失活。

sparkcontext有多個構造函數。最簡單的一個不需要任何參數。一個sparkcontext類執行個體可以用如下代碼建立。

在這種情況下,sparkcontext的配置資訊都從系統屬性中擷取,比如spark master的位址、應用名稱等。也可以建立一個sparkconf類執行個體,然後把它作為sparkcontext的參數進而設定配置資訊。sparkconf 是spark庫中定義的一個類。通過這種方式可以像下面這樣設定各種spark配置資訊。

sparkconf為設定諸如spark master這樣的常用配置資訊都提供了對應的顯式方法。此外,它還提供了一個通用的方法用于設定配置資訊,它使用鍵-值對進行設定。sparkcontext和sparkconf可以使用的參數将在第4章進行詳細介紹。

在本章接下來的例子中會繼續使用上面建立的變量sc。

3.5.2 rdd

彈性分布式資料集(rdd)表示一個關于分區資料元素的集合,可以在其上進行并行操作。它是spark的主要資料抽象概念。它是spark庫中定義的一個抽象類。

從概念上看,除了可以用于表示分布式資料集和支援惰性操作的特性外,rdd類似于spark的集合。惰性操作将在本章稍後部分詳細介紹。

下面分别簡要描述rdd的特點。

不可變性

rdd是一種不可變的資料結構。一旦建立,它将不可以在原地修改。基本上,一個修改rdd的操作都會傳回一個新的rdd。

分片

rdd表示的是一組資料的分區。這些分區分布在多個叢集節點上。然而,當spark在單個節點運作時,所有的分區資料都會在目前節點上。

spark存儲rdd的分區和資料集實體分區之間關系的映射關系。rdd是各個分布式資料源之中資料的一個抽象,它通常表示分布在多個叢集節點上的分區資料。比如hdfs将資料分片或分塊分散存儲在叢集中。預設情況下,一個rdd分區對應一個hdfs檔案分片。其他的分布式資料源(比如cassandra)同樣也将資料分片分散存儲在叢集多個節點上。然而,一個rdd對應多個cassandra分片。

容錯性

rdd為可容錯的。rdd代表了分散在叢集中多個節點的資料,但是任何一個節點都有可能出故障。誠如之前所說的,一個節點出故障的可能性和叢集節點數量成正比。叢集越大,在任何一個節點它出故障的可能性就越高。

rdd會自動處理節點出故障的情況。當一個節點出故障時,該節點上存儲的資料将無法被通路。此時,spark會在其他節點上重建丢失的rdd分區資料。spark存儲每一個rdd的血統資訊。通過這些血統資訊,spark可以恢複rdd的部分資訊,當節點出故障的時候,它甚至可以恢複整個rdd。

接口

需要着重指出的是,rdd是一個處理資料的接口。在spark庫中它定義為一個抽象類。rdd為多種資料源提供了一個處理資料的統一接口,包括hdfs、hbase、cassandra等。這個接口同樣可以用于處理存儲于多個節點記憶體中的資料。

spark為不同資料源提供了各自具體的實作類,比如hadooprdd、parallelcollection-rdd、jdbcrdd和cassandrardd。它們都支援基礎的rdd接口。

強類型

rdd類有一個參數用于表示類型,這使得rdd可以表示不同類型的資料。rdd可以表示同一類型資料的分布式集合,包括integer、long、float、string或者應用開發者自己定義的類型。而且,一個應用總會使用某種類型的rdd,包括integer、long、float、double、string或自定義類型。

駐留在記憶體中

之前已經提及了spark的記憶體叢集計算特性。rdd類提供一套支援記憶體計算的api。spark允許rdd在記憶體中緩存或長期駐留。就像之前所說的,對一個緩存在記憶體中的rdd進行操作比操作沒緩存的rdd要快很多。

3.5.3 建立rdd

由于rdd是一個抽象類,是以無法直接建立一個rdd的類執行個體。sparkcontext類提供了一個工廠方法用來建立rdd實作類的類執行個體。rdd也可以通過由其他rdd執行轉換操作得到。就像之前所說的,rdd是不可變的。任何一個對rdd的修改操作都将傳回一個代表修改後資料的新rdd。

本節總結了幾種建立rdd的常見方法。在下面的示例代碼中,sc是一個sparkcontext的類執行個體。之前的章節已經介紹了怎麼建立它。

parallelize

這個方法用于從本地scala集合建立rdd執行個體。它會對scala集合中的資料重新分區、重新分布,然後傳回一個代表這些資料的rdd。這個方法很少用在生産上,但是使用它有助于學習spark。

textfile

textfile方法用于從文本檔案建立rdd執行個體。它可以從多種來源讀取資料,包括單個檔案、本地同一目錄下的多個檔案、hdfs、amazon s3,或其他hadoop支援的存儲系統。這個方法傳回一個rdd,這個rdd代表的資料集每個元素都是一個字元串,每一個字元串代表輸入檔案中的一行。

上面的代碼表示從存儲于hdfs上的一個檔案或者目錄建立rdd執行個體。

textfile方法也可以讀取壓縮檔案中的資料。而且,它的參數中可以存在通配符,用于從一個目錄中讀取多個檔案。下面是一個例子。

textfile的第二個參數是一個可選參數,它用于指定分區的個數。預設情況下,spark為每一個檔案分塊建立一個分區。可以設定成一個更大的數字進而提高并行化程度,但是設定成一個小于檔案分塊數的數字是不可以的。

wholetextfiles

這個方法讀取目錄下的所有文本檔案,然後傳回一個由鍵值型rdd。傳回rdd中的每一個鍵值對對應一個檔案。鍵為檔案路徑,對應的值為該檔案的内容。這個方法可以從多種來源讀取資料,包括本地檔案系統、hdfs、amazon s3,或者其他hadoop支援的存儲系統。

sequencefile

sequencefile方法從sequencefile檔案中擷取鍵值對資料,這些sequencefile檔案可以存儲于本地檔案系統、hdfs或者其他hadoop支援的存儲系統。這個方法傳回一個鍵值對型rdd執行個體。當使用這個方法的時候,不僅需要提供檔案名,還需要提供檔案中資料鍵和值各自的類型。

3.5.4 rdd操作

spark應用使用rdd類或其繼承類中定義的方法來處理資料。這些方法也稱為操作。既然scala中可以把一個方法當成操作符使用,那麼rdd中的方法有時也稱為操作符。

spark的美好之處就在于同樣一個rdd方法既可以處理幾位元組的資料也可以處理pb級的資料。而且spark應用可以使用同樣的方法去處理資料,無論它是存儲于本地還是存儲于一個分布式存儲系統。這樣的靈活性使得開發者可以在單機上開發、調試、測試spark應用,然後不用改動任何代碼就可以将它部署到一個大叢集上。

rdd操作可以歸為兩類:轉換和行動。轉換将會建立一個新的rdd執行個體。行動則會将結果傳回給驅動程式。

轉換

轉換指的是在原rdd執行個體上進行計算,而後建立一個新的rdd執行個體。本節将介紹一些常見的轉換操作。

從概念上看,rdd轉換操作的類似于scala集合上的方法。主要的差別在于scala集合方法操作的資料是在單機記憶體中的,而rdd的轉換操作可以處理分布在叢集各個節點上的資料。另外一個重要的差別是,rdd轉換操作是惰性的,而scala集合方法不是。本章餘下部分會詳細介紹這些内容。

map

map方法是一個高階方法,它把一個函數作為它的參數,并把這個函數作用在原rdd的每個元素上,進而建立一個新rdd執行個體。這個作為參數的函數擁有一個參數并傳回一個值。

filter

filter方法是一個高階方法,它把一個布爾函數作為它的參數,并把這個函數作用在原rdd的每個元素上,進而建立一個新rdd執行個體。一個布爾函數隻有一個參數作為輸入,傳回true或false。filter方法傳回一個新的rdd執行個體,這個rdd執行個體代表的資料集由布爾函數傳回true的元素構成。是以,新rdd執行個體代表的資料集是原rdd的子集。

flatmap

flatmap方法是一個高階方法,它把一個函數作為它的參數,這個函數處理原rdd中每個元素傳回一個序列。扁平化這個序列的集合得到一個資料集,flatmap方法傳回的rdd就代表這個資料集。

mappartitions

mappartitions是一個高階方法,它使你可以以分區的粒度來處理資料。相比于一次處理一個元素,mappartitions一次處理處理一個分區,每個分區被當成一個疊代器。mappartitions方法的函數參數把疊代器作為輸入,傳回另外一個疊代器作為輸出。map-partitions将自定義函數參數作用于每一個分區上,進而傳回一個新rdd執行個體。

union

union方法把一個rdd執行個體作為輸入,傳回一個新rdd執行個體,這個新rdd執行個體的資料集是原rdd和輸入rdd的合集。

intersection

intersection方法把一個rdd執行個體作為輸入,傳回一個新rdd執行個體,這個新rdd執行個體代表的資料集是原rdd和輸入rdd的交集。

這是另外一個例子。

subtract

subtract方法把一個rdd執行個體作為輸入,傳回一個新rdd執行個體,這個新rdd執行個體代表的資料集由那些存在于原rdd執行個體中但不在輸入rdd執行個體中的元素構成。

distinct

rdd執行個體上的distinct方法傳回一個新rdd執行個體,這個新rdd執行個體的資料集由原rdd的資料集去重後得到。

cartesian

cartesian方法把一個rdd執行個體作為輸入,傳回一個新rdd執行個體,這個新rdd執行個體的資料集由原rdd和輸入rdd的所有元素的笛卡兒積構成。傳回的rdd執行個體的每一個元素都是一個有序二進制組,每一個有序二進制組的第一個元素來自原rdd,第二個元素來自輸入rdd。元素的個數等于原rdd的元素個數乘以輸入rdd的元素個數。

這個方法類似于sql中的join操作。

zip

zip方法把一個rdd執行個體作為輸入,傳回一個新rdd執行個體,這個新rdd執行個體的每一個元素是一個二進制組,二進制組的第一個元素來自原rdd,第二個元素來自輸入rdd。和cartesian方法不同的是,zip方法傳回的rdd的元素個數于原rdd的元素個數。原rdd的元素個數和輸入rdd的相同。進一步地說,原rdd和輸入rdd不僅有相同的分區數,每個分區還有相同的元素個數。

zipwithindex

zipwithindex方法傳回一個新rdd執行個體,這個新rdd執行個體的每個元素都是由原rdd元素及其下标構成的二進制組。

groupby

groupby是一個高階方法,它将原rdd中的元素按照使用者定義的标準分組進而組成一個rdd。它把一個函數作為它的參數,這個函數為原rdd中的每一個元素生成一個鍵。groupby把這個函數作用在原rdd的每一個元素上,然後傳回一個由二進制組構成的新rdd執行個體,每個二進制組的第一個元素是函數生成的鍵,第二個元素是對應這個鍵的所有原rdd元素的集合。其中,鍵和原rdd元素的對應關系由那個作為參數的函數決定。

需要注意的是,groupby是一個費時操作,因為它可能需要對資料做shuffle操作。

假設有一個csv檔案,檔案的内容為公司客戶的姓名、年齡、性别和郵編。下面的示例代碼示範了按照郵編将客戶分組。

keyby

keyby方法與groupby方法相類似。它是一個高階方法,把一個函數作為參數,這個函數為原rdd中的每一個元素生成一個鍵。keyby方法把這個函數作用在原rdd的每一個元素上,然後傳回一個由二進制組構成的新rdd執行個體,每個二進制組的第一個元素是函數生成的鍵,第二個元素是對應這個鍵的原rdd元素。其中,鍵和原rdd元素的對應關系由那個作為參數的函數決定。傳回的rdd執行個體的元素個數和原rdd的相同。

groupby和keyby的差別在于傳回rdd執行個體的元素上。雖然都是二進制組,但是 groupby傳回的二進制組中的第二個元素是一個集合,而keyby的是單個值。

sortby

sortby是一個高階方法,它将原rdd中的元素進行排序後組成一個新的rdd執行個體傳回。它擁有兩個參數。第一個參數是一個函數,這個函數将為原rdd的每一個元素生成一個鍵。第二個參數用來指明是升序還是降序排列。

下面是另一個示例。

pipe

pipe方法可以讓你建立子程序來運作一段外部程式,然後捕獲它的輸出作為字元串,用這些字元串構成rdd執行個體傳回。

randomsplit

randomsplit方法将原rdd分解成一個rdd數組。它的參數是分解的權重。

coalesce

coalesce方法用于減少rdd的分區數量。它把分區數作為參數,傳回分區數等于這個參數的rdd執行個體。

使用coalesce方法時需要小心,因為減少了rdd的分區數也就意味着降低了spark的并行能力。它通常用于合并小分區。舉例來說,在執行filter操作之後,rdd可能會有很多小分區。在這種情況下,減少分區數能提升性能。

repartition

repartition方法把一個整數作為參數,傳回分區數等于這個參數的rdd執行個體。它有助于提高spark的并行能力。它會重新分布資料,是以它是一個耗時操作。

coalesce和repartition方法看起來一樣,但是前者用于減少rdd中的分區,後者用于增加rdd中的分區。

sample

sample方法傳回原rdd資料集的一個抽樣子集。它擁有三個參數。第一個參數指定是有放回抽樣還是無放回抽樣。第二個參數指定抽樣比例。第三個參數是可選的,指定抽樣的随機數種子。

鍵值對型rdd的轉換

除了上面介紹的rdd轉換之外,針對鍵值對型rdd還支援其他的一些轉換。下面将介紹隻能作用于鍵值對型rdd的常用轉換操作。

keys

keys方法傳回隻由原rdd中的鍵構成的rdd。

values

values方法傳回隻由原rdd中的值構成的rdd。

mapvalues

mapvalues是一個高階方法,它把一個函數作為它的參數,并把這個函數作用在原rdd的每個值上。它傳回一個由鍵值對構成的rdd。它和map方法類似,不同點在于它把作為參數的函數作用在原rdd的值上,是以原rdd的鍵都沒有變。傳回的rdd和原rdd擁有相同的鍵。

join

join方法把一個鍵值對型rdd作為參數輸入,而後在原rdd和輸入rdd上做内連接配接操作。它傳回一個由二進制組構成的rdd。二進制組的第一個元素是原rdd和輸入rdd都有的鍵,第二個元素是一個元組,這個元組由原rdd和輸入rdd中鍵對應的值構成。

leftouterjoin

leftouterjoin方法把一個鍵值對型rdd作為參數輸入,而後在原rdd和輸入rdd之間做左連接配接操作。它傳回一個由鍵值對構成的rdd。鍵值對的第一個元素是原rdd中的鍵,第二個元素是一個元組,這個元組由原rdd中鍵對應的值和輸入rdd中的可選值構成。可選值用option類型表示。

rightouterjoin

rightouterjoin方法把一個鍵值對型rdd作為參數輸入,而後在原rdd和輸入rdd之間做右連接配接操作。它傳回一個由鍵值對構成的rdd。鍵值對的第一個元素是輸入rdd中的鍵,第二個元素是一個元組,這個元組由原rdd中的可選值和輸入rdd中鍵對應的值構成。可選值用option類型表示。

fullouterjoin

fullouterjoin方法把一個鍵值對型rdd作為參數輸入,而後在原rdd和輸入rdd之間做全連接配接操作。它傳回一個由鍵值對構成的rdd。

samplebykey

samplebykey通過在鍵上抽樣傳回原rdd的一個子集。它把對每個鍵的抽樣比例作為輸入參數,傳回原rdd的一個抽樣。

subtractbykey

subtractbykey方法把一個鍵值對型rdd作為輸入參數,傳回一個鍵值對rdd,這個鍵值對rdd的鍵都是隻存在原rdd中但是不存在于輸入rdd中。

groupbykey

groupbykey方法傳回一個由二進制組構成的rdd,二進制組的第一個元素是原rdd的鍵,第二個元素是一個集合,集合由該鍵對應的所有值構成。它類似于上面介紹過的group-by方法。二者的差別在于groupby是一個高階方法,它的參數是一個函數,這個函數為原rdd的每一個元素生成一個鍵。groupbykey方法作用于rdd的每一個鍵值對上,故不需要一個生成鍵的函數作為輸入參數。

應當盡量避免使用groupbykey。它是一個耗時操作,因為它可能會對資料進行shuffle操作。在大多數情況下,都有不使用groupbykey的更好的替代方案。

reducebykey

reducebykey是一個高階方法,它把一個滿足結合律的二進制操作符當作輸入參數。它把這個操作符作用于有相同鍵的值上。

一個二進制操作符把兩個值當作輸入參數,傳回一個值。一個滿足結合律的二進制操作符傳回同樣的結果,但是它不關心操作數的分組情況。

reducebykey方法可以用于對同一鍵對應的值進行彙總操作。比如它可以用于對同一鍵對應的值進行求和,求乘積,求最小值,求最大值。

對于基于鍵的彙總操作、合并操作,reducebykey比groupbykey更合适。

操作

操作指的是那些傳回值給驅動程式的rdd方法。本節介紹一些rdd中常用的操作。

collect

collect方法傳回一個數組,這個數組由原rdd中的元素構成。在使用這個方法的時候需要小心,因為它把在worker節點的資料移給了驅動程式。如果操作一個有大資料集的rdd,它有可能會導緻驅動程式崩潰。

count

count方法傳回原rdd中元素的個數。

countbyvalue

countbyvalue方法傳回原rdd中每個元素的個數。它傳回是一個map類執行個體,其中,鍵為元素的值,值為該元素的個數。

first

first方法傳回原rdd中的第一個元素。

max

max方法傳回rdd中最大的元素。

min

min方法傳回rdd中最小的元素。

take

take方法的輸入參數為一個整數n,它傳回一個由原rdd中前n個元素構成的rdd。

takeordered

takeordered方法的輸入參數為一個整數n,它傳回一個由原rdd中前n小的元素構成的rdd。

top

top方法的輸入參數為一個整數n,它傳回一個由原rdd中前n大的元素構成的rdd。

fold

fold是一個高階方法,用于對原rdd的元素做彙總操作,彙總的時候使用一個自定義的初值和一個滿足結合律的二進制操作符。它首先在每一個rdd的分區中進行彙總,然後再彙總這些結果。

初值的取值取決于rdd中的元素類型和彙總操作的目的。比如,給定一個元素為整數的rdd,為了計算這個rdd中所有元素的和,初值取為0。相反,給定一個元素為整數的rdd,為了計算這個rdd中所有元素的乘積,初值則應取為1。

reduce

reduce是一個高階方法,用于對原rdd的元素做彙總操作,彙總的時候使用一個滿足結合律和交換律的二進制操作符。它類似于fold方法,然而,它并不需要初值。

鍵值對型rdd上的操作

鍵值對rdd上有一些額外的操作,我們在下面進行介紹。

countbykey

countbykey方法用于統計原rdd每個鍵的個數。它傳回一個map類執行個體,其中,鍵為原rdd中的鍵,值為個數。

lookup

lookup方法的輸入參數為一個鍵,傳回一個序列,這個序列的元素為原rdd中這個鍵對應的值。

數值型rdd上的操作

如果rdd的元素類型為integer、long、float或double,則這樣的rdd為數值型rdd。這類rdd還有一些對于統計分析十分有用的額外操作,下面将介紹一些常用的行動。

mean

mean方法傳回原rdd中元素的平均值。

stdev

stdev方法傳回原rdd中元素的标準差。

sum

sum方法傳回原rdd中所有元素的和。

variance

variance方法傳回原rdd中元素的方差。

3.5.5 儲存rdd

一般來說,資料處理完畢後,結果會儲存在硬碟上。spark允許開發者将rdd儲存在任何hadoop支援的存儲系統中。儲存在硬碟上的rdd可以被其他spark應用或hadoop應用使用。

本節介紹将rdd儲存成檔案的常用方法。

saveastextfile

saveastextfile方法将原rdd中的元素儲存在指定目錄中,這個目錄位于任何hadoop支援的存儲系統中。每一個rdd中的元素都用字元串表示并另存為文本中的一行。

saveasobjectfile

saveasobjectfile方法将原rdd中的元素序列化成java對象,存儲在指定目錄中。

saveassequencefile

saveassequencefile方法将鍵值對型rdd以sequencefile的格式儲存。鍵值對型rdd也可以以文本的格式儲存,隻須使用saveastextfile方法即可。

需要注意的是,上面的方法都把一個目錄的名字作為輸入參數,然後在這個目錄為每個rdd分區建立一個檔案。這種設計不僅高效而且可容錯。因為每一個分區被存成一個檔案,是以spark在儲存rdd的時候可以啟動多個任務,并行執行,将資料寫入檔案系統中。這樣也保證了寫入資料的過程是可容錯的。一旦有一個将分區寫入檔案的任務失敗了,spark可以再啟動一個任務,重寫剛才失敗任務建立的檔案。

3.6 惰性操作

rdd的建立和轉換方法都是惰性操作。當應用調用一個傳回rdd的方法的時候,spark并不會立即執行運算。比如,當你使用sparkcontext的textfile方法從hdfs中讀取檔案時,spark并不會馬上從硬碟中讀取檔案。類似地,rdd轉換操作(它會傳回新rdd)也是惰性的。spark會記錄作用于rdd上的轉換操作。

讓我們考慮如下示例代碼。

上面三行代碼看起來很快就會執行完,哪怕textfile方法讀取的是一個包含了10tb資料的檔案。這其中的原因是當你調用textfile方法時,它并沒有真正讀取檔案。類似地,filter方法也沒有立即周遊原rdd中的每一個元素。

spark僅僅記錄了這個rdd是怎麼建立的,在它上面做轉換操作會建立怎樣的子rdd等資訊。spark為每一個rdd維護其各自的血統資訊。在需要的時候,spark利用這些資訊建立rdd或重建rdd。

如果rdd的建立和轉換都是惰性操作,那麼spark什麼時候才真正讀取資料和做轉換操作的計算呢?下面将會解答這個問題。

觸發計算的操作

當spark應用調用操作方法或者儲存rdd至存儲系統的時候,rdd的轉換計算才真正執行。儲存rdd至存儲系統也被視為一種操作,盡管它并沒有向驅動程式傳回值。

當spark應用調用rdd的操作方法或者儲存rdd的時候,它觸發了spark中的連鎖反應。當調用操作方法的時候,spark會嘗試建立作為調用者的rdd。如果這個rdd是從檔案中建立的,那麼spark會在worker節點上讀取檔案至記憶體中。如果這個rdd是通過其他rdd的轉換得到的子rdd,spark會嘗試建立其父rdd。這個過程會一直持續下去,直到spark找到根rdd。然後spark就會真正執行這些生成rdd所必需的轉換計算,進而生成作為調用者的rdd。最後,執行操作方法所需的計算,将生成的結果傳回給驅動程式。

惰性轉換使得spark可以高效地執行rdd計算。直到spark應用需要操作結果時才進行計算,spark可以利用這一點優化rdd的操作。這使得操作流水線化,而且還避免了在網絡間不必要的資料傳輸。

3.7 緩存

除了将資料駐留在記憶體中以外,緩存在rdd中也扮演了另外一個重要的角色。就像之前所說的,建立rdd有兩種方式,從存儲系統中讀取資料或者應用其他現存rdd的轉換操作。預設情況下,當一個rdd的操作方法被調用時,spark會根據它的父rdd來建立這個rdd,這有可能導緻父rdd的建立。如此往複,這個過程一直持續到spark找到根rdd,而後spark通過從過存儲系統讀取資料的方式建立根rdd。操作方法被調用一次,上面說的過程就會執行一遍。每次調用操作方法,spark都會周遊這個調用者rdd的血統樹,執行所有的轉換操作來建立它。

考慮下面的例子。

盡管上面的代碼隻調用了一次textfile方法,但是日志檔案會被從硬碟中讀取兩次。這是因為調用了兩次操作方法count。在調用errorlogs.count時,日志檔案第一次被讀取,調用warninglogs.count時,日志檔案被再次讀取。這隻是個簡單的例子,現實世界中的應用會有更多的各種轉換和操作。

如果一個rdd緩存了,spark會執行到目前為止的所有轉換操作并為這個rdd建立一個檢查點。具體來說,這隻會在第一次在一個緩存的rdd上調用某操作的時候發生。類似于轉換方法,緩存方法也是惰性的。

如果一個應用緩存了rdd,spark并不是立即執行計算并把它存儲在記憶體中。spark隻有在第一次在緩存的rdd上調用某操作的時候才會将rdd物化在記憶體中。而且這第一次操作并不會從中受益,後續的操作才會從緩存中受益。因為它們不需要再執行從存儲系統中讀取資料開始的一系列操作。它們通常都運作得快多了。還有,那些隻使用一次資料的應用使用緩存也不會有任何好處。隻有那些需要對同樣資料做多次疊代的應用才能從緩存中受益。

如果一個應用把rdd緩存在記憶體中,spark實際上是把它存儲在每個worker節點上執行者的記憶體中了。每個執行者把它所計算的rdd分區緩存在記憶體中。

3.7.1 rdd的緩存方法

rdd類提供了兩種緩存方法:cache和persist。

cache

cache方法把rdd存儲在叢集中執行者的記憶體中。它實際上是将rdd物化在記憶體中。

下面的例子展示了怎麼利用緩存優化上面的例子。

persist

persist是一個通用版的cache方法。它把rdd存儲在記憶體中或者硬碟上或者二者皆有。它的輸入參數是存儲等級,這是一個可選參數。如果調用persist方法而沒有提供參數,那麼它的行為類似于cache方法。

persist方法支援下列常見的存儲選項。

memory_only:當一個應用把 memory_only作為參數調用persist方法時,spark會将rdd分區采用反序列化java對象的方式存儲在worker節點的記憶體中。如果一個rdd分區無法完全載入worker節點的記憶體中,那麼它将在需要時才計算。

disk_only:如果把disk_only作為參數調用persist方法,spark會物化rdd分區,把它們存儲在每一個worker節點的本地檔案系統中。這個參數可以用于緩存中間的rdd,這樣接下來的一系列操作就沒必要從根rdd開始計算了。

memory_and_disk:這種情況下,spark會盡可能地把rdd分區存儲在記憶體中,如果有剩餘,就把剩餘的分區存儲在硬碟上。

memory_only_ser:這種情況下,spark會采用序列化java對象的方式将rdd分區存儲在記憶體中。一個序列化的java對象會消耗更少的記憶體,但是讀取是cpu密集型的操作。這個參數是在記憶體消耗和cpu使用之間做的一個妥協。

memory_and_disk_ser:spark會盡可能地以序列化java對象的方式将rdd分區存儲在記憶體中。如果有剩餘,則剩餘的分區會存儲在硬碟上。

3.7.2 rdd緩存是可容錯的

在分布式環境中可容錯性是相當重要的。之前我們就已經知道了當節點出故障的時候spark是怎麼自動把計算作業轉移到其他節點的。spark的rdd機制同樣也是可容錯的。

即使一個緩存rdd的節點出故障了,spark應用也不會崩潰。spark會在另外節點上自動重新建立、緩存出故障的節點中存儲的分區。spark利用rdd的血統資訊來重新計算丢失的緩存分區。

3.7.3 緩存記憶體管理

spark采用lru算法來自動管理緩存占用的記憶體。隻有在必要時,spark才會從緩存占用的記憶體中移除老的rdd分區。而且,rdd還提供了名為unpersist的方法。應用可以調用這個方法來從緩存占用的記憶體中手動移除rdd分區。

3.8 spark作業

rdd上的轉換、操作和緩存方法構成了spark應用的基礎。從本質上說,rdd描述了spark程式設計模型。既然我們介紹過了程式設計模型,那麼接下來我們介紹在spark應用中這些是怎麼結合在一起的。

作業指的是spark将要執行的一些計算,它們将操作的結果傳回給驅動程式。一個應用可以發起一個或多個作業。通過調用rdd的操作方法可以發起一個作業。也就是說,一個操作方法會觸發一個作業。如果一個操作是從未緩存的rdd或未緩存rdd的後代rdd發起的,spark将會從存儲系統中讀取資料,從此開始作業。如果一個操作是從緩存過的rdd或者緩存過的rdd的後代rdd發起的,那麼spark就會從那個緩存過的rdd開始作業。接下來,spark會按照操作方法的要求執行必要的轉換操作來建立rdd。最後,執行操作所需的計算,一旦結果出來後,便将它傳回給驅動程式。

當一個應用調用rdd的操作方法時,spark會建立由若幹個階段構成的dag。spark根據shuffle邊界來将不同任務劃分成不同的階段。不需要shuffle操作的任務被劃分到同一個階段。那些輸入資料是已經做過shuffle操作的任務将開始一個新的階段。

一個階段可以由一個或者多個任務構成。spark把任務送出給執行者,執行者将并行執行任務。在節點間排程任務的依據是資料分布情況。如果一個節點在處理任務時失效了,spark會把這個任務送出給其他節點。

3.9 共享變量

spark使用的架構是無共享的。資料分布在叢集的各個節點上,每個節點都有自己的cpu、記憶體和存儲資源。沒有全局的記憶體空間用于任務間共享。驅動程式和任務之間通過消息共享資料。

舉例來說,如果一個rdd操作的函數參數是驅動程式中變量的引用,spark會将這個變量的副本以及任務一起發送給執行者。每個任務都有一份變量的副本并把它當成隻讀變量使用。任何對這個變量的更新都隻存在任務的内部,改動并不會回傳給驅動程式。而且spark會把這個變量在每一個階段的開始發送給worker節點。

對于一些應用而言,這種預設行為是低效的。在一個實際的使用場景中,驅動程式在作業的任務間共享了一個巨大的查找表。而這個作業由多個階段構成。預設情況下,spark會自動将這個變量及其相關任務發送給每個執行者。然而,spark會在每個階段做這件事。如果這個查找表存儲了100mb的資料,并且這個作業涉及10個階段,那麼spark就會給每個worker節點發送10次100mb的相同資料。

另外一個使用場景是在每個運作在不同節點上的任務中需要更新全局變量。預設情況下,任務中對變量的更新是不會回傳給驅動程式的。

spark通過共享變量的概念來滿足這些使用場景的需求。

3.9.1 廣播變量

廣播變量的使用使得spark應用可以有效地在驅動程式和執行作業的任務之間共享資料。spark隻會給worker節點發送一次廣播變量,并且将它反序列化成隻讀變量存儲在執行者的記憶體中。而且,spark采用一種更高效的算法來釋出廣播變量。

注意,如果一個作業由多個階段構成,且階段中的任務使用同一個驅動程式的變量,那麼使用廣播變量是十分有用的。如果你不想在開始執行每個任務之前反序列化變量,使用廣播變量也是有益的。預設情況下,spark會将傳輸過來的變量以序列化的形式緩存在執行者的記憶體中,在開始執行任務之前再反序列化它。

sparkcontext 類提供了一個叫作broadcast的方法用于建立廣播變量。它把一個待廣播的變量作為參數,傳回一個broadcast類執行個體。一個任務必須使用broadcast對象的value方法才可以擷取廣播變量的值。

考慮這樣一個應用,它根據電商交易資訊生成交易詳情。在現實世界的應用中會有一張顧客表、一張商品表和一張交易表。為了簡化起見,我們直接用一些簡單的資料結構來代替這些表作為輸入資料。

使用廣播變量使得我們可以高效地實作顧客資料、商品資料和交易資料之間的連接配接。我們可以通過使用rdd api來實作連接配接操作,但是這會在網絡間對顧客資料、商品資料和交易資料做shuffle操作。使用廣播變量,我們使得spark隻将顧客資料和商品資料發送給每個節點一次,并且用簡單的map操作來代替耗時的join操作。

3.9.2 累加器

累加器是隻增變量,它可以被運作在不同節點上的任務更改并且被驅動程式讀取。它可以用于計數器和聚合操作。spark提供了數值類型的累加器,也支援建立自定義類型的累加器。

sparkcontext類提供了一個叫作accumulator的方法用于建立累加器變量。它有兩個參數。第一個參數是累加器的初值,第二個是在spark ui中顯示的名字,這是一個可選參數。它傳回一個accumulator類執行個體。這個類執行個體為操作累加器變量提供操作符。任務隻能采用add方法或者+=操作符來增加累加器變量的值。隻有驅動程式可以通過value方法來擷取累加器的值。

考慮這樣一個應用,它需要從顧客表中過濾出不合法的顧客并計數。在現實世界的應用中,我們會從硬碟中讀取資料并将過濾後的資料寫入到硬碟中的另外一個檔案。為簡化起見,我們跳過讀寫硬碟的部分。

在使用累加器的時候需要注意,轉換操作期間對累加器的更新無法保證恰好隻有一次。如果一個任務或一個階段重複執行,每一個任務的更新操作就會多次執行。

而且,對累加器的更新操作并不是在rdd的操作方法被調用時才執行的。rdd的轉換操作是惰性的,轉換操作中對累加器的更新并不會立即執行。是以,如果驅動程式在操作方法被調用之前就使用累加器的值,那麼它将得到一個錯誤的值。

3.10 總結

spark是一個快速、可擴充、可容錯且基于記憶體的叢集計算架構。一個spark應用可以比hadoop應用快上100倍。

spark不但快速而且它能很友善地使用mapreduce。通過不同語言(包括java、python、scala和r)的易讀的api,它可以友善地開發分布式大資料應用。使用spark開發者的生産力可以有5~10倍的提升。

而且spark為各種資料處理任務提供了統一的平台。它是一個通用的架構,可以被各種大資料應用使用。對于疊代式資料分析或者使用疊代算法的應用而言,它是一個理想的平台。

spark的程式設計模型基于一個叫作rdd的抽象概念。從概念上看,rdd類似于scala中的集合。它表示的資料就是一組分區的集合,這些分區分布在叢集的節點上。它還為處理資料提供一些函數式的方法。

繼續閱讀