天天看点

Scala 高级算子

==> mapPartitionsWithIndex

    ---> 定义: def mapPartitionsWithIndex[U](f:(Int, Iterator[T]) => Iterator[U], preserversPartitioning: Boolean = false)

    ---> 作用: 对 RDD 每个分区进行操作,带有分区号

    ---> 示例:输出分区号和内容

<code>// 创建一个RDD</code>

<code>val</code> <code>rdd</code><code>1</code> <code>=</code> <code>sc.parallelize(List(</code><code>1</code><code>,</code><code>2</code><code>,</code><code>3</code><code>,</code><code>4</code><code>,</code><code>5</code><code>,</code><code>6</code><code>,</code><code>7</code><code>,</code><code>8</code><code>,</code><code>9</code><code>))</code>

<code>// 创建一个函数,作为 f 的值</code>

<code>def</code> <code>func(index</code><code>:</code><code>Int, iter</code><code>:</code><code>Iterator[Int])</code><code>:</code><code>Iterator[String] </code><code>=</code> <code>{</code>

<code>    </code><code>iter.toList.map(x</code><code>=</code><code>&gt;</code><code>"[PartID: "</code> <code>+ index + </code><code>", value= "</code> <code>+ x + </code><code>"]"</code><code>).iterator</code>

<code>}</code>

<code>// 调用</code>

<code>rdd</code><code>1</code><code>.mapPartitionsWithIndex(func).colect</code>

<code>// 结果</code>

<code>res</code><code>15</code><code>:</code> <code>Array[String] </code><code>=</code> <code>Array([PartitionID</code><code>:</code> <code>0</code><code>,value</code><code>=</code><code>1</code><code>], [PartitionID</code><code>:</code> <code>0</code><code>,value</code><code>=</code><code>2</code><code>], [PartitionID</code><code>:</code> <code>0</code><code>,value</code><code>=</code><code>3</code><code>], [PartitionID</code><code>:</code> <code>0</code><code>,value</code><code>=</code><code>4</code><code>], </code>

<code>                             </code><code>[PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>5</code><code>], [PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>6</code><code>], [PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>7</code><code>], [PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>8</code><code>], [PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>9</code><code>])</code>

==&gt; aggregate

    ---&gt; 定义:def aggregate[U: ClassTag](zeroValue: U)(seqOp:(U, T) =&gt; U, combOp: (U, U) =&gt; U): U

        ---- (zeroValue: U)            初始值

        ---- seqOp:(U, T) =&gt; U    局部操作

        ---- combOp:(U, U) =&gt; U        全局操作

    ---&gt; 作用:先对局部进行操作,再对全局进行操作

    ---&gt; 示例:

<code>// 求两个分区最大值的和,初始值为0</code>

<code>rdd</code><code>1</code><code>.aggregate(</code><code>0</code><code>)(math.max(</code><code>_</code><code>,</code><code>_</code><code>), </code><code>_</code><code>+</code><code>_</code><code>)</code>

<code>// 结果为:res16: Int = 13</code>

==&gt; aggregateByKey

    ---&gt; 定义:

    ---&gt; 作用:对 key-value 格式 的数据进行 aggregate 操作

<code>// 准备一个 key-value 格式的 RDD</code>

<code>val</code> <code>parRDD </code><code>=</code> <code>sc.parallelize(List((</code><code>"cat"</code><code>, </code><code>2</code><code>),(</code><code>"cat"</code><code>, </code><code>5</code><code>),(</code><code>"mouse"</code><code>, </code><code>4</code><code>),(</code><code>"cat"</code><code>, </code><code>12</code><code>),(</code><code>"dog"</code><code>, </code><code>12</code><code>),(</code><code>"mouse"</code><code>, </code><code>2</code><code>)), </code><code>2</code><code>)</code>

<code>// 计算每个分区中的动物最多的个数求和</code>

<code>parRDD.aggregateByKey(</code><code>0</code><code>)(math.max(</code><code>_</code><code>, </code><code>_</code><code>), </code><code>_</code><code>+</code><code>_</code><code>)</code>

<code>// 结果为:  Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))</code>

<code>// 计算每种动物的总数量</code>

<code>parRDD.aggregateByKey(</code><code>0</code><code>)(</code><code>_</code><code>+</code><code>_</code><code>, </code><code>_</code><code>+</code><code>_</code><code>).collect        </code><code>// 方法一</code>

<code>parRDD.reduceByKey(</code><code>_</code><code>+</code><code>_</code><code>).collect</code>

==&gt; coalesce 与 repartition    

    ---&gt; 作用:将 RDD 中的分区进行重分区

    ---&gt; 区别: coalesce 默认不会进行 shuffle(false)

                        repartition 会进行 shuffle(true), 会将数据真正通过网络进行重分区

<code>// 定义一个 RDD </code>

<code>val</code> <code>rdd </code><code>=</code> <code>sc.parallelize(List(</code><code>1</code><code>,</code><code>2</code><code>,</code><code>3</code><code>,</code><code>4</code><code>,</code><code>5</code><code>,</code><code>6</code><code>,</code><code>7</code><code>,</code><code>8</code><code>), </code><code>2</code><code>)</code>

<code>// 显示分区中的分区号和分区号中的内容</code>

<code>// 查看 rdd 中的分区情况</code>

<code>rdd.mapPartitionsWithIndex(func).collect</code>

<code>// 结果为: Array[String] = Array(</code>

<code>// [PartID: 0, value= 1], [PartID: 0, value= 2], [PartID: 0, value= 3], [PartID: 0, value= 4], </code>

<code>// [PartID: 1, value= 5], [PartID: 1, value= 6], [PartID: 1, value= 7], [PartID: 1, value= 8])</code>

<code>// 使用 repartition 将分区数改为3</code>

<code>val</code> <code>rdd</code><code>2</code> <code>=</code> <code>rdd</code><code>1</code><code>.repartition(</code><code>3</code><code>)</code>

<code>val</code> <code>rdd</code><code>3</code> <code>=</code> <code>rdd</code><code>1</code><code>.coalesce(</code><code>3</code><code>, </code><code>true</code><code>)</code>

<code>// 查看rdd2 与rdd3 的分区情况</code>

<code>rdd</code><code>2</code><code>.mapPartitionsWithIndex(func).collect</code>

<code>rdd</code><code>3</code><code>.mapPartitionsWithIndex(func).collect</code>

<code>// 结果为:Array[String] = Array(</code>

<code>// [PartID: 0, value= 3], [PartID: 0, value= 6], </code>

<code>// [PartID: 1, value= 1], [PartID: 1, value= 4], [PartID: 1, value= 7], </code>

<code>// [PartID: 2, value= 2], [PartID: 2, value= 5])</code>

本文转自 菜鸟的征程 51CTO博客,原文链接:http://blog.51cto.com/songqinglong/2082613