天天看點

python實戰spark(五)常用API

常用API

​​Spark官方文檔​​

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)

用于控制RDD存儲。每個StorageLevel記錄:

是否使用記憶體,如果記憶體不足,是否将RDD放到磁盤上,是否以特定于java的序列化格式将資料儲存在記憶體中,以及是否在多個節點上複制RDD分區。還包含一些常用存儲級别的靜态常量,MEMORY_ONLY。由于資料總是在Python端序列化,是以所有常量都使用序列化格式。

class pyspark.Broadcast(sc=None, value=None, pickle_registry=None, path=None, sock_file=None)

使用​

​SparkContext.broadcast()​

​​建立廣播變量。通過值​

​.value​

​通路值。

>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
>>> b = sc.broadcast([1, 2, 3, 4, 5])
>>> b.value
[1, 2, 3, 4, 5]
>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()      
>>> large_broadcast = sc.broadcast(range(10000))      

1.​

​destroy()​

​​ 删除與此廣播變量相關的所有資料和中繼資料。一旦廣播變量被銷毀,就不能再使用它。此方法阻塞,直到删除完成。

2.​

​dump(value,f)​

​ 3.​

​load(file)​

​ 4.​

​load_from_path(path)​

​ 5.​

​unpersist(blocking=False)​

​ 删除執行器上此廣播的緩存副本。如果在調用後使用廣播,則需要将其重新發送給每個執行程式。

參數:

blocking-- 是否阻塞,直到完成非持久化

6.​

​property value​

class pyspark.Accumulator(aid, value, accum_param)

可以累積的共享變量,具有可交換和可關聯的“add”操作。Spark叢集上的工作任務可以使用​

​+=operator​

​​向累加器添加值,但是隻有驅動程式可以使用value通路它的值。來自worker的更新會自動傳播到driver。

雖然SparkContext支援基本資料類型(如int和float)的累加器,但是使用者也可以通過提供一個自定義的AccumulatorParam對象來為自定義類型定義累加器。以該子產品的doctest為例。

1.​​

​add(term)​

​​ 2.​

​property value​

class pyspark.AccumulatorParam

定義如何累積給定類型的值的helper對象。

1.​​

​addInPlace(value1, value2)​

​​添加累加器資料類型的兩個值,傳回一個新值;為了提高效率,還可以在适當的地方更新value1并傳回它。

2.​​

​zero(value)​

​為類型提供一個“零值”,在次元上與提供的值相容(例如,一個零向量)

​class pyspark.MarshalSerializer​

使用Python的Marshal序列化對象。該序列化更快,但支援少量資料

1.​​

​dumps(obj)​

​​ 2.​

​loads(obj)​

​class pyspark.PickleSerializer​

該序列化器支援幾乎所有Python對象,但可能不像其他專用的序列化器那麼快。

1.​​

​dumps(obj)​

​​ 2.​

​loads(obj)​

​class pyspark.StatusTracker(jtracker)​

用于監視job和stage progress的低級狀态報告api。

這些api有意提供非常弱的一緻性語義;這些api的使用者應該準備好處理空的/丢失的資訊。例如,作業的stage id可能是已知的,但是狀态API可能沒有關于這些stage細節的任何資訊,是以​​

​getStageInfo​

​​可能會為有效的stage id傳回None。

為了限制記憶體使用,這些api隻提供關于最近jobs/stages的資訊。這些api将為最後一個​​

​spark.ui.retainedStages​

​​和​

​spark.ui.retainedJobs​

​​提供資訊。

1.​​

​getActiveJobsIds()​

​​ 傳回一個包含所有活躍jobs的id的數組

2.​

​getActiveStageIds()​

​ 傳回一個包含所有活躍stages的id的數組

3.​

​getJobIdsForGroup(jobGroup=None)​

​ 傳回特定作業group中所有已知作業的清單。如果jobGroup為None,則傳回所有與作業組無關的已知作業。

傳回的清單可能包含正在運作、失敗和已完成的作業,并且在此方法的不同調用中可能有所不同。此方法不保證其結果中元素的順序。

4.​

​getJobInfo(jobId)​

​ 傳回SparkJobInfo對象,如果找不到作業資訊或作業資訊已被垃圾收集,則傳回None。

5.​

​getStageInfo(stageId)​

​ 傳回SparkStageInfo對象,如果找不到作業資訊或作業資訊已被垃圾收集,則傳回None。

​class pyspark.SparkJobInfo​

暴露有關Spark作業的資訊。

​class pyspark.SparkStageInfo​

暴露有關Spark階段的資訊。

​class pyspark.Profiler(ctx)​

PySpark支援自定義分析器,這是為了允許使用不同的分析器,以及輸出到不同的格式,而不是在BasicProfiler中提供的。

自定義分析器必須定義或繼承以下方法:

  • profile–将生成某種類型的系統配置檔案。
  • stats–傳回收集到的統計資訊。
  • dump–将概要檔案轉儲到路徑
  • add --将概要檔案添加到現有的累積概要檔案

建立SparkContext時選擇profiler類

>>> from pyspark import SparkConf, SparkContext
>>> from pyspark import BasicProfiler
>>> class MyCustomProfiler(BasicProfiler):
...     def show(self, id):
...         print("My custom profiles for RDD:%s" % id)
...
>>> conf = SparkConf().set("spark.python.profile", "true")
>>> sc = SparkContext('local', 'test', conf=conf, profiler_cls=MyCustomProfiler)
>>> sc.parallelize(range(1000)).map(lambda x: 2 * x).take(10)
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
>>> sc.parallelize(range(1000)).count()
1000
>>> sc.show_profiles()
My custom profiles for RDD:1
My custom profiles for RDD:3
>>> sc.stop()      

1.​

​dump(id, path)​

​​ 将profile轉儲到path中,id是RDD id

2.​

​profile(func)​

​ 利用函數分析

3.​

​show(id)​

​ 列印profile狀态到輸出

4.​

​stats()​

​ 傳回收集到的分析狀态

​class pyspark.BasicProfiler(ctx)​

BasicProfiler是預設的profiler,它是基于cProfile和累加器實作的

1.​​

​profile(func)​

​​ 運作并配置傳入的方法to_profile。傳回一個profile對象。

2.​

​stats()​

​ 傳回收集到的profiling統計資訊(pstats.Stats)

​class pyspark.TaskContext​

任務的上下文資訊,可以在執行過程中讀取或修改。要通路正在運作的任務的TaskContext通過​

​TaskContext.get()​

​​。

1.​​

​attemptNumber()​

​​ “這個任務已經嘗試了多少次了。第一次任務嘗試将被配置設定為嘗試号= 0,後續嘗試的嘗試号将不斷增加。

2.​

​classmethod get()​

​ 傳回目前活動的TaskContext。這可以在使用者函數内部調用,以通路有關正在運作的任務的上下文資訊。

注意:必須是called on worker,而不是driver。如果沒有初始化,則傳回None。

3.​

​getLocalProperty(key)​

​ 在driver的上遊設定一個本地屬性,如果它丢失,則不設定。

4.​

​partitionId()​

​ 此任務計算的RDD分區的ID。

5.​

​stageId()​

​ 此任務所屬的階段的ID。

6.​

​taskAttemptId()​

​ 此任務嘗試的唯一ID(在相同的SparkContext中,沒有兩個任務嘗試的嘗試id不同)。這大緻相當于Hadoop的TaskAttemptID。

​class pyspark.RDDBarrier(rdd)​

将RDD包裝在barrier階段中,這迫使Spark一起啟動這個階段的任務。​

​RDDBarrier​

​​執行個體是由​

​RDD.barrier()​

​​建立的。

1.​​

​mapPartitions(f, preservesPartitioning=False)​

​ 通過将一個函數應用到包裝好的RDD的每個分區,傳回一個新的RDD,其中的任務一起在barrier階段啟動。該接口與RDD.mapPartitions()相同。

​class pyspark.BarrierTaskContext​

​class pyspark.BarrierTaskInfo(address)​

繼續閱讀