天天看点

spark scala RDD

1、创建RDD

  1. 从外部数据源创建
  2. 从父RDD创建
  3. 使用makeRDD() 和 parallelize() 这两个函数创建

1.1 从外部数据源进行创建(HDFS,HBASE等)

Cassandra、Amazon S3,spark 支持的文本文件、SequeceFile和任何hadoop InputFormat格式的文件

# textFile(hdfs_file_path)
val input = sc.textFile("/spark/train_new.data").foreach(println)
[[email protected] data]# hadoop fs -ls /spark/train_new.data
-rw-r--r--   3 root supergroup      81439 2018-01-15 16:22 /spark/train_new.data
           

1.2 从父RDD 创建

val words = input.flatMap{ x=>
    x.split("\t")
}
           

1.3 并行化集合makeRDD()函数和parallelize() 函数,这两个函数是sparkContext 自带的函数

scala> val words_1 = Array("three","one","three","two","four","five","one","five")
scala> sc.parallelize(words_1).foreach(println(_))
three
one
three
two
four
five
one
five
           

1.4 makeRDD 和 parallelize 这两者的区别:

# 先看两个函数的源码
def parallelize[T:ClassTag](
    seq:Seq[T],numSlice:Int=defaultParallelism):RDD[T]
def makeRDD[T:ClassTag](
    seq:Seq[T],numSlice:Int=defaultParallelism):RDD[T]
def makeRDD[T:ClassTag](seq:Seq[(T,Seq[String])]).RDD[T]
           

makeRDD 有两种方式

makeRDD 第一种方法和parallelize 完全一样。接受的参数类型:序列(数组,列表,字符串,以及任意可迭代对象等)

makeRDD 第二种方法,接受参数类型seq(Int,seq(String))

val seq3 = List((,List("ab","cd","de")),(,List("abc","bcd")),(,List("abcde","bcdef",)))
sc.makeRDD(seq3).foreach(println(_))
# 输出:
(,List(ab, cd, de))
(,List(abc, bcd))
(,List(abcde, bcdef, ))
val seq2 = List((,List("ab","cd","de")),(,List("abc","bcd")),(,List("abcde","bcdef")))
sc.makeRDD(seq2).foreach(println(_))  # 这里只输出最前面位置信息。注意seq2 和 seq3 的区别
# 输出



# 两个序列嵌套,外层序列中元素数据类型Int,而且只有1个元素;内层序列中所有元素的数据类型是String类型
# 注意下面两段代码的区别
scala> sc.makeRDD(Array((,List("a","b")),(,List("c","d")),(,List("e")))).foreach(println(_))
# 输出:



scala> sc.makeRDD(Array((,,List("a","b")),(,,List("c","d")),(,,List("e")))).foreach(println(_))
(,,List(a, b))
(,,List(e))
(,,List(c, d))
           

2、RDD 算子,使用scala语言实现,下一篇准备使用python实现

  1. transformation 类
  2. action 类

2.1 transformation RDD 算子

2.1.1 flatMap{} 实现的功能:输出,input 和 output 是一对多的输出。如:input是一个句子,output输出将这个句子拆分成单个单词进行输出,这个例子适用于wordcount

input:how do you do

output

how

do

you

do

# 数据源:  
[root@master tmp]# hadoop fs -text /test
how do you do
#代码示例:
scala> sc.textFile("hdfs://master:9000/test").foreach(println())
how do you do
scala> sc.textFile("hdfs://master:9000/test").flatMap{ x=> x.split(" ")}.foreach(println())
how
do
you
do
split(" ") 以空格分开,和python一样这样会将一个字符串变成一个列表。
           

2..1.2 reduceByKey 先按key进行分组,默认情况下将相同的key所对应的value 进行相加。相当于map combine。

在本地将相同的key合并在传给reduce,减少网络I/O

input:

key1,val1

key1,val2

key2,val3

key2,val4

output:

key1,(val1+val2)

key2,(val3+val4)

# 代码示例:
sc.textFile("hdfs://master:9000/test").flatMap{ x=> x.split(" ")}.map{ x=> (x,)}.reduceByKey((x,y) => x + y).foreach(println(_))
(how,)
(you,)
(do,)

# 下面是简写 reduceByKey 默认做相加操作:

sc.textFile("hdfs://master:9000/test").flatMap{ x=> x.split(" ")}.map{ x=> (x,)}.reduceByKey(_+_).foreach(println(_))
(how,)
(you,)
(do,)
# 对于reduceByKey可以多看几个示例比较容易弄懂   
scala> sc.parallelize(List((,),(,),(,),(,))).reduceByKey((x,y) => x + y).foreach(println(_))
(,)
(,)
scala> sc.parallelize(List((,),(,),(,),(,))).reduceByKey(_+_).foreach(println(_))
(,)
(,)
scala> sc.parallelize(List((,),(,),(,),(,))).reduceByKey((x,y) => x * y).foreach(println(_))
(,)
(,)
           

2.1.2.1 reduceByKey(fun) 传递自定义函数,下面给一个完整的代码示例

基于命令行终端的形式进行开发,先搭建环境

工作目录/data/project/spark/spark_workstation/

cd /data/project/spark/spark_workstation/
mkdir lib project src target -pv
mkdir src/main/scala/spark/example -pv
# 将spark中所有的jar包复制到lib目录下
[root@master spark_workstation]# cp /data/spark/jars/* lib
[root@master spark_workstation]# cat build.sbt 
name :="reduceByKey"
version :="2.2.1"
scalaVersion :="2.11.8" # 注意scala的版本最好是这个否则会报很多意想不到的错误。
# 搭建编译scala 编译环境,这个过程时间比较长,需要连接外网,自动下载依赖包,可以使用阿里云的镜像。参考:
https://www.cnblogs.com/jasondan/p/build-spark-with-aliyun-maven.html
# 自动构建scala 开发spark 环境,自动下载所需要的依赖包
[root@master spark_workstation]# sbt compile 
# scala 源代码目录 :/data/project/spark/spark_workstation/src/main/scala/spark/example
cat Test_reduceByKey

package spark.example

import org.apache.spark._
import SparkContext._

object Test_reduceByKey {
    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("TEST")
        val sc = new SparkContext(conf)
        val a = List((,),(,),(,),(,),(,),(,))
        val inputRDD = sc.parallelize(a)
        inputRDD.reduceByKey(sum).collect().foreach(println(_))
        }
    def sum( a:Int, b:Int) : Int = {
        var sum:Int = 
        sum = a + b
        return sum
    }
}

# 编译
[root@master spark_workstation]# sbt package
# 生jar包
[root@master spark_workstation]# ll target/scala-2.11/
reducebykey_2-.jar # 这个目录下自动生成这个jar包
# 提交spark集群运行
/data/spark/bin/spark-submit --master yarn-cluster \
    --num-executors  \
    --executor-memory '1024m' \
    --executor-cores  \
    --class spark.example.Test_reduceByKey ./target/scala-2.11/reducebykey_2.11-2.2.1.jar
# 注意--class 后面跟的类名一定要是上面代码定义的类别(object Test_reduceByKey)
# 输出:
(,)
(,)
(,)
(,)
           

2.1.3 groupByKey 按key分组,将相同key对应的value值添加至一个sequence中。如果要对这个sequence做聚合操作,最好使用reduceByKey

groupByKey() 不能自定义函数,只能通过map来应用自定义的函数。而reduceByKey(fun) 这里可以直接给定自定义的函数名

# input:
key1,val1
key1,val2
key3,val3
key2,val4
key2,val5
key3,val6
key1,val7
key2,val8
# output:
key1,[val1,val2,val3,val7]
key3,[val3,val6]
key2,[val4,val5,val8]

# 输出不按key排序,也不一定是按key输入的顺序输出。
val words = Array("one","two","three","one","two","one","five","three")
val wordPairssRDD = sc.parallelize(words).map(word => (word,))
val wordCountsWithGroupByKey = wordPairssRDD.groupByKey().collect()
wordCountsWithGroupByKey: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(, )),(one,CompactBuffer(, , )), (three,CompactBuffer(, )), (five,CompactBuffer()))
           

2.1.4 groupByKey 和 reduceByKey 区别使用图示说明

spark scala RDD

reduceByKey spark 在每个分区移动数据之前,对相同的key做聚合操作(如把同一key对应的value值进行相加),这样可以减小移动数据的代价,类似于MapReduce中的combiner

groupByKey 不会对相同的key做聚合操作

2.1.5 map 实现输出功能,input和output 实现一对一的输出,map同时可以使用自定义函数。

代码示例,使用自定义函数
自定义函数:
scala> def print_format(s:String) : String = {
     | val run = "string value is : " + s
     | return run
     | }

scala> sc.makeRDD(Array("abc","cde","efg")).map(x=> print_format(x)).foreach(println(_))
#输出:
string value is : cde
string value is : abc
string value is : efg
           

2.1.6 filter 对输入和输出进行过滤

# 示例,数据源:
[[email protected] example]# hadoop fs -text /test
    hadoop  
    spark   
    python  
    scala   
    java    
    c++     
    c++     
# 想要实现的功能:过滤第3列小于或等于10,第2列是C++的项
sc.textFile("hdfs://master:9000/test").filter{x=>
    val fileds = x.split("\t")
    fileds().toInt > 
    }.map { x=>
    val fileds = x.split("\t")
    fileds().toString + "\t" + fileds().toString + "\t" + fileds().toString
    }.filter {x =>
    val fileds = x.split("\t")
    fileds().toString != "c++"
    }.foreach(println(_))

# 输出:
    scala   
    java    
           

2.1.7 mapPartitions(fun) 算子 和map类似,只是map(fun) 对fun调用的次数取决于RDD中元素的个数,而mapPartitions对fun调用的次数取决于partition分区的个数,比map明细要高效很多

# 实验说明
input 输入数据有个分区[,,],[,,]
def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
preservesPartitioning 这个参数表示新生成的RDD是否与原RDD保持同一的分区,默认是false
mapPartitions 
输入:迭代器(Iterator),列表,数组和字符串可以转换成迭代器
输出:也是迭代器(Iterator)

[root@master example]# cat test_mapPartitions.scala 
package spark.example

import scala.Iterator
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object Test_mapPartitions {
    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("Test mapPartitions")
        val spark = new SparkContext(conf)
        val input = spark.parallelize(List(,,,,,),)
        val result = input.mapPartitions(partition => Iterator(sumOfEveryPartition(partition)))
        // 传入参数是partition(可迭代对象)返回的也是一个可迭代对象
        result.collect().foreach(println(_))
    }
    def sumOfEveryPartition(input: Iterator[Int]): Int = {
        var total = 0
        input.foreach { elem =>
            total += elem // 每个分区的所有元素相加,返回最终的和
        }
        return total //可以省略return这个关键字
    }
}
# 输出结果:
    // = (++)
   // = (++)

#在看一个有关mapPartitions 算子的实例
# rdd1 分两个区 [1,2],[3,4,5]
# 目前还不清楚为什么不是[1,2,3],[4,5]  
var rdd1 = sc.makeRDD( to ,)
var rdd3 = rdd1.mapPartitions { x=> {
    var result = List[Int]()
    var i = 
    # x 是一个Iterator(可迭代对象)。
    # x.hasNext 判断这个可迭代对象中是否有元素
    # x.next() 从左往右开始弹出,类似于python list.pop()
    while(x.hasNext) {
      i += x.next()}
      # List.::(3) 表示向列表中添加一个元素3,放在第一个位置
      # var a = List(2,3)
      # a.::(1) 输出:List[Int] = List(1, 2, 3),但是a并没有变化,还是有两个元素1和2
      # val r = a.::(1).iterator r是一个可迭代对象,里面有3个元素1,2,3
      # val a = result.::(i).iterator  a是一个可迭代对象,里面有1个元素3
      result.::(i).iterator # 每个分区所有的元素求和所得的值添加至可迭代对象中
      }
}
rdd3.collect()
输出:
Array[Int] = Array(, )
           

2.1.8 mapValues() 算子,类似于map的功能,只是这个算子只适用于k-v 这种键值对。主要实现的功能:key保持不动,对values进行修改

val a = sc.parallelize(List("dog","tiger","lion","cat","panther","eagle"),)
# "<" + _ + ">" === "<" + input + ">"
a.mapValues("<" + _ + ">").collect().foreach(print(_))
输出: 
(,<dog>)(,<tiger>)(,<lion>)(,<cat>)(,<panther>)(,<eagle>)
           

2.1.9 mapPartitionsWithIndex 算子,类似于mapPartitions算子,只是多了一个Index 参数

def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

var rdd1 = sc.makeRDD( to ,)

var rdd2 = rdd1.mapPartitionsWithIndex { (x,iter) => {
    var result = List[String]()
    var i = 
    while(iter.hasNext) {
        i += iter.next()}
    result.::(x + "|" + i).iterator
    }
}
rdd2.collect().foreach(println(_))
# 输出:
|
|
           

2.1.10 spark 2.2.1 没有 mapWith 这个算子

2.2 RDD action 算子

2.2.1 foreach(println(_)):可以随时输出RDD 中的内容,一般用在RDD之间转换的时候,观察RDD内容的变化情况

2.2.2 collect 算子,返回RDD 中所有的元素,按照输入的顺序返回,不排序

scala> val b = sc.parallelize(List((,),(,),(,),(,),(,),(,),(,))).collect()
        b: Array[(Int, Int)] = Array((,), (,), (,), (,), (,), (,), (,))
           

2.2.3 reduce 聚合,求一个序列中所有元素之和,先将序列中的第一个和第二个元素相加所得的值和第3个数相加,一直进行到最后一个元素

scala> val c = sc.parallelize( to )
# 可以简写 reduce 默认做相加操作
scala> c.reduce(_+_)
res53: Int = 
scala> c.reduce((x,y) => x + y)
res54: Int = 

reduce 内部计算过程:
     +  = 
     +  = 
     +  = 
     +  = 
     +  = 
     +  = 
     +  = 
     +  = 
     +  = 
           

2.2.4 countByValue() 算子统计value的个数,返回(count(value),value)

scala> sc.parallelize(Array("hello", "world", "hello", "china", "hello")).countByValue()
res26: scala.collection.Map[String,Long] = Map(hello -> , world -> , china -> )
           

继续阅读