Spark2.3.0 版本: Spark2.3.0 建立RDD Spark的核心概念是彈性分布式資料集(RDD),RDD是一個可容錯、可并行操作的分布式元素集合。有兩種方法可以建立RDD對象:
- 在驅動程式中并行化操作集合對象來建立RDD
- 從外部存儲系統中引用資料集(如:共享檔案系統、HDFS、HBase或者其他Hadoop支援的資料源)。
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB71f1c5fe9b41b34b6ba029832e8674d1%3Fmethod%3Ddownload%26read%3Dtrue#1-%E5%B9%B6%E8%A1%8C%E5%8C%96%E9%9B%86%E5%90%88 1. 并行化集合
通過在驅動程式中的現有集合上調用
JavaSparkContext
的
parallelize
方法建立并行化集合(Parallelized collections)。集合的元素被複制以形成可以并行操作的分布式資料集。 例如,下面是如何建立一個包含數字1到5的并行化集合:
Java版本:
List<Integer> list = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> rdd = sc.parallelize(list);
Scala版本:
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
Python版本:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
RDD一旦建立,分布式資料集(distData)可以并行操作。 例如,我們可以調用
distData.reduce((a,b) - > a + b)
來實作對清單元素求和。 我們稍後介紹分布式資料集的操作。
并行化集合的一個重要參數是分區(partition)數,将分布式資料集分割成多少分區。Spark叢集中每個分區運作一個任務(task)。典型場景下,一般為每個CPU配置設定2-4個分區。但通常而言,Spark會根據你叢集的狀況,自動設定分區數。當然,你可以給
parallelize
方法傳遞第二個參數來手動設定分區數(如:
sc.parallelize(data, 10)
)。注意:Spark代碼裡有些地方仍然使用分片(slice)這個術語(分區的同義詞),主要為了保持向後相容。
https://note.youdao.com/md/preview/preview.html?file=%2Fyws%2Fapi%2Fpersonal%2Ffile%2FWEB71f1c5fe9b41b34b6ba029832e8674d1%3Fmethod%3Ddownload%26read%3Dtrue#2-%E5%A4%96%E9%83%A8%E6%95%B0%E6%8D%AE%E9%9B%86 2. 外部資料集
Spark可以從Hadoop支援的任何存儲資料源建立分布式資料集,包括本地檔案系統,HDFS,Cassandra,HBase,Amazon S3等。Spark可以支援文本檔案,SequenceFiles以及任何其他Hadoop 輸入格式。
文本檔案RDD可以使用
SparkContext
textFile
方法建立。該方法根據URL擷取檔案(機器上的本地路徑,或
hdfs://
,
s3n://
等等). 下面是一個示例調用:
JavaRDD<String> distFile = sc.textFile("data.txt");
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
>>> distFile = sc.textFile("data.txt")
一旦建立完成,distFiile 就能做資料集操作。例如,我們可以用下面的方式使用 map 和 reduce 操作将所有行的長度相加:
distFile.map(s -> s.length()).reduce((a, b) -> a + b);
Spark讀檔案時一些注意事項:
(1) 如果使用本地檔案系統路徑,在所有工作節點上該檔案必須都能用相同的路徑通路到。要麼複制檔案到所有的工作節點,要麼使用網絡的方式共享檔案系統。
(2) Spark 所有基于檔案的輸入方法,包括
textFile
,能很好地支援檔案目錄,壓縮過的檔案和通配符。例如,你可以使用:
textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")
(3)
textFile
方法也可以選擇第二個可選參數來控制檔案分區(partitions)數目,預設情況下,Spark為每一個檔案塊建立一個分區(HDFS中分塊大小預設為128MB),你也可以通過傳遞一個較大數值來請求更多分區。 注意的是,分區數目不能少于分塊數目。
除了文本檔案,Spark的Java API還支援其他幾種資料格式:
(1) JavaSparkContext.wholeTextFiles可以讀取包含多個小文本檔案的目錄,并将它們以(檔案名,内容)鍵值對傳回。 這與textFile相反,textFile将在每個檔案中每行傳回一條記錄。
JavaPairRDD<String, String> rdd = sc.wholeTextFiles("/home/xiaosi/wholeText");
List<Tuple2<String, String>> list = rdd.collect();
for (Tuple2<?, ?> tuple : list) {
System.out.println(tuple._1() + ": " + tuple._2());
}
(2) 對于SequenceFiles,可以使用SparkContext的sequenceFile [K,V]方法,其中K和V是檔案中的鍵和值的類型。 這些應該是Hadoop的Writable接口的子類,如IntWritable和Text。
(3) 對于其他Hadoop InputFormats,您可以使用JavaSparkContext.hadoopRDD方法,該方法采用任意JobConf和輸入格式類,鍵類和值類。 将這些設定與使用輸入源的Hadoop作業相同。 您還可以使用基于“新”MapReduce API(org.apache.hadoop.mapreduce)的InputFormats的JavaSparkContext.newAPIHadoopRDD。
(4) JavaRDD.saveAsObjectFile 和 SparkContext.objectFile 支援儲存一個RDD,儲存格式是一個簡單的 Java 對象序列化格式。這是一種效率不高的專有格式,如 Avro,它提供了簡單的方法來儲存任何一個 RDD。
原文:
http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds