天天看點

IDEA 上運作 Hadoop WordCount V1.0 和 V2.0

文章目錄

    • 1. WordCount V1.0
    • 2. WordCount V2.0
    • 3. 坑

搭建 Hadoop 3.1.2 windows單節點安裝與使用

使用管理者身份運作IDEA

添加Maven依賴,雖然hadoop-client中有hadoop-mapreduce-client-jobclient,但不單獨添加,IDEA控制台日志不會列印

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
    <version>3.1.2</version>
</dependency>
           

添加log4j.properties到resource檔案夾中

log4j.rootLogger=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%m%n
           

将hdfs-site.xml和core-site.xml複制到resource檔案夾

項目結構

IDEA 上運作 Hadoop WordCount V1.0 和 V2.0

1. WordCount V1.0

map1

public class WordCountMapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 讀取一行
        String line = value.toString();
        // 空格分隔
        StringTokenizer stringTokenizer = new StringTokenizer(line);
        // 循環空格分隔,給每個計數1
        while(stringTokenizer.hasMoreTokens()){
            word.set(stringTokenizer.nextToken());
            context.write(word, one);
        }
    }
}
           

reduce1

public class WordCountReducer1 extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 根據key對values計數
        int sum = 0;
        for(IntWritable intWritable : values){
            sum += intWritable.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}
           

WordCount V1.0,路徑寫死可直接在IDEA中run,也可以設定參數

public class WordCount1 {

    public static void main( String[] args ) {
   		// 讀取hdfs-site.xml,core-site.xml
        Configuration conf = new Configuration();

        try{
            Job job = Job.getInstance(conf,"WordCount V1.0");

            job.setJarByClass(WordCount1.class);

            job.setMapperClass(WordCountMapper1.class);
            job.setCombinerClass(WordCountReducer1.class);
            job.setReducerClass(WordCountReducer1.class);

            // job 輸出key value 類型,mapper和reducer類型相同可用
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            // hdfs
            FileInputFormat.addInputPath(job, new Path("/hdfsTest/input"));
            FileOutputFormat.setOutputPath(job, new Path("/hdfsTest/output"));

			//FileInputFormat.addInputPath(job, new Path(args[0]));
			//FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // windows 本地目錄
			//FileInputFormat.setInputPaths(job, "D:\\hadoop-test\\input");
			//FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop-test\\output"));

            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
           

可打成jar包運作,使用參數,需要增加命名空間

hadoop jar mapreduce-test-1.0-SNAPSHOT.jar com.shpun.wordcount1.WordCount1 /hdfsTest/input /hdfsTest/output
           

2. WordCount V2.0

map2

public class WordCountMapper2 extends Mapper<LongWritable,Text, Text, IntWritable> {

    enum MapperCounterEnums{
        INPUT_WORDS
    }

    private static final IntWritable one = new IntWritable(1);

    private Text word = new Text();

    // 區分大小寫
    private boolean caseSensitive;
    // 用于存需要過濾的pattern
    private Set<String> patternsToSkip = new HashSet<>();

    private Configuration conf;
    private BufferedReader bufferedReader;

    /**
     * 此方法被MapReduce架構僅且執行一次,在執行Map任務前,進行相關變量或者資源的集中初始化工作。
     * 1.讀取配置檔案中的wordcount.case.sensitive,指派給caseSensitive變量
     * 2.讀取配置檔案中的wordcount.skip.patterns,如果為true,将CacheFiles的檔案都加入過濾範圍
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        conf = context.getConfiguration();
        // 擷取是否區分大小寫的值
        caseSensitive = conf.getBoolean("wordcount.case.sensitive",true);
        // 擷取是否需要過濾
        if(conf.getBoolean("wordcount.skip.patterns", false)){
            // 讀取緩存檔案,在main函數中添加
            URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
            for(URI patternsURI : patternsURIs){
                Path patternsPath = new Path(patternsURI.getPath());
                String fileName = patternsPath.getName();
                parseSkipFile(fileName);
            }
        }
    }

    /**
     * 根據檔案名讀取每行,添加到需要過濾的set中
     * @param fileName
     */
    private void parseSkipFile(String fileName){
       try{
           bufferedReader = new BufferedReader(new FileReader(fileName));
           String patternLine;
           //讀取檔案每一行,并添加
           while((patternLine = bufferedReader.readLine()) != null){
               patternsToSkip.add(patternLine);
           }
       }catch (IOException e){
           e.printStackTrace();
       }
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();

        // 過濾
        for(String pattern : patternsToSkip){
            line = line.replaceAll(pattern,"");
        }

        StringTokenizer stringTokenizer = new StringTokenizer(line);
        while(stringTokenizer.hasMoreTokens()){
            word.set(stringTokenizer.nextToken());
            context.write(word, one);

            // 定義計數器,枚舉類型的名稱即為組的名稱,枚舉類型的字段就是計數器名稱
            Counter counter = context.getCounter(MapperCounterEnums.class.getName(),MapperCounterEnums.INPUT_WORDS.toString());
            counter.increment(1);
        }
    }
}
           

reduce2

public class WordCountReducer2 extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable value : values){
            sum += value.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}
           

WordCount V2.0

public class WordCount2 {
   
    public static void main( String[] args ) {
    	// 讀取hdfs-site.xml,core-site.xml
        Configuration conf = new Configuration();
        
        try{
            // 擷取參數
            GenericOptionsParser genericOptionsParser = new GenericOptionsParser(conf,args);
            String[] remainingArgs = genericOptionsParser.getRemainingArgs();

            // 若指令為 $ bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output
            // genericOptionsParser.getRemainingArgs() 擷取到的就是
            // /user/joe/wordcount/input /user/joe/wordcount/output

            // 若指令為 $ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
            // -Dwordcount.case.sensitive=false 設定不區分大小寫
            // genericOptionsParser.getRemainingArgs() 擷取到的就是
            // /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
            if(remainingArgs.length != 2 && remainingArgs.length != 4){
                System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
                System.exit(2);
            }

            Job job = Job.getInstance(conf,"WordCount V2.0");
            job.setJarByClass(WordCount2.class);

            job.setMapperClass(WordCountMapper2.class);
            job.setCombinerClass(WordCountReducer2.class);
            job.setReducerClass(WordCountReducer2.class);

            // job 輸出key value 類型,mapper和reducer類型相同可用
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            List<String> otherArgList = new ArrayList<>();
            for(int i = 0;i < remainingArgs.length;i++){
                if("-skip".equals(remainingArgs[i])){
                    // 擷取過濾檔案的位址,将hdfs檔案路徑轉成完整的路徑,scheme://authority/path
                    URI patternURI = new Path(remainingArgs[++i]).toUri();
                    // 加入本地化緩存中
                    job.addCacheFile(patternURI);
                    // 需要過濾
                    job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
                }else{
                    otherArgList.add(remainingArgs[i]);
                }
            }

            FileInputFormat.addInputPath(job, new Path(otherArgList.get(0)));
            FileOutputFormat.setOutputPath(job, new Path(otherArgList.get(1)));

            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
           

IDEA指派參數,在Program arguments中空格間隔開

IDEA 上運作 Hadoop WordCount V1.0 和 V2.0

可打成jar包運作

hadoop jar mapreduce-test-1.0-SNAPSHOT.jar com.shpun.wordcount2.WordCount2 /hdfsTest/input /hdfsTest/output
hadoop jar mapreduce-test-1.0-SNAPSHOT.jar com.shpun.wordcount2.WordCount2 -Dwordcount.case.sensitive=false /hdfsTest/input /hdfsTest/output
hadoop jar mapreduce-test-1.0-SNAPSHOT.jar com.shpun.wordcount2.WordCount2 -Dwordcount.case.sensitive=false /hdfsTest/input /hdfsTest/output -skip /hdfsTest/skip/skipInput.txt
           

3. 坑

  • 在WordCount V1.0中,在IDEA上運作時,切換注釋的HDFS和Windows路徑時,Maven要進行Clean再Compile,然後再Rebuild Project。不然會出現路徑類型識别出錯。HDFS路徑識别成本地路徑,Windows路徑識别成HDFS路徑。

參考:

hadoop MapReduce Tutorial

初識MapReduce的應用場景(附JAVA和Python代碼)

自己編譯WordCount編譯通過執行報錯

官網MapReduce執行個體代碼詳細批注

繼續閱讀