本文翻譯自Spark(http://spark.apache.org)的官方文檔。由于Spark更新較快,部分API已經過時,本文僅供參考,請以相應版本的官方文檔和運作時的提示為準。
概述
從高層次上來看,每一個Spark應用都包含一個驅動程式,用于執行使用者的main函數以及在叢集上運作各種并行操作。Spark提供的主要抽象是彈性分布式資料集(RDD),這是一個包含諸多元素、被劃分到不同節點上進行并行處理的資料集合。RDD通過打開HDFS(或其他hadoop支援的檔案系統)上的一個檔案、在驅動程式中打開一個已有的Scala集合或由其他RDD轉換操作得到。使用者可以要求Spark将RDD持久化到記憶體中,這樣就可以有效地在并行操作中複用。另外,在節點發生錯誤時RDD可以自動恢複。
Spark提供的另一個抽象是可以在并行操作中使用的共享變量。在預設情況下,當Spark将一個函數轉化成許多任務在不同的節點上運作的時候,對于所有在函數中使用的變量,每一個任務都會得到一個副本。有時,某一個變量需要在任務之間或任務與驅動程式之間共享。Spark支援兩種共享變量:廣播變量,用來将一個值緩存到所有節點的記憶體中;累加器,隻能用于累加,比如計數器和求和。
這篇指南将展示這些特性在Spark支援的語言中是如何使用的(本文隻翻譯了Python部分)。如果你打開了Spark的互動指令行——bin/spark-shell的Scala指令行或bin/pyspark的Python指令行都可以——那麼這篇文章你學習起來将是很容易的。
連接配接Spark
Spark1.3.0隻支援Python2.6或更高的版本(但不支援Python3)。它使用了标準的CPython解釋器,是以諸如NumPy一類的C庫也是可以使用的。
通過Spark目錄下的bin/spark-submit腳本你可以在Python中運作Spark應用。這個腳本會載入Spark的Java/Scala庫然後讓你将應用送出到叢集中。你可以執行bin/pyspark來打開Python的互動指令行。
如果你希望通路HDFS上的資料,你需要為你使用的HDFS版本建立一個PySpark連接配接。常見的HDFS版本标簽都已經列在了這個第三方發行版頁面。
最後,你需要将一些Spark的類import到你的程式中。加入如下這行:
from pyspark import SparkContext, SparkConf
初始化Spark
在一個Spark程式中要做的第一件事就是建立一個SparkContext對象來告訴Spark如何連接配接一個叢集。為了建立SparkContext,你首先需要建立一個SparkConf對象,這個對象會包含你的應用的一些相關資訊。
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
appName參數是在叢集UI上顯示的你的應用的名稱。master是一個Spark、Mesos或YARN叢集的URL,如果你在本地運作那麼這個參數應該是特殊的”local”字元串。在實際使用中,當你在叢集中運作你的程式,你一般不會把master參數寫死在代碼中,而是通過用spark-submit運作程式來獲得這個參數。但是,在本地測試以及單元測試時,你仍需要自行傳入”local”來運作Spark程式。
使用指令行
在PySpark指令行中,一個特殊的內建在解釋器裡的SparkContext變量已經建立好了,變量名叫做sc。建立你自己的SparkContext不會起作用。你可以通過使用—master指令行參數來設定這個上下文連接配接的master主機,你也可以通過—py-files參數傳遞一個用逗号隔開的清單來将Python的.zip、.egg或.py檔案添加到運作時路徑中。你還可以通過—package參數傳遞一個用逗号隔開的maven清單來給這個指令行會話添加依賴(比如Spark的包)。任何額外的包含依賴包的倉庫(比如SonaType)都可以通過傳給—repositorys參數來添加進去。Spark包的所有Python依賴(列在這個包的requirements.txt檔案中)在必要時都必須通過pip手動安裝。
比如,使用四核來運作bin/pyspark應當輸入這個指令:
$ ./bin/pyspark --master local[4]
又比如,把code.py檔案添加到搜尋路徑中(為了能夠import在程式中),應當使用這條指令:
$ ./bin/pyspark --master local[4] --py-files code.py
想要了解指令行選項的完整資訊請執行pyspark --help指令。在這些場景下,pyspark會觸發一個更通用的spark-submit腳本
在IPython這個加強的Python解釋器中運作PySpark也是可行的。PySpark可以在1.0.0或更高版本的IPython上運作。為了使用IPython,必須在運作bin/pyspark時将PYSPARK_DRIVER_PYTHON變量設定為ipython,就像這樣:
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
你還可以通過設定PYSPARK_DRIVER_PYTHON_OPTS來自省定制ipython。比如,在運作IPython Notebook時開啟PyLab圖形支援應該使用這條指令:
$ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --pylab
inline" ./bin/pyspark
彈性分布式資料集(RDD)
Spark是以RDD概念為中心運作的。RDD是一個容錯的、可以被并行操作的元素集合。建立一個RDD有兩個方法:在你的驅動程式中并行化一個已經存在的集合;從外部存儲系統中引用一個資料集,這個存儲系統可以是一個共享檔案系統,比如HDFS、HBase或任意提供了Hadoop輸入格式的資料來源。
并行化集合
并行化集合是通過在驅動程式中一個現有的疊代器或集合上調用SparkContext的parallelize方法建立的。為了建立一個能夠并行操作的分布資料集,集合中的元素都會被拷貝。比如,以下語句建立了一個包含1到5的并行化集合:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
分布資料集(distData)被建立起來之後,就可以進行并行操作了。比如,我們可以調用disData.reduce(lambda a, b: a+b)來對元素進行疊加。在後文中我們會描述分布資料集上支援的操作。
并行集合的一個重要參數是将資料集劃分成分片的數量。對每一個分片,Spark會在叢集中運作一個對應的任務。 典型情況下,叢集中的每一個CPU将對應運作2-4個分片。一般情況下,Spark會根據目前叢集的情況自行設定分片數量。但是,你也可以通過将第二個參 數傳遞給parallelize方法(比如sc.parallelize(data, 10))來手動确定分片數量。注意:有些代碼中會使用切片(slice,分片的同義詞)這個術語來保持向下相容性。
外部資料集
PySpark可以通過Hadoop支援的外部資料源(包括本地檔案系統、HDFS、 Cassandra、HBase、 亞馬遜S3等等)建立分布資料集。Spark支援文本檔案、 序列檔案以及其他任何 Hadoop輸入格式檔案。
通過文本檔案建立RDD要使用SparkContext的textFile方法。這個方法會使用一個檔案的URI(或本地檔案路徑,hdfs://、s3n://這樣的URI等等)然後讀入這個檔案建立一個文本行的集合。以下是一個例子:
>>> distFile = sc.textFile("data.txt")
建立完成後distFile上就可以調用資料集操作了。比如,我們可以調用map和reduce操作來疊加所有文本行的長度,代碼如下:
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
在Spark中讀入檔案時有幾點要注意:
如果使用了本地檔案路徑時,要保證在worker節點上這個檔案也能夠通過這個路徑通路。這點可以通過将這個檔案拷貝到所有worker上或者使用網絡挂載的共享檔案系統來解決。
包括textFile在内的所有基于檔案的Spark讀入方法,都支援将檔案夾、壓縮檔案、包含通配符的路徑作為參數。比如,以下代碼都是合法的:
textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")
textFile方法也可以傳入第二個可選參數來控制檔案的分片數量。預設情況下,Spark會為檔案的每一個塊(在HDFS中塊的大小預設是64MB) 建立一個分片。但是你也可以通過傳入一個更大的值來要求Spark建立更多的分片。注意,分片的數量絕不能小于檔案塊的數量。
除了文本檔案之外,Spark的Python API還支援多種其他資料格式:
SparkContext.wholeTextFiles能夠讀入包含多個小文本檔案的目錄,然後為每一個檔案傳回一個(檔案名,内容)對。這是與textFile方法為每一個文本行傳回一條記錄相對應的。
RDD.saveAsPickleFile和SparkContext.pickleFile支援将RDD以串行化的Python對象格式存儲起來。串行化的過程中會以預設10個一批的數量批量處理。
序列檔案和其他Hadoop輸入輸出格式。
注意 這個特性目前仍處于試驗階段,被标記為Experimental,目前隻适用于進階使用者。這個特性在未來可能會被基于Spark SQL的讀寫支援所取代,因為Spark SQL是更好的方式。
可寫類型支援
PySpark序列檔案支援利用Java作為中介載入一個鍵值對RDD,将可寫類型轉化成Java的基本類型,然後使用 Pyrolite将java結果對象串行化。當将一個鍵值對RDD儲存到一個序列檔案中時PySpark将會運作上述過程的相反過程。首先将Python對象反串行化成Java對象,然後轉化成可寫類型。以下可寫類型會自動轉換:
| 可寫類型 | Python類型 | | ———————- | ————- | | Text | unicode str| | IntWritable | int | | FloatWritable | float | | DoubleWritable | float | | BooleanWritable | bool | | BytesWritable | bytearray | | NullWritable | None | | MapWritable | dict |
數組是不能自動轉換的。使用者需要在讀寫時指定ArrayWritable的子類型.在讀入的時候,預設的轉換器會把自定義的ArrayWritable子 類型轉化成Java的Object[],之後串行化成Python的元組。為了獲得Python的array.array類型來使用主要類型的數組,使用者 需要自行指定轉換器。
儲存和讀取序列檔案
和文本檔案類似,序列檔案可以通過指定路徑來儲存與讀取。鍵值類型都可以自行指定,但是對于标準可寫類型可以不指定。
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
儲存和讀取其他Hadoop輸入輸出格式
PySpark同樣支援寫入和讀出其他Hadoop輸入輸出格式,包括’新’和’舊’兩種Hadoop MapReduce API。如果有必要,一個Hadoop配置可以以Python字典的形式傳入。以下是一個例子,使用了Elasticsearch ESInputFormat:
$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
>>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
"org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first() # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
{u'field1': True,
u'field2': u'Some Text',
u'field3': 12345})
注意,如果這個讀入格式僅僅依賴于一個Hadoop配置和/或輸入路徑,而且鍵值類型都可以根據前面的表格直接轉換,那麼剛才提到的這種方法非常合适。
如果你有一些自定義的序列化二進制資料(比如從Cassandra/HBase中讀取資料),那麼你需要首先在Scala/Java端将這些資料轉化成可以被Pyrolite的串行化器處理的資料類型。一個轉換器特質已經提供好了。簡單地拓展這個特質同時在convert方法中實作你自己的轉換代碼即可。記住,要確定這個類以及通路你的輸入格式所需的依賴都被打到了Spark作業包中,并且確定這個包已經包含到了PySpark的classpath中。
這裡有一些通過自定義轉換器來使用Cassandra/HBase輸入輸出格式的Python樣例和轉換器樣例。
RDD操作
RDD支援兩類操作:轉化操作,用于從已有的資料集轉化産生新的資料集;啟動操作,用于在計算結束後向驅動程式傳回結果。舉個例子,map是一個轉化操作,可以将資料集中每一個元素傳給一個函數,同時将計算結果作為一個新的RDD傳回。另一方面,reduce操作是一個啟動操作,能夠使用某些函數來聚集計算RDD中所有的元素,并且向驅動程式傳回最終結果(同時還有一個并行的reduceByKey操作可以傳回一個分布資料集)。
在Spark所有的轉化操作都是惰性求值的,就是說它們并不會立刻真的計算出結果。相反,它們僅僅是記錄下了轉換操作的操作對象(比如:一個檔案)。隻有當一個啟動操作被執行,要向驅動程式傳回結果時,轉化操作才會真的開始計算。這樣的設計使得Spark運作更加高效——比如,我們會發覺由map操作産生的資料集将會在reduce操作中用到,之後僅僅是傳回了reduce的最終的結果而不是map産生的龐大資料集。
在預設情況下,每一個由轉化操作得到的RDD都會在每次執行啟動操作時重新計算生成。但是,你也可以通過調用persist(或cache)方法來将RDD持久化到記憶體中,這樣Spark就可以在下次使用這個資料集時快速獲得。Spark同樣提供了對将RDD持久化到硬碟上或在多個節點間複制的支援。
基本操作
為了示範RDD的基本操作,請看以下的簡單程式:
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
第一行定義了一個由外部檔案産生的基本RDD。這個資料集不是從記憶體中載入的也不是由其他操作産生的;lines僅僅是一個指向檔案的指針。第二行将lineLengths定義為map操作的結果。再強調一次,由于惰性求值的緣故,lineLengths并不會被立即計算得到。最後,我們運作了reduce操作,這是一個啟動操作。從這個操作開始,Spark将計算過程劃分成許多任務并在多機上運作,每台機器運作自己部分的map操作和reduce操作,最終将自己部分的運算結果傳回給驅動程式。
如果我們希望以後重複使用lineLengths,隻需在reduce前加入下面這行代碼:
lineLengths.persist()
這條代碼将使得lineLengths在第一次計算生成之後儲存在記憶體中。
向Spark傳遞函數
Spark的API嚴重依賴于向驅動程式傳遞函數作為參數。有三種推薦的方法來傳遞函數作為參數。
Lambda表達式,簡單的函數可以直接寫成一個lambda表達式(lambda表達式不支援多語句函數和無傳回值的語句)。
對于代碼很長的函數,在Spark的函數調用中在本地用def定義。
子產品中的頂級函數。
比如,傳遞一個無法轉化為lambda表達式長函數,可以像以下代碼這樣:
"MyScript.py"""
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)
sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)
值得指出的是,也可以傳遞類執行個體中方法的引用(與單例對象相反),這種傳遞方法會将整個對象傳遞過去。比如,考慮以下代碼:
class MyClass(object):
def func(self, s):
return s
def doStuff(self, rdd):
return rdd.map(self.func)
在這裡,如果我們建立了一個新的MyClass對象,然後對它調用doStuff方法,map會用到這個對象中func方法的引用,是以整個對象都需要傳遞到叢集中。
還有另一種相似的寫法,通路外層對象的資料域會傳遞整個對象的引用:
class MyClass(object):
def __init__(self):
self.field = "Hello"
def doStuff(self, rdd):
return rdd.map(lambda s: self.field + x)
此類問題最簡單的避免方法就是,使用一個本地變量緩存一份這個資料域的拷貝,直接通路這個資料域:
def doStuff(self, rdd):
field = self.field
return rdd.map(lambda s: field + x)
使用鍵值對
雖然大部分Spark的RDD操作都支援所有種類的對象,但是有少部分特殊的操作隻能作用于鍵值對類型的RDD。這類操作中最常見的就是分布的shuffle操作,比如将元素通過鍵來分組或聚集計算。
在Python中,這類操作一般都會使用Python内建的元組類型,比如(1, 2)。它們會先簡單地建立類似這樣的元組,然後調用你想要的操作。
比如,一下代碼對鍵值對調用了reduceByKey操作,來統計每一文本行在文本檔案中出現的次數:
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
我們還可以使用counts.sortByKey(),比如,當我們想将這些鍵值對按照字母表順序排序,然後調用counts.collect()方法來将結果以對象清單的形式傳回。
轉化操作
下面的表格列出了Spark支援的常用轉化操作。欲知細節,請查閱RDD API文檔(Scala, Java, Python)和鍵值對RDD函數文檔(Scala, Java)。
轉化操作 | 作用 ————| —— map(func) | 傳回一個新的分布資料集,由原資料集元素經func處理後的結果組成 filter(func) | 傳回一個新的資料集,由傳給func傳回True的原資料集元素組成 flatMap(func) | 與map類似,但是每個傳入元素可能有0或多個傳回值,func可以傳回一個序列而不是一個值 mapParitions(func) | 類似map,但是RDD的每個分片都會分開獨立運作,是以func的參數和傳回值必須都是疊代器 mapParitionsWithIndex(func) | 類似mapParitions,但是func有兩個參數,第一個是分片的序号,第二個是疊代器。傳回值還是疊代器 sample(withReplacement, fraction, seed) | 使用提供的随機數種子取樣,然後替換或不替換 union(otherDataset) | 傳回新的資料集,包括原資料集和參數資料集的所有元素 intersection(otherDataset) | 傳回新資料集,是兩個集的交集 distinct([numTasks]) | 傳回新的集,包括原集中的不重複元素 groupByKey([numTasks]) | 當用于鍵值對RDD時傳回(鍵,值疊代器)對的資料集 aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 用于鍵值對RDD時傳回(K,U)對集,對每一個Key的value進行聚集計算 sortByKey([ascending], [numTasks])用于鍵值對RDD時會傳回RDD按鍵的順序排序,升降序由第一個參數決定 join(otherDataset, [numTasks]) | 用于鍵值對(K, V)和(K, W)RDD時傳回(K, (V, W))對RDD cogroup(otherDataset, [numTasks]) | 用于兩個鍵值對RDD時傳回 (K, (V疊代器, W疊代器))RDD cartesian(otherDataset) | 用于T和U類型RDD時傳回(T, U)對類型鍵值對RDD pipe(command, [envVars]) | 通過shell指令管道處理每個RDD分片 coalesce(numPartitions) | 把RDD的分片數量降低到參數大小 repartition(numPartitions) | 重新打亂RDD中元素順序并重新分片,數量由參數決定 repartitionAndSortWithinPartitions(partitioner) | 按照參數給定的分片器重新分片,同時每個分片内部按照鍵排序
啟動操作
下面的表格列出了Spark支援的部分常用啟動操作。欲知細節,請查閱RDD API文檔(Scala, Java, Python)和鍵值對RDD函數文檔(Scala, Java)。 (譯者注:這部分翻譯比較簡略,僅供簡單參考,具體細節請看文檔) 啟動操作 | 作用 ————| —— reduce(func) | 使用func進行聚集計算,func的參數是兩個,傳回值一個,兩次func運作應當是完全解耦的,這樣才能正确地并行運算 collect() | 向驅動程式傳回資料集的元素組成的數組 count() | 傳回資料集元素的數量 first() | 傳回資料集的第一個元素 take(n) | 傳回前n個元素組成的數組 takeSample(withReplacement, num, [seed]) | 傳回一個由原資料集中任意num個元素的suzuki,并且替換之 takeOrder(n, [ordering]) | 傳回排序後的前n個元素 saveAsTextFile(path) | 将資料集的元素寫成文本檔案 saveAsSequenceFile(path) | 将資料集的元素寫成序列檔案,這個API隻能用于Java和Scala程式 saveAsObjectFile(path) | 将資料集的元素使用Java的序列化特性寫到檔案中,這個API隻能用于Java和Scala程式 countByCount() | 隻能用于鍵值對RDD,傳回一個(K, int) hashmap,傳回每個key的出現次數 foreach(func) | 對資料集的每個元素執行func, 通常用于完成一些帶有副作用的函數,比如更新累加器(見下文)或與外部存儲互動等
RDD持久化
Spark的一個重要功能就是在将資料集持久化(或緩存)到記憶體中以便在多個操作 中重複使用。當我們持久化一個RDD是,每一個節點将這個RDD的每一個分片計算并儲存到記憶體中以便在下次對這個資料集(或者這個資料集衍生的資料集)的 計算中可以複用。這使得接下來的計算過程速度能夠加快(經常能加快超過十倍的速度)。緩存是加快疊代算法和快速互動過程速度的關鍵工具。
你可以通過調用persist或cache方法來标記一個想要持久化的RDD。在第一次被計算産生之後,它就會始終停留在節點的記憶體中。Spark的緩存是具有容錯性的——如果RDD的任意一個分片丢失了,Spark就會依照這個RDD産生的轉化過程自動重算一遍。
另外,每一個持久化的RDD都有一個可變的存儲級别,這個級别使得使用者可以改變RDD持久化的儲存位置。比如,你可以将資料集持久化到硬碟上,也可以将它以序列化的Java對象形式(節省空間)持久化到記憶體中,還可以将這個資料集在節點之間複制,或者使用Tachyon将它儲存到堆外。這些存儲級别都是通過向persist()傳遞一個StorageLevel對象(Scala, Java, Python)來設定的。存儲級别的所有種類請見下表:
注意:在Python中,儲存的對象永遠是通過Pickle庫序列化過的,是以設不設定序列化級别不會産生影響。
Spark還會在shuffle操作(比如reduceByKey)中自動儲存中間資料,即使使用者沒有調用persist。這是為了防止在shuffle過程中某個節點出錯而導緻的全盤重算。不過如果使用者打算複用某些結果RDD,我們仍然建議使用者對結果RDD手動調用persist,而不是依賴自動持久化機制。
應該選擇哪個存儲級别? Spark的存儲級别是為了提供記憶體使用與CPU效率之間的不同取舍平衡程度。我們建議使用者通過考慮以下流程來選擇合适的存儲級别:
如果你的RDD很适合預設的級别(MEMORY_ONLY),那麼久使用預設級别吧。這是CPU最高效運作的選擇,能夠讓RDD上的操作以最快速度運作。
否則,試試MEMORY_ONLY_SER選項并且選擇一個快的序列化庫來使對象的空間使用率更高,同時盡量保證通路速度足夠快。
不要往硬碟上持久化,除非重算資料集的過程代價确實很昂貴,或者這個過程過濾了巨量的資料。否則,重新計算分片有可能跟讀硬碟速度一樣快。
如果你希望快速的錯誤恢複(比如用Spark來處理web應用的請求),使用複制級别。所有的存儲級别都提供了重算丢失資料的完整容錯機制,但是複制一份副本能省去等待重算的時間。
在大記憶體或多應用的環境中,處于實驗中的OFF_HEAP模式有諸多優點:
這個模式允許多個執行者共享Tachyon中的同一個記憶體池
這個模式顯著降低了垃圾回收的花銷。
在某一個執行者個體崩潰之後緩存的資料不會丢失。
删除資料
Spark會自動監視每個節點的緩存使用同時使用LRU算法丢棄舊資料分片。如果你想手動删除某個RDD而不是等待它被自動删除,調用RDD.unpersist()方法。
共享變量
通常情況下,當一個函數傳遞給一個在遠端叢集節點上運作的Spark操作(比如map和reduce)時,Spark會對涉及到的變量的所有副本執行這個函數。這些變量會被複制到每個機器上,而且這個過程不會被回報給驅動程式。通常情況下,在任務之間讀寫共享變量是很低效的。但是,Spark仍然提供了有限的兩種共享變量類型用于常見的使用場景:廣播變量和累加器。
廣播變量
廣播變量允許程式員在每台機器上保持一個隻讀變量的緩存而不是将一個變量的拷貝傳遞給各個任務。它們可以被使用,比如,給每一個節點傳遞一份大輸入資料集的拷貝是很低效的。Spark試圖使用高效的廣播算法來分布廣播變量,以此來降低通信花銷。
可以通過SparkContext.broadcast(v)來從變量v建立一個廣播變量。這個廣播變量是v的一個包裝,同時它的值可以功過調用value方法來獲得。以下的代碼展示了這一點:
>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]
在廣播變量被建立之後,在所有函數中都應當使用它來代替原來的變量v,這樣就可以保證v在節點之間隻被傳遞一次。另外,v變量在被廣播之後不應該再被修改了,這樣可以確定每一個節點上儲存的廣播變量的一緻性(如果這個變量後來又被傳輸給一個新的節點)。
累加器
累加器是在一個相關過程中隻能被”累加”的變量,對這個變量的操作可以有效地被并行化。它們可以被用于實作計數器(就像在MapReduce過程中)或求 和運算。Spark原生支援對數字類型的累加器,程式員也可以為其他新的類型添加支援。累加器被以一個名字建立之後,會在Spark的UI中顯示出來。這 有助于了解計算的累進過程(注意:目前Python中不支援這個特性)。
可以通過SparkContext.accumulator(v)來從變量v建立一個累加器。在叢集中運作的任務随後可以使用add方法或+=操作符(在Scala和Python中)來向這個累加器中累加值。但是,他們不能讀取累加器中的值。隻有驅動程式可以讀取累加器中的值,通過累加器的value方法。
以下的代碼展示了向一個累加器中累加數組元素的過程:
>>> accum = sc.accumulator(0)
Accumulator<id=0, value=0>
>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
10
這段代碼利用了累加器對int類型的内建支援,程式員可以通過繼承AccumulatorParam類來建立自己想要的類型支援。AccumulatorParam的接口提供了兩個方法:zero'用于為你的資料類型提供零值;'addInPlace'用于計算兩個值得和。比如,假設我們有一個Vector`類表示數學中的向量,我們可以這樣寫:
class VectorAccumulatorParam(AccumulatorParam):
def zero(self, initialValue):
return Vector.zeros(initialValue.size)
def addInPlace(self, v1, v2):
v1 += v2
return v1
# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
累加器的更新操作隻會被運作一次,Spark提供了保證,每個任務中對累加器的更新操作都隻會被運作一次。比如,重新開機一個任務不會再次更新累加器。在轉化過程中,使用者應該留意每個任務的更新操作在任務或作業重新運算時是否被執行了超過一次。
累加器不會該别Spark的惰性求值模型。如果累加器在對RDD的操作中被更新了,它們的值隻會在啟動操作中作為RDD計算過程中的一部分被更新。是以,在一個懶惰的轉化操作中調用累加器的更新,并沒法保證會被及時運作。下面的代碼段展示了這一點:
accum = sc.accumulator(0)
data.map(lambda x => acc.add(x); f(x))
# Here, acc is still 0 because no actions have cause the `map` to be computed.
在叢集上部署
這個應用送出指南描述了一個應用被送出到叢集上的過程。簡而言之,隻要你把你的應用打成了JAR包(Java/Scala應用)或.py檔案的集合或.zip壓縮包(Python應用),bin/spark-submit腳本會将應用送出到任意支援的叢集管理器上。
單元測試
Spark對單元測試是友好的,可以與任何流行的單元測試架構相容。你隻需要在測試中建立一個SparkContext,并如前文所述将master的URL設為local,執行你的程式,最後調用SparkContext.stop()來終止運作。請確定你在finally塊或測試架構的tearDown方法中終止了上下文,因為Spark不支援兩個上下文在一個程式中同時運作。
從1.0之前版本的Spark遷移
Spark1.0當機了1.X系列Spark的核心API。現在版本中沒有标注”experimental”或是”developer API”的API在未來的版本中仍會被支援。對Python使用者來說唯一的變化就是組管理操作,比如groupByKey, cogroup, join, 它們的傳回值都從(鍵,值清單)對變成了(鍵, 值疊代器)對。
你還可以閱讀Spark Streaming, MLlib和GraphX的遷移指南。
還有什麼要做的
你可以在Spark的網站上看到更多的Spark樣例程式。另外,在examples目錄下還有許多樣例代碼(Scala, Java, Python)。你可以通過将類名稱傳給Spark的bin/run-example 腳本來運作Java和Scala語言樣例,舉例說明:
./bin/run-example SparkPi
對于Python例子,使用spark-submit腳本代替:
./bin/spark-submit examples/src/main/python/pi.py
為了給你優化代碼提供幫助,配置指南和調優指南提供了關于最佳實踐的一些資訊。確定你的資料儲存在以高效的格式儲存在記憶體中,這很重要。為了給你部署應用提供幫助,叢集模式概覽描述了許多内容,包括分布式操作和支援的叢集管理器。
最後,完整的API文檔在這裡。Scala版本 Java版本 Python版本
版權聲明:本文為CSDN部落客「weixin_33832340」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。
原文連結:https://blog.csdn.net/weixin_33832340/article/details/92208923