天天看點

hadoop中mapreduce的常用類(二)

雲智慧(北京)科技有限公司陳鑫

NullWritable 

不想輸出的時候,把它當做key。NullWritable是Writable的一個特殊類,序列化的長度為0,實作方法為空實作,不從資料流中讀資料,也不寫入資料,隻充當占位符,如在MapReduce中,如果你不需要使用鍵或值,你就可以将鍵或值聲明為NullWritable,NullWritable是一個不可變的單執行個體類型。

FileInputFormat繼承于InputFormat

InputFormat的作用:

驗證輸入規範;

切分輸入檔案為InputSpilts;

提供RecordReader來收集InputSplit中的輸入記錄,給Mapper進行執行。

RecordReader

将面向位元組的InputSplit轉換為面向記錄的視圖,供Mapper或者Reducer使用運作。是以假定處理記錄的責任界限,為任務呈現key-value。

SequenceFile:

SequenceFile是包含二進制kv的扁平檔案(序列化)。它提供Writer、Reader、Sorter來進行寫、讀、排序功能。基于CompressionType,SequenceFile有三種對于kv的壓縮方式:

  Writer:不壓縮records;

  RecordCompressWriter:隻壓縮values;

  BlockCompressWriter:   壓縮records,keys和values都被分開壓縮在block中,block的大小可以配置;

壓縮方式由合适的CompressionCodec指定。推薦使用此類的靜态方法createWriter來選擇格式。Reader作為橋接可以讀取以上任何一種壓縮格式。

CompressionCodec:

封裝了關于流式壓縮/解壓縮的相關方法。

Mapper

Mapper 将輸入的kv對映射成中間資料kv對集合。Maps 将輸入記錄轉變為中間記錄,其中被轉化後的記錄不必和輸入記錄類型相同。一個給定的輸入對可以映射為0或者多個輸出對。

在MRJob執行過程中,MapReduce架構根據提前指定的InputFormat(輸入格式對象)産生InputSplit(輸入分片),而每個InputSplit将會由一個map任務處理。

總起來講,Mapper實作類通過JobConfigurable.configure(JobConf)方法傳入JobConf對象來初始化,然後在每個map任務中調用map(WritableComparable,Writable,OutputCollector,Reporter)方法處理InputSplit的每個kv對。MR應用可以覆寫Closeable.close方法去處理一些必須的清理工作。

輸出對不一定和輸入對類型相同。一個給定的輸入對可能映射成0或者很多的輸出對。輸出對是架構通過調用OutputCollector.colect(WritableComparable,Writable)得到。

MR應用可以使用Reporter彙報進度,設定應用層級的狀态資訊,更新計數器或者隻是顯示應用處于運作狀态等。

所有和給定的輸出key關聯的中間資料都會随後被架構分組處理,并傳給Reducer處理以産生最終的輸出。使用者可以通過JobConf.setOutputKeyComparatorClass(Class)指定一個Comparator控制分組處理過程。

Mapper輸出都被排序後根據Reducer數量進行分區,分區數量等于reduce任務數量。使用者可以通過實作自定義的Partitioner來控制哪些keys(記錄)到哪個Reducer中去。

此外,使用者還可以指定一個Combiner,調用JobConf.setCombinerClass(Class)來實作。這個可以來對map輸出做本地的聚合,有助于減少從mapper到reducer的資料量。

經過排序的中間輸出資料通常以一種簡單的格式(key-len,key,value-len,value)存儲在SequenceFile中。應用可以決定是否或者怎樣被壓縮以及壓縮格式,可以通過JobConf來指定CompressionCodec.

如果job沒有reducer,那麼mapper的輸出結果會不經過分組排序,直接寫進FileSystem.

Map數

通常map數由輸入資料總大小決定,也就是所有輸入檔案的blocks數目決定。

每個節點并行的運作的map數正常在10到100個。由于Map任務初始化本身需要一段時間是以map運作時間至少在1分鐘為好。

如此,如果有10T的資料檔案,每個block大小128M,最大使用為82000map數,除非使用setNumMapTasks(int)(這個方法僅僅對MR架構提供一個建議值)将map數值設定到更高。

Reducer

Reducer根據key将中間資料集合處理合并為更小的資料結果集。

使用者可以通過JobConf.setNumReduceTasks(int)設定作業的reducer數目。

整體而言,Reducer實作類通過JobConfigurable.configure(JobConf)方法将JobConf對象傳入,并為Job設定和初始化Reducer。MR架構調用 reduce(WritableComparable, Iterator, OutputCollector,Reporter) 來處理以key被分組的輸入資料。應用可以覆寫Closeable.close()處理必要的清理操作。

Reducer由三個主要階段組成:shuffle,sort,reduce。

  shuffle

輸入到Reducer的輸入資料是Mapper已經排過序的資料.在shuffle階段,根據partition算法擷取相關的mapper位址,并通過Http協定将mapper的相應輸出資料由reducer拉取到reducer機器上處理。

  sort

架構在這個階段會根據key對reducer的輸入進行分組(因為不同的mapper輸出的資料中可能含有相同的key)。

shuffle和sort是同時進行的,同時reducer仍然在拉取map的輸出。

  Secondary Sort

如果對中間資料key進行分組的規則和在處理化簡階段前對key分組規則不一緻時,可以通過JobConf.setOutputValueGroupingComparator(Class)設定一個Comparator。因為中間資料的分組政策是通過JobConf.setOutputKeyComparatorClass(Class) 設定的,可以控制中間資料根據哪些key進行分組。而JobConf.setOutputValueGroupingComparator(Class)則可用于在資料連接配接情況下對value進行二次排序。

Reduce(化簡)

這個階段架構循環調用 reduce(WritableComparable, Iterator, OutputCollector,Reporter) 方法處理被分組的每個kv對。

reduce 任務一般通過OutputCollector.collect(WritableComparable, Writable)将輸出資料寫入檔案系統FileSystem。應用可以使用Reporter彙報作業執行進度、設定應用層級的狀态資訊并更新計數器(Counter),或者隻是提示作業在運作。

注意,Reducer的輸出不會再進行排序。

Reducer數目

合适的reducer數目可以這樣估算:(節點數目mapred.tasktracker.reduce.tasks.maximum)乘以0.95 或乘以1.75。因子為0.95時,當所有map任務完成時所有reducer可以立即啟動,并開始從map機器上拉取資料。因子為1.75時,最快的一些節點将完成第一輪reduce處理,此時架構開始啟動第二輪reduce任務,這樣可以達到比較好的作業負載均衡。提高reduce數目會增加架構的運作負擔,但有利于提升作業的負載均衡并降低失敗的成本。上述的因子使用最好在作業執行時架構仍然有reduce槽為前提,畢竟架構還需要對作業進行可能的推測執行和失敗任務的處理。

不使用Reducer

如果不需要進行化簡處理,可以将reduce數目設為0。這種情況下,map的輸出會直接寫入到檔案系統。輸出路徑通過setOutputPath(Path)指定。架構在寫入資料到檔案系統之前不再對map結果進行排序。

Partitioner

Partitioner對資料按照key進行分區,進而控制map的輸出傳輸到哪個reducer上。預設的Partitioner算法是hash(哈希。分區數目由作業的reducer數目決定。HashPartitioner是預設的Partitioner。

Reporter

Reporter為MR應用提供了進度報告、應用狀态資訊設定,和計數器(Counter)更新等功能.

Mapper和Reducer實作可以使用Reporter彙報進度或者提示作業在正常運作。在一些場景下,應用在處理一些特殊的kv對時耗費了過多時間,這個可能會因為架構假定任務逾時而強制停止了這些作業。為避免該情況,可以設定mapred.task.timeout為一個比較高的值或者将其設定為0以避免逾時發生。

應用也可以使用Reporter來更新計數(Counter)。

OutputCollector

OutputCollector是MR架構提供的通用工具來收集Mapper或者Reducer輸出資料(中間資料或者最終結果資料)。

HadoopMapReduce提供了一些經常使用的mapper、reducer和partioner的實作類供我們進行學習。

以上有關configuration和job的部分在新的API中有所改變,簡單說就是在Mapper和Reducer中引入了MapContext和ReduceContext,它們封裝了configuration和outputcollector,以及reporter。

繼續閱讀