天天看点

Spark使用parallelize方法创建RDD以及map、flatmap的区别

spark创建RDD常用的方法有两个分别是parallelize、parallelizePairs,parallelize用来生成普通格式的RDD,parallelizePairs用来生成kv格式的RDD

package com.debug;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

public class CreateRDD1 {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.setMaster("local");
		conf.setAppName("createRDD01");

		JavaSparkContext sc = new JavaSparkContext(conf);

		List<String> arr = Arrays.asList("a_1", "b_2", "c_3", "d_4", "e_5", "f_6");
		JavaRDD<String> rdd1 = sc.parallelize(arr);

		/*
		 * JavaRDD<String> res=rdd1.map(new Function<String, String>() {
		 * 
		 * public String call(String ch) throws Exception {
		 * 
		 * return ch+"*"; } });
		 */
		JavaRDD<String> res = rdd1.flatMap(new FlatMapFunction<String, String>() {

			public Iterable<String> call(String ch) throws Exception {

				return Arrays.asList(ch.split("_"));
			}
		});
		res.foreach(new VoidFunction<String>() {

			public void call(String content) throws Exception {
				System.out.println(content);

			}
		});

	}

}
           

这里需要注意理解map和flatmap的区别,map会将每一条映射为一个新对象,flatmap会将每一个输入对象映射为一个新集合,然后把这些集合拼接成一个大的集合;如果把RDD的每个元素比喻成水果,map就相当于削皮,水果的总数不会变,flatmap相当于切碎水果,总数一般变多

平时使用较多的还有kv格式的RDD,前面已经说过可以使用parallelizePairs生成kv格式的RDD,看下如下的demo

package com.debug;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class CreateRDD2 {

	public static void main(String[] args) {
		
		SparkConf conf=new SparkConf();
		conf.setMaster("local");
		conf.setAppName("CreateRDD2");
		
		
		JavaSparkContext sc=new JavaSparkContext(conf);
		List<Tuple2<String, Double>> arr2 = Arrays.asList(
				new Tuple2<String, Double>("u1", 20.01),
				new Tuple2<String, Double>("u2", 18.95), 
				new Tuple2<String, Double>("u3", 20.55),
				new Tuple2<String, Double>("u4", 20.12), 
				new Tuple2<String, Double>("u5", 100.11)
		);
		JavaPairRDD<String, Double> rdd2 = sc.parallelizePairs(arr2);

		rdd2.foreach(new VoidFunction<Tuple2<String, Double>>() {

			public void call(Tuple2<String, Double> tup) throws Exception {
				System.out.println(tup);

			}
		});

		sc.stop();

	}

}
           
Spark使用parallelize方法创建RDD以及map、flatmap的区别

继续阅读