天天看点

Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMapparallelizemakeRDDtextFilefiltermapflatMap

Spark RDD算子(一)

  • parallelize
    • scala版本
    • java版本
  • makeRDD
  • textFile
    • scala版本
    • java版本
  • filter
    • scala版本
    • java版本
  • map
    • scala版本
    • java版本
  • flatMap
    • scala版本
    • java版本

parallelize

通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集

scala版本

def parallelize[T](seq: Seq[T],numSlices: Int)(implicit arg0: ClassTag[T]): RDD[T]

  • 第一个参数是一个seq集合
  • 第二个参数是分区数,可省略
  • 返回值是一个RDD
scala> sc.parallelize(List("hello","world","hello","scala"))
res0: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:25
           

java版本

def parallelize[T](list: java.util.List[T]): JavaRDD[T] =parallelize(list, sc.defaultParallelism)

  • 第一个参数为List集合
  • 第二参数为分区数,可省略
  • 返回值为JavaRDD[T]类型
public static void main(String[] args) {
    SparkConf  conf = new SparkConf().setMaster("local[*]").setAppName("parallelizeJava");
    JavaSparkContext sc = new JavaSparkContext(conf);
    // Arrays.asList该方法是将数组转化成List集合的方法
    List<String> list = Arrays.asList("hello java","hello scala","hello world");
    JavaRDD<String> rdd1 = sc.parallelize(list);
}
           

makeRDD

def makeRDD[T](seq: Seq[(T, Seq[String])],numSlices: Int)(implicit arg0: ClassTag[T]): RDD[T]

  • scala版本的才有makeRDD
  • 该方法的底层调用parallelize(seq, numSlices)实现

textFile

从外部存储中读取数据来创建 RDD

scala版本

def textFile(path: String,minPartitions: Int): org.apache.spark.rdd.RDD[String]

  • 第一个参数为文件路径
  • 第二个参数指定最小分区数
  • 返回值为RDD[String]
// 本地文件
val rdd1:RDD[String] = sc.textFile("file:///f:/a.txt")
// hdfs文件系统
val rdd2:RDD[String] = sc.textFile("hdfs://hadoop4:9000/user/a.txt")
           

java版本

// 本地文件
JavaRDD<String> rdd1 = sc.textFile("f:/a.txt");
// hdfs文件系统
JavaRDD<String> rdd2 = sc.textFile("hdfs://192.168.233.133:9000/user/a.txt");
           

textFile支持分区,支持模式匹配,例如把某目录下的所有txt文件转换成RDD

filter

filter用于将数据按照某一规则过滤,返回满足条件的内容

scala版本

例:将List(1,2,3,4,5,6,7,8,9,10)中的偶数输出

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
 rdd1.filter(_%2==0).collect.foreach(println)
           
Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMapparallelizemakeRDDtextFilefiltermapflatMap

java版本

// 获取rdd
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
// rdd的filter方法参数需要一个函数,需要重写该函数的call方法
JavaRDD<Integer> filterRDD = rdd1.filter(new Function<Integer, Boolean>() {
    @Override
    public Boolean call(Integer v1) throws Exception {
        return v1 % 2 == 0;
    }
});
List<Integer> collect = filterRDD.collect();
// 输出
for (Integer i : collect) {
    System.out.println(i);
}
           
Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMapparallelizemakeRDDtextFilefiltermapflatMap

map

map() 接收一个函数,把这个函数用于集合中的每个元素,将函数的返回值作为结果RDD

scala版本

例:List(“hello”,“scala”,“java”,“world”),将list中每个元素变为(k,v)形式,k为元素本身,v为该元素长度,如:(scala,5)

val rdd1 = sc.parallelize(List("hello","scala","java","world"))
rdd1.map(x=>(x,x.size)).collect.foreach(println)
           
Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMapparallelizemakeRDDtextFilefiltermapflatMap

java版本

JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("hello", "scala", "java", "world"));
JavaRDD<Tuple2<String, Integer>> mapRDD = rdd2.map(new Function<String, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> call(String v1) throws Exception {
        return new Tuple2<>(v1, v1.length());
    }
});
List<Tuple2<String, Integer>> collect1 = mapRDD.collect();
for (Tuple2<String, Integer> tuple2 : collect1) {
    System.out.println(tuple2);
}
           
Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMapparallelizemakeRDDtextFilefiltermapflatMap

flatMap

flatMap的函数应用于每一个元素,对于每一个元素返回的是多个元素组成的迭代器

scala版本

例:将数据切分为单词

val rdd1 = sc.parallelize(List("hello scala","hello java","hello world"))
rdd1.flatMap(x=>x.split(" ")).collect.foreach(println)
           
Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMapparallelizemakeRDDtextFilefiltermapflatMap

java版本

spark2.0以上,对flatMap的方法有所修改,2.0版本以下重写call的返回值是Iteratable类型,2.0以上是Iterator类型

  • spark 2.0 以下
JavaRDD<String> rdd3 = sc.parallelize(Arrays.asList("hello scala", "hello java", "hello world"));
JavaRDD<String> flatMapRDD = rdd3.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterable<String> call(String s) throws Exception {
        String[] strings = s.split(" ");
        return Arrays.asList(strings);
    }
});
           
  • spark 2.0 以上
JavaRDD<String> rdd3 = sc.parallelize(Arrays.asList("hello scala", "hello java", "hello world"));
JavaRDD<String> flatMapRDD = rdd3.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterator<String> call(String s) throws Exception {
        String[] strings = s.split(" ");
        return Arrays.asList(strings).iterator();
    }
});
List<String> collect2 = flatMapRDD.collect();
for (String s : collect2) {
    System.out.println(s);
}
           
Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMapparallelizemakeRDDtextFilefiltermapflatMap

继续阅读