天天看点

Spark Transformation算子->mapPartitionWithIndex

类似于 mapPartitions,除此之外还会携带分区的索引值。

  1. java
package transformations;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 * @Author yqq
 * @Date 2021/12/09 20:45
 * @Version 1.0
 */
public class MapPartitionsWithIndexTest {
    public static void main(String[] args) {
        JavaSparkContext context = new JavaSparkContext(
                new SparkConf()
                        .setMaster("local")
                        .setAppName("mappartitionswithindex")
        );
        context.setLogLevel("Error");
        JavaRDD<String> rdd = context.parallelize(Arrays.asList("a", "b", "c", "e", "f", "g"), 3);
        List<String> list = new ArrayList<>();
        JavaRDD<String> rdd1 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
            @Override
            public Iterator<String> call(Integer v1, Iterator<String> v2) throws Exception {
                while (v2.hasNext())
                    list.add("partition:"+v1+"\t"+"value:"+v2.next());
                return list.iterator();
            }
        }, false);
        rdd1.collect().forEach(e-> System.out.println(e));
    }
}      
package transformation

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ListBuffer

/**
 * @Author yqq
 * @Date 2021/12/09 21:11
 * @Version 1.0
 */
object MapPartitionsWithIndexTest {
  def main(args: Array[String]): Unit = {
    val context = new SparkContext(
      new SparkConf()
        .setMaster("local")
        .setAppName("mappartitionswithindex")
    )
    context.setLogLevel("Error")
    context.parallelize(Array[String]("a", "b", "c", "e", "f", "g"),3)
      .mapPartitionsWithIndex((index,ite)=>{
        val b = new ListBuffer[String]()
        while (ite.hasNext)
          b.append(s"partition:$index,value:${ite.next()}")
        b.iterator
      }).foreach(println)
  }
}