==> mapPartitionsWithIndex
---> 定义: def mapPartitionsWithIndex[U](f:(Int, Iterator[T]) => Iterator[U], preserversPartitioning: Boolean = false)
---> 作用: 对 RDD 每个分区进行操作,带有分区号
---> 示例:输出分区号和内容
<code>// 创建一个RDD</code>
<code>val</code> <code>rdd</code><code>1</code> <code>=</code> <code>sc.parallelize(List(</code><code>1</code><code>,</code><code>2</code><code>,</code><code>3</code><code>,</code><code>4</code><code>,</code><code>5</code><code>,</code><code>6</code><code>,</code><code>7</code><code>,</code><code>8</code><code>,</code><code>9</code><code>))</code>
<code>// 创建一个函数,作为 f 的值</code>
<code>def</code> <code>func(index</code><code>:</code><code>Int, iter</code><code>:</code><code>Iterator[Int])</code><code>:</code><code>Iterator[String] </code><code>=</code> <code>{</code>
<code> </code><code>iter.toList.map(x</code><code>=</code><code>></code><code>"[PartID: "</code> <code>+ index + </code><code>", value= "</code> <code>+ x + </code><code>"]"</code><code>).iterator</code>
<code>}</code>
<code>// 调用</code>
<code>rdd</code><code>1</code><code>.mapPartitionsWithIndex(func).colect</code>
<code>// 结果</code>
<code>res</code><code>15</code><code>:</code> <code>Array[String] </code><code>=</code> <code>Array([PartitionID</code><code>:</code> <code>0</code><code>,value</code><code>=</code><code>1</code><code>], [PartitionID</code><code>:</code> <code>0</code><code>,value</code><code>=</code><code>2</code><code>], [PartitionID</code><code>:</code> <code>0</code><code>,value</code><code>=</code><code>3</code><code>], [PartitionID</code><code>:</code> <code>0</code><code>,value</code><code>=</code><code>4</code><code>], </code>
<code> </code><code>[PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>5</code><code>], [PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>6</code><code>], [PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>7</code><code>], [PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>8</code><code>], [PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>9</code><code>])</code>
==> aggregate
---> 定义:def aggregate[U: ClassTag](zeroValue: U)(seqOp:(U, T) => U, combOp: (U, U) => U): U
---- (zeroValue: U) 初始值
---- seqOp:(U, T) => U 局部操作
---- combOp:(U, U) => U 全局操作
---> 作用:先对局部进行操作,再对全局进行操作
---> 示例:
<code>// 求两个分区最大值的和,初始值为0</code>
<code>rdd</code><code>1</code><code>.aggregate(</code><code>0</code><code>)(math.max(</code><code>_</code><code>,</code><code>_</code><code>), </code><code>_</code><code>+</code><code>_</code><code>)</code>
<code>// 结果为:res16: Int = 13</code>
==> aggregateByKey
---> 定义:
---> 作用:对 key-value 格式 的数据进行 aggregate 操作
<code>// 准备一个 key-value 格式的 RDD</code>
<code>val</code> <code>parRDD </code><code>=</code> <code>sc.parallelize(List((</code><code>"cat"</code><code>, </code><code>2</code><code>),(</code><code>"cat"</code><code>, </code><code>5</code><code>),(</code><code>"mouse"</code><code>, </code><code>4</code><code>),(</code><code>"cat"</code><code>, </code><code>12</code><code>),(</code><code>"dog"</code><code>, </code><code>12</code><code>),(</code><code>"mouse"</code><code>, </code><code>2</code><code>)), </code><code>2</code><code>)</code>
<code>// 计算每个分区中的动物最多的个数求和</code>
<code>parRDD.aggregateByKey(</code><code>0</code><code>)(math.max(</code><code>_</code><code>, </code><code>_</code><code>), </code><code>_</code><code>+</code><code>_</code><code>)</code>
<code>// 结果为: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))</code>
<code>// 计算每种动物的总数量</code>
<code>parRDD.aggregateByKey(</code><code>0</code><code>)(</code><code>_</code><code>+</code><code>_</code><code>, </code><code>_</code><code>+</code><code>_</code><code>).collect </code><code>// 方法一</code>
<code>parRDD.reduceByKey(</code><code>_</code><code>+</code><code>_</code><code>).collect</code>
==> coalesce 与 repartition
---> 作用:将 RDD 中的分区进行重分区
---> 区别: coalesce 默认不会进行 shuffle(false)
repartition 会进行 shuffle(true), 会将数据真正通过网络进行重分区
<code>// 定义一个 RDD </code>
<code>val</code> <code>rdd </code><code>=</code> <code>sc.parallelize(List(</code><code>1</code><code>,</code><code>2</code><code>,</code><code>3</code><code>,</code><code>4</code><code>,</code><code>5</code><code>,</code><code>6</code><code>,</code><code>7</code><code>,</code><code>8</code><code>), </code><code>2</code><code>)</code>
<code>// 显示分区中的分区号和分区号中的内容</code>
<code>// 查看 rdd 中的分区情况</code>
<code>rdd.mapPartitionsWithIndex(func).collect</code>
<code>// 结果为: Array[String] = Array(</code>
<code>// [PartID: 0, value= 1], [PartID: 0, value= 2], [PartID: 0, value= 3], [PartID: 0, value= 4], </code>
<code>// [PartID: 1, value= 5], [PartID: 1, value= 6], [PartID: 1, value= 7], [PartID: 1, value= 8])</code>
<code>// 使用 repartition 将分区数改为3</code>
<code>val</code> <code>rdd</code><code>2</code> <code>=</code> <code>rdd</code><code>1</code><code>.repartition(</code><code>3</code><code>)</code>
<code>val</code> <code>rdd</code><code>3</code> <code>=</code> <code>rdd</code><code>1</code><code>.coalesce(</code><code>3</code><code>, </code><code>true</code><code>)</code>
<code>// 查看rdd2 与rdd3 的分区情况</code>
<code>rdd</code><code>2</code><code>.mapPartitionsWithIndex(func).collect</code>
<code>rdd</code><code>3</code><code>.mapPartitionsWithIndex(func).collect</code>
<code>// 结果为:Array[String] = Array(</code>
<code>// [PartID: 0, value= 3], [PartID: 0, value= 6], </code>
<code>// [PartID: 1, value= 1], [PartID: 1, value= 4], [PartID: 1, value= 7], </code>
<code>// [PartID: 2, value= 2], [PartID: 2, value= 5])</code>
本文转自 菜鸟的征程 51CTO博客,原文链接:http://blog.51cto.com/songqinglong/2082613