天天看点

Spark Transformation算子->zip、zipWithIndex

1.zip

将两个 RDD 中的元素(KV 格式/非 KV 格式)变成一个 KV 格式的 RDD,两个 RDD 的 每个分区元素个数必须相同。

  1. java
package transformations;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

/**
 * @Author yqq
 * @Date 2021/12/10 00:04
 * @Version 1.0
 */
public class ZipAndZipWithIndexTest {
    public static void main(String[] args) {
        JavaSparkContext context = new JavaSparkContext(
                new SparkConf()
                        .setMaster("local")
                        .setAppName("zipandzipwithindex")
        );
        context.setLogLevel("Error");
        JavaRDD<String> rdd = context.parallelize(Arrays.asList("a", "b", "c", "d"));
        JavaRDD<Integer> rdd1 = context.parallelize(Arrays.asList(100,200,300,400));
        JavaPairRDD<String, Integer> zip = rdd.zip(rdd1);
        zip.foreach(e-> System.out.println(e));
    }
}      
Spark Transformation算子->zip、zipWithIndex

2. scala

package transformation

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Author yqq
 * @Date 2021/12/10 00:19
 * @Version 1.0
 */
object ZipAndZipWithIndexTest {
  def main(args: Array[String]): Unit = {
    val context = new SparkContext(
      new SparkConf()
        .setAppName("ZipAndZipWithIndex")
        .setMaster("local")
    )
    context.setLogLevel("Error")
    val rdd = context.parallelize(Array[String]("a", "b", "c", "c"))
    val rdd1 = context.parallelize(Array[Int](1, 2, 3, 4))
    rdd.zip(rdd1).foreach(println)
  }
}      
Spark Transformation算子->zip、zipWithIndex

2.zipWithIndex

该函数将 RDD 中的元素和这个元素在 RDD 中的索引号(从 0 开始)组合成(K,V) 对。

  1. java
package transformations;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

/**
 * @Author yqq
 * @Date 2021/12/10 00:04
 * @Version 1.0
 */
public class ZipAndZipWithIndexTest {
    public static void main(String[] args) {
        JavaSparkContext context = new JavaSparkContext(
                new SparkConf()
                        .setMaster("local")
                        .setAppName("zipandzipwithindex")
        );
        context.setLogLevel("Error");
        JavaRDD<String> rdd = context.parallelize(Arrays.asList("a", "b", "c", "d"));
        rdd.zipWithIndex().foreach(e-> System.out.println(e));

    }
}      
package transformation

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Author yqq
 * @Date 2021/12/10 00:19
 * @Version 1.0
 */
object ZipAndZipWithIndexTest {
  def main(args: Array[String]): Unit = {
    val context = new SparkContext(
      new SparkConf()
        .setAppName("ZipAndZipWithIndex")
        .setMaster("local")
    )
    context.setLogLevel("Error")
    val rdd = context.parallelize(Array[String]("a", "b", "c", "c"))
    rdd.zipWithIndex().foreach(println)

  }
}