天天看點

Hadoop專業解決方案-第3章:MapReduce處理資料3.1 了解MapReduce3.2第一個Mapreduce應用3.3設計MapReduce的實作第一個Mapreduce應用設計MapReduce的實作

前言:非常感謝團隊的努力,最新的章節終于有了成果,因為自己的懶惰,好久沒有最新的進展了,感謝群裡兄弟的努力。

群名稱是hadoop專業解決方案群

 313702010

本章主要内容:

了解mapreduce基本原理

了解mapreduce應用的執行

了解mapreduce應用的設計

截止到目前,我們已經知道hadoop如何存儲資料,但hadoop不僅僅是一個高可用

的,規模巨大的資料存儲引擎,它的另一個主要特點是可以将資料存儲與處理相結合。

hadoop的核心處理子產品是mapreduce,也是目前最流行的大資料處理架構之一。

注意:組合子是一個利用程式片段建構程式片段的函數。組合子有助于在更高的抽象層次上進行程式設計,并且能夠使你從實作當中區分出政策。在函數式程式設計中,由于組合子可以直接支援一級公民,使用它們可以自動的建構大多數程式。

mapreduce架構的靈感來源于這些概念。在2004年google使用它進行分布式計算,處理分布在多個計算機叢集上大資料集。從此,該架構被廣泛用于許多軟體平台,現在是hadoop生态系統中不可缺少的一部分。

 mapreduce被用于解決大規模計算問題,它經過特殊設計可以運作在普通的硬體裝置上。它根據分而治之的原則——輸入的資料集被切分成獨立的塊,同時被mapper子產品處理。另外,map執行與資料是典型的本地協作(在第四章在讨論資料本地性時将會了解更多)。mapreduce會對所有map的輸出結果排序,它們将作為reduce的一種輸入。

圖3-1  mapper與reducer的功能

mapreduce架構的主要任務(根據使用者提供的代碼)是統籌所有任務的協調執行。

   現在我們對mapreduce有了一些了解,下面讓我們進一步看看mapreduce作業是如何執行的。

任何存儲在hadoop中的資料(hdfs與hbase),甚至是存儲在hadoop外的數

   圖3-2從較高的層次展示mapreduce執行架構。

圖3-2 高層次hadoop執行架構

下面介紹mapreduce執行管道線的主要元件。

  driver:這是主程式,用來初始化mapreduce job。它定義了job的個性化配置,并且标注了所有的元件(包括輸入輸出格式,mapper與reducer,使用結合器,使用定制的分片器等等)。driver也可以獲得job執行的狀态。

  context:driver,mapper與reducer在不同的階段被執行,一般情況下是在多台節點上執行。context對象(如圖3-2所示)在mapreduce執行的任何階段都可以被使用。它為交換需要的系統與job内部資訊提供一種友善的機制。要注意context協調隻發生在mapreduce job 開始後合适的階段(driver,map或者reduce)。這意味着在一個mapper中設定的值不可以在另一個mapper中使用(即使另一個mapper在第一個mapper完成後開始),但是在任何reducer中都是有效的。

  input data:為mapreduce任務準備的最初存儲資料。這些資料可以在hdfs,hbase,或者其他的倉庫中。一般情況下,input data 是非常大的,幾十個g或者更多。

inputformat:如何對輸入資料進行讀取和切分

  。inputformat類确定input data中資料輸入哪個任務的inputsplit,并且提供一個生成recordreader的工廠方法,這個對象主要是讀取inputsplit指定的檔案。hadoop提供了一些inputformat類,在第四章提供了如何自定義inputformat的執行個體。inputformat直接被job的driver調用來決定map任務執行的數目與地點(根據inputsplit)。

  inputsplit:inputsplit确定一個在mapreduce中map任務的作業單元。處理一個資料集的mapreduce程式由幾個(也可能是幾百個)map任務組成。inputformat(直接被job driver調用)确定在map階段中map任務的數目。每個map任務操作一個單獨的inputsplit。完成inputsplits的計算後,mapreduce架構會再合适的節點啟動期望數目的map任務。

  recorreader:inputsplit确定map任務的工作機,但沒有描述如何獲得該資料。recordreader類是真正從資料源讀取資料的類(在map 任務中),并将資料轉化為設和map執行的key/value對,并将他們傳遞給map方法。recordreader由inputformat定義。在第四章提供了如何實作自定義recordreader的執行個體。

  mapper:mapper負責在mapreduce程式中第一個階段使用者自定義作業的執行。從實作的角度看,mapper實作負責将輸入資料轉化成一些列的key/value對(k1,v1),這些鍵值對将被用于單個map的執行。一般情況下mapp會将輸入的鍵值對轉化為另一種輸出鍵值對(k2,v2),這些輸出鍵值對将會作為reduce階段shuffle與sort階段的輸入。一個新的mapper執行個體在每個map任務的單獨的jvm實體中被執行個體化, 這些map任務構成所有作業輸出的一部分。獨立的mapper是不會提供任何與其他mapper通信的機制。這一點保證每個map任務的可靠性僅僅由本地節點的可靠性決定。

  partition:由所有獨立的mapper産生的中間資料(k2,v2)的子集會被配置設定到一個reducer上執行。這些子集(或者partitions)會作為reduce任務的輸入。具備相同鍵的數值會被一個reduce處理,而不會考慮他們有哪個mapper産生。這樣的結果是,所有的map節點必須判斷産生的中間資料将有哪個reducer執行。partitioner類決定特定的key/value對将由哪個reducer執行。預設的partitioner會為每個key計算一個哈希值,并根據這個值作為配置設定的依據。第四章提供了如何實作自定義的partitioner的執行個體。

  shuffle:在hadoop叢集中,每個節點可能會執行某個job的幾個map任務。一旦至少有一個map函數執行完成,産生的中間輸出就會根據key值進行分片,并将由map産生的的分片分發至需要它們的reducer。将map的輸出傳遞到reducer的過程叫做shuffling。

  sort:每個reduce任務負責處理與部分key值相對應的value。中間key/value資料集,在被傳遞給reducer前會由hadoop架構自動排序,組裝成(k2,{v2,v2,…})的形式。

  reducer:reducer負責執行由使用者提供的用于完成某個作業第二階段任務的代碼。對于配置設定到某個reducer中的每個key,reducer的reduce()方法都會被調用一次。這個方法接收一個key值,由疊代器周遊與它綁定在一起的所有value值,并無序的傳回與這個key值相關的value值。一般情況下,reducer将輸入的key/value轉化成輸出鍵值對(k3,v3)。

  outputformat:job的輸出(job的輸出可以由reducer産生,若沒有reducer也可由map産生)記錄的方式有outputformat控制。outputformat負責确定輸出資料的位址,由recordwriter負責将資料結果寫入。第四章介紹了如何實作自定義的outputformat。

  recordwriter:recordwriter定義每條output記錄如何寫入。

下面将介紹mapreduce執行時兩個可選的元件(圖3-2中并沒有展現)

  combiner:這是一個可以優化mapreduce job執行的可選執行步驟。如何選擇後,combiner運作在mapper執行後,reduce執行前。combiner的執行個體會運作在每個map任務中與部分reduce任務中。combiner接收由mapper執行個體輸出的所有資料作為輸入,并且嘗試将具有相同key值的value整合,以此來減少key值的存儲空間和減少必須存儲的(實際上不必須)key值的數目。combiner的輸出會被排序并發送給reducer。

  distribute cache:另一個常用與mapreduce job中的工具是distribute cache。這個元件可以使得叢集中所有節點共享資料。distribute cache可以是能夠被所有任務都能可獲得的共享庫,包含key/value對的全局查找檔案,jar檔案(或者是archives)包含可執行代碼等等。該工具會将這些檔案拷貝至實際執行任務的節點,并使它們可以在本地使用。第四章将提供執行個體展示如何在mapreduce執行時,使distributed cache與本地代碼相結合。

mapreduce最重要的一個特點是它完全隐藏了管理一個大規模分布式叢集,與在不

同節點上協同執行任務的複雜性。開發者的程式設計模型非常簡單——隻需要實作mapper、reducer的功能,driver,使它們像一個單獨的job運作在一起和配置一些必要的參數。所有的使用者代碼都會擺打包進一個jar檔案(事實上,mapreduce架構可以在多個jar檔案上進行操作),這個檔案會被送出到mapreduce叢集用于執行。

一旦job jar檔案送出到叢集中,mapreduce架構就已經準備好所有事情。它可以在

隻有一台節點的叢集,也可以是擁有幾百台節點的叢集中,掌控分布式代碼執行的各個方面。

   mapreduce架構為應用開發提供了一下支援:

  scheduling:架構可以保證來自多個job的多個任務在一個叢集上執行。不同的排程政策提供了不同的排程政策,有“先來先服務”的排程政策,也有保證讓來自不同使用者的job公平共享叢集執行資源的排程政策。排程的另一個方面是推測執行,它是由mapreduce的一個優化。如果jobtracker發現某個任務花費太長時間執行,它就會啟動另一個執行個體執行相同的任務(選擇不同的tasktracker)。推測執行的原理是某台節點非預期的緩慢不會降低任務的執行速度。推測執行預設是開啟的,同樣也可以通過将參數mapred.map.tasks.speculative.execution 與mapred.reduce.tasks.speculative.execution設定為false來分别禁用map與reduce任務。

  error and fault hadling:為保證在一個錯誤和故障是常态的環境中完成job執行,jobtracker會嘗試重新執行失敗的任務。(在第五章,你講學習更多關于如何編寫可靠的mapreduce應用)

在圖3-3中,hadoop的mapreduce架構使用一種非常簡單的協調機制。job的driver

使用inputformat完成隔離map的執行(基于資料切分),初始化一個用來與jobtracker通信的job用戶端和送出将要執行的job。一旦job被送出,job用戶端會從jobtracker端擷取job運作狀态,并等待它運作完成。jobtracker會為每一個split建立一個map任務,和幾個reducer任務。(reduce任務的個數決定與job的配置)。

任務的實際執行由運作在每個節點上的tasktracker負責,tasktracker啟動map作業,

并周期性向jobtracker發送“心跳”。心跳資訊具備雙重職能——首先會告訴jobtracker本節點的tasktracker是活着的,其次是作為一種通信通道。心跳資訊中會包含tasktracker什麼時候可以執行一個新的任務。

圖3-3 mapreduce 執行

首先,它會通過将jar檔案拷貝到本地檔案系統完成job的本地化。同時還會拷貝一

當jobtracker收到提示某個job的最後一個任務執行完畢,就會将job的狀态标記為“completed”。job用戶端也會通過周期性拉去job狀态,發現job的已經執行完成。

job.getconfiguration().setint(job.jvm_num_tasks_to_run, int)來将該值設定為大于1的數。

清單3-1展示了一個單詞計數的mapreduce作業的非常簡單的應用

清單3-1:hadoop 單詞計數應用

importjava.io.ioexception;

importjava.util.iterator;

importjava.util.stringtokenizer;

importorg.apache.hadoop.conf.configuration;

importorg.apache.hadoop.conf.configured;

importorg.apache.hadoop.fs.path;

importorg.apache.hadoop.io.intwritable;

importorg.apache.hadoop.io.longwritable;

importorg.apache.hadoop.io.text;

importorg.apache.hadoop.mapreduce.job;

importorg.apache.hadoop.mapreduce.mapper;

importorg.apache.hadoop.mapreduce.reducer;

importorg.apache.hadoop.mapreduce.lib.input.textinputformat;

importorg.apache.hadoop.mapreduce.lib.output.textoutputformat;

importorg.apache.hadoop.util.tool;

importorg.apache.hadoop.util.toolrunner;

public class wordcount extends configured implements tool{

public static class map extends mapper<longwritable, text, text, intwritable> {

private final staticintwritable one = newintwritable(1);

privatetext word = newtext();

@override

public voidmap(longwritable key, text value, context context)

throwsioexception, interruptedexception {

string line = value.tostring();

stringtokenizer tokenizer = newstringtokenizer(line);

while(tokenizer.hasmoretokens()) {

word.set(tokenizer.nexttoken());

context.write(word, one);

}

public static class reduce extends reducer<text, intwritable, text, intwritable>{

public void reduce(text key, iterable<intwritable> val, context context)

int sum = 0;

iterator<intwritable> values = val.iterator();

while(values.hasnext()) {

sum += values.next().get();

context.write(key, newintwritable(sum));

public int run(string[] args) throws exception {

configuration conf = newconfiguration();

job job = newjob(conf, "word count");

job.setjarbyclass(wordcount.class);

// set up the input

job.setinputformatclass(textinputformat.class);

textinputformat.addinputpath(job,newpath(args[0]));

// mapper

job.setmapperclass(map.class);

// reducer

job.setreducerclass(reduce.class);

// output

job.setoutputformatclass(textoutputformat.class);

job.setoutputkeyclass(text.class);

job.setoutputvalueclass(intwritable.class);

textoutputformat.setoutputpath(job,newpath(args[1]));

//execute

booleanres = job.waitforcompletion(true);

if(res)

return0;

else

return-1;

public static voidmain(string[] args) throwsexception {

intres = toolrunner.run(newwordcount(), args);

system.exit(res);

注意:hadoop提供了兩個版本的mapreduce api-新的(包含在org.apche.hadoop.mapreduce包裡)和舊的(包含在orp.apache.hadoop.mapred包裡)。這本書裡。我們隻用新的api。

這個應用有兩個内部類-map和reduce。分别繼承了hadoop的mapper類和reducer類。

mapper類有三個主要方法(可以重寫):setup(),cleanup()和map()(僅在這裡應用的一個),setup()和cleanup()方法在mapper的整個循環中隻被調用一次-分别是在mapper執行的開頭和結尾。setup()方法被用來實作mapper的初始化(例如,閱讀共享資源,連接配接hbase表等),然而cleanup()被用來清除mapper資源,可選擇的是,如果mapper實作關聯矩陣和計數器,就寫出資訊。

在map函數中實作了mapper的業務功能(就是,特定程式的邏輯能力)。通常,給定一個鍵/值對,該方法處理并産生(使用context對象)一個或多個鍵/值對。一個context對象傳遞給這個方法允許map方法擷取執行環境的附加資訊,報告他的執行狀況。需要注意的是一個map函數不讀取資料。它被喚醒(基于“hollywood原則”)。每次閱讀器讀取(和可選地解析)一個新的記錄,它就調用(基于“好萊塢原則”)傳遞給它的資料(通過context對象)的閱讀器。如果你很疑惑,看一看下面的清單3-2的mapper基類中額外的方法(沒有廣泛應用)。

注意:好萊塢原則-不用聯系我們,我們聯系你。是一個有用的軟體開發技術,其中一個對象(或元素)的初始條件和持續的生命周期是被環境所決定的。而不是對象本身。這個原則通常用于必須符合現有架構的限制實作一個類/元件。

清單3-2:mapper基類中run方法

/**

* expert users can override this method for more complete control over the

* execution of the mapper.

* @paramcontext

* @throwsioexception

*/

public voidrun(context context) throwsioexception, interruptedexception {

setup(context);

while(context.nextkeyvalue()) {

map(context.getcurrentkey(),context.getcurrentvalue(), context);

cleanup(context);

使用eclipse開發hadoop代碼是相當簡單的,假設你的eclipse執行個體是用maven配置的,首先為你的應用建構一個maven項目。因為沒有hadoop maven的原型,從“簡單的”maven項目開始,手動的添加pom.xml,類似于清單3-3所示。

清單3-3:hadoop2.0中的pom.xml

<project xmlns="http://maven.apache.org/pom/4.0.0"

xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"

xsi:schemalocation="http://maven.apache.org/pom/4.0.0

http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelversion>4.0.0</modelversion>

<groupid>com.nokia.lib</groupid>

<artifactid>nokia-cgnr-sparse</artifactid>

<version>0.0.1-snapshot</version>

<name>cgnr-sparse</name>

<properties>

<hadoop.version>2.0.0-mr1-cdh4.1.0</hadoop.version>

<hadoop.common.version>2.0.0-cdh4.1.0</hadoop.common.version>

<hbase.version>0.92.1-cdh4.1.0</hbase.version>

</properties>

<repositories>

<repository>

<id>cdh releases and rcs repositories</id>

<url>https://repository.cloudera.com/content/groups/cdhreleases-rcs</url>

</repository>

</repositories>

<build>

<plugins>

<plugin>

<groupid>org.apache.maven.plugins</groupid>

<artifactid>maven-compiler-plugin</artifactid>

<version>2.3.2</version>

<configuration>

<source>1.6</source>

<target>1.6</target>

</configuration>

</plugin>

</plugins>

</build>

<dependencies>

<dependency>

<groupid>org.apache.hadoop</groupid>

<artifactid>hadoop-core</artifactid>

<version>${hadoop.version}</version>

<scope>provided</scope>

</dependency>

<groupid>org.apache.hbase</groupid>

<artifactid>hbase</artifactid>

<version>${hbase.version}</version>

<artifactid>hadoop-common</artifactid>

<version>${hadoop.common.version}</version>

<groupid>junit</groupid>

<artifactid>junit</artifactid>

<version>4.10</version>

</dependencies>

</project>

注意:存在很多版本的hadoop,包括不同版本的cloudera版本(cdh3和cdh4),hortonworks版本,mapreduce版本,amazon的emr等等。他們中一些是相容的,一些不是相容的。你需要使用不同的maven pom檔案在特定的運作時間建構一個定向的可執行目标。此外,目前hadoop隻支援java版本6。是以這裡需要使用maven編譯器插件,以確定使用正确的版本。

清單3-3中的pom檔案是cloudera cdh4.1版本中的(注意包含在cloudera庫中的pom檔案)。它包含建立hadoop mapreduce作業所必須的一組最小依賴關系,hadoop-core和hadoop-common。除此之外,如果在一個應用中使用hbase,必須包含一個hbase依賴項。也包含支援基本單元測試的junit。同時注意提供的所有hadoop相關的依賴項是指定的。也就意味着他們不會包含在maven産生的最終的jar檔案中。

一旦建立了eclipse maven項目,所有實作mapreduce的代碼都會加入到這個項目中。eclipse負責加載所需的庫和編譯java代碼等等。

現在你已經知道如何編寫一個mapreduce作業,接下來我們将學習如何執行。你可以利用maven install指令産生一個包含所有需要的代碼的jar檔案。一旦jar檔案被建立,你可以利用ftp上傳到叢集的邊節點。然後利用下面的清單3-4中的指令執行它。

清單3-4:hadoop執行指令

hadoop jar your.jar mainclass inputpath outputpath

hadoop提供了一些java伺服器頁面(jsps),使你能夠可視化執行mapreduce。mapreduce的管理jsp可以使你能夠檢視叢集的整體狀态和特殊作業執行的細節。圖3-4中的mapreduce的管理頁面展示了叢集了所有狀态,以及目前運作、完成以及失敗作業的清單。每一個工作清單(運作、完成和失敗)都是可以點選的,可以使你擷取關于作業執行的額外資訊。

圖3-4:mapreduce管理首頁

圖3-5中的作業詳細頁面提供了關于執行的(動态)資訊。當jobtracker接受作業時,這個頁面就開始存在,一直跟蹤在執行過程中的所有變化。你可以在事後對它進行分析工作的執行過程。這個頁面包含以下四個主要部分(第四個沒有顯示在圖3-5中):

第一部分(頁面的頂部)顯示了關于作業的所有合并後的資訊。包括作業名稱,使用者,送出到的主機,開始和結束時間等等。

第二部分包含了關于給定作業的mapper和reducer的彙總資訊。它告訴這個作業有多少個mapper和reducer。根據他們的狀态進行分割-挂起,運作,完成和死亡。

第三部分顯示作業的計數器(對計數器的深入讨論,請看第四章),根據名稱空間進行了分割。因為在這個示例實作沒有使用自定義,對它隻使用标準的計數器。

第四部分提供了很好的直方圖詳述了mapper和reducer的執行。

作業詳細頁面提供了更多的資訊(通過“超連結”),可以更深入的幫你分析作業的執行。這些頁面在第五章讨論建構可靠的mapreduce應用程式時進行了詳細的讨論。

接下來,我們來看一看mapreduce應用的設計。

圖3-5:wordcount作業頁面

如前所述,mapreduce的功能主要來自于它的簡單性。除了準備輸入資料之外,程式員隻需要操作mapper和reducer。現實中的很多問題都可以利用這種方法解決。

在大多數情況下, mapreduce可以作為一個通用的并行執行架構,充分利用資料的本地性。但是這種簡單性是有代價的,設計者必須從以特定的方式組合在一起的一小部分元件的角度決定如何表達他的業務問題。

注意:盡管很多著作描述了mapreduce的api的使用,很少描述用實際的方法來設計一個mapreduce應用。

重新制定mapreduce的初始問題,通常有必要回答以下問題:

1、    怎麼把一個大問題分解成多個小任務?更具體地說,你怎麼分解問題以至于這些小任務可以并行執行?

2、    你會選擇哪對key/value作為每個任務的輸出/輸出?

3、    你如何彙總計算所需要的所有資料?更具體地說, 你怎麼安排處理的方式,使所有必要的計算中的資料都同時在記憶體中?

我們要認識到,很多算法不能很容易地表示為一個單一的mapreduce作業。它往往需要把複雜的算法分解成一系列的作業,把其中一個作業的資料輸出成為下一個作業的輸入。

本節将會探讨在幾個設計不同的實際mapreduce應用問題的例子(從簡單到複雜)。所有的例子都會被描述為以下形式:

對問題簡單的描述

mapreduce作業的描述,包括:

²  mapper的描述

²  reducer的描述

在簡單的例子中,源資料被組織為一組獨立的記錄。結果可以以任何順序指定。這些類型的問題(”尴尬的并行“),需要以相對獨立的方式對每個資料元素進行相同的處理-換句話說,就是不需要合并或者聚合各自的結果。一個簡單的例子就是處理幾千個pdf檔案,提取一些關鍵的文本,放入到csv檔案中,然後導入到資料庫中。

這種情況下的mapreduce實施是非常簡單的――唯一需要的就是mapper,單獨的處理每個記錄然後輸出結果。在這個例子中,mapreduce控制mappers的分布,提供排程和錯誤處理的所有支援。下面的例子展示了如何設計這種類型的應用程式。

雖然不是經常作為hadoop-related問題讨論,但是圖像處理應用在mapreduce範例中是非常合适的。假設有一個人臉識别算法的應用,需要一個圖像,識别一系列想要的特性,并産生一組識别結果。再假設需要在百萬圖檔上做人臉識别。如果所有的圖檔以序列檔案的形式存放在hadoop中,那麼你可以用一個簡單的map作業就可以實作并行處理。在這個例子中,輸入的key/value是imageid/image,輸出的key/value是imageid/可特征識别清單。此外,一組可特征識别必須分布到所有的mapper(例如,利用分布式緩存)。

表3-1展示這個例子中mapreduce作業的實施

表3-1:人臉識别作業

mapper

在這個作業中,mapper首先以可識别特征集進行初始化,對于每一個圖像,一個map函數通過它的圖像本身,以及可識别的清單來調用的人臉識别算法。識别的結果連同原來imageid一起從map中輸出。

result

這個作業執行的結果是所有包含在原始圖檔中識别出來的圖檔。

注意:要實作完全獨立的mappers/reducers。在mapreduce應用中的每一個mapper/reducer需要建立獨自的輸出檔案。這意味着,人臉識别的作業的執行結果将是一組檔案(相同目錄下的),每一個包含了各自mapper的輸出。如果需要把他們放入到一個單個的檔案中。必須在人臉識别作業中添加一個單獨的reducer。這個reducer是非常簡單的。因為在這個例子中,每一個作為reduce的輸入的key隻有一個單獨的value(這裡假設圖像的id是唯一的),reducer隻是把輸入的key/value直接寫入到輸出檔案。我們要知道在這個例子中盡管一個reducer極其簡單,但是這種額外的作業明顯的增加了作業的整體運作時間。這是因為額外的reducer分為shuffle和sort(不單單在map作業中出現),當圖像的數量非常大時,将花費大量的時間。

接下來看一個比較複雜的例子,map執行的結果必須在一起分組(也就是,以某種方式排序)。很多實際的應用(包括過濾,解析、資料轉換,總結等)可以用這種mapreduce作業解決。

4、    怎麼把一個大問題分解成多個小任務?更具體地說,你怎麼分解問題以至于這些小任務可以并行執行?

5、    你會選擇哪對key/value作為每個任務的輸出/輸出?

6、    你如何彙總計算所需要的所有資料?更具體地說, 你怎麼安排處理的方式,使所有必要的計算中的資料都同時在記憶體中?

這種情況的一個例子就是建構反向索引。這種類型的問題需要所有的mapreduce步驟進行執行,需要shuffle和sort把所有的結果集合在一起。下面的例子展示了如何設計這種類型的應用。

在計算機科學中,反向索引是一個資料架構,用來存放了從内容(例如單詞或者數字)到它在一個文檔或一組文檔裡的位置的映射,如表3-6所示。反向索引的目的是實作快速的全文搜尋,在文檔增加的時候增加處理成本為代價,反向索引式的資料結構是典型搜尋引擎的關鍵部分,優化了查找某些單詞出現的文檔的速度。

文檔

id

title

content

1

popular

football is popular in us

2

common sport

soccer is commonly played in europe

3

national sport

cricket is played all over india

...

反向索引

term

value

document

sport

common

national

football

is

表3-6:反向索引

要建立反向索引,可以把每個文檔(或者文檔裡行)給mapper。mapper可以解析出文檔裡的多個單詞,然後輸出[單詞,詞頻]鍵值對。reducer可以隻是一個識别,輸出清單或者可以執行每個單詞的一些統計彙總的功能。

注釋在第九章你将學會更多關于如何利用hbase來存儲倒排的索引。

表3-2裡展示了這個例子中mapreduce作業的實作。

表3-2 反向索引的計算

處理階段

描述

作業中,mapper的任務是建構一個包含一個單詞索引的獨特的記錄和描述在文檔裡單詞出現的資訊。它讀取每個輸入的文檔,解析,然後為文檔裡的每一個獨特的單詞建立一個索引描述符。該描述符包含文檔的id,文檔裡索引出現的次數,和任何附件的資訊(比如從文檔的開頭索引位置的偏移量) ,每一個是以描述符被寫出。

shuffle和sort

mapreduce的shuffle和sort過程會把所有的記錄都按照索引值排序,確定reducer接受到所有相同key值的索引。

reducer

這項工作中,reducer的作用是建構一個反向索引結構。根據系統的要求,可能有一個或多個reducer。reducer得到所有給定索引的描述符,并生成一個索引記錄,并寫入到指定的索引存儲。

該作業執行的結果是一組原始文檔的反向索引。

更多複雜的mapreduce應用需要将來自多個擷取的資料(就是說連接配接資料)進行處理。

在疊代式的mapreduce應用中,在循環中通常一個或者多個mapreduce作業會被執行。也就是說,這樣的應用程式要麼使用一個内部實作疊代邏輯的驅動程式來實作,并在疊代循環中調用需要的mapreduce作業,要麼使用是在一次循環中運作mapreduce作業的一個外部腳本和檢查轉換的标準。(另一個選擇方式是使用工作流引擎,第六章至第八章細述了hadoop下的工作流引擎apache oozie)。使用一個驅動程式執行疊代邏輯提供了一個更靈活的解決方案,使你能夠利用内部變量和java的全部功能去實作疊代和轉換的檢查。

         疊代算法的典型例子是解決一個線性方程組。接下來你将看到如何利用mapreduce設計這樣一個算法。

解決線性方程組示例

許多實際的問題都可以表示為解決線性方程類似的問題。至少可以歸納到這種方法裡。

1、  可以利用線性方程解決優化的問題

2、  近似問題(例如,多項式曲線)

注解:這個例子是在gene kalmens(一個在nokia的同僚)幫助下完成的。

要有效的解決線性方程問題,問題的大小是非常重要的---大緻有成百上千或者更多變量是個挑戰。這種情況下,另一種選擇是使用t位元組内容的超級計算機,或者使用允許零散計算的算法,而不需要把完整的矩陣放到記憶體中。遵循這些要求的這類算法是可以提供相近解決方案的疊代方法。它的性能和疊代的次數有關,在要求的精度内找到一個答案是可行的。

這些類型的算法中,當系數矩陣正确的時候,共轭梯度(cg)方法提供了最好的性能。下面是線性方程組的基本方程:

ax=b

使用共轭梯度,可以實作最速下降法,适用于如下定義的rn中的二次曲面。

f(x) = 1512'> xtax −xtb, x 15鈭?/m:t>'> rn

在端點方向上每一步都改善了解向量。對于前面步驟中的所有向量,每一步得到的向量都共轭與a。

cg算法有以下步驟:

1、  選擇初始向量x0,為了簡單起見,可以設定為0.

2、  計算初始的殘差向量r0(例如,r0=b=ax0).

3、  選擇初始的查找方向p0-r0.

4、  循環如下:

a)         計算系數:ak=(rktrk)/ (pktapk).

b)         找到下一個x的近似值:xk+1=xk+akpk。

c)         計算新的殘差向量:rk+1= rk+1+akapk。

d)         如果abs(rk+1) 的在公差内,則結束循環。

e)         計算出标量(scalar),推斷下一個搜尋方向:bk=(rk+1trk+1)/(rktrk)。

f)          推斷下一個搜尋方向:pk+1=r k+1+b kp k。

5、循環結束

結果就是x k+1。

在這個算法實作中唯一“昂貴”的操作是殘差向量的計算(步驟2和4c)。因為此步驟需要矩陣向量乘。用mapreduce可以很容的實作這個操作。

確定你有兩個hbase表-----一個存放矩陣a,另一個存放所有的向量。如果矩陣a是稀疏矩陣,一個合理的hbase資料模型如下:

1、  每個表的行代表一個矩陣的行。

2、  給定矩陣的行中的所有元素都存儲在一個單獨的列族,列名稱對應于給定矩陣元素的列。

雖然對于實作向量乘法,矩陣的列不需要明确的列元素。但是如有有必要設定或更新單個元素的時候,這個表的布局可能會很友善。

一個表示向量的合理的hbase表如下所示:

1、  每個表的行代表單個的向量。

2、  給定向量的所有元素都存儲在單獨的列族,列名稱對應用一個向量索引,列值對應于用于索引的矢量值。

雖然從技術角度上講,存儲不同行值的作為向量索引的向量可以使用不同的表。提出的這個模型可以使讀寫向量非常的快(單行讀寫),同時減少了打開hbase連接配接的數量。

hbase表合理的設計,使mapreduce矩陣向量的實作變得相當簡單。一個mapper就能完成任務。矩陣向量乘的mapreduce作業如表3-7所示。

表3-7:矩陣向量乘法作業

作業中,mapper第一次初始化為向量的值,對于矩陣的每一行,計算出源向量和矩陣的行的乘向量。結果值(不為0)存儲在結果向量的索引r中。

在這個實作中,mapreduce的dirver執行了先前描述的算法,每次相乘,都需要調用矩陣向量乘的mapreduce作業。

雖然這裡描述的算法實作非常簡單和直接。為了能使cg(共轭梯度)能夠使用,必須滿足下列條件:

1、  矩陣a必須是正定矩陣。它提供了帶有一個極值點的凸表面。那意味着這種方法會收斂于任何選擇的初始向量x0。

2、  矩陣a必須是對稱的,這樣可以確定在每一步的過程中都存在與a正交的向量。

如果矩陣a不是對稱的,通過下面的代替初始方程可以變成對稱的。

atax=atb

ata是對稱和正定的。是以上面描述的算法可以使用。使用初始算法,在計算新的系數矩陣ata時,會造成很大的性能損失。另外,這種方法的收斂也會受到影響,因為k(ata)=k(a)2。

幸運的是,在前面你可以選擇不計算ata。而是修改前面算法中步驟2,4a,4c,如下所示:

步驟2-計算atax0,用兩個矩陣向量乘:at(ax0)。

步驟4a-計算分母pktatapk,注意它和(apk)2是相等的,此步計算的實質上是矩陣向量積和本身内積的結果。

步驟4c-和步驟2類似,計算兩個矩陣向量乘:atapk。

是以,整體算法實作的第一步必須檢查矩陣是否是對稱的。如果是,原先的算法就可以使用,如果不是,就要使用改進的算法。

除了第一個作業之外,整個算法的實作需要更多的mapreduce作業-矩陣轉置。矩陣轉置的mapreduce作業如表3-8所示:

表3-8:矩陣轉置作業

作業中,對于矩陣的每一行(r),每個元素(r,j)都将作為矩陣元素(j,r)寫入到結果矩陣裡面。

注意,在這個例子中,算法的轉換準則也是算法本身的重要組成部分。在接下來的例子中,使用了hadoop特有的技術進行轉換标準的計算。

stranding divalent link的例子(滞留二價連接配接)

一個相當普遍的mapper問題就是滞留二價連結問題

注釋 這個例子是在一個nokia的同僚dmitry mikhelson的幫助下準備的。

如果兩個連接配接的鍊路是通過一個bivalent節點連接配接,則被稱為二價的。二價節點是一個隻有兩個連接配接的節點,例如,在圖3-9中,節點n6,n7,n8和n9是二價節點,鍊路l5,l6,l7,l8和l9也是二價的。二價的退化情況是鍊路l4。

圖3-9:二價鍊路的例子

如圖3-9所示,計算二價鍊路延伸的算法看起來非常簡單-在兩個非二價節點之間的任何延伸的鍊路是稱為二價鍊路的延伸。

要實作該算法,假定:

1、  一個節點描述為擁有鍵值ni 15-nni'> 的对象n,比如,节点n1可以被描述为 15nn1'> ,n2为 15nn2'> 。

2、  鍊路被描述為一個擁有鍵值li~ 15lli'> 的对象l,比如,链路l1可以被描述ll1,l2为ll2等等。一个链接对象包含引用它开始和结束的节点( 15nni'> , 15nnj'> )。

3、  也引進一個對象類型的鍊路(s)或節點(ln),可以有任何鍵值,可以包含節點或者一個或多個鍊路。

4、  在定義一個類型-一個連結的鍊。這種資料類型包含在串中的一個有連結的鍊路清單。

當有了這些,這個二價鍊路strand的算法看起來如下:

1、  建構一個局部的二價串集合

2、  循環以下步驟:

a)         合并局部串(strands)

b)         如果沒有可以結合的局部串,跳出循環。

3、結束循環

實際的實作過程包含兩個mapreduce作業。一個是準備初始的strands(串),另一個(在循環中執行)是合并局部的strands(串)。在這個例子中。串的合并是作為mapreduce作業執行的一部分完成的。是以,這些作業(不是driver)知道在執行過程中合并了多少個局部strand。幸運的是,hadoop在驅動和mapreduce執行中間提供了一個簡單的機制-計數器。

注釋:hadoop提供輕量級的對象(計數器)去收集和作業相關的權值/統計資訊。這些東西在mapreduce作業中的任何地方是可以設定和通路的。

這個例子的第一個mapreduce作業的實作如表3-9所示。

表3-9 無二價節點作業的清除

作業中mapper的任務是建立不屬于源記錄( 15nni鍜?/m:t>>lli'> )的 15lnni'> 记录。它读取每一个输如的记录,然后查看对象类型。如果是有键值ni的节点,一个新类型的 15lnni'> 会被输出。如果是一个链接,所有的邻接节点(ni和nj)会从链接中提取出来,两个记录( 15lnni鍜?/m:t>lnnj'> )會寫出來。

mapreduce的shuffle和sort會把基于節點key值的所有記錄進行排序,確定每個節點相鄰的所有鍊路隻被一個單獨的reducer處理,對于指定的key值,每個reducer将會同時得到所有的ln記錄。

作業中,reducer的任務是删除無二價性的節點,然後建立部分的鍊路的連結。對于給定的節點key值,reducer讀取所有的ln記錄然後把他們存儲到記憶體中。如果這個節點的鍊路的數量是2,那麼這就是個2價節點,一個新的鍊(合并這兩個節點)會寫入到輸出檔案中(例如,看l5,l6和l7,l8鍊路對)。如果鍊路的數量不是2(他可能是一個終結點或是多個連結的節點),那麼它就是非二價節點。對于這種類型的節點,一個包含唯一一個鍊路的特殊的串被建立,連結到這個非二價節點(例如,l4或l5),這種節點的鍊的數量同連結到這個節點的鍊路是相等的。

結果

這個作業的執行結果包括部分二價節點的記錄,一些這樣的節點可以重複(例如,l4,可以出現兩次-從過程n4到n5)。

這個例子的第二個mapreduce作業的實作如表3-10所示

表3-10 :合并部分strands作業

作業中,mapper的任務是把擁有相同鍊路的strand放到一塊。對于他讀取的每一個strand,你産生一些key/value鍵值對。key的值是在strand中鍊路的key。strands的value值是strands本身。

mapreduce的shuffle和sort會把把所有來自終鍊路鍵值的記錄進行排序,確定相同鍊路id的所有strand記錄同時來之相同的reducer。

作業中,reducer的任務是把擁有相同鍊路id的strand進行合并。對于每一個鍊路id,所有的strands被載入到記憶體中,然後按照strand的類型進行處理:

如果兩個strands包含相同的鍊路,産生的strand(鍊)是完整的,可以直接寫入到最終結果目錄。

否則生成的鍊(strand(鍊中)包含所有獨特的鍊)被建立,然後寫入到輸出檔案中做進一步處理。這種情況下,要被處理的計數器增加。

該作業的執行結果是在一個單獨的目錄中包含了所有完整的鍊。

這裡給出的例子就是開始潛在的使用mapreduce來解決現實世界的問題。接下來,仔細看看那些情況下是使用mapreduce是合理的,哪些是不合适的。

如上讨論的那樣,mapreduce是在大量資料的情況下解決簡單問題的技術,而且必須以并行的方式處理(最好是多台機器)。這個概念的整體思路是在現實的時間架構上對大規模資料集進行計算。

另外,mapreduce可以用來并行密集型計算,并不是和資料量相關,而是和整體的計算時間(一般是“高度并行”情況下的計算)有關。

為了能使mapreduce可以應用,下面必須符合:

1、  要運作的計算必須可以組合,它指的是必須能對資料集下的小資料集進行計算。然後對部分結果合并。

2、  資料集的大小要足夠大(或者計算時間要足夠長),當基礎設施  為獨立的計算和合并結果不會對整體性能造成影響。

3、  計算主要取決于于正在處理的資料集。用hbase可以額外添加小的資料集。分布式緩存或者一些其他的技術。

然而,當資料集必須能随機的被通路去執行操作(例如,如果一個給定的資料集記錄必須加上額外的記錄來執行操作),在這種情境中,mapreduce是不适用的。然後在這種情況下,可以運作額外的mapreduce作業來為計算“準備”資料。

另外一些不适用mapreduce的問題是遞歸問題(例如,斐波那契問題)。在這種情況下,mapreduce不适用是因為目前value值的計算需要前一個的知識。這就意味着你不能把它們分解成為可以單獨運作的子計算(sub computation)。

如果一個資料足夠的小,小到可以放到一個機器的記憶體裡,作為一個獨立的應用程式可能會處理的更快。在這種情況下,使用mapreduce,會使執行變得不必要的複雜,通常會更慢。

注意,(keep it in mind),雖然一大類的算法不能直接應用在mapreduce的實施上。但是對于同樣的基本問題,往往存在可以通過利用mapreduce解決的替代解決方案。這種情況下,使用mapreduce通常是有利的,因為mapreduce是在有豐富的hadoop生态系統中執行的(支援更容易的改進的實施),并與其它應用程式的內建。

最後 你應該記住mapreduce本質上是一個批處理實作。決不能用于線上計算(比如線上使用者請求的實時計算)。

當你設計mapreduce應用的時候,下面列舉的是需要注意和避免的。

1、  擁有過多的mapper會造成排程和基礎設施的開銷,在極端情況下,甚至會殺死一個jobtracker。另外,過多的mapper通常會提高整體資源的使用率(因為建立過多的jvm)和執行時間(因為執行slot的數量是有限的)。

2、  mapper太少會導緻叢集不能充分利用,給一些節點(實作運作mapper的節點)造成過度負載。此外,在有大型map任務情況下,重試和推測執行的情況會變得非常昂貴的代價且會花費更長的時間。

3、  大量小型的mapper會造成大量的尋求,shuffle map輸出給reducer的結果時。當把map的輸出結果傳遞給reducer時,它也會造成過多的連接配接。

1、  除了排程和基礎設施的開銷外,大量的reducer會建立太多的輸出檔案(記住,每個reducer建立自己的輸出檔案),對namenode有負面的影響。當有其他作業利用該mapreduce作業的結果時,它會變得更為複雜。

2、  太少的reducer和太少的mapper一樣,造成同樣的負面影響-不能充分利用叢集和非常昂貴(代價)的回調。(retry)

1、  計數器在跟蹤少量的,重要的,全局的資訊是适用的(在chapter 5了解更多關于使用計數器的詳情)。他們絕對不是隻是整合非常細粒度統計的應用程式。

2、  計數器的代價非常高,因為jobtracker在應用程式的整個持續時間内,必須維持每個map/reduce任務的每一個計數器。

1、  盡量避免在map和reduce方法中添加新的類的執行個體。這些方法在執行過程中會循環執行多次。也就是說類的建立和處理将增加執行的時間,為垃圾收集器增加額外的工作。比較好的方法是在相應的set()方法中建立大量的中間類,然後重寫map和reduce方法。

2、  不要用分布式緩存來移動大數量的工件或者非常大的工件(每個百兆位元組)。分布式緩存的設計是用來分布小部分中等大小的工件,幾兆到幾十兆大小。

3、  處理少量的資料時,不要建立成百上千個小作業式的工作流。

4、  不直接從reducer或者mapper直接寫入使用者自定義的檔案。hadoop中目前實作檔案寫的功能是單線程的(從第二章擷取更多細節),這意味着當多個mapper/reducer試圖寫檔案時,這個執行将被序列化。

5、  不要建立這樣的mapreduce功能,掃描一個hbase表來建立一個新的hbase表(或者寫入同樣的表中)  。tableinputformat是為基于具有時間敏感性的表掃描的hbase和mapreduce的實作。  另一方面,hbase寫功能會因為hbase表的分割而産生一定的寫延遲。 結果是region伺服器會挂掉,然後你會失去一些資料。最好的解決方案是把作業分割成兩個作業。一個掃描表并想hdfs中寫入中間結果。另一個從hdfs讀取資料并寫入到hbase中。  

本章節讨論了mapreduce架構,了解了mapreduce的整體架構和mapreduce管道的執行方式。也學習了如何設計mapreduce應用和mapreduce遇到的問題的類型。最後學習了怎麼編寫和執行一個簡單的mapreduce應用以及在執行過程中都發生了什麼。

現在你知道了如何設計一個mapreduce應用、編寫和執行一個簡單的應用。第四章考察了mapreduce管道中不同元件的定制方法。使您能夠更好的利用mapreduce環境。