天天看點

MapReduce: WordCount的Eclipse實作

WordCountMapper.java

package cds.hadoop.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

/**
 * Mapper Task
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    @Override
    /**
     * key: line number;
     * value: a line of input
     */
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
            throws IOException, InterruptedException {
        // get the line
        String line = value.toString();

        // split a line onto words
        String[] words = StringUtils.split(line, ' ');

        // output <word, 1>
        for (String word : words) {
            context.write(new Text(word), new LongWritable(L));

        }

    }

}
           

WordCountReducer.java

package cds.hadoop.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Reducer Task
 * */
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    @Override
    // input is <key, <v1,v2,...,vn>>
    protected void reduce(Text key, Iterable<LongWritable> values,
            Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        // count the number that every word appears
        long count = ;
        for (LongWritable value : values) {
            count += value.get();
        }

        // output is <word, count>, it's what we want
        context.write(new Text(key), new LongWritable(count));

    }

}
           

WordCountRunner.java

package cds.hadoop.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * describe a job:
 * which class is mapper?
 * which class is reducer?
 * where is the input file?
 * where to output?
 * 
 * Then submit this job to Hadoop
 * */
public class WordCountRunner {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job wcjob = Job.getInstance(conf);

        wcjob.setJarByClass(WordCountRunner.class);

        // set Mapper
        wcjob.setMapperClass(WordCountMapper.class);
        wcjob.setMapOutputKeyClass(Text.class);
        wcjob.setMapOutputValueClass(LongWritable.class);

        // set Reducer
        wcjob.setReducerClass(WordCountReducer.class);
        wcjob.setOutputKeyClass(Text.class);
        wcjob.setOutputValueClass(LongWritable.class);

        // where is the input file?
        // FileInputFormat is a component of Job, but it has a default realization,
        // so we don't write a class of FileInputFormat in this demo
        FileInputFormat.setInputPaths(wcjob, "hdfs://master:9000/wc/srcData");

        // where to output?
        FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://master:9000/wc/output"));

        boolean ret = wcjob.waitForCompletion(true);
    }

}
           

輸出與結果:

-- ::, WARN  [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>()) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-- ::, INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated()) - session.id is deprecated. Instead, use dfs.metrics.session-id
-- ::, INFO  [main] jvm.JvmMetrics (JvmMetrics.java:init()) - Initializing JVM Metrics with processName=JobTracker, sessionId=
-- ::, WARN  [main] mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles()) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
-- ::, WARN  [main] mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles()) - No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
-- ::, INFO  [main] input.FileInputFormat (FileInputFormat.java:listStatus()) - Total input paths to process : 
-- ::, INFO  [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal()) - number of splits:
-- ::, INFO  [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens()) - Submitting tokens for job: job_local1963603364_0001
-- ::, INFO  [main] mapreduce.Job (Job.java:submit()) - The url to track the job: http://localhost:/
-- ::, INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob()) - Running job: job_local1963603364_0001
-- ::, INFO  [Thread-] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter()) - OutputCommitter set in config null
-- ::, INFO  [Thread-] output.FileOutputCommitter (FileOutputCommitter.java:<init>()) - File Output Committer Algorithm version is 
-- ::, INFO  [Thread-] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter()) - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-- ::, INFO  [Thread-] mapred.LocalJobRunner (LocalJobRunner.java:runTasks()) - Waiting for map tasks
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.LocalJobRunner (LocalJobRunner.java:run()) - Starting task: attempt_local1963603364_0001_m_000000_0
-- ::, INFO  [LocalJobRunner Map Task Executor #] output.FileOutputCommitter (FileOutputCommitter.java:<init>()) - File Output Committer Algorithm version is 
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.Task (Task.java:initialize()) -  Using ResourceCalculatorProcessTree : [ ]
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:runNewMapper()) - Processing split: hdfs://master:/wc/srcData/testText.txt:+
-- ::, INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob()) - Job job_local1963603364_0001 running in uber mode : false
-- ::, INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob()) -  map % reduce %
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:setEquator()) - (EQUATOR)  kvi ()
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:init()) - mapreduce.task.io.sort.mb: 
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:init()) - soft limit at 
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:init()) - bufstart = ; bufvoid = 
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:init()) - kvstart = ; length = 
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:createSortingCollector()) - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate()) - 
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:flush()) - Starting flush of map output
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:flush()) - Spilling map output
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:flush()) - bufstart = ; bufend = ; bufvoid = 
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:flush()) - kvstart = (); kvend = (); length = /
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:sortAndSpill()) - Finished spill 
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.Task (Task.java:done()) - Task:attempt_local1963603364_0001_m_000000_0 is done. And is in the process of committing
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate()) - map
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.Task (Task.java:sendDone()) - Task 'attempt_local1963603364_0001_m_000000_0' done.
-- ::, INFO  [LocalJobRunner Map Task Executor #] mapred.LocalJobRunner (LocalJobRunner.java:run()) - Finishing task: attempt_local1963603364_0001_m_000000_0
-- ::, INFO  [Thread-] mapred.LocalJobRunner (LocalJobRunner.java:runTasks()) - map task executor complete.
-- ::, INFO  [Thread-] mapred.LocalJobRunner (LocalJobRunner.java:runTasks()) - Waiting for reduce tasks
-- ::, INFO  [pool--thread-] mapred.LocalJobRunner (LocalJobRunner.java:run()) - Starting task: attempt_local1963603364_0001_r_000000_0
-- ::, INFO  [pool--thread-] output.FileOutputCommitter (FileOutputCommitter.java:<init>()) - File Output Committer Algorithm version is 
-- ::, INFO  [pool--thread-] mapred.Task (Task.java:initialize()) -  Using ResourceCalculatorProcessTree : [ ]
-- ::, INFO  [pool--thread-] mapred.ReduceTask (ReduceTask.java:run()) - Using ShuffleConsumerPlugin: [email protected]
-- ::, INFO  [pool--thread-] reduce.MergeManagerImpl (MergeManagerImpl.java:<init>()) - MergerManager: memoryLimit=, maxSingleShuffleLimit=, mergeThreshold=, ioSortFactor=, memToMemMergeOutputsThreshold=
-- ::, INFO  [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run()) - attempt_local1963603364_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
-- ::, INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob()) -  map % reduce %
-- ::, INFO  [localfetcher#] reduce.LocalFetcher (LocalFetcher.java:copyMapOutput()) - localfetcher# about to shuffle output of map attempt_local1963603364_0001_m_000000_0 decomp:  len:  to MEMORY
-- ::, INFO  [localfetcher#] reduce.InMemoryMapOutput (InMemoryMapOutput.java:shuffle()) - Read  bytes from map-output for attempt_local1963603364_0001_m_000000_0
-- ::, INFO  [localfetcher#] reduce.MergeManagerImpl (MergeManagerImpl.java:closeInMemoryFile()) - closeInMemoryFile -> map-output of size: , inMemoryMapOutputs.size() -> , commitMemory -> , usedMemory ->
-- ::, INFO  [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run()) - EventFetcher is interrupted.. Returning
-- ::, INFO  [pool--thread-] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate()) -  /  copied.
-- ::, INFO  [pool--thread-] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge()) - finalMerge called with  in-memory map-outputs and  on-disk map-outputs
-- ::, INFO  [pool--thread-] mapred.Merger (Merger.java:merge()) - Merging  sorted segments
-- ::, INFO  [pool--thread-] mapred.Merger (Merger.java:merge()) - Down to the last merge-pass, with  segments left of total size:  bytes
-- ::, INFO  [pool--thread-] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge()) - Merged  segments,  bytes to disk to satisfy reduce memory limit
-- ::, INFO  [pool--thread-] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge()) - Merging  files,  bytes from disk
-- ::, INFO  [pool--thread-] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge()) - Merging  segments,  bytes from memory into reduce
-- ::, INFO  [pool--thread-] mapred.Merger (Merger.java:merge()) - Merging  sorted segments
-- ::, INFO  [pool--thread-] mapred.Merger (Merger.java:merge()) - Down to the last merge-pass, with  segments left of total size:  bytes
-- ::, INFO  [pool--thread-] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate()) -  /  copied.
-- ::, INFO  [pool--thread-] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated()) - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
-- ::, INFO  [pool--thread-] mapred.Task (Task.java:done()) - Task:attempt_local1963603364_0001_r_000000_0 is done. And is in the process of committing
-- ::, INFO  [pool--thread-] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate()) -  /  copied.
-- ::, INFO  [pool--thread-] mapred.Task (Task.java:commit()) - Task attempt_local1963603364_0001_r_000000_0 is allowed to commit now
-- ::, INFO  [pool--thread-] output.FileOutputCommitter (FileOutputCommitter.java:commitTask()) - Saved output of task 'attempt_local1963603364_0001_r_000000_0' to hdfs://master:/wc/output/_temporary//task_local1963603364_0001_r_000000
-- ::, INFO  [pool--thread-] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate()) - reduce > reduce
-- ::, INFO  [pool--thread-] mapred.Task (Task.java:sendDone()) - Task 'attempt_local1963603364_0001_r_000000_0' done.
-- ::, INFO  [pool--thread-] mapred.LocalJobRunner (LocalJobRunner.java:run()) - Finishing task: attempt_local1963603364_0001_r_000000_0
-- ::, INFO  [Thread-] mapred.LocalJobRunner (LocalJobRunner.java:runTasks()) - reduce task executor complete.
-- ::, INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob()) -  map % reduce %
-- ::, INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob()) - Job job_local1963603364_0001 completed successfully
-- ::, INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob()) - Counters: 
    File System Counters
        FILE: Number of bytes read=
        FILE: Number of bytes written=
        FILE: Number of read operations=
        FILE: Number of large read operations=
        FILE: Number of write operations=
        HDFS: Number of bytes read=
        HDFS: Number of bytes written=
        HDFS: Number of read operations=
        HDFS: Number of large read operations=
        HDFS: Number of write operations=
    Map-Reduce Framework
        Map input records=
        Map output records=
        Map output bytes=
        Map output materialized bytes=
        Input split bytes=
        Combine input records=
        Combine output records=
        Reduce input groups=
        Reduce shuffle bytes=
        Reduce input records=
        Reduce output records=
        Spilled Records=
        Shuffled Maps =
        Failed Shuffles=
        Merged Map outputs=
        GC time elapsed (ms)=
        Total committed heap usage (bytes)=
    Shuffle Errors
        BAD_ID=
        CONNECTION=
        IO_ERROR=
        WRONG_LENGTH=
        WRONG_MAP=
        WRONG_REDUCE=
    File Input Format Counters 
        Bytes Read=
    File Output Format Counters 
        Bytes Written=
           
MapReduce: WordCount的Eclipse實作
MapReduce: WordCount的Eclipse實作
MapReduce: WordCount的Eclipse實作

繼續閱讀