天天看點

Spark常見的Transformation算子(一)

Spark常見的Transformation算子(一)

parallelize

将一個存在的集合,轉換成一個RDD

/** Distribute a local Scala collection to form an RDD.
 *
 * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
 * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
 * modified collection. Pass a copy of the argument to avoid this.
 * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
 * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
 * @param seq Scala collection to distribute
 * @param numSlices number of partitions to divide the collection into
 * @return RDD representing distributed collection
 */
// 可以指定分區數量,如果不指定,使用預設的分區數量為主機核心數
def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
  assertNotStopped()
  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
           

Scala版本

println("======================= parallelize-1 ===========================")
val data: RDD[Int] = sc.parallelize(1 to 10)
println(s"分區數量為:${data.getNumPartitions}")
println(s"原始資料為:${data.collect.toBuffer}")

println("======================= parallelize-2 ===========================")
val message: RDD[String] = sc.parallelize(List("hello world", "hello spark", "hello scala"), 2)
println(s"分區數量為:${message.getNumPartitions}")
println(s"原始資料為:${message.collect.toBuffer}")
           

運作結果

Spark常見的Transformation算子(一)

makeRDD

将一個存在的集合,轉換成一個RDD

/** Distribute a local Scala collection to form an RDD.
 *
 * This method is identical to `parallelize`.
 * @param seq Scala collection to distribute
 * @param numSlices number of partitions to divide the collection into
 * @return RDD representing distributed collection
 */
// 第一種 makeRDD 實作,底層調用了 parallelize 函數
def makeRDD[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
  parallelize(seq, numSlices)
}

/**
 * Distribute a local Scala collection to form an RDD, with one or more
 * location preferences (hostnames of Spark nodes) for each object.
 * Create a new partition for each collection item.
 * @param seq list of tuples of data and location preferences (hostnames of Spark nodes)
 * @return RDD representing data partitioned according to location preferences
 */
// 第二種 makeRDD 實作,可以為資料提供位置資訊,但是不可以指定分區的數量,而是固定為seq參數的size大小
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
  assertNotStopped()
  val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
  new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)
}
           

Scala版本

println("======================= makeRDD-1 ===========================")
val data: RDD[Int] = sc.makeRDD(1 to 10)
println(s"分區數量為:${data.getNumPartitions}")
println(s"原始資料為:${data.collect.toBuffer}")

println("======================= makeRDD-2 ===========================")
val message: RDD[String] = sc.makeRDD(List("hello world", "hello spark", "hello scala"), 2)
println(s"分區數量為:${message.getNumPartitions}")
println(s"原始資料為:${message.collect.toBuffer}")

println("======================= makeRDD-3 ===========================")
val info: RDD[Int] = sc.makeRDD(List((1, List("aa", "bb")), (2, List("cc", "dd")), (3, List("ee", "ff"))))
println(s"分區數量為:${info.getNumPartitions}")
println(s"原始資料為:${info.collect.toBuffer}")
println(s"第一分區的資料為:${info.preferredLocations(info.partitions(0))}")
           

運作結果

Spark常見的Transformation算子(一)

textFile

從外部讀取資料來建立RDD

/**
 * Read a text file from HDFS, a local file system (available on all nodes), or any
 * Hadoop-supported file system URI, and return it as an RDD of Strings.
 * @param path path to the text file on a supported file system
 * @param minPartitions suggested minimum number of partitions for the resulting RDD
 * @return RDD of lines of the text file
 */
// 兩個參數,其中一個有預設值,最小的預設分區數量為:2
def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}
           

Scala版本

println("======================= textFile-1 ===========================")
val data: RDD[String] = sc.textFile("src/main/data/textFile.txt")
println(s"分區數量為:${data.getNumPartitions}")
println(s"原始資料為:${data.collect.toBuffer}")

println("======================= textFile-2 ===========================")
val message: RDD[String] = sc.textFile("src/main/data/textFile.txt", 3)
println(s"分區數量為:${message.getNumPartitions}")
println(s"原始資料為:${message.collect.toBuffer}")
           

運作結果

Spark常見的Transformation算子(一)

繼續閱讀