工程結構:
在整個案例過程中,代碼如下:
wordcountmapper的代碼如下:
package cn.toto.bigdata.mr.wc;
import java.io.ioexception;
import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.mapper;
/**
* 這裡的mapper是hadoop-mapreduce-client-core-2.8.0.jar中的内容
* mapper<keyin, valuein, keyout, valueout>
* keyin :是指架構讀取到的資料的key的類型,在預設的inputformat下,讀到的key是一行文本的起始偏移量,是以key的類型是long
* valuein :是指架構讀取到的資料的value的類型,在預設的inputformat下,讀到的value是一行文本的内容,是以value的類型是string
* keyout :是指使用者自定義邏輯方法傳回的資料中key的類型,由使用者業務邏輯決定,在此wordcount程式中,我們輸出的key是單詞,是以是string
* valueout :是指使用者自定義邏輯方法傳回的資料中value的類型,由使用者業務邏輯決定,在此wordcount程式中,我們輸出的value是單詞的數量,是以是integer
*
* 但是,string,long等jdk中自帶的資料類型,在序列化是,效率比較低,hadoop為了提高序列化效率,自定義了一套序列化架構,
* 是以,在hadoop的程式中,如果該資料需要進行序列化(寫磁盤,或者網絡傳輸),就一定要用實作了hadoop序列化架構的資料類型
* long ----> longwritable
* string ----> text
* integer ----> intwritable
* null ----> nullwritable
*/
public class wordcountmapper extends mapper<longwritable, text, text, intwritable> {
/**
* 這就是mapreduce架構中一個主體運作程式maptask所要調用的使用者業務邏輯方法
* maptask會驅動inputformat去讀取資料(keyin,valuein),每讀到一個kv對,就傳入這個使用者寫的map方法中調用一次
* 在預設的inputformat實作中,此處的一個key就是一行的起始偏移量,value就是一行的内容
*/
@override
protected void map(longwritable key, text value, mapper<longwritable, text, text, intwritable>.context context)
throws ioexception, interruptedexception {
string line = value.tostring();
string[] words = line.split(" ");
for (string word : words) {
context.write(new text(word), new intwritable(1));
}
}
}
wordcountreducer的代碼如下:
import org.apache.hadoop.mapreduce.reducer;
public class wordcountreducer extends reducer<text, intwritable, text, intwritable> {
/** reducetask在調我們寫的reduce方法
reducetask應該收到了前一階段(map階段)中所有maptask輸出的資料中的一部分
(資料的key.hashcode%reducetask數==本reductask号)
reducetask将這些收到kv資料拿來處理時,是這樣調用我們的reduce方法的:
先将自己收到的所有的kv對按照k分組(根據k是否相同)
将某一組kv中的第一個kv中的k傳給reduce方法的key變量,把這一組kv中所有的v用一個疊代器傳給reduce方法的變量values
*/
protected void reduce(text key, iterable<intwritable> values,
reducer<text, intwritable, text, intwritable>.context context) throws ioexception, interruptedexception {
int count = 0;
for(intwritable v : values) {
count += v.get();
context.write(key, new intwritable(count));
wordcountdriver的代碼如下:
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.input.textinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
import org.apache.hadoop.mapreduce.lib.output.textoutputformat;
* 本類是用戶端用來指定wordcount job程式運作時所需要的很多參數:
* 比如,指定用哪個元件作為資料讀取器、資料結果輸出器
* 指定用哪個類作為map階段的業務邏輯類,哪個類作為reduce階段的業務邏輯類
* 指定wordcount job程式的jar包所在路徑
* ....
* 運作前準備工作
* 1、将目前的工程導出成wordcount.jar
* 2、準備/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容類似:
* the true
nobility is
in being
superior to
your previous
self guess
3、将 /home/toto/software/wordcount通過 hadoop fs -put wordcount /wordcount 上傳到hdfs中
*
* 以及其他各種需要的參數
* hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.wordcountdriver
* 上面的指令等同:
* java -cp wordcount.jar cn.toto.bigdata.mr.wc.wordcountdriver
* 上面的含義是通過hadoop jar将hadoop classpath的jar都拷貝到應用中,并且指定執行cn.toto.bigdata.mr.wc.wordcountdriver
*
* 最後檢視結果的方式是:hadoop fs -cat /wordcount/output/part-r-00000,通過這個指令可以檢視檢視到
public class wordcountdriver {
public static void main(string[] args) throws exception {
configuration conf = new configuration();
conf.set("fs.defaultfs", "hdfs://hadoop:9000");
/*conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "mini1");*/
job job = job.getinstance(conf);
//告訴架構,我們的程式所在jar包的路徑
// job.setjar("c:/wordcount.jar");
job.setjarbyclass(wordcountdriver.class);
//告訴架構,我們的程式所用的mapper類和reducer類
job.setmapperclass(wordcountmapper.class);
job.setreducerclass(wordcountreducer.class);
//告訴架構,我們的mapperreducer輸出的資料類型
job.setmapoutputkeyclass(text.class);
job.setmapoutputvalueclass(intwritable.class);
job.setoutputkeyclass(text.class);
job.setoutputvalueclass(intwritable.class);
// 告訴架構,我們的資料讀取、結果輸出所用的format元件
// textinputformat是mapreduce架構中内置的一種讀取文本檔案的輸入元件
job.setinputformatclass(textinputformat.class);
job.setoutputformatclass(textoutputformat.class);
// 告訴架構,我們要處理的檔案在哪個路徑下,注意若hdfs中已經有了/wordcount/input/這個檔案,說明
fileinputformat.setinputpaths(job, new path("/wordcount/input/"));
// 告訴架構,我們的處理結果要輸出到哪裡去
fileoutputformat.setoutputpath(job, new path("/wordcount/output/"));深
boolean res = job.waitforcompletion(true);
system.exit(res?0:1);
}
運作前的準備工作:
運作前準備工作
1、将目前的工程導出成wordcount.jar
2、準備/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容類似:
the true
3、将 /home/toto/software/wordcount通過 hadoop fs -put wordcount /wordcount 上傳到hdfs中
最後,可以執行的指令是:
hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.wordcountdriver
執行後的效果如下:
b:使用wordcount本地運作,并且使用combiner的案例(主要改變是在wordcountdriver中),代碼如下:
準備工作:
在e盤下準備e:/wordcount/input/a.txt,其中的内容如下:
右鍵運作上面的代碼,進入:
e:\wordcount\output\part-r-00000中看結果,結果内容如下:
經過上面的所有步驟之後,程式已經編寫完成
總結:
(1)combiner是mr程式中mapper和reducer之外的一種元件
(2)combiner元件的父類就是reducer
(3)combiner和reducer的差別在于運作的位置:
combiner是在每一個maptask所在的節點運作
reducer是接收全局所有mapper的輸出結果;
(4) combiner的意義就是對每一個maptask的輸出進行局部彙總,以減小網絡傳輸量
具體實作步驟:
1、 自定義一個combiner繼承reducer,重寫reduce方法
2、 在job中設定: job.setcombinerclass(customcombiner.class)
(5) combiner能夠應用的前提是不能影響最終的業務邏輯
而且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來
<a target="_blank"></a>
combiner的使用要非常謹慎
因為combiner在mapreduce過程中可能調用也肯能不調用,可能調一次也可能調多次
是以:combiner使用的原則是:有或沒有都不能影響業務邏輯
===============================================================================
流量統計和自定義類實作序列化案例:
運作條件模拟:
1、配置環境變量為hadoop_home=e:\learntempfolder\hadoop-2.7.3
2、從csdn資源上下載下傳支援win10版本的:e:\learntempfolder\hadoop-2.7.3\bin\winutils.exe 和 e:\learntempfolder\hadoop-2.7.3\bin\hadoop.dll
界面效果如下:
3、準備要處理的資料:
http_20130313143750.dat 資料檔案的具體内容如:
4、先運作flowsum(右鍵執行java程式)
運作生成的檔案為e:\learntempfolder\flow\output\part-r-00000,内容如下:
5、運作flowsumsort(注意不要删除上面的part-r-00000)
運作後産生的檔案内容是:
當然,我們也可以一次性求和并運算出結果輸出到指定的檔案目錄中,代碼如下:
到"e:/flow/sortout/"目錄下,檢視結果:
即:
6、為不同的手機号設定分區,讓不同的手機号在不同的檔案中。方法如下:
a:下面是自定義分區,自定分區的代碼如下:
b:測試一下自定義分區:
c:運作所需的準備:
資料檔案:
檔案内容如下:
運作後的結果如下:
part-r-00001中内容:
等等