天天看點

大資料平台運維之Spark

大資料系列之運維(自主搭建的大資料平台)

(9)Spark運維

  1. 打開 Linux Shell 啟動 spark-shell終端,将啟動的程式程序資訊以文本形式送出到答題框中。
[[email protected] ~]# spark-shell
20/03/31 21:31:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://master:4040
Spark context available as 'sc' (master = local[*], app id = local-1585661525987).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_171)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

           
  1. 啟動 spark-shell 後,在 scala 中加載資料“1,2,3,4,5,6,7,8,9,10”,求這些資料的 2 倍乘積能夠被 3 整除的數字,并通過 toDebugString 方法來檢視 RDD 的譜系。
scala> val num = sc.parallelize(1 to 10)
num: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val doublenum = num.map(_*2)
doublenum: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25

scala> val threenum = doublenum.filter(_%3==0)
threenum: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at filter at <console>:25

scala> threenum.collect
res0: Array[Int] = Array(6, 12, 18)                                             

scala> threenum.toDebugString
res1: String = (2) MapPartitionsRDD[2] at filter at <console>:25 []
 |  MapPartitionsRDD[1] at map at <console>:25 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:24 []

           
  1. 啟動 spark-shell 後,在scala 中加載 Key-Value 資料(“A”,1),(“B”,2),(“C”,3),(“A”,4),(“B”,5),(“C”,4),(“A”,3),(“A”,9),(“B”,4),(“D”,5)将這些資料以 Key 為基準進行升序排序,并以 Key 為基準進行分組。
scala> val kv = sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5),("C",4),("A",3),("A",9),("B",4),("D",5)))
kv: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> val kv1 = kv.sortByKey().collect
kv1: Array[(String, Int)] = Array((A,1), (A,4), (A,3), (A,9), (B,2), (B,5), (B,4), (C,3), (C,4), (D,5))
scala> kv.groupByKey().collect
res5: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(2, 5, 4)), (D,CompactBuffer(5)), (A,CompactBuffer(1, 4, 3, 9)), (C,CompactBuffer(3, 4)))

           
  1. 啟動 spark-shell 後,在 scala 中加載 Key-Value 資料(“A”,1),(“B”,3),(“C”,5),(“D”,4),(“B”,7),(“C”,4),(“E”,5),(“A”,8),(“B”,4),(“D”,5)将這些資料以 Key 為基準進行升序排序,并對相同的 Key進行 Value 求和計算。
scala> val kv2 = sc.parallelize(List(("A",1),("B",3),("C",5),("D",4),("B",7),("C",4),("E",5),("A",8),("B",4),("D",5)))
kv2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:24

scala> val kv3 = kv2.sortByKey().collect
kv3: Array[(String, Int)] = Array((A,1), (A,8), (B,3), (B,7), (B,4), (C,5), (C,4), (D,4), (D,5), (E,5))
scala> kv2.reduceByKey(_+_).collect
res8: Array[(String, Int)] = Array((B,14), (D,9), (A,9), (C,9), (E,5))

           
  1. 啟動 spark-shell 後,在 scala 中加載 Key-Value 資料(“A”,4),(“A”,2),(“C”,3),(“A”,4),(“B”,5),(“C”,3),(“A”,4)以 Key 為基準進行去重操作,并通過 toDebugString 方法來檢視 RDD 的譜系。
scala> val kv4 = sc.parallelize(List(("A",4),("A",2),("C",3),("A",4),("B",5),("C",3),("A",4)))
kv4: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at <console>:24

scala> kv4.distinct.collect
res14: Array[(String, Int)] = Array((A,4), (B,5), (C,3), (A,2))

scala> kv4.toDebugString
res15: String = (2) ParallelCollectionRDD[15] at parallelize at <console>:24 []

           
  1. 啟動 spark-shell 後,在 scala 中加載兩組 Key-Value 資料(“A”,1),(“B”,2),(“C”,3),(“A”,4),(“B”,5)、(“A”,1),(“B”,2),(“C”,3),(“A”,4),(“B”,5),将兩組資料以 Key 為基準進行 JOIN 操作。
scala> val kv5 = sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
kv5: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[19] at parallelize at <console>:24

scala> val kv6 = sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
kv6: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[20] at parallelize at <console>:24

scala> kv5.join(kv6).collect
res16: Array[(String, (Int, Int))] = Array((B,(2,2)), (B,(2,5)), (B,(5,2)), (B,(5,5)), (A,(1,1)), (A,(1,4)), (A,(4,1)), (A,(4,4)), (C,(3,3)))

           
  1. 登入 spark-shell,定義 i 值為 1,sum 值為 0,使用 while 循環,求從 1 加到 100 的值,最後使用 scala 的标準輸出函數輸出 sum 值。
scala> var i = 1
i: Int = 1
scala> var sum = 0
sum: Int = 0
scala> while(i<=100){
     | sum += i
     | i=i+1
     | }
scala> println(sum)
5050

           
  1. 登入 spark-shell,定義 i 值為 1,sum 值為 0,使用 for 循環,求從 1 加到100 的值,最後使用 scala 的标準輸出函數輸出 sum 值。
scala> var i = 1
i: Int = 1
scala> var sum = 0
sum: Int = 0
scala> for(i<-1 to 100){
     | sum += i
     | }
scala> println(sum)
5050

           
  1. 登入 spark-shell,定義變量 i、sum,并賦 i 初值為 1、sum 初值為 0、步長為 3,使用 while 循環,求從 1 加到 2018 的值,最後使用 scala 的标準輸出函數輸出 sum 值。
scala> var i = 1
i: Int = 1

scala> var sum = 0
sum: Int = 0

scala> while(i<=2018){
     | sum+=i
     | i=i+3
     | }

scala> println(sum)
679057

           
  1. 任何一種函數式語言中,都有 map 函數與 faltMap 這兩個函數:map 函數的用法,顧名思義,将一個函數傳入 map 中,然後利用傳入的這個函數,将集合中的每個元素處理,并将處理後的結果傳回。而flatMap與map唯一不一樣的地方就是傳入的函數在處理完後傳回值必須是 List,是以需要傳回值是 List 才能執行 flat 這一步。

    (1)登入 spark-shell,自定義一個 list,然後利用 map 函數,對這個 list 進

    行元素乘 2 的操作。定義值為(1,2,3,4,5,6,7,8,9)

scala> import scala.math._
import scala.math._
scala> val nums=List(1,2,3,4,5,6,7,8,9)
nums: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> nums.map(x=>x*2)
res23: List[Int] = List(2, 4, 6, 8, 10, 12, 14, 16, 18)

           

(2)登入 spark-shell,自定義一個 list,然後利用 flatMap 函數将 list轉換成Char類型List變量,并轉換為大寫。定義值為(“Hadoop”,“Spark”,“Java”)

scala> import scala.math._
import scala.math._
scala> val data = List("Hadoop","Spark","Java")
data: List[String] = List(Hadoop, Spark, Java)

scala> data.flatMap(_.toUpperCase)
res24: List[Char] = List(H, A, D, O, O, P, S, P, A, R, K, J, A, V, A)

           
  1. 登入大資料雲主機 master 節點,在 root 目錄下建立一個 abc.txt,裡面的内容為:

    hadoop hive

    solr redis

    kafka hadoop

    storm flume

    sqoop docker

    spark spark

    hadoop spark

    elasticsearch hbase

    hadoop hive

    spark hive

    hadoop spark

    然後登入 spark-shell,首先使用指令統計 abc.txt 的行數,接着對 abc.txt 文檔中的單詞進行計數,并按照單詞首字母的升序進行排序,最後統計結果行數。

scala> val words = sc.textFile("file:///root/abc.txt").count
words: Long = 11
scala> val words = sc.textFile("file:///root/abc.txt").flatMap(_.split("\\W+")).map(x=>(x,1)).reduceByKey(_+_).sortByKey().collect
words: Array[(String, Int)] = Array((docker,1), (elasticsearch,1), (flume,1), (hadoop,5), (hbase,1), (hive,3), (kafka,1), (redis,1), (solr,1), (spark,5), (sqoop,1), (storm,1))
scala> val words = sc.textFile("file:///root/abc.txt").flatMap(_.split("\\W+")).map(x=>(x,1)).reduceByKey(_+_).sortByKey().count
words: Long = 12

           
注意:在abc.txt檔案裡,不能有空行,空行也會計算進去。
  1. 登入 spark-shell,自定義一個 List,使用 spark 自帶函數對這個 List 進行去重操作。
scala> val data = sc.parallelize(List(1,2,3,4,5,1,2,3,4,5,3,2))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[68] at parallelize at <console>:27

scala> data.distinct.collect
res25: Array[Int] = Array(4, 2, 1, 3, 5)

           
  1. 登入“spark-shell”互動界面。導入如下資料:

    2017-01-01 a

    2017-01-01 b

    2017-01-01 c

    2017-01-02 a

    2017-01-02 b

    2017-01-02 d

    2017-01-03 b

    2017-01-03 e

    2017-01-03 f

    由導入資料可見,2017-01-01起新增三個使用者(a、b、c),2017-01-02新增一個使用者(d),2017-01-03新增兩個使用者(e、f);使用 spark 工具,統計每個日期

    新增加的使用者數,最後顯示統計結果。

scala> val rdd1 = spark.sparkContext.parallelize(Array(("2017-01-01","a"),("2017-01-01","b"),("2017-01-01","c"),("2017-01-02","a"),("2017-01-02","b"),("2017-01-02","d"),("2017-01-03","b"),("2017-01-03","e"),("2017-01-03","f")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[72] at parallelize at <console>:26
scala> val rdd2 = rdd1.map(kv=>(kv._2,kv._1))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[73] at map at <console>:30

scala> val rdd3 = rdd2.groupByKey()
rdd3: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[74] at groupByKey at <console>:28

scala> val rdd4 = rdd3.map(kv=>(kv._2.min,1))
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[75] at map at <console>:30

scala> rdd4.countByKey().foreach(println)
(2017-01-03,2)
(2017-01-01,3)
(2017-01-02,1)

           
  1. 登入“spark-shell”互動界面。定義一個函數,函數的作用是比較傳入兩個變量,傳回最大的那個。

    定義一個名字為MaxNum帶參數也帶傳回值的函數,函數的作用是比較傳入的兩個int類型的變量,傳回大的那個。定義完成後,使用23876567和23786576驗證函數。

scala> def MaxNum(x:Int,y:Int):Int = if(x>y) x else y
MaxNum: (x: Int, y: Int)Int

scala> MaxNum(23876567,23786576)
res27: Int = 23876567

           
  1. 登入“spark-shell”互動界面。定義一維整型數組AA=[3,4,3,2,44,3,22,231,4,5,2,345,2,2,11,124,35,349,34],自定義函數sum(),根據傳入的整型數組名和數組的長度,求為i為奇數的各AA[i]之和。
scala> def sum(aa:Array[Int],n:Int):Int={
     | var he=0
     | var i=0
     | for(i<-0 until(n,2))
     | he=he+aa(i)
     | he
     | }
sum: (aa: Array[Int], n: Int)Int

scala> var aa = Array(3,4,3,2,44,3,22,231,4,5,2,345,2,2,11,124,35,349,34)
aa: Array[Int] = Array(3, 4, 3, 2, 44, 3, 22, 231, 4, 5, 2, 345, 2, 2, 11, 124, 35, 349, 34)
scala> aa.length
res37: Int = 19
scala> sum(aa,19)
res28: Int = 160

           

在此感謝先電雲提供的題庫。

感謝Apache開源技術服務支援

感謝抛物線、mn525520、菜鳥一枚2019三位部落客的相關部落格。

繼續閱讀