1.zip
将两个 RDD 中的元素(KV 格式/非 KV 格式)变成一个 KV 格式的 RDD,两个 RDD 的 每个分区元素个数必须相同。
- java
package transformations;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
/**
* @Author yqq
* @Date 2021/12/10 00:04
* @Version 1.0
*/
public class ZipAndZipWithIndexTest {
public static void main(String[] args) {
JavaSparkContext context = new JavaSparkContext(
new SparkConf()
.setMaster("local")
.setAppName("zipandzipwithindex")
);
context.setLogLevel("Error");
JavaRDD<String> rdd = context.parallelize(Arrays.asList("a", "b", "c", "d"));
JavaRDD<Integer> rdd1 = context.parallelize(Arrays.asList(100,200,300,400));
JavaPairRDD<String, Integer> zip = rdd.zip(rdd1);
zip.foreach(e-> System.out.println(e));
}
}

2. scala
package transformation
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author yqq
* @Date 2021/12/10 00:19
* @Version 1.0
*/
object ZipAndZipWithIndexTest {
def main(args: Array[String]): Unit = {
val context = new SparkContext(
new SparkConf()
.setAppName("ZipAndZipWithIndex")
.setMaster("local")
)
context.setLogLevel("Error")
val rdd = context.parallelize(Array[String]("a", "b", "c", "c"))
val rdd1 = context.parallelize(Array[Int](1, 2, 3, 4))
rdd.zip(rdd1).foreach(println)
}
}
2.zipWithIndex
该函数将 RDD 中的元素和这个元素在 RDD 中的索引号(从 0 开始)组合成(K,V) 对。
- java
package transformations;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
/**
* @Author yqq
* @Date 2021/12/10 00:04
* @Version 1.0
*/
public class ZipAndZipWithIndexTest {
public static void main(String[] args) {
JavaSparkContext context = new JavaSparkContext(
new SparkConf()
.setMaster("local")
.setAppName("zipandzipwithindex")
);
context.setLogLevel("Error");
JavaRDD<String> rdd = context.parallelize(Arrays.asList("a", "b", "c", "d"));
rdd.zipWithIndex().foreach(e-> System.out.println(e));
}
}
package transformation
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author yqq
* @Date 2021/12/10 00:19
* @Version 1.0
*/
object ZipAndZipWithIndexTest {
def main(args: Array[String]): Unit = {
val context = new SparkContext(
new SparkConf()
.setAppName("ZipAndZipWithIndex")
.setMaster("local")
)
context.setLogLevel("Error")
val rdd = context.parallelize(Array[String]("a", "b", "c", "c"))
rdd.zipWithIndex().foreach(println)
}
}