類似于 mapPartitions,除此之外還會攜帶分區的索引值。
- java
package transformations;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* @Author yqq
* @Date 2021/12/09 20:45
* @Version 1.0
*/
public class MapPartitionsWithIndexTest {
public static void main(String[] args) {
JavaSparkContext context = new JavaSparkContext(
new SparkConf()
.setMaster("local")
.setAppName("mappartitionswithindex")
);
context.setLogLevel("Error");
JavaRDD<String> rdd = context.parallelize(Arrays.asList("a", "b", "c", "e", "f", "g"), 3);
List<String> list = new ArrayList<>();
JavaRDD<String> rdd1 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
@Override
public Iterator<String> call(Integer v1, Iterator<String> v2) throws Exception {
while (v2.hasNext())
list.add("partition:"+v1+"\t"+"value:"+v2.next());
return list.iterator();
}
}, false);
rdd1.collect().forEach(e-> System.out.println(e));
}
}
package transformation
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
/**
* @Author yqq
* @Date 2021/12/09 21:11
* @Version 1.0
*/
object MapPartitionsWithIndexTest {
def main(args: Array[String]): Unit = {
val context = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("mappartitionswithindex")
)
context.setLogLevel("Error")
context.parallelize(Array[String]("a", "b", "c", "e", "f", "g"),3)
.mapPartitionsWithIndex((index,ite)=>{
val b = new ListBuffer[String]()
while (ite.hasNext)
b.append(s"partition:$index,value:${ite.next()}")
b.iterator
}).foreach(println)
}
}