天天看點

☀️大資料面試題及答案 (轉載)

大資料面試題及答案

1 kafka的message包括哪些資訊

2 怎麼檢視kafka的offset

3 hadoop的shuffle過程

4 spark叢集運算的模式

5 HDFS讀寫資料的過程

6 RDD中reduceBykey與groupByKey哪個性能好,為什麼?

7 spark2.0的了解

8 rdd 怎麼分區寬依賴和窄依賴

9 spark streaming 讀取kafka資料的兩種方式

10 kafka的資料存在記憶體還是磁盤

11 怎麼解決kafka的資料丢失

12 fsimage和edit的差別?

13 列舉幾個配置檔案優化?

14 datanode 首次加入 cluster 的時候,如果 log 報告不相容檔案版本,那需要namenode執行格式化操作,這樣處理的原因是?

15 MapReduce 中排序發生在哪幾個階段?這些排序是否可以避免?為什麼?

16 hadoop的優化?

17 設計題

18 有 10 個檔案,每個檔案 1G,每個檔案的每一行存放的都是使用者的 query,每個檔案的query 都可能重複。要求你按照 query 的頻度排序。還是典型的 TOP K 算法

19 在2.5億個整數中找出不重複的整數,注,記憶體不足以容納這2.5億個整數。

20 騰訊面試題:給40億個不重複的 unsigned int 的整數,沒排過序的,然後再給一個數,如何快速判斷這個數是否在那 40 億個數當中?

21 怎麼在海量資料中找出重複次數最多的一個?

22 上千萬或上億資料(有重複),統計其中出現次數最多的錢 N 個資料。

23 一個文本檔案,大約有一萬行,每行一個詞,要求統計出其中最頻繁出現的前 10 個詞,給出思想,給出時間複雜度分析。

24 100w 個數中找出最大的 100 個數。

25 有一千萬條短信,有重複,以文本檔案的形式儲存,一行一條,有重複。請用 5 分鐘時間,找出重複出現最多的前 10 條。

一個Kafka的Message由一個固定長度的header和一個變長的消息體body組成

header部分由一個位元組的magic(檔案格式)和四個位元組的CRC32(用于判斷body消息體是否正常)構成。當magic的值為1的時候,會在magic和crc32之間多一個位元組的資料:attributes(儲存一些相關屬性,比如是否壓縮、壓縮格式等等);如果magic的值為0,那麼不存在attributes屬性

body是由N個位元組構成的一個消息體,包含了具體的key/value消息

0.9版本以上,可以用最新的Consumer client 用戶端,有consumer.seekToEnd() / consumer.position() 可以用于得到目前最新的offset:

Map端會處理輸入資料并産生中間結果,這個中間結果會寫到本地磁盤,而不是HDFS。每個Map的輸出會先寫到記憶體緩沖區中,當寫入的資料達到設定的門檻值時,系統将會啟動一個線程将緩沖區的資料寫到磁盤,這個過程叫做spill。

  在spill寫入之前,會先進行二次排序,首先根據資料所屬的partition進行排序,然後每個partition中的資料再按key來排序。partition的目是将記錄劃分到不同的Reducer上去,以期望能夠達到負載均衡,以後的Reducer就會根據partition來讀取自己對應的資料。接着運作combiner(如果設定了的話),combiner的本質也是一個Reducer,其目的是對将要寫入到磁盤上的檔案先進行一次處理,這樣,寫入到磁盤的資料量就會減少。最後将資料寫到本地磁盤産生spill檔案(spill檔案儲存在{mapred.local.dir}指定的目錄中,Map任務結束後就會被删除)。

最後,每個Map任務可能産生多個spill檔案,在每個Map任務完成前,會通過多路歸并算法将這些spill檔案歸并成一個檔案。至此,Map的shuffle過程就結束了。

Reduce端的shuffle主要包括三個階段,copy、sort(merge)和reduce。

  首先要将Map端産生的輸出檔案拷貝到Reduce端,但每個Reducer如何知道自己應該處理哪些資料呢?因為Map端進行partition的時候,實際上就相當于指定了每個Reducer要處理的資料(partition就對應了Reducer),是以Reducer在拷貝資料的時候隻需拷貝與自己對應的partition中的資料即可。每個Reducer會處理一個或者多個partition,但需要先将自己對應的partition中的資料從每個Map的輸出結果中拷貝過來。

  接下來就是sort階段,也成為merge階段,因為這個階段的主要工作是執行了歸并排序。從Map端拷貝到Reduce端的資料都是有序的,是以很适合歸并排序。最終在Reduce端生成一個較大的檔案作為Reduce的輸入。

最後就是Reduce過程了,在這個過程中産生了最終的輸出結果,并将其寫到HDFS上。

Spark 有很多種模式,最簡單就是單機本地模式,還有單機僞分布式模式,複雜的則運作在叢集中,目前能很好的運作在 Yarn和 Mesos 中,當然 Spark 還有自帶的 Standalone 模式,對于大多數情況 Standalone 模式就足夠了,如果企業已經有 Yarn 或者 Mesos 環境,也是很友善部署的。

standalone(叢集模式):典型的Mater/slave模式,不過也能看出Master是有單點故障的;Spark支援ZooKeeper來實作 HA

on yarn(叢集模式): 運作在 yarn 資料總管架構之上,由 yarn 負責資源管理,Spark 負責任務排程和計算

on mesos(叢集模式): 運作在 mesos 資料總管架構之上,由 mesos 負責資源管理,Spark 負責任務排程和計算

on cloud(叢集模式):比如 AWS 的 EC2,使用這個模式能很友善的通路 Amazon的 S3;Spark 支援多種分布式存儲系統:HDFS 和 S3

1、跟namenode通信查詢中繼資料,找到檔案塊所在的datanode伺服器

2、挑選一台datanode(就近原則,然後随機)伺服器,請求建立socket流

3、datanode開始發送資料(從磁盤裡面讀取資料放入流,以packet為機關來做校驗)

4、用戶端以packet為機關接收,現在本地緩存,然後寫入目标檔案

寫:

1、根namenode通信請求上傳檔案,namenode檢查目标檔案是否已存在,父目錄是否存在

2、namenode傳回是否可以上傳

3、client請求第一個 block該傳輸到哪些datanode伺服器上

4、namenode傳回3個datanode伺服器ABC

5、client請求3台dn中的一台A上傳資料(本質上是一個RPC調用,建立pipeline),A收到請求會繼續調用B,然後B調用C,将真個pipeline建立完成,逐級傳回用戶端

6、client開始往A上傳第一個block(先從磁盤讀取資料放到一個本地記憶體緩存),以packet為機關,A收到一個packet就會傳給B,B傳給C;A每傳一個packet會放入一個應答隊列等待應答

7、當一個block傳輸完成之後,client再次請求namenode上傳第二個block的伺服器。

寬依賴:父RDD的分區被子RDD的多個分區使用 例如 groupByKey、reduceByKey、sortByKey等操作會産生寬依賴,會産生shuffle

窄依賴:父RDD的每個分區都隻被子RDD的一個分區使用 例如map、filter、union等操作會産生窄依賴

這兩種方式分别是:

使用Kafka的高層次Consumer API來實作。receiver從Kafka中擷取的資料都存儲在Spark Executor的記憶體中,然後Spark Streaming啟動的job會去處理那些資料。然而,在預設的配置下,這種方式可能會因為底層的失敗而丢失資料。如果要啟用高可靠機制,讓資料零丢失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地将接收到的Kafka資料寫入分布式檔案系統(比如HDFS)上的預寫日志中。是以,即使底層節點出現了失敗,也可以使用預寫日志中的資料進行恢複。

Spark1.3中引入Direct方式,用來替代掉使用Receiver接收資料,這種方式會周期性地查詢Kafka,獲得每個topic+partition的最新的offset,進而定義每個batch的offset的範圍。當處理資料的job啟動時,就會使用Kafka的簡單consumer api來擷取Kafka指定offset範圍的資料。

Kafka最核心的思想是使用磁盤,而不是使用記憶體,可能所有人都會認為,記憶體的速度一定比磁盤快,我也不例外。在看了Kafka的設計思想,查閱了相應資料再加上自己的測試後,發現磁盤的順序讀寫速度和記憶體持平。

而且Linux對于磁盤的讀寫優化也比較多,包括read-ahead和write-behind,磁盤緩存等。如果在記憶體做這些操作的時候,一個是JAVA對象的記憶體開銷很大,另一個是随着堆記憶體資料的增多,JAVA的GC時間會變得很長,使用磁盤操作有以下幾個好處:

磁盤緩存由Linux系統維護,減少了程式員的不少工作。

磁盤順序讀寫速度超過記憶體随機讀寫。

JVM的GC效率低,記憶體占用大。使用磁盤可以避免這一問題。

系統冷啟動後,磁盤緩存依然可用。

producer端:

宏觀上看保證資料的可靠安全性,肯定是依據分區數做好資料備份,設立副本數。

broker端:

topic設定多分區,分區自适應所在機器,為了讓各分區均勻分布在所在的broker中,分區數要大于broker數。

分區是kafka進行并行讀寫的機關,是提升kafka速度的關鍵。

Consumer端

consumer端丢失消息的情形比較簡單:如果在消息處理完成前就送出了offset,那麼就有可能造成資料的丢失。由于Kafka consumer預設是自動送出位移的,是以在背景送出位移前一定要保證消息被正常處理了,是以不建議采用很重的處理邏輯,如果處理耗時很長,則建議把邏輯放到另一個線程中去做。為了避免資料丢失,現給出兩點建議:

enable.auto.commit=false 關閉自動送出位移

在消息被完整處理之後再手動送出位移

mapred.tasktracker.map.tasks.maximum 2 mapred.tasktracker.reduce.tasks.maximum 2 b、調整心跳間隔:叢集規模小于 300 時,心跳間隔為 300 毫秒 mapreduce.jobtracker.heartbeat.interval.min 心跳時間 mapred.heartbeats.in.second 叢集每增加多少節點,時間增加下面的值 mapreduce.jobtracker.heartbeat.scaling.factor 叢集每增加上面的個數,心跳增多少 c、啟動帶外心跳 mapreduce.tasktracker.outofband.heartbeat 預設是 false d、配置多塊磁盤 mapreduce.local.dir e、配置 RPC hander 數目 mapred.job.tracker.handler.count 預設是 10,可以改成 50,根據機器的能力 f、配置 HTTP 線程數目 tasktracker.http.threads 預設是 40,可以改成 100 根據機器的能力 g、選擇合适的壓縮方式,以 snappy 為例: mapred.compress.map.output true mapred.map.output.compression.codec org.apache.hadoop.io.compress.SnappyCodec

A、某個使用者某天通路某個URL的次數

B、某個URL某天被通路的總次數

實時思路是:使用Logstash + Kafka + Spark-streaming + Redis + 報表展示平台

離線的思路是:Logstash + Kafka + Elasticsearch + Spark-streaming + 關系型資料庫

A、B、資料在進入到Spark-streaming 中進行過濾,把符合要求的資料儲存到Redis中

1.最高位為 0

2.最高位為 1

并将這兩類分别寫入到兩個檔案中,其中一個檔案中數的個數<=20 億,而另一個>=20 億(這相當于折半了); 與要查找的數的最高位比較并接着進入相應的檔案再查找 再然後把這個檔案為又分成兩類:

1.次最高位為 0

2.次最高位為 1

并将這兩類分别寫入到兩個檔案中,其中一個檔案中數的個數<=10 億,而另一個>=10 億(這相當于折半了); 與要查找的數的次最高位比較并接着進入相應的檔案再查找。

以此類推,就可以找到了,而且時間複雜度為 O(logn),方案 2 完。

3)附:這裡,再簡單介紹下,位圖方法: 使用位圖法判斷整形數組是否存在重複 ,判斷集合中存在重複是常見程式設計任務之一,當集合中資料量比較大時我們通常希望少進行幾次掃描,這時雙重循環法就不可取了。

位圖法比較适合于這種情況,它的做法是按照集合中最大元素 max 建立一個長度為 max+1的新數組,然後再次掃描原數組,遇到幾就給新數組的第幾位置上 1,如遇到 5 就給新數組的第六個元素置 1,這樣下次再遇到 5 想置位時發現新數組的第六個元素已經是 1 了,這說明這次的資料肯定和以前的資料存在着重複。這 種給新數組初始化時置零其後置一的做法類似于位圖的處理方法故稱位圖法。它的運算次數最壞的情況為 2N。如果已知數組的最大值即能事先給新數組定長的話效 率還能提高一倍。

繼續閱讀