天天看點

Scala 進階算子

==> mapPartitionsWithIndex

    ---> 定義: def mapPartitionsWithIndex[U](f:(Int, Iterator[T]) => Iterator[U], preserversPartitioning: Boolean = false)

    ---> 作用: 對 RDD 每個分區進行操作,帶有分區号

    ---> 示例:輸出分區号和内容

<code>// 建立一個RDD</code>

<code>val</code> <code>rdd</code><code>1</code> <code>=</code> <code>sc.parallelize(List(</code><code>1</code><code>,</code><code>2</code><code>,</code><code>3</code><code>,</code><code>4</code><code>,</code><code>5</code><code>,</code><code>6</code><code>,</code><code>7</code><code>,</code><code>8</code><code>,</code><code>9</code><code>))</code>

<code>// 建立一個函數,作為 f 的值</code>

<code>def</code> <code>func(index</code><code>:</code><code>Int, iter</code><code>:</code><code>Iterator[Int])</code><code>:</code><code>Iterator[String] </code><code>=</code> <code>{</code>

<code>    </code><code>iter.toList.map(x</code><code>=</code><code>&gt;</code><code>"[PartID: "</code> <code>+ index + </code><code>", value= "</code> <code>+ x + </code><code>"]"</code><code>).iterator</code>

<code>}</code>

<code>// 調用</code>

<code>rdd</code><code>1</code><code>.mapPartitionsWithIndex(func).colect</code>

<code>// 結果</code>

<code>res</code><code>15</code><code>:</code> <code>Array[String] </code><code>=</code> <code>Array([PartitionID</code><code>:</code> <code>0</code><code>,value</code><code>=</code><code>1</code><code>], [PartitionID</code><code>:</code> <code>0</code><code>,value</code><code>=</code><code>2</code><code>], [PartitionID</code><code>:</code> <code>0</code><code>,value</code><code>=</code><code>3</code><code>], [PartitionID</code><code>:</code> <code>0</code><code>,value</code><code>=</code><code>4</code><code>], </code>

<code>                             </code><code>[PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>5</code><code>], [PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>6</code><code>], [PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>7</code><code>], [PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>8</code><code>], [PartitionID</code><code>:</code> <code>1</code><code>,value</code><code>=</code><code>9</code><code>])</code>

==&gt; aggregate

    ---&gt; 定義:def aggregate[U: ClassTag](zeroValue: U)(seqOp:(U, T) =&gt; U, combOp: (U, U) =&gt; U): U

        ---- (zeroValue: U)            初始值

        ---- seqOp:(U, T) =&gt; U    局部操作

        ---- combOp:(U, U) =&gt; U        全局操作

    ---&gt; 作用:先對局部進行操作,再對全局進行操作

    ---&gt; 示例:

<code>// 求兩個分區最大值的和,初始值為0</code>

<code>rdd</code><code>1</code><code>.aggregate(</code><code>0</code><code>)(math.max(</code><code>_</code><code>,</code><code>_</code><code>), </code><code>_</code><code>+</code><code>_</code><code>)</code>

<code>// 結果為:res16: Int = 13</code>

==&gt; aggregateByKey

    ---&gt; 定義:

    ---&gt; 作用:對 key-value 格式 的資料進行 aggregate 操作

<code>// 準備一個 key-value 格式的 RDD</code>

<code>val</code> <code>parRDD </code><code>=</code> <code>sc.parallelize(List((</code><code>"cat"</code><code>, </code><code>2</code><code>),(</code><code>"cat"</code><code>, </code><code>5</code><code>),(</code><code>"mouse"</code><code>, </code><code>4</code><code>),(</code><code>"cat"</code><code>, </code><code>12</code><code>),(</code><code>"dog"</code><code>, </code><code>12</code><code>),(</code><code>"mouse"</code><code>, </code><code>2</code><code>)), </code><code>2</code><code>)</code>

<code>// 計算每個分區中的動物最多的個數求和</code>

<code>parRDD.aggregateByKey(</code><code>0</code><code>)(math.max(</code><code>_</code><code>, </code><code>_</code><code>), </code><code>_</code><code>+</code><code>_</code><code>)</code>

<code>// 結果為:  Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))</code>

<code>// 計算每種動物的總數量</code>

<code>parRDD.aggregateByKey(</code><code>0</code><code>)(</code><code>_</code><code>+</code><code>_</code><code>, </code><code>_</code><code>+</code><code>_</code><code>).collect        </code><code>// 方法一</code>

<code>parRDD.reduceByKey(</code><code>_</code><code>+</code><code>_</code><code>).collect</code>

==&gt; coalesce 與 repartition    

    ---&gt; 作用:将 RDD 中的分區進行重分區

    ---&gt; 差別: coalesce 預設不會進行 shuffle(false)

                        repartition 會進行 shuffle(true), 會将資料真正通過網絡進行重分區

<code>// 定義一個 RDD </code>

<code>val</code> <code>rdd </code><code>=</code> <code>sc.parallelize(List(</code><code>1</code><code>,</code><code>2</code><code>,</code><code>3</code><code>,</code><code>4</code><code>,</code><code>5</code><code>,</code><code>6</code><code>,</code><code>7</code><code>,</code><code>8</code><code>), </code><code>2</code><code>)</code>

<code>// 顯示分區中的分區号和分區号中的内容</code>

<code>// 檢視 rdd 中的分區情況</code>

<code>rdd.mapPartitionsWithIndex(func).collect</code>

<code>// 結果為: Array[String] = Array(</code>

<code>// [PartID: 0, value= 1], [PartID: 0, value= 2], [PartID: 0, value= 3], [PartID: 0, value= 4], </code>

<code>// [PartID: 1, value= 5], [PartID: 1, value= 6], [PartID: 1, value= 7], [PartID: 1, value= 8])</code>

<code>// 使用 repartition 将分區數改為3</code>

<code>val</code> <code>rdd</code><code>2</code> <code>=</code> <code>rdd</code><code>1</code><code>.repartition(</code><code>3</code><code>)</code>

<code>val</code> <code>rdd</code><code>3</code> <code>=</code> <code>rdd</code><code>1</code><code>.coalesce(</code><code>3</code><code>, </code><code>true</code><code>)</code>

<code>// 檢視rdd2 與rdd3 的分區情況</code>

<code>rdd</code><code>2</code><code>.mapPartitionsWithIndex(func).collect</code>

<code>rdd</code><code>3</code><code>.mapPartitionsWithIndex(func).collect</code>

<code>// 結果為:Array[String] = Array(</code>

<code>// [PartID: 0, value= 3], [PartID: 0, value= 6], </code>

<code>// [PartID: 1, value= 1], [PartID: 1, value= 4], [PartID: 1, value= 7], </code>

<code>// [PartID: 2, value= 2], [PartID: 2, value= 5])</code>

本文轉自 菜鳥的征程 51CTO部落格,原文連結:http://blog.51cto.com/songqinglong/2082613