天天看點

RDD -- Actions算子

Actions算子

Actions算子會觸發spark任務送出。一般拍電影時導演都會喊Action,Action有開始之意,這類算子固有開始任務之意。

reduce

collect

count

first

take

takeSample

takeOrdered

saveAsTextFile

saveAsSequenceFile

saveAsObjectFile

countByKey

foreach

reduce

源碼

/**
   * Reduces the elements of this RDD using the specified commutative and
   * associative binary operator.
   */
  def reduce(f: (T, T) => T): T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }
           

sc.runJob(this, reducePartition, mergeResult) 這行代碼就是Actions算子和Transformation算子的本質差別,向spark送出了計算任務。

eg:

val a = sc.parallelize(1 to 100)
    // 求和
    val sum = a.reduce(_ + _)
    println(sum)
    //5050
    // 求最大值
    println(a.reduce((number1, number2) => math.max(number1, number2)))
    // 100
           

count

源碼

/**
   * Return the number of elements in the RDD.
   */
  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
           

eg:

val a = sc.parallelize(1 to 100)
    val b = a.map(x => (x.toString.length.toString, x))
    println(b.count()) // 100
           

first

源碼

/**
   * Return the first element in this RDD.
   */
  def first(): T = withScope {
    take(1) match {
      case Array(t) => t
      case _ => throw new UnsupportedOperationException("empty collection")
    }
  }
   def take(num: Int): Array[T] = withScope {
    val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
    if (num == 0) {
      new Array[T](0)
    } else {
      val buf = new ArrayBuffer[T]
      val totalParts = this.partitions.length
      var partsScanned = 0
      while (buf.size < num && partsScanned < totalParts) {
        // The number of partitions to try in this iteration. It is ok for this number to be
        // greater than totalParts because we actually cap it at totalParts in runJob.
        var numPartsToTry = 1L
        val left = num - buf.size
        if (partsScanned > 0) {
          // If we didn't find any rows after the previous iteration, quadruple and retry.
          // Otherwise, interpolate the number of partitions we need to try, but overestimate
          // it by 50%. We also cap the estimation in the end.
          if (buf.isEmpty) {
            numPartsToTry = partsScanned * scaleUpFactor
          } else {
            // As left > 0, numPartsToTry is always >= 1
            numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
            numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
          }
        }

        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

        res.foreach(buf ++= _.take(num - buf.size))
        partsScanned += p.size
      }

      buf.toArray
    }
  }
           

eg:

val a = sc.parallelize(1 to 100)
    val b = a.map(x => (x.toString.length.toString, x))
    println(b.first()) //(1,1)
    println(a.first()) // 1
           

take

take:提取RDD的前n個項,并将它們作為數組傳回。

源碼

/**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   *
   * @note This method should only be used if the resulting array is expected to be small, as
   * all the data is loaded into the driver's memory.
   *
   * @note Due to complications in the internal implementation, this method will raise
   * an exception if called on an RDD of `Nothing` or `Null`.
   */
  def take(num: Int): Array[T] = withScope {
    val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
    if (num == 0) {
      new Array[T](0)
    } else {
      val buf = new ArrayBuffer[T]
      val totalParts = this.partitions.length
      var partsScanned = 0
      while (buf.size < num && partsScanned < totalParts) {
        // The number of partitions to try in this iteration. It is ok for this number to be
        // greater than totalParts because we actually cap it at totalParts in runJob.
        var numPartsToTry = 1L
        val left = num - buf.size
        if (partsScanned > 0) {
          // If we didn't find any rows after the previous iteration, quadruple and retry.
          // Otherwise, interpolate the number of partitions we need to try, but overestimate
          // it by 50%. We also cap the estimation in the end.
          if (buf.isEmpty) {
            numPartsToTry = partsScanned * scaleUpFactor
          } else {
            // As left > 0, numPartsToTry is always >= 1
            numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
            numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
          }
        }

        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

        res.foreach(buf ++= _.take(num - buf.size))
        partsScanned += p.size
      }

      buf.toArray
    }
  }
           

eg:

val a = sc.parallelize(1 to 100)
    val b = a.map(x => (x.toString.length.toString, x))
    val topk = a.take(10)
    topk.foreach(println)
    //1
	//2
	//3
	//4
	//5
	//6
	//7
	//8
	//9
	//10
    val topkb = b.take(10)
    topkb.foreach(println)
    //(1,1)
	//(1,2)
	//(1,3)
	//(1,4)
	//(1,5)
	//(1,6)
	//(1,7)
	//(1,8)
	//(1,9)
	//(2,10)
           

takeOrdered

takeOrdered:使用RDD的固有隐式排序函數對資料項進行排序,并以數組的形式傳回前n項。

源碼

/**
   * Returns the first k (smallest) elements from this RDD as defined by the specified
   * implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].
   * For example:
   * {{{
   *   sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1)
   *   // returns Array(2)
   *
   *   sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2)
   *   // returns Array(2, 3)
   * }}}
   *
   * @note This method should only be used if the resulting array is expected to be small, as
   * all the data is loaded into the driver's memory.
   *
   * @param num k, the number of elements to return
   * @param ord the implicit ordering for T
   * @return an array of top elements
   */
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    if (num == 0) {
      Array.empty
    } else {
      val mapRDDs = mapPartitions { items =>
        // Priority keeps the largest elements, so let's reverse the ordering.
        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
        queue ++= collectionUtils.takeOrdered(items, num)(ord)
        Iterator.single(queue)
      }
      if (mapRDDs.partitions.length == 0) {
        Array.empty
      } else {
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
      }
    }
  }
           

eg:

val a = sc.parallelize(1 to 100)
    val b = a.map(x => (x.toString.length.toString, x))
    val takeOrder = b.map(x => KeyValue(x._1, x._2)).takeOrdered(12)
    takeOrder.foreach(println)
    //KeyValue(3,100)
	//KeyValue(2,99)
	//KeyValue(2,98)
	//KeyValue(2,97)
	//KeyValue(2,96)
	//KeyValue(2,95)
	//KeyValue(2,94)
	//KeyValue(2,93)
	//KeyValue(2,92)
	//KeyValue(2,91)
	//KeyValue(2,90)
	//KeyValue(2,89)
	case class KeyValue(len: String, number: Int) extends Ordered[KeyValue] with Serializable {
    override def compare(that: KeyValue): Int = {
      if (this.number <= that.number) {
        1
      } else {
        -1
      }
    }
  }

           

鄭風·揚之水

【作者】佚名 【朝代】先秦

揚之水,不流束楚。終鮮兄弟,維予與女。無信人之言,人實诳女。

揚之水,不流束薪。終鮮兄弟,維予二人。無信人之言,人實不信。

繼續閱讀