天天看點

《Hadoop實戰第2版》——3.4節Hadoop流

3.4 hadoop流

hadoop流提供了一個api,允許使用者使用任何腳本語言寫map函數或reduce函數。hadoop流的關鍵是,它使用unix标準流作為程式與hadoop之間的接口。是以,任何程式隻要可以從标準輸入流中讀取資料并且可以寫入資料到标準輸出流,那麼就可以通過hadoop流使用其他語言編寫mapreduce程式的map函數或reduce函數。

舉個最簡單的例子(本例的運作環境:ubuntu,hadoop-0.20.2):

從這個例子中可以看到,hadoop流引入的包是hadoop-0.20.2-streaming.jar,并且具有如下指令:

-input 指明輸入檔案路徑

-output 指明輸出檔案路徑

-mapper 指定map函數

-reducer 指定reduce函數

hadoop流的操作還有其他參數,後面會一一列出。

3.4.1 hadoop流的工作原理

先來看hadoop流的工作原理。在上例中,map和reduce都是linux内的可執行檔案,更重要的是,它們接受的都是标準輸入(stdin),輸出的都是标準輸出(stdout)。如果大家熟悉linux,那麼對它們一定不會陌生。執行上一節中的示例程式的過程如下所示。

程式的輸入與wordcount程式是一樣的,具體如下:

wc指令用來統計檔案中的行數、單詞數與位元組數,可以看到,這個結果是正确的。

hadoop流的工作原理并不複雜,其中map的工作原理如圖3-4所示(reduce與其相同)。

《Hadoop實戰第2版》——3.4節Hadoop流

當一個可執行檔案作為mapper時,每一個map任務會以一個獨立的程序啟動這個可執行檔案,然後在map任務運作時,會把輸入切分成行提供給可執行檔案,并作為它的标準輸入(stdin)内容。當可執行檔案運作出結果時,map從标準輸出(stdout)中收集資料,并将其轉化為對,作為map的輸出。

reduce與map相同,如果可執行檔案做reducer時,reduce任務會啟動這個可執行檔案,并且将對轉化為行作為這個可執行檔案的标準輸入(stdin)。然後reduce會收集這個可執行檔案的标準輸出(stdout)的内容。并把每一行轉化為對,作為reduce的輸出。

map與reduce将輸出轉化為對的預設方法是:将每行的第一個tab符号(制表符)之前的内容作為key,之後的内容作為value。如果沒有tab符号,那麼這一行的所有内容會作為key,而value值為null。當然這是可以更改的。

值得一提的是,可以使用java類作為map,而用一個可執行程式作為reduce;或使用java類作為reduce,而用可執行程式作為map。例如:

3.4.2 hadoop流的指令

hadoop流提供自己的流指令選項及一個通用的指令選項,用于設定hadoop流任務。首先介紹一下流指令。

hadoop流指令選項

《Hadoop實戰第2版》——3.4節Hadoop流

表3-1所示的hadoop流指令中,必選的4個很好了解,分别用于指定輸入/輸出檔案的位置及map/reduce函數。在其他的可選指令中,這裡我們隻解釋常用的幾個。

-file

-file指令用于将檔案加入到hadoop的job中。上面的例子中,cat和wc都是linux系統中的指令,而在hadoop流的使用中,往往需要使用自己寫的檔案(作為map函數或reduce函數)。一般而言,這些檔案是hadoop叢集中的機器上沒有的,這時就需要使用hadoop流中的-file指令将這個可執行檔案加入到hadoop的job中。

這個指令用來加入combiner程式。

這兩個指令用來設定輸入輸出檔案的處理方法,這兩個指令後面的參數必須是java類。

hadoop流通用的指令選項

hadoop流的通用指令用來配置hadoop流的job。需要注意的是,如果使用這部配置設定置,就必須将其置于流指令配置之前,否則指令會失敗。這裡簡要列出指令清單(如表3-2所示),供大家參考。

《Hadoop實戰第2版》——3.4節Hadoop流

3.4.3 兩個例子

從上面的内容可以知道,hadoop流的api是一個擴充性非常強的架構,它與程式相連的部分隻有資料,是以可以接受任何适用于unix标準輸入/輸出的腳本語言,比如bash、php、ruby、python等。

下面舉兩個非常簡單的例子來進一步說明它的特性。

bash

mapreduce架構是一個非常适合在大規模的非結構化資料中查找資料的程式設計模型,grep就是這種類型的一個例子。

在linux中,grep指令用來在一個或多個檔案中查找某個字元模式(這個字元模式可以代表字元串,多用正規表達式表示)。

下面嘗試在如下的資料中查找帶有hadoop字元串的行,如下所示。

輸入檔案為:

顯然,這個結果是正确的。

python

對于python來說,情況有些特殊。因為python是可以編譯為jar包的,如果将程式編譯為jar包,那麼就可以采用運作jar包的方式來運作了。

不過,同樣也可以用流的方式運作python程式。請看如下代碼:

注意其中的aggregate是hadoop提供的一個包,它提供一個reduce函數和一個combine函數。這個函數實作一些簡單的類似求和、取最大值最小值等的功能。