天天看點

2021年前100名Apache Spark面試問題和解答

  Apache Spark面試問題答案

  一, Spark Driver在spark應用程式中的作用是什麼?

  Spark驅動程式是定義知識RDD的轉換和操作并向主伺服器送出請求的程式。Spark驅動程式是在機器的主節點上運作的程式,它聲明對知識RDD的轉換和操作。

  簡單來說,Spark中的驅動程式建立SparkContext,連接配接到給定的Spark Master。它将RDD圖表聯合提供給Master,無論獨立叢集管理器在哪裡運作。

  二, Apache Spark叢集中的工作節點是什麼?

  Apache Spark遵循主/從架構,具有一個主或驅動程式程序以及多個從屬或工作程序

  1. master是運作main()程式的驅動程式,其中建立了spark上下文。然後,它與叢集管理器互動以安排作業執行并執行任務。

  2.工作程式由可以并行運作的程序組成,以執行驅動程式安排的任務。這些過程稱為執行程式。

  每當用戶端運作應用程式代碼時,驅動程式都會執行個體化Spark Context,将轉換和操作轉換為執行的邏輯DAG。然後将此邏輯DAG轉換為實體執行計劃,然後将其細分為較小的實體執行單元。然後,驅動程式與叢集管理器互動,以協商執行應用程式代碼任務所需的資源。然後,叢集管理器與每個工作節點互動,以了解每個節點中運作的執行程式的數量。

  工作節點/執行者的角色:

  1.執行應用程式代碼的資料處理

  2.讀取資料并将資料寫入外部源

  3.将計算結果存儲在記憶體或磁盤中。

  執行程式在Spark應用程式的整個生命周期中運作。這是執行者的靜态配置設定。使用者還可以決定運作任務需要多少執行程式,具體取決于工作負載。這是執行者的動态配置設定。

  在執行任務之前,執行程式通過叢集管理器向驅動程式注冊,以便驅動程式知道有多少執行程式正在運作以執行計劃任務。然後,執行程式通過叢集管理器開始執行工作節點排程的任務。

  每當任何工作節點發生故障時,需要執行的任務将自動配置設定給任何其他工作節點

  三,為什麼變換在Spark中變得懶散?

  每當在Apache Spark中執行轉換操作時,它都會被懶惰地評估。在執行操作之前不會執行。Apache Spark隻是将變換操作的條目添加到計算的DAG(有向無環圖),這是一個沒有循環的有向有限圖。在該DAG中,所有操作都被分類到不同的階段,在單個階段中沒有資料的混亂。

  通過這種方式,Spark可以通過完整地檢視DAG來優化執行,并将适當的結果傳回給驅動程式。

  例如,考慮HDFS中的1TB日志檔案,其中包含錯誤,警告和其他資訊。以下是驅動程式中正在執行的操作:

  1. 建立此日志檔案的RDD

  2.對此RDD執行flatmap()操作,以根據制表符分隔符拆分日志檔案中的資料。

  3.執行filter()操作以提取僅包含錯誤消息的資料

  4.執行first()操作以僅擷取第一條錯誤消息。

  如果熱切評估上述驅動程式中的所有轉換,那麼整個日志檔案将被加載到記憶體中,檔案中的所有資料将根據頁籤進行拆分,現在要麼需要在某處寫入FlatMap的輸出或将其儲存在記憶中。Spark需要等到執行下一個操作,并且資源被阻塞以進行即将進行的操作。除此之外,每個操作spark都需要掃描所有記錄,比如FlatMap處理所有記錄然後再次在過濾操作中處理它們。

  另一方面,如果所有轉換都被懶惰地評估,Spark将整體檢視DAG并為應用程式準備執行計劃,現在該計劃将被優化,操作将被合并/合并到階段然後執行将開始。Spark建立的優化計劃可提高作業效率和整體吞吐量。

  通過Spark中的這種惰性評估,驅動程式和叢集之間的切換次數也減少了,進而節省了記憶體中的時間和資源,并且計算速度也有所提高。

  四,我可以在沒有Hadoop的情況下運作Apache Spark嗎?

  是的,Apache Spark可以在沒有Hadoop,獨立或在雲中運作。Spark不需要Hadoop叢集就可以工作。Spark還可以讀取并處理來自其他檔案系統的資料。HDFS隻是Spark支援的檔案系統之一。

  Spark沒有任何存儲層,是以它依賴于分布式存儲系統之一,用于分布式計算,如HDFS,Cassandra等。

  但是,在Hadoop(HDFS(用于存儲)+ YARN(資料總管))上運作Spark有很多優點,但這不是強制性要求。Spark是一種用于分布式計算的。在這種情況下,資料分布在計算機上,Hadoop的分布式檔案系統HDFS用于存儲不适合記憶體的資料。

  使用Hadoop和Spark的另一個原因是它們都是開源的,并且與其他資料存儲系統相比,它們可以相當容易地互相內建。

  五,.在Spark中解釋累加器。

  這個讨論是繼續問題,命名Apache Spark中可用的兩種類型的共享變量。

  累加器介紹:

  Accumulator是Apache Spark中的共享變量,用于聚合群集中的資訊。換句話說,将來自工作節點的資訊/值聚合回驅動程式。(我們将在下面的會議中看到)為什麼累加器:當我們在map(),filter()等操作中使用函數時,這些函數可以使用驅動程式中這些函數作用域外定義的變量。當我們将任務送出到叢集時,叢集上運作的每個任務都會獲得這些變量的新副本,并且這些變量的更新不會傳播回驅動程式。累加器降低了此限制。用例 :累加器最常見的用途之一是計算作業執行期間發生的事件以進行調試。意思是數不了。輸入檔案中的空白行,沒有。在會話期間來自網絡的錯誤資料包,在奧運會資料分析期間,我們必須找到我們在SQL查詢中所說的年齡(年齡!='NA'),簡短地查找錯誤/損壞的記錄。例子 :scala> val record=spark.read.textFile("/home/hdadmin/wc-data-blanklines.txt")record: org.apache.spark.sql.Dataset[String]=[value: string]

scala> val emptylines=sc.accumulator(0)warning: there were two deprecation warnings; re-run with -deprecation for detailsemptylines: org.apache.spark.Accumulator[Int]=0

scala> val processdata=record.flatMap(x=> {if(x=="") emptylines +=1 x.split(" ") })

processdata: org.apache.spark.sql.Dataset[String]=[value: string]scala> processdata.collect16/12/02 20:55:15 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes輸出:res0:Array [String]=Array(DataFlair,提供,教育訓練,開,切,邊緣,技術。,“”,DataFlair,是,上司,教育訓練,提供者,我們,有,訓練,1000s, ,候選人,教育訓練,焦點,實踐,方面,哪些,工業,需要,而不是理論,知識。,“”,DataFlair,幫助,組織,解決,BigData,問題。, “”,Javadoc,是一個工具,用于生成API,文檔,HTML,格式,文檔,注釋,内容,源代碼,代碼。,它,可以,隻下載下傳,僅作為,部分,Java,2,SDK。,To,see,documentation,generated,by,Javadoc,tool ,, go,to,J2SE,1.5.0,API,Documentation。,“”,Javadoc,常見問題解答, - ,這,常見問題,涵蓋,在哪裡,到,下載下傳,Javadoc,工具,如何,到,找到,清單,已知,錯誤和功能,reque ...scala> println(“空行數:”+ emptylines.value)空行數:10程式的解釋和結論:在上面的例子中,我們建立了一個Accumulator [Int] 'emptylines'在這裡,我們想找到沒有。我們處理過程中的空白行。之後,我們應用flatMap()轉換來處理我們的資料,但我們想要找出沒有。空行(空白行)是以在flatMap()函數中,如果我們遇到任何空行,累加器空行增加1,否則我們按空格分割行。之後,我們檢查輸出和否。的空白行。我們通過調用sc.accumulator(0)來建立具有初始值的累加器,通過調用sc.accumulator(0)即spark Context.accumulator(初始值),其中傳回類型為initalValue {org.apache.spark.Accumulator [T],其中T為initalValue]最後,我們調用累加器上的value()屬性來通路它的值。請注意,工作節點上的任務不能通路累加器的value屬性,是以在任務的上下文中,累加器是隻寫變量。accumulator的value()屬性僅在驅動程式中可用。我們也可以算不上。在變換/動作的幫助下,我們需要一個額外的操作,但是在累加器的幫助下,我們可以計算一下。我們加載/處理資料時的空行(或更廣泛的事件)。

  六, Driver程式在Spark Application中的作用是什麼?

  驅動程式負責在叢集上啟動各種并行操作。驅動程式包含應用程式的main()函數。它是運作使用者代碼的過程,使用者代碼又建立SparkContext對象,建立RDD并在RDD上執行轉換和操作操作。驅動程式通過SparkContext對象通路Apache Spark,該對象表示與計算叢集的連接配接(從Spark 2.0開始,我們可以通過SparkSession通路SparkContext對象)。驅動程式負責将使用者程式轉換為稱為任務的實體執行單元。它還定義了叢集上的分布式資料集,我們可以對資料集(轉換和操作)應用不同的操作。Spark程式建立一個名為Directed Acyclic graph的邏輯計劃,當驅動程式運作時,該計劃由驅動程式轉換為實體執行計劃。

  七,如何識别給定的操作是程式中的Transformation / Action?

  為了識别操作,需要檢視操作的傳回類型。

  如果操作在這種情況下傳回一個新的RDD,則操作是“轉換”如果操作傳回除RDD之外的任何其他類型,則操作為“Action”

  是以,Transformation從現有RDD(前一個)構造新的RDD,而Action根據應用的轉換計算結果,并将結果傳回給驅動程式或将其儲存到外部存儲

  八,命名Apache Spark中可用的兩種類型的共享變量。

  Apache Spark中有兩種類型的共享變量:

  (1)累加器:用于聚合資訊。

  (2)廣播變量:有效地配置設定大值。

  當我們将函數傳遞給Spark時,比如說filter(),這個函數可以使用在函數外部但在Driver程式中定義的變量,但是當我們将任務送出給Cluster時,每個工作節點都獲得一個新的變量副本并更新從這些變量不會傳播回Driver程式。

  累加器和廣播變量用于消除上述缺點(即我們可以将更新的值恢複到我們的驅動程式)

  九,使用Apache Spark時開發人員常見的錯誤是什麼?

  1)DAG的管理- 人們經常在DAG控制中犯錯誤。始終嘗試使用reducebykey而不是groupbykey。ReduceByKey和GroupByKey可以執行幾乎相似的功能,但GroupByKey包含大資料。是以,盡量使用ReduceByKey。始終盡量減少地圖的側面。盡量不要在分區中浪費更多時間。盡量不要随便洗牌。盡量遠離Skews和分區。

  2)保持随機塊的所需大小

  十,預設情況下,Apache Spark中的RDD中建立了多少個分區?

  預設情況下,Spark為檔案的每個塊建立一個分區(對于HDFS)HDFS塊的預設塊大小為64 MB(Hadoop版本1)/ 128 MB(Hadoop版本2)。但是,可以明确指定要建立的分區數。例1:沒有指定分區val rdd1=sc.textFile("/home/hdadmin/wc-data.txt")例2:以下代碼建立了10個分區的RDD,因為我們指定了no。分區。val rdd1=sc.textFile("/home/hdadmin/wc-data.txt", 10)可以通過以下方式查詢分區數:rdd1.partitions.lengthORrdd1.getNumPartitions最佳情況是我們應該按照以下方式制作RDD:Cluster中的核心數量=否。分區

  十一,.為什麼我們需要壓縮以及支援的不同壓縮格式是什麼?

  在Big Data中,當我們使用壓縮時,它可以節省存儲空間并減少網絡開銷。可以在将資料寫入HDFS時指定壓縮編碼(Hadoop格式)人們也可以讀取壓縮資料,因為我們也可以使用壓縮編解碼器。以下是BigData中不同的壓縮格式支援: gzip lzo bzip2 Zlib * Snappy

  十二,解釋過濾器轉換。

  Apache Spark中的filter()轉換将函數作為輸入。它傳回一個RDD,它隻有通過輸入函數中提到的條件的元素。示例:val rdd1=sc.parallelize(List(10,20,40,60))val rdd2=rdd2.filter(x=> x !=10)println(rdd2.collect())産量10

  十三,如何在互動式shell中啟動和停止scala?

  在Scala中啟動互動式shell的指令:

  >>>> bin / spark-shell

  首先進入spark目錄即

  hdadmin@ubuntu:~$ cd spark-1.6.1-bin-hadoop2.6/

  hdadmin@ubuntu:~/spark-1.6.1-bin-hadoop2.6$ bin/spark-shell

  shisi

  -------------------------------------------------- -------------------------------------------------- --------------------------

  在Scala中停止互動式shell的指令:

  scala>Press (Ctrl+D)

  可以看到以下消息

  scala> Stopping spark context.

  十四,解釋sortByKey()操作

  > sortByKey()是一種轉換。

  >它傳回按鍵排序的RDD。

  >排序可以在(1)升序OR(2)降序OR(3)自定義排序中完成

  從:

  http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#212_SortByKey

  他們将适用于範圍内具有隐式排序[K]的任何鍵類型K. 對于所有标準基元類型,已經存在排序對象。使用者還可以為自定義類型定義自己的排序,或覆寫預設排序。将使用最近範圍内的隐式排序。

  當調用 (K,V)資料集

  (其中k為Ordered)時,傳回按鍵按升序或降序排序的(K,V)對資料集,如升序參數中所指定。

  示例:

  

  val rdd1=sc.parallelize(Seq(("India",91),("USA",1),("Brazil",55),("Greece",30),("China",86),("Sweden",46),("Turkey",90),("Nepal",977)))

  val rdd2=rdd1.sortByKey()

  rdd2.collect();

  輸出:

  數組[(String,Int)]=(數組(巴西,55),(中國,86),(希臘,30),(印度,91),(尼泊爾,977),(瑞典,46),(火雞,90),(美國,1)

  val rdd2=rdd1.sortByKey(false)

  Array [(String,Int)]=(Array(USA,1),(Turkey,90),(Sweden,46),(Nepal,977),(India,91),(Greece,30),(中國,86),(巴西,55)

  十五,解釋Spark中的distnct(),union(),intersection()和substract()轉換

  union()轉換

  最簡單的設定操作。rdd1.union(rdd2),它輸出一個包含兩個來源資料的RDD。如果輸入RDD中存在重複項,則union()轉換的輸出也将包含重複項,可以使用distinct()進行修複。

  例

  val u1=sc.parallelize(List("c","c","p","m","t"))

  val u2=sc.parallelize(List("c","m","k"))

  val result=u1.union(u2)

  result.foreach{println}

  c

  p

  m

  t

  k

  十六,在apache spark中解釋foreach()操作

  > foreach()操作是一個動作。

  >它不會傳回任何值。

  >它對RDD的每個元素執行輸入功能。

  來自:http:

  //data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#39_Foreach

  它在RDD中的每個項目上執行該功能。它适用于編寫資料庫或釋出到Web服務。它為每個資料項執行參數減少功能。

  例:

  val mydata=Array(1,2,3,4,5,6,7,8,9,10)

  val rdd1=sc.parallelize(mydata)

  rdd1.foreach{x=>println(x)}

  OR

  rdd1.foreach{println}

  1

  2

  3

  4

  5

  6

  7

  8

  9

  10

  十七,Apache Spark中的groupByKey vs reduceByKey

  在對(K,V)對的資料集應用groupByKey()時,資料根據另一個RDD中的鍵值K進行混洗。在這種轉變中,許多不必要的資料通過網絡傳輸。

  Spark提供了将資料存儲到單個執行程式機器上的資料多于記憶體中資料時儲存到磁盤的功能。

  val data=spark.sparkContext.parallelize(Array(('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)),3)

  val group=data.groupByKey().collect()

  group.foreach(println)

  在對資料集(K,V)應用reduceByKey時,在對資料進行混合之前,組合具有相同密鑰的同一機器上的對。

  val words=Array("one","two","two","four","five","six","six","eight","nine","ten")

  val data=spark.sparkContext.parallelize(words).map(w=> (w,1)).reduceByKey(_+_)

  data.foreach(println)

  十八,.解釋mapPartitions()和mapPartitionsWithIndex()

  Mappartitions是一種類似于Map的轉換。

  在Map中,函數應用于RDD的每個元素,并傳回結果RDD的每個其他元素。對于mapPartitions,該函數将應用于RDD的每個分區,而不是每個元素,并傳回結果RDD的多個元素。在mapPartitions轉換中,性能得到改善,因為在地圖轉換中消除了每個元素的對象建立。

  由于mapPartitions轉換适用于每個分區,是以它将字元串或int值的疊代器作為分區的輸入。

  請考慮以下示例:

  val data=sc.parallelize(List(1,2,3,4,5,6,7,8), 2)

  Map:

  def sumfuncmap(numbers : Int) : Int=

  {

  var sum=1

  return sum + numbers

  }

  data.map(sumfuncmap).collect

  returns Array[Int]=Array(2, 3, 4, 5, 6, 7, 8, 9) //Applied to each and every element

  MapPartitions:

  def sumfuncpartition(numbers : Iterator[Int]) : Iterator[Int]=

  while(numbers.hasNext)

  sum=sum + numbers.next()

  return Iterator(sum)

  data.mapPartitions(sumfuncpartition).collect

  returns

  Array[Int]=Array(11, 27) // Applied to each and every element partition-wise

  MapPartitionsWithIndex類似于mapPartitions,除了它還需要一個參數作為輸入,它是分區的索引。

  十九,Apache Spark中的Map是什麼?

  Map是應用于RDD中每個元素的轉換,它提供了一個新的RDD作為結果。在Map轉換中,使用者定義的業務邏輯将應用于RDD中的所有元素。它類似于FlatMap,但與可以産生0,1或多個輸出的FlatMap不同,Map隻能産生一對一的輸出。 映射操作将長度為N的RDD轉換為另一個長度為N的RDD。

  A -------> a

  B -------> b

  C -------> c

  Map Operation

  映射轉換不會将資料從一個分區變為多個分區。它将使操作保持狹窄

  二十,Apache Spark中的FlatMap是什麼?

  FlatMap是Apache Spark中的轉換操作,用于從現有RDD 建立RDD。它需要RDD中的一個元素,并且可以根據業務邏輯生成0,1或多個輸出。它類似于Map操作,但Map産生一對一輸出。如果我們對長度為N的RDD執行Map操作,則輸出RDD的長度也為N.但對于FlatMap操作,輸出RDD可以根據業務邏輯的不同長度

  X ------ A x ----------- a

  Y ------ B y ----------- b,c

  Z ----- -C z ----------- d,e,f

  地圖操作FlatMap操作

  我們也可以說flatMap将長度為N的RDD轉換為N個集合的集合,然後将其展平為單個RDD結果。

  如果我們觀察下面的示例data1 RDD,它是Map操作的輸出,具有與資料RDD相同的元素,

  但是data2 RDD沒有相同數量的元素。我們還可以在這裡觀察data2 RDD是data1 RDD的平坦輸出

  pranshu @ pranshu-virtual-machine:?$ cat pk.txt

  1 2 3 4

  5 6 7 8 9

  10 11 12

  13 14 15 16 17

  18 19 20

  scala> val data=sc.textFile(“/ home / pranshu / pk.txt”)

  17/05/17 07:08:20 WARN SizeEstimator:無法檢查是否設定了UseCompressedOops; 假設是

  資料:org.apache.spark.rdd.RDD [String]=/home/pranshu/pk.txt MapPartitionsRDD [1] at textFile at :24

  scala> data.collect

  res0:Array [String]=Array(1 2 3 4,5 6 7 8 9,10 11 12,13 14 15 16 17,18 19 20)

  斯卡拉>

  scala> val data1=data.map(line=> line.split(“”))

  data1:org.apache.spark.rdd.RDD [Array [String]]=MapPartitionsRDD [2] at map at :26

  scala> val data2=data.flatMap(line=> line.split(“”))

  data2:org.apache.spark.rdd.RDD [String]=在mapMap at 的MapPartitionsRDD [3]:26

  scala> data1.collect

  res1:Array [Array [String]]=Array(數組(1,2,3,4),數組(5,6,7,8,9 ),數組(10,11,12),數組(13,14,15,16,17),數組(18,19,20))

  scala> data2.collect

  res2:Array [String]=數組(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18, 19,20)

  二十一,在Spark中解釋fold()操作。

  fold()是一個動作。它是廣泛的操作(即跨多個分區的shuffle資料并輸出單個值)它将函數作為輸入,具有兩個相同類型的參數,并輸出單個輸入類型的值。它類似于reduce,但還有一個參數'ZERO VALUE'(比如初始值),它将在每個分區的初始調用中使用。

  def fold(zeroValue:T)(op:(T,T)?T):T

  使用給定的關聯函數和中性“零值”聚合每個分區的元素,然後聚合所有分區的結果。函數op(t1,t2)允許修改t1并将其作為結果值傳回以避免對象配置設定; 但是,它不應該修改t2。

  這與在Scala等函數式語言中為非分布式集合實作的折疊操作有所不同。該折疊操作可以單獨地應用于分區,然後将這些結果折疊成最終結果,而不是以某種定義的順序将折疊順序地應用于每個元素。對于不可交換的函數,結果可能與應用于非分布式集合的折疊的結果不同。

  zeroValue:op運算符的每個分區的累積結果的初始值,以及組合的初始值來自op運算符的不同分區 - 這通常是中性元素(例如,清單連接配接為Nil或0為總結)

  操作:用于在分區内累積結果并組合來自不同分區的結果的運算符

  val rdd1=sc.parallelize(List(1,2,3,4,5),3)

  rdd1.fold(5)(_+_)

  Int=35

  val rdd1=sc.parallelize(List(1,2,3,4,5))

  Int=25

  rdd1.fold(3)(_+_)

  Int=27

  二十二,解釋API createOrReplaceTempView()

  它的基本資料集功能。它位于org.apache.spark.sql下def createOrReplaceTempView(viewName:String):Unit使用給定名稱建立臨時視圖。此臨時視圖的生命周期與用于建立此資料集的SparkSession相關聯。

  scala> val df=spark.read.csv("/home/hdadmin/titanic_data.txt")

  df: org.apache.spark.sql.DataFrame=[_c0: string, _c1: string ... 9 more fields]

scala> df.printSchema

  root

  |-- _c0: string (nullable=true)

  |-- _c1: string (nullable=true)

  |-- _c2: string (nullable=true)

  |-- _c3: string (nullable=true)

  |-- _c4: string (nullable=true)

  |-- _c5: string (nullable=true)

  |-- _c6: string (nullable=true)

  |-- _c7: string (nullable=true)

  |-- _c8: string (nullable=true)

  |-- _c9: string (nullable=true)

  |-- _c10: string (nullable=true)

scala> df.show

  +---+---+---+--------------------+-------+-----------+--------------------+-------+-----------------+-----+------+

  |_c0|_c1|_c2| _c3| _c4| _c5| _c6| _c7| _c8| _c9| _c10|

  | 1|1st| 1|Allen, Miss Elisa...|29.0000|Southampton| St Louis, MO| B-5| 24160 L221| 2|female|

  | 2|1st| 0|Allison, Miss Hel...| 2.0000|Southampton|Montreal, PQ / Ch...| C26| null| null|female|

  | 3|1st| 0|Allison, Mr Hudso...|30.0000|Southampton|Montreal, PQ / Ch...| C26| null|(135)| male|

  | 4|1st| 0|Allison, Mrs Huds...|25.0000|Southampton|Montreal, PQ / Ch...| C26| null| null|female|

  | 5|1st| 1|Allison, Master H...| 0.9167|Southampton|Montreal, PQ / Ch...| C22| null| 11| male|

  | 6|1st| 1| Anderson, Mr Harry|47.0000|Southampton| New York, NY| E-12| null| 3| male|

  | 7|1st| 1|Andrews, Miss Kor...|63.0000|Southampton| Hudson, NY| D-7| 13502 L77| 10|female|

  | 8|1st| 0|Andrews, Mr Thoma...|39.0000|Southampton| Belfast, NI| A-36| null| null| male|

  | 9|1st| 1|Appleton, Mrs Edw...|58.0000|Southampton| Bayside, Queens, NY| C-101| null| 2|female|

  | 10|1st| 0|Artagaveytia, Mr ...|71.0000| Cherbourg| Montevideo, Uruguay| null| null| (22)| male|

  | 11|1st| 0|Astor, Colonel Jo...|47.0000| Cherbourg| New York, NY| null|17754 L224 10s 6d|(124)| male|

  | 12|1st| 1|Astor, Mrs John J...|19.0000| Cherbourg| New York, NY| null|17754 L224 10s 6d| 4|female|

  | 13|1st| 1|Aubert, Mrs Leont...| NA| Cherbourg| Paris, France| B-35| 17477 L69 6s| 9|female|

  | 14|1st| 1|Barkworth, Mr Alg...| NA|Southampton| Hessle, Yorks| A-23| null| B| male|

  | 15|1st| 0| Baumann, Mr John D.| NA|Southampton| New York, NY| null| null| null| male|

  | 16|1st| 1|Baxter, Mrs James...|50.0000| Cherbourg| Montreal, PQ|B-58/60| null| 6|female|

  | 17|1st| 0|Baxter, Mr Quigg ...|24.0000| Cherbourg| Montreal, PQ|B-58/60| null| null| male|

  | 18|1st| 0| Beattie, Mr Thomson|36.0000| Cherbourg| Winnipeg, MN| C-6| null| null| male|

  | 19|1st| 1|Beckwith, Mr Rich...|37.0000|Southampton| New York, NY| D-35| null| 5| male|

  | 20|1st| 1|Beckwith, Mrs Ric...|47.0000|Southampton| New York, NY| D-35| null| 5|female|

  only showing top 20 rows

scala> df.createOrReplaceTempView("titanicdata")

  二十三,解釋Apache Spark中的values()操作

  values()是一種轉換。它僅傳回值的RDD。

  val rdd1=sc.parallelize(Seq((2,4),(3,6),(4,8),(5,10),(6,12),(7,14),(8,16),(9,18),(10,20)))

  val rdd2=rdd1.values

  rdd2.collect

  Array[Int]=Array(4, 6, 8, 10, 12, 14, 16, 18, 20)

  示例2:資料集中的值重複

  val rdd1=sc.parallelize(Seq((2,4),(3,6),(4,8),(2,6),(4,12),(5,10),(5,40),(10,40)))

  val rdd2=rdd1.keys

  val rdd3=rdd1.values

  rdd3.collect

  Array[Int]=Array(2, 3, 4, 2, 4, 5, 5, 10)

  Array[Int]=Array(4, 6, 8, 6, 12, 10, 40, 40)

  二十四,解釋Apache spark中的keys()操作。

  keys()是一種轉換。它傳回一個密鑰的RDD。val rdd1=sc.parallelize(Seq((2,4),(3,6),(4,8),(5,10),(6,12),(7,14),(8,16),(9,18),(10,20)))val rdd2=rdd1.keysrdd2.collect輸出:Array[Int]=Array(2, 3, 4, 5, 6, 7, 8, 9, 10)

  示例2 :(重複鍵 - 資料集中存在重複鍵)

  二十五,在Spark中解釋textFile與fullTextFile

  兩者都是org.apache.spark.SparkContext的方法。文本檔案() :def textFile(path:String,minPartitions:Int=defaultMinPartitions):RDD [String]從HDFS讀取文本檔案,本地檔案系統(在所有節點上都可用)或任何Hadoop支援的檔案系統URI,并将其作為字元串的RDD傳回例如sc.textFile(“/ home / hdadmin / wc-data.txt”)是以它将建立RDD,其中每個單獨的行都是一個元素。每個人都知道textFile的用法。wholeTextFiles():def wholeTextFiles(path:String,minPartitions:Int=defaultMinPartitions):RDD [(String,String)]從HDFS讀取文本檔案目錄,本地檔案系統(在所有節點上都可用)或任何支援Hadoop的檔案系統URI。而不是建立基本RDD,wholeTextFile()傳回pairRDD。例如,目錄中的檔案很少,是以通過使用wholeTextFile()方法,它建立了帶有檔案名的對RDD,路徑為鍵,值為整個檔案為字元串val myfilerdd=sc.wholeTextFiles("/home/hdadmin/MyFiles")val keyrdd=myfilerdd.keyskeyrdd.collectval filerdd=myfilerdd.valuesfilerdd.collect輸出:Array [String]=Array(檔案:/home/hdadmin/MyFiles/JavaSparkPi.java,檔案:/home/hdadmin/MyFiles/sumnumber.txt,檔案:/home/hdadmin/MyFiles/JavaHdfsLR.java,檔案: /home/hdadmin/MyFiles/JavaPageRank.java,檔案:/home/hdadmin/MyFiles/JavaLogQuery.java,檔案:/home/hdadmin/MyFiles/wc-data.txt,檔案:/ home / hdadmin / MyFiles / nosum。文本)Array [String]=Array(“/ 根據一個或多個貢獻者許可協定許可給Apache Software Foundation(ASF)。有關版權所有權的其他資訊,請參閱随此工作分發的NOTICE檔案。 ASF許可此根據Apache許可證2.0版(“許可證”)向您送出;除非符合許可,否則您不得使用此檔案。您可以在以下位置擷取許可副本:

http://www.apache.org/licenses/LICENSE-2.0

除非适用法律要求或書面同意,否則根據許可證分發的軟體按“原樣”分發,不附帶任何明示或暗示的擔保或條件。有關權限和 的特定語言,請參閱許可證。

  二十六,解釋Spark中的cogroup()操作

  >這是一個轉變。

  >它位于

  org.apache.spark.rdd.PairRDDFunctions包中

  def cogroup [W1,W2,W3](other1:RDD [(K,W1)],other2:RDD [(K,W2)],other3:RDD [(K,W3)]):RDD [(K,( Iterable [V],Iterable [W1],Iterable [W2],Iterable [W3]))]

  對于this或other1或other2或other3中的每個鍵k,傳回包含元組的結果RDD,該元組具有該鍵,other1,other2和other3中該鍵的值清單。

  val myrdd1=sc.parallelize(List((1,"spark"),(2,"HDFS"),(3,"Hive"),(4,"Flink"),(6,"HBase")))

  val myrdd2=sc.parallelize(List((4,"RealTime"),(5,"Kafka"),(6,"NOSQL"),(1,"stream"),(1,"MLlib")))

  val result=myrdd1.cogroup(myrdd2)

  result.collect

  Array [(Int,(Iterable [String],Iterable [String]))]=

  Array((4,(CompactBuffer(Flink),CompactBuffer(RealTime))),

  (1,(CompactBuffer(spark),CompactBuffer( stream,MLlib))),

  (6,(CompactBuffer(HBase),CompactBuffer(NOSQL))),

  (3,(CompactBuffer(Hive),CompactBuffer())),

  (5,(CompactBuffer(),CompactBuffer(Kafka) )),

  (2,(CompactBuffer(HDFS),CompactBuffer())))

  二十七,解釋Apache Spark中的pipe()操作

  這是一種轉變。def pipe(command:String):RDD [String] 将由管道元素建立的RDD傳回給分叉的外部程序。通常,Spark使用Scala,Java和Python來編寫程式。但是,如果這還不夠,并且想要管道(注入)用其他語言(如'R')編寫的資料,Spark會以pipe()方式的形式提供一般機制Spark在RDD上提供了pipe()方法。使用Spark的pipe()方法,可以編寫RDD的轉換,可以将RDD中的每個元素從标準輸入讀取為String。它可以将結果作為String寫入标準輸出。

  二十八,.解釋Spark coalesce()操作

  >這是一種轉變。

  org.apache.spark.rdd.ShuffledRDD包中

  DEF聚結(numPartitions:中等,洗牌:布爾=假,partitionCoalescer:選項[PartitionCoalescer]=Option.empty)(隐式ORD:訂購[(K,C)]=NULL):RDD [(K,C)]

  傳回一個縮減為numPartitions分區的新RDD。

  這會導緻較窄的依賴性,例如,如果從1000個分區到100個分區,則不會進行随機播放,而是100個新分區中的每個分區将聲明10個目前分區。

  但是,如果你正在進行激烈的合并,例如對numPartitions=1,這可能導緻你的計算發生在比你想要的更少的節點上(例如,在numPartitions=1的情況下,一個節點)。為避免這種情況,您可以傳遞shuffle=true。這将添加一個shuffle步驟,但意味着目前的上遊分區将并行執行(無論目前分區是什麼)。

  注意:使用shuffle=true,您實際上可以合并到更大數量的分區。如果您有少量分區(例如100),這可能會使一些分區異常大,這很有用。調用coalesce(1000,shuffle=true)将導緻1000個分區,并使用散列分區程式分發資料。

  //data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#214_Coalesce

  它會更改存儲資料的分區數。它将原始分區與新數量的分區相結合,是以可以減少分區數量。它是重新分區的優化版本,允許資料移動,但前提是您要減少RDD分區的數量。過濾大型資料集後,它可以更有效地運作操作。

  val myrdd1=sc.parallelize(1 to 1000, 15)

  myrdd1.partitions.length

  val myrdd2=myrdd1.coalesce(5,false)

  myrdd2.partitions.length

  Int=5

  Int=15

  二十九,解釋Spark中的repartition()操作

  repartition()是一種轉變。

  >此函數更改參數numPartitions(numPartitions:Int)中提到的分區數

  >它位于包

  org.apache.spark.rdd.ShuffledRDD中

  def repartition(numPartitions:Int)(隐式ord:Ordering [(K,C)]=null):RDD [(K,C)]

  傳回一個具有正好numPartitions分區的新RDD。

  可以增加或減少此RDD中的并行度。在内部,它使用shuffle重新配置設定資料。

  如果要減少此RDD中的分區數,請考慮使用coalesce,這可以避免執行shuffle。

  來自:

  http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/

  重新分區将重新調整RDD中的資料,以生成您請求的最終分區數。它可以減少或增加整個網絡中的分區數量和資料。

  val rdd1=sc.parallelize(1 to 100, 3)

  rdd1.getNumPartitions

  val rdd2=rdd1.repartition(6)

  rdd2.getNumPartitions

  Int=3

  Int=6

  三十,.解釋Apache Spark中的fullOuterJoin()操作

  >這是轉型。

  def fullOuterJoin [W](其他:RDD [(K,W)]):RDD [(K,(Option [V],Option [W]))]

  執行此和其他的完全外部聯接。

  對于此中的每個元素(k,v),得到的RDD将包含所有對(k,(Some(v),Some(w)))用于其他w,

  或對(k,(Some(v)) ,無)))如果其他元素沒有密鑰k。

  類似地,對于其他元素(k,w),得到的RDD将包含所有對(k,(Some(v),Some(w)))中的v,

  或者對(k,(None,一些(w)))如果其中沒有元素具有密鑰k。

  使用現有的分區程式/并行級别對生成的RDD進行散列分區。

  val frdd1=sc.parallelize(Seq(("Spark",35),("Hive",23),("Spark",45),("HBase",89)))

  val frdd2=sc.parallelize(Seq(("Spark",74),("Flume",12),("Hive",14),("Kafka",25)))

  val fullouterjoinrdd=frdd1.fullOuterJoin(frdd2)

  fullouterjoinrdd.collect

  Array [(String,(Option [Int],Option [Int]))]=Array((Spark,(Some(35),Some(74))),(Spark,(Some(45),Some( 74))),(Kafka,(無,有些(25))),(Flume,(無,有些(12))),(Hive,(Some(23),Some(14))),(HBase, (一些(89),無)))

  三十一. Expain Spark leftOuterJoin()和rightOuterJoin()操作

  > leftOuterJoin()和rightOuterJoin()都是轉換。

  >兩者都在

  leftOuterJoin():

  def leftOuterJoin [W](其他:RDD [(K,W)]):RDD [(K,(V,Option [W]))]

  執行此和其他的左外連接配接。對于此中的每個元素(k,v),得到的RDD将包含w中的所有對(k,(v,Some(w))),或者對(k,(v,None))如果不包含其他元素有關鍵k。使用現有分區程式/并行級别對輸出進行散列分區。

  leftOuterJoin()在兩個RDD之間執行連接配接,其中鍵必須存在于第一個RDD中

  val rdd1=sc.parallelize(Seq(("m",55),("m",56),("e",57),("e",58),("s",59),("s",54)))

  val rdd2=sc.parallelize(Seq(("m",60),("m",65),("s",61),("s",62),("h",63),("h",64)))

  val leftjoinrdd=rdd1.leftOuterJoin(rdd2)

  leftjoinrdd.collect

  Array [(String,(Int,Option [Int]))]=Array((s,(59,Some(61))),(s,(59,Some(62))),(s,( 54,Some(61))),(s,(54,Some(62))),(e,(57,None)),(e,(58,None)),(m,(55,Some( 60))),(m,(55,Some(65))),(m,(56,Some(60))),(m,(56,Some(65))))

  rightOuterJoin():

  def rightOuterJoin [W](其他:RDD [(K,W)]):RDD [(K,(Option [V],W))]

  執行此和其他的右外連接配接。對于其他元素(k,w),得到的RDD将包含所有對(k,(Some(v),w))的v,或者對(k,(None,w))如果沒有其中的元素有關鍵k。使用現有的分區程式/并行級别對生成的RDD進行散列分區。

  它執行兩個RDD之間的連接配接,其中密鑰必須存在于其他RDD中

  val rightjoinrdd=rdd1.rightOuterJoin(rdd2)

  rightjoinrdd.collect

  Array [(String,(Option [Int],Int))]=Array((s,(Some(59),61)),(s,(Some(59),62)),(s,(Some(( 54),61)),(s,(Some(54),62)),(h,(None,63)),(h,(None,64)),(m,(Some(55),60 )),(m,(Some(55),65)),(m,(Some(56),60)),(m,(Some(56),65)))

  三十二,解釋Spark join()操作

  > join()是轉型。

  >它在包

  org.apache.spark.rdd.pairRDDFunction

  def join [W](其他:RDD [(K,W)]):RDD [(K,(V,W))]固定連結

  傳回包含所有元素對的RDD,其中包含比對鍵和其他元素。

  每對元素将作為(k,(v1,v2))元組傳回,其中(k,v1)在此,而(k,v2)在其他元素中。在整個群集中執行散列連接配接。

  它正在連接配接兩個資料集。當調用類型(K,V)和(K,W)的資料集時,傳回(K,(V,W))對的資料集以及每個鍵的所有元素對。通過leftOuterJoin,rightOuterJoin和fullOuterJoin支援外連接配接。

  例1:

  val joinrdd=rdd1.join(rdd2)

  joinrdd.collect

  Array [(String,(Int,Int))]=Array((m,(54,60)),(m,(54,65)),(m,(56,60)),(m,(56 ,65)),(s,(59,61)),(s,(59,62)),(s,(54,61)),(s,(54,62)))

  例2:

  val myrdd1=sc.parallelize(Seq((1,2),(3,4),(3,6)))

  val myrdd2=sc.parallelize(Seq((3,9)))

  val myjoinedrdd=myrdd1.join(myrdd2)

  myjoinedrdd.collect

  數組[(Int,(Int,Int))]=數組((3,(4,9)),(3,(6,9)))

  三十三,解釋top()和takeOrdered()操作

  top()和takeOrdered()都是動作。兩者都傳回基于預設排序或基于使用者提供的自定義排序的RDD元素。def top(num: Int)(implicit ord: Ordering[T]): Array[T]傳回此RDD中的前k個(最大)元素,由指定的隐式Ordering [T]定義并維護排序。這與takeOrdered相反。def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]傳回此RDD中的第一個k(最小)元素,由指定的隐式Ordering [T]定義并維護排序。這與top相反。示例:val myrdd1=sc.parallelize(List(5,7,9,13,51,89))myrdd1.top(3)myrdd1.takeOrdered(3)myrdd1.top(3)輸出:Array[Int]=Array(89, 51, 13)Array[Int]=Array(5, 7, 9)Array[Int]=Array(89, 51, 13)

  三十四,解釋Spark中的first()操作

  >這是一個動作。

  >它傳回RDD的第一個元素。

  val rdd1=sc.textFile("/home/hdadmin/wc-data.txt")

  rdd1.count

  rdd1.first

  長:20

  字元串:DataFlair是領先的技術教育訓練提供商

  三十五,.解釋Apache Spark中的sum(),max(),min()操作

  sum():

  >它将RDD中的值相加。

  >它是一個包

  org.apache.spark.rdd.DoubleRDDFunctions。

  >它的傳回類型是Double

  val rdd1=sc.parallelize(1 to 20)

  rdd1.sum

  Double=210.0

  max():

  >它從隐式排序(元素順序)定義的RDD元素傳回一個最大值

  >它是一個包org.apache.spark.rdd

  val rdd1=sc.parallelize(List(1,5,9,0,23,56,99,87))

  rdd1.max

  Int=99

  min():

  >它從隐式排序(元素順序)定義的RDD元素傳回一個min值

  rdd1.min

  Int=0

  三十六,.解釋Apache Spark RDD中的countByValue()操作

  這是一個動作它傳回RDD中每個唯一值的計數作為本地Map(作為Map to driver program)(value,countofvalues)對必須小心使用此API,因為它将值傳回給驅動程式,是以它僅适用于較小的值。例:val rdd1=sc.parallelize(Seq(("HR",5),("RD",4),("ADMIN",5),("SALES",4),("SER",6),("MAN",8)))rdd1.countByValue輸出:scala.collection.Map [(String,Int),Long]=Map((HR,5) - > 1,(RD,4) - > 1,(SALES,4) - > 1,(ADMIN,5 ) - > 1,(MAN,8) - > 1,(SER,6) - > 1)val rdd2=sc.parallelize{Seq(10,4,3,3)}rdd2.countByValue輸出:scala.collection.Map [Int,Long]=Map(4 - > 1,3 - > 2,10 - > 1)

  三十七,.解釋Spark中的lookup()操作

  >這是一個動作

  >它傳回

二手QQ轉讓

RDD中鍵值'key'的值清單

  val rdd1=sc.parallelize(Seq(("Spark",78),("Hive",95),("spark",15),("HBase",25),("spark",39),("BigData",78),("spark",49)))

  rdd1.lookup("spark")

  rdd1.lookup("Hive")

  rdd1.lookup("BigData")

  Seq [Int]=WrappedArray(15,39,49)

  Seq [Int]=WrappedArray(95)

  Seq [Int]=WrappedArray(78)

  三十八,解釋Spark countByKey()操作

  >它是一個動作操作

  >傳回(key,noofkeycount)對。

  它計算RDD的值,該值由每個不同鍵的兩個元件元組組成。它實際上計算每個鍵的元素數,并将結果作為(鍵,計數)對的清單傳回給主鍵。

  rdd1.countByKey

  scala.collection.Map [String,Long]=Map(Hive - > 1,BigData - > 1,HBase - > 1,spark - > 3,Spark - > 1)

  三十九,解釋Spark saveAsTextFile()操作

  它将RDD的内容寫入文本檔案,或使用字元串表示将RDD儲存為檔案路徑目錄中的文本檔案。

  四十,解釋reduceByKey()Spark操作

  > reduceByKey()是對pairRDD(包含Key / Value)進行轉換的轉換。

  > PairRDD包含元組,是以我們需要傳遞元組上的運算符而不是每個元素。

  >它使用關聯reduce函數将值與相同的鍵合并。

  >它是廣泛的操作,因為資料混洗可能發生在多個分區上。

  >它在跨分區發送資料之前在本地合并資料,以優化資料混洗。

  >它将函數作為一個輸入,它有兩個相同類型的參數(與同一個鍵相關的值)和一個輸入類型的元素輸出(值)

  >我們可以說它有三個重載函數:

  reduceBykey(function)

  reduceByKey(功能,配置設定數量)

  reduceBykey(partitioner,function)

  它使用關聯reduce函數,它合并每個鍵的值。它隻能與鍵值對中的Rdd一起使用。它是一種廣泛的操作,可以從多個分區/分區中混洗資料并建立另一個RDD。它使用關聯函數在本地合并資料,以優化資料混洗。組合的結果(例如,和)與值的類型相同,并且當從不同分區組合時的操作也與在分區内組合值時的操作相同。

  val rdd1=sc.parallelize(Seq(5,10),(5,15),(4,8),(4,12),(5,20),(10,50)))

  val rdd2=rdd1.reduceByKey((x,y)=>x+y)

  rdd2.collect()

  數組[(Int,Int)]=數組((4,20),(10,50),(5,45)

  四十一,解釋Spark中的reduce()操作

  > reduce()是一個動作。它是寬操作(即跨越多個分區的随機資料并輸出單個值)

  >它将函數作為具有兩個相同類型參數的輸入,并輸出單個輸入類型的值。

  >即将RDD的元素組合在一起。

  示例1:

  val rdd1=sc.parallelize(1到100)

  val rdd2=rdd1.reduce((x,y)=> x + y)

  val rdd2=rdd1.reduce(_ + _)

  rdd2:Int=5050

  示例2:

  val rdd1=sc.parallelize(1到5)

  val rdd2=rdd1.reduce(_ * _)

  rdd2:Int=120

  四十二,在Spark RDD中解釋動作count()

  count()是Apache Spark RDD操作中的一個操作count()傳回RDD中的元素數。示例:val rdd1=sc.parallelize(List(10,20,30,40))println(rdd1.count())輸出:4它傳回RDD中的多個元素或項目。是以,它基本上計算資料集中存在的項目數,并在計數後傳回一個數字。

  四十三.解釋Spark map()轉換

  > map()轉換将函數作為輸入,并将該函數應用于RDD中的每個元素。

  >函數的輸出将是每個輸入元素的新元素(值)。

  防爆。

  val rdd1=sc.parallelize(List(10,20,30,40))

  val rdd2=rdd1.map(x=> x * x)

  println(rdd2.collect()。mkString(“,”))

  四十四,解釋Apache Spark中的flatMap()轉換

  當想要為每個輸入元素生成多個元素(值)時,使用flatMap()。與map()一樣,flatMap()也将函數作為輸入。函數的輸出是我們可以疊代的元素的List。(即函數可以為每個輸入元素傳回0或更多元素)簡單地使用flatMap()将輸入行(字元串)拆分為單詞。

  val fm1=sc.parallelize(List("Good Morning", "Data Flair", "Spark Batch"))

  val fm2=fm1.flatMap(y=> y.split(" "))

  fm2.foreach{println}

  輸出如下:

  Good

  Morning

  Data

  Flair

  Spark

  Batch

  四十五,Apache Spark有哪些限制?

  在,Apache Spark被認為是行業廣泛使用的下一代Gen Big資料工具。但Apache Spark存在一定的局限性。他們是:

  Apache Spark的局限性:

  1.無檔案管理系統

  Apache Spark依賴于其他平台,如Hadoop或其他基于雲的平台檔案管理系統。這是Apache Spark的主要問題之一。

  2.延遲

  使用Apache Spark時,它具有更高的延遲。

  3.不支援實時處理

  在Spark Streaming中,到達的實時資料流被分成預定義間隔的批次,每批資料被視為Spark Resilient Distributed Database(RDD)。然後使用map,reduce,join等操作處理這些RDD。這些操作的結果是批量傳回的。是以,它不是實時處理,但Spark接近實時資料的實時處理。微批處理在Spark Streaming中進行。

  4.手動優化

  手動優化是優化Spark作業所必需的。此外,它适用于特定資料集。如果我們想要在Spark中進行分區和緩存是正确的,我們需要手動控制。

  少一點。算法

  Spark MLlib在Tanimoto距離等許多可用算法方面落後。

  6.視窗标準

  Spark不支援基于記錄的視窗标準。它隻有基于時間的視窗标準。

  7.疊代處理

  在Spark中,資料分批疊代,每次疊代都是單獨排程和執行的。

  8.

  當我們想要經濟高效地處理大資料時,昂貴記憶體容量可能成為瓶頸,因為在記憶體中儲存資料非常昂貴。此時記憶體消耗非常高,并且不以使用者友好的方式處理。Spark的成本非常高,因為Apache Spark需要大量的RAM才能在記憶體中運作。

  四十六,什麼是Spark SQL?

  Spark SQL是一個Spark接口,用于處理結構化和半結構化資料(定義字段即表格的資料)。它提供了一個名為DataFrame和DataSet的抽象層,我們可以輕松處理資料。可以說DataFrame就像關系資料庫中的表。Spark SQL可以以Parquets,JSON,Hive等各種結構化和半結構化格式讀寫資料。在Spark應用程式中使用SparkSQL是使用它的最佳方式。這使我們能夠加載資料并使用SQL進行查詢。我們也可以将它與Python,Java或Scala中的 “正常”程式代碼結合起來。

  四十七,解釋Spark SQL緩存和解除

  當我們嘗試在另一個使用者使用該表時解凍Spark SQL中的表時會發生什麼?因為我們可以在Spark SQL JDBC伺服器中的多個使用者之間使用共享緩存表。

  四十八,解釋Spark流媒體

  rk Streaming

  資料流定義為以無界序列的形式連續到達的資料。為了進一步處理,Streaming将連續流動的輸入資料分離為離散單元。它是一種低延遲處理和分析流資料。

  在2013年,Apache Spark Streaming被添加到Apache Spark。通過Streaming,我們可以對實時資料流進行容錯,可擴充的流處理。從許多來源,如Kafka,Apache Flume,Amazon Kinesis或TCP套接字,可以進行資料攝取。此外,通過使用複雜算法,可以進行處理。用進階函數表示,例如map,reduce,join和window。最後,處理後的資料可以推送到檔案系統,資料庫和實時儀表闆。

  在内部,通過Spark流,接收實時輸入資料流并将其分成批次。然後,這些批次由Spark引擎處理,以批量生成最終結果流。

  Discretized Stream或簡稱Spark DStream是它的基本抽象。這也代表了分成小批量的資料流。DStreams建構于Spark的核心資料抽象Spark RDD之上。Streaming可以與Spark MLlib和Spark SQL等任何其他Apache Spark元件內建。

  四十九,解釋Spark Streaming

  Spark Streaming

  五十,在Apache Spark Streaming中解釋DStream中的不同轉換

  Apache Spark Streaming中DStream中的不同轉換是:

  1- map(func) - 通過函數func傳遞源DStream的每個元素來傳回一個新的DStream。

  2- flatMap(func) - 與map類似,但每個輸入項可以映射到0個或更多輸出項。

  3- filter(func) - 通過僅選擇func傳回true的源DStream的記錄來傳回新的DStream。

  4- repartition(numPartitions) - 通過建立更多或更少的分區來更改此DStream中的并行度級别。

  5- union(otherStream) - 傳回一個新的DStream,它包含源DStream和

  otherDStream中元素的并集。

  6- 計數() -傳回單元素的一個新的DSTREAM RDDS通過計數在源DSTREAM的每個RDD元件的數量。

  7- reduce(func) - 通過使用函數func(它接受兩個參數并傳回一個)聚合源DStream的每個RDD中的元素,傳回單元素RDD的新DStream。

  8- countByValue() - 當在類型為K的元素的DStream上調用時,傳回(K,Long)對的新DStream,其中每個鍵的值是其在源DStream的每個RDD中的頻率。

  9- reduceByKey(func,[numTasks]) - 當在(K,V)對的DStream上調用時,傳回一個(K,V)對的新DStream,其中使用給定的reduce函數聚合每個鍵的值。

  10- join(otherStream,[numTasks]) - 當在(K,V)和(K,W)對的兩個DStream上調用時,傳回一個新的DStream(K,(V,W))對與所有對每個鍵的元素。

  11- cogroup(otherStream,[numTasks]) - 當在(K,V)和(K,W)對的DStream上調用時,傳回(K,Seq [V],Seq [W])元組的新DStream。

  12- transform(func) - 通過将RDD-to-RDD函數應用于源DStream的每個RDD來傳回一個新的DStream。

  13- updateStateByKey(func) - 傳回一個新的“狀态”DStream,其中通過在密鑰的先前狀态和密鑰的新值上應用給定函數來更新每個密鑰的狀态。

  希望以上這些多大家有所幫助,能夠幫得到您說明我的努力是沒有白費的,最後,希望大家多多關注下,更多精彩的文章帶給大家!