天天看點

使用Mapreduce案例編寫用于統計文本中單詞出現的次數的案例、mapreduce本地運作等,Combiner使用及其相關的知識,流量統計案例和流量總和以及流量排序案例,自定義Partitioner

工程結構:

使用Mapreduce案例編寫用于統計文本中單詞出現的次數的案例、mapreduce本地運作等,Combiner使用及其相關的知識,流量統計案例和流量總和以及流量排序案例,自定義Partitioner

在整個案例過程中,代碼如下:

wordcountmapper的代碼如下:

package cn.toto.bigdata.mr.wc;

import java.io.ioexception;

import org.apache.hadoop.io.intwritable;

import org.apache.hadoop.io.longwritable;

import org.apache.hadoop.io.text;

import org.apache.hadoop.mapreduce.mapper;

/**

 * 這裡的mapper是hadoop-mapreduce-client-core-2.8.0.jar中的内容

 * mapper<keyin, valuein, keyout, valueout>

 * keyin     :是指架構讀取到的資料的key的類型,在預設的inputformat下,讀到的key是一行文本的起始偏移量,是以key的類型是long

 * valuein   :是指架構讀取到的資料的value的類型,在預設的inputformat下,讀到的value是一行文本的内容,是以value的類型是string

 * keyout    :是指使用者自定義邏輯方法傳回的資料中key的類型,由使用者業務邏輯決定,在此wordcount程式中,我們輸出的key是單詞,是以是string

 * valueout  :是指使用者自定義邏輯方法傳回的資料中value的類型,由使用者業務邏輯決定,在此wordcount程式中,我們輸出的value是單詞的數量,是以是integer

 *

 * 但是,string,long等jdk中自帶的資料類型,在序列化是,效率比較低,hadoop為了提高序列化效率,自定義了一套序列化架構,

 * 是以,在hadoop的程式中,如果該資料需要進行序列化(寫磁盤,或者網絡傳輸),就一定要用實作了hadoop序列化架構的資料類型

 * long       ----> longwritable

 * string     ----> text

 * integer    ----> intwritable

 * null       ----> nullwritable

 */

public class wordcountmapper extends mapper<longwritable, text, text, intwritable> {

         /**

          * 這就是mapreduce架構中一個主體運作程式maptask所要調用的使用者業務邏輯方法

          * maptask會驅動inputformat去讀取資料(keyin,valuein),每讀到一個kv對,就傳入這個使用者寫的map方法中調用一次

          * 在預設的inputformat實作中,此處的一個key就是一行的起始偏移量,value就是一行的内容

          */

         @override

         protected void map(longwritable key, text value, mapper<longwritable, text, text, intwritable>.context context)

                            throws ioexception, interruptedexception {

                   string line = value.tostring();

                   string[] words = line.split(" ");

                   for (string word : words) {

                            context.write(new text(word), new intwritable(1));

                   }

         }

}

wordcountreducer的代碼如下:

import org.apache.hadoop.mapreduce.reducer;

public class wordcountreducer extends reducer<text, intwritable, text, intwritable> {

         /** reducetask在調我們寫的reduce方法

                   reducetask應該收到了前一階段(map階段)中所有maptask輸出的資料中的一部分

                   (資料的key.hashcode%reducetask數==本reductask号)

                   reducetask将這些收到kv資料拿來處理時,是這樣調用我們的reduce方法的:

                          先将自己收到的所有的kv對按照k分組(根據k是否相同)

                          将某一組kv中的第一個kv中的k傳給reduce方法的key變量,把這一組kv中所有的v用一個疊代器傳給reduce方法的變量values

                    */

         protected void reduce(text key, iterable<intwritable> values,

                            reducer<text, intwritable, text, intwritable>.context context) throws ioexception, interruptedexception {

                   int count = 0;

                   for(intwritable v : values) {

                            count += v.get();

                   context.write(key, new intwritable(count));

wordcountdriver的代碼如下:

import org.apache.hadoop.conf.configuration;

import org.apache.hadoop.fs.path;

import org.apache.hadoop.mapreduce.job;

import org.apache.hadoop.mapreduce.lib.input.fileinputformat;

import org.apache.hadoop.mapreduce.lib.input.textinputformat;

import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;

import org.apache.hadoop.mapreduce.lib.output.textoutputformat;

 * 本類是用戶端用來指定wordcount job程式運作時所需要的很多參數:

 * 比如,指定用哪個元件作為資料讀取器、資料結果輸出器

 *     指定用哪個類作為map階段的業務邏輯類,哪個類作為reduce階段的業務邏輯類

 *     指定wordcount job程式的jar包所在路徑

 *     ....

 *     運作前準備工作

 *     1、将目前的工程導出成wordcount.jar

 *     2、準備/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容類似:

 *                  the true

                nobility is

                in being

                superior to

                your previous

                self guess

      3、将 /home/toto/software/wordcount通過 hadoop fs -put wordcount /wordcount 上傳到hdfs中

 *    

 *     以及其他各種需要的參數

 *     hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.wordcountdriver

 *     上面的指令等同:

 *     java -cp wordcount.jar cn.toto.bigdata.mr.wc.wordcountdriver

 *     上面的含義是通過hadoop jar将hadoop classpath的jar都拷貝到應用中,并且指定執行cn.toto.bigdata.mr.wc.wordcountdriver

 *     

 *     最後檢視結果的方式是:hadoop fs -cat /wordcount/output/part-r-00000,通過這個指令可以檢視檢視到

public class wordcountdriver {

   public static void main(string[] args) throws exception {

      configuration conf = new configuration();

      conf.set("fs.defaultfs", "hdfs://hadoop:9000");

      /*conf.set("mapreduce.framework.name", "yarn");

      conf.set("yarn.resourcemanager.hostname", "mini1");*/

      job job = job.getinstance(conf);

      //告訴架構,我們的程式所在jar包的路徑

      // job.setjar("c:/wordcount.jar");

      job.setjarbyclass(wordcountdriver.class);

      //告訴架構,我們的程式所用的mapper類和reducer類

      job.setmapperclass(wordcountmapper.class);

      job.setreducerclass(wordcountreducer.class);

      //告訴架構,我們的mapperreducer輸出的資料類型

      job.setmapoutputkeyclass(text.class);

      job.setmapoutputvalueclass(intwritable.class);

      job.setoutputkeyclass(text.class);

      job.setoutputvalueclass(intwritable.class);

      // 告訴架構,我們的資料讀取、結果輸出所用的format元件

      // textinputformat是mapreduce架構中内置的一種讀取文本檔案的輸入元件

      job.setinputformatclass(textinputformat.class);

      job.setoutputformatclass(textoutputformat.class);

      // 告訴架構,我們要處理的檔案在哪個路徑下,注意若hdfs中已經有了/wordcount/input/這個檔案,說明

      fileinputformat.setinputpaths(job, new path("/wordcount/input/"));

      // 告訴架構,我們的處理結果要輸出到哪裡去

      fileoutputformat.setoutputpath(job, new path("/wordcount/output/"));深

      boolean res = job.waitforcompletion(true);

      system.exit(res?0:1);

   }

運作前的準備工作:

運作前準備工作

1、将目前的工程導出成wordcount.jar

2、準備/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容類似:

                   the true

   3、将 /home/toto/software/wordcount通過 hadoop fs -put wordcount /wordcount 上傳到hdfs中

最後,可以執行的指令是:

hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.wordcountdriver

執行後的效果如下:

使用Mapreduce案例編寫用于統計文本中單詞出現的次數的案例、mapreduce本地運作等,Combiner使用及其相關的知識,流量統計案例和流量總和以及流量排序案例,自定義Partitioner
使用Mapreduce案例編寫用于統計文本中單詞出現的次數的案例、mapreduce本地運作等,Combiner使用及其相關的知識,流量統計案例和流量總和以及流量排序案例,自定義Partitioner

b:使用wordcount本地運作,并且使用combiner的案例(主要改變是在wordcountdriver中),代碼如下:

準備工作:

在e盤下準備e:/wordcount/input/a.txt,其中的内容如下:

右鍵運作上面的代碼,進入:

e:\wordcount\output\part-r-00000中看結果,結果内容如下:

經過上面的所有步驟之後,程式已經編寫完成

總結:

(1)combiner是mr程式中mapper和reducer之外的一種元件

(2)combiner元件的父類就是reducer

(3)combiner和reducer的差別在于運作的位置:

combiner是在每一個maptask所在的節點運作

reducer是接收全局所有mapper的輸出結果;

(4) combiner的意義就是對每一個maptask的輸出進行局部彙總,以減小網絡傳輸量

具體實作步驟:

    1、 自定義一個combiner繼承reducer,重寫reduce方法

    2、 在job中設定: job.setcombinerclass(customcombiner.class)

(5) combiner能夠應用的前提是不能影響最終的業務邏輯

而且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來

<a target="_blank"></a>

combiner的使用要非常謹慎

因為combiner在mapreduce過程中可能調用也肯能不調用,可能調一次也可能調多次

是以:combiner使用的原則是:有或沒有都不能影響業務邏輯

===============================================================================

 流量統計和自定義類實作序列化案例:

運作條件模拟:

1、配置環境變量為hadoop_home=e:\learntempfolder\hadoop-2.7.3

 2、從csdn資源上下載下傳支援win10版本的:e:\learntempfolder\hadoop-2.7.3\bin\winutils.exe 和 e:\learntempfolder\hadoop-2.7.3\bin\hadoop.dll

界面效果如下:

使用Mapreduce案例編寫用于統計文本中單詞出現的次數的案例、mapreduce本地運作等,Combiner使用及其相關的知識,流量統計案例和流量總和以及流量排序案例,自定義Partitioner

3、準備要處理的資料:

使用Mapreduce案例編寫用于統計文本中單詞出現的次數的案例、mapreduce本地運作等,Combiner使用及其相關的知識,流量統計案例和流量總和以及流量排序案例,自定義Partitioner

http_20130313143750.dat 資料檔案的具體内容如:

4、先運作flowsum(右鍵執行java程式)

運作生成的檔案為e:\learntempfolder\flow\output\part-r-00000,内容如下:

5、運作flowsumsort(注意不要删除上面的part-r-00000)

運作後産生的檔案内容是:

當然,我們也可以一次性求和并運算出結果輸出到指定的檔案目錄中,代碼如下:

到"e:/flow/sortout/"目錄下,檢視結果:

使用Mapreduce案例編寫用于統計文本中單詞出現的次數的案例、mapreduce本地運作等,Combiner使用及其相關的知識,流量統計案例和流量總和以及流量排序案例,自定義Partitioner

即:

6、為不同的手機号設定分區,讓不同的手機号在不同的檔案中。方法如下:

a:下面是自定義分區,自定分區的代碼如下:

b:測試一下自定義分區:

c:運作所需的準備:

資料檔案:

使用Mapreduce案例編寫用于統計文本中單詞出現的次數的案例、mapreduce本地運作等,Combiner使用及其相關的知識,流量統計案例和流量總和以及流量排序案例,自定義Partitioner

檔案内容如下:

運作後的結果如下:

使用Mapreduce案例編寫用于統計文本中單詞出現的次數的案例、mapreduce本地運作等,Combiner使用及其相關的知識,流量統計案例和流量總和以及流量排序案例,自定義Partitioner

part-r-00001中内容:

使用Mapreduce案例編寫用于統計文本中單詞出現的次數的案例、mapreduce本地運作等,Combiner使用及其相關的知識,流量統計案例和流量總和以及流量排序案例,自定義Partitioner
使用Mapreduce案例編寫用于統計文本中單詞出現的次數的案例、mapreduce本地運作等,Combiner使用及其相關的知識,流量統計案例和流量總和以及流量排序案例,自定義Partitioner
使用Mapreduce案例編寫用于統計文本中單詞出現的次數的案例、mapreduce本地運作等,Combiner使用及其相關的知識,流量統計案例和流量總和以及流量排序案例,自定義Partitioner

等等