常用API
Spark官方文檔
class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)
用于控制RDD存儲。每個StorageLevel記錄:
是否使用記憶體,如果記憶體不足,是否将RDD放到磁盤上,是否以特定于java的序列化格式将資料儲存在記憶體中,以及是否在多個節點上複制RDD分區。還包含一些常用存儲級别的靜态常量,MEMORY_ONLY。由于資料總是在Python端序列化,是以所有常量都使用序列化格式。
class pyspark.Broadcast(sc=None, value=None, pickle_registry=None, path=None, sock_file=None)
使用
SparkContext.broadcast()
建立廣播變量。通過值
.value
通路值。
>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
>>> b = sc.broadcast([1, 2, 3, 4, 5])
>>> b.value
[1, 2, 3, 4, 5]
>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()
>>> large_broadcast = sc.broadcast(range(10000))
1.
destroy()
删除與此廣播變量相關的所有資料和中繼資料。一旦廣播變量被銷毀,就不能再使用它。此方法阻塞,直到删除完成。
2.
dump(value,f)
3.
load(file)
4.
load_from_path(path)
5.
unpersist(blocking=False)
删除執行器上此廣播的緩存副本。如果在調用後使用廣播,則需要将其重新發送給每個執行程式。
參數:
blocking-- 是否阻塞,直到完成非持久化
6.
property value
class pyspark.Accumulator(aid, value, accum_param)
可以累積的共享變量,具有可交換和可關聯的“add”操作。Spark叢集上的工作任務可以使用
+=operator
向累加器添加值,但是隻有驅動程式可以使用value通路它的值。來自worker的更新會自動傳播到driver。
雖然SparkContext支援基本資料類型(如int和float)的累加器,但是使用者也可以通過提供一個自定義的AccumulatorParam對象來為自定義類型定義累加器。以該子產品的doctest為例。
1.
add(term)
2.
property value
class pyspark.AccumulatorParam
定義如何累積給定類型的值的helper對象。
1.
addInPlace(value1, value2)
添加累加器資料類型的兩個值,傳回一個新值;為了提高效率,還可以在适當的地方更新value1并傳回它。
2.
zero(value)
為類型提供一個“零值”,在次元上與提供的值相容(例如,一個零向量)
class pyspark.MarshalSerializer
class pyspark.MarshalSerializer
使用Python的Marshal序列化對象。該序列化更快,但支援少量資料
1.
dumps(obj)
2.
loads(obj)
class pyspark.PickleSerializer
class pyspark.PickleSerializer
該序列化器支援幾乎所有Python對象,但可能不像其他專用的序列化器那麼快。
1.
dumps(obj)
2.
loads(obj)
class pyspark.StatusTracker(jtracker)
class pyspark.StatusTracker(jtracker)
用于監視job和stage progress的低級狀态報告api。
這些api有意提供非常弱的一緻性語義;這些api的使用者應該準備好處理空的/丢失的資訊。例如,作業的stage id可能是已知的,但是狀态API可能沒有關于這些stage細節的任何資訊,是以
getStageInfo
可能會為有效的stage id傳回None。
為了限制記憶體使用,這些api隻提供關于最近jobs/stages的資訊。這些api将為最後一個
spark.ui.retainedStages
和
spark.ui.retainedJobs
提供資訊。
1.
getActiveJobsIds()
傳回一個包含所有活躍jobs的id的數組
2.
getActiveStageIds()
傳回一個包含所有活躍stages的id的數組
3.
getJobIdsForGroup(jobGroup=None)
傳回特定作業group中所有已知作業的清單。如果jobGroup為None,則傳回所有與作業組無關的已知作業。
傳回的清單可能包含正在運作、失敗和已完成的作業,并且在此方法的不同調用中可能有所不同。此方法不保證其結果中元素的順序。
4.
getJobInfo(jobId)
傳回SparkJobInfo對象,如果找不到作業資訊或作業資訊已被垃圾收集,則傳回None。
5.
getStageInfo(stageId)
傳回SparkStageInfo對象,如果找不到作業資訊或作業資訊已被垃圾收集,則傳回None。
class pyspark.SparkJobInfo
class pyspark.SparkJobInfo
暴露有關Spark作業的資訊。
class pyspark.SparkStageInfo
class pyspark.SparkStageInfo
暴露有關Spark階段的資訊。
class pyspark.Profiler(ctx)
class pyspark.Profiler(ctx)
PySpark支援自定義分析器,這是為了允許使用不同的分析器,以及輸出到不同的格式,而不是在BasicProfiler中提供的。
自定義分析器必須定義或繼承以下方法:
- profile–将生成某種類型的系統配置檔案。
- stats–傳回收集到的統計資訊。
- dump–将概要檔案轉儲到路徑
- add --将概要檔案添加到現有的累積概要檔案
建立SparkContext時選擇profiler類
>>> from pyspark import SparkConf, SparkContext
>>> from pyspark import BasicProfiler
>>> class MyCustomProfiler(BasicProfiler):
... def show(self, id):
... print("My custom profiles for RDD:%s" % id)
...
>>> conf = SparkConf().set("spark.python.profile", "true")
>>> sc = SparkContext('local', 'test', conf=conf, profiler_cls=MyCustomProfiler)
>>> sc.parallelize(range(1000)).map(lambda x: 2 * x).take(10)
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
>>> sc.parallelize(range(1000)).count()
1000
>>> sc.show_profiles()
My custom profiles for RDD:1
My custom profiles for RDD:3
>>> sc.stop()
1.
dump(id, path)
将profile轉儲到path中,id是RDD id
2.
profile(func)
利用函數分析
3.
show(id)
列印profile狀态到輸出
4.
stats()
傳回收集到的分析狀态
class pyspark.BasicProfiler(ctx)
class pyspark.BasicProfiler(ctx)
BasicProfiler是預設的profiler,它是基于cProfile和累加器實作的
1.
profile(func)
運作并配置傳入的方法to_profile。傳回一個profile對象。
2.
stats()
傳回收集到的profiling統計資訊(pstats.Stats)
class pyspark.TaskContext
class pyspark.TaskContext
任務的上下文資訊,可以在執行過程中讀取或修改。要通路正在運作的任務的TaskContext通過
TaskContext.get()
。
1.
attemptNumber()
“這個任務已經嘗試了多少次了。第一次任務嘗試将被配置設定為嘗試号= 0,後續嘗試的嘗試号将不斷增加。
2.
classmethod get()
傳回目前活動的TaskContext。這可以在使用者函數内部調用,以通路有關正在運作的任務的上下文資訊。
注意:必須是called on worker,而不是driver。如果沒有初始化,則傳回None。
3.
getLocalProperty(key)
在driver的上遊設定一個本地屬性,如果它丢失,則不設定。
4.
partitionId()
此任務計算的RDD分區的ID。
5.
stageId()
此任務所屬的階段的ID。
6.
taskAttemptId()
此任務嘗試的唯一ID(在相同的SparkContext中,沒有兩個任務嘗試的嘗試id不同)。這大緻相當于Hadoop的TaskAttemptID。
class pyspark.RDDBarrier(rdd)
class pyspark.RDDBarrier(rdd)
将RDD包裝在barrier階段中,這迫使Spark一起啟動這個階段的任務。
RDDBarrier
執行個體是由
RDD.barrier()
建立的。
1.
mapPartitions(f, preservesPartitioning=False)
通過将一個函數應用到包裝好的RDD的每個分區,傳回一個新的RDD,其中的任務一起在barrier階段啟動。該接口與RDD.mapPartitions()相同。
class pyspark.BarrierTaskContext
class pyspark.BarrierTaskContext
class pyspark.BarrierTaskInfo(address)
class pyspark.BarrierTaskInfo(address)