package cds.hadoop.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
/**
* Mapper Task
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
/**
* key: line number;
* value: a line of input
*/
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
// get the line
String line = value.toString();
// split a line onto words
String[] words = StringUtils.split(line, ' ');
// output <word, 1>
for (String word : words) {
context.write(new Text(word), new LongWritable(L));
}
}
}
WordCountReducer.java
package cds.hadoop.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Reducer Task
* */
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
// input is <key, <v1,v2,...,vn>>
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
// count the number that every word appears
long count = ;
for (LongWritable value : values) {
count += value.get();
}
// output is <word, count>, it's what we want
context.write(new Text(key), new LongWritable(count));
}
}
WordCountRunner.java
package cds.hadoop.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* describe a job:
* which class is mapper?
* which class is reducer?
* where is the input file?
* where to output?
*
* Then submit this job to Hadoop
* */
public class WordCountRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job wcjob = Job.getInstance(conf);
wcjob.setJarByClass(WordCountRunner.class);
// set Mapper
wcjob.setMapperClass(WordCountMapper.class);
wcjob.setMapOutputKeyClass(Text.class);
wcjob.setMapOutputValueClass(LongWritable.class);
// set Reducer
wcjob.setReducerClass(WordCountReducer.class);
wcjob.setOutputKeyClass(Text.class);
wcjob.setOutputValueClass(LongWritable.class);
// where is the input file?
// FileInputFormat is a component of Job, but it has a default realization,
// so we don't write a class of FileInputFormat in this demo
FileInputFormat.setInputPaths(wcjob, "hdfs://master:9000/wc/srcData");
// where to output?
FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://master:9000/wc/output"));
boolean ret = wcjob.waitForCompletion(true);
}
}
輸出與結果:
-- ::, WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>()) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-- ::, INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated()) - session.id is deprecated. Instead, use dfs.metrics.session-id
-- ::, INFO [main] jvm.JvmMetrics (JvmMetrics.java:init()) - Initializing JVM Metrics with processName=JobTracker, sessionId=
-- ::, WARN [main] mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles()) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
-- ::, WARN [main] mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles()) - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
-- ::, INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus()) - Total input paths to process :
-- ::, INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal()) - number of splits:
-- ::, INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens()) - Submitting tokens for job: job_local1963603364_0001
-- ::, INFO [main] mapreduce.Job (Job.java:submit()) - The url to track the job: http://localhost:/
-- ::, INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob()) - Running job: job_local1963603364_0001
-- ::, INFO [Thread-] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter()) - OutputCommitter set in config null
-- ::, INFO [Thread-] output.FileOutputCommitter (FileOutputCommitter.java:<init>()) - File Output Committer Algorithm version is
-- ::, INFO [Thread-] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter()) - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-- ::, INFO [Thread-] mapred.LocalJobRunner (LocalJobRunner.java:runTasks()) - Waiting for map tasks
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.LocalJobRunner (LocalJobRunner.java:run()) - Starting task: attempt_local1963603364_0001_m_000000_0
-- ::, INFO [LocalJobRunner Map Task Executor #] output.FileOutputCommitter (FileOutputCommitter.java:<init>()) - File Output Committer Algorithm version is
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.Task (Task.java:initialize()) - Using ResourceCalculatorProcessTree : [ ]
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:runNewMapper()) - Processing split: hdfs://master:/wc/srcData/testText.txt:+
-- ::, INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob()) - Job job_local1963603364_0001 running in uber mode : false
-- ::, INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob()) - map % reduce %
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:setEquator()) - (EQUATOR) kvi ()
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:init()) - mapreduce.task.io.sort.mb:
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:init()) - soft limit at
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:init()) - bufstart = ; bufvoid =
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:init()) - kvstart = ; length =
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:createSortingCollector()) - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate()) -
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:flush()) - Starting flush of map output
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:flush()) - Spilling map output
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:flush()) - bufstart = ; bufend = ; bufvoid =
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:flush()) - kvstart = (); kvend = (); length = /
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.MapTask (MapTask.java:sortAndSpill()) - Finished spill
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.Task (Task.java:done()) - Task:attempt_local1963603364_0001_m_000000_0 is done. And is in the process of committing
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate()) - map
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.Task (Task.java:sendDone()) - Task 'attempt_local1963603364_0001_m_000000_0' done.
-- ::, INFO [LocalJobRunner Map Task Executor #] mapred.LocalJobRunner (LocalJobRunner.java:run()) - Finishing task: attempt_local1963603364_0001_m_000000_0
-- ::, INFO [Thread-] mapred.LocalJobRunner (LocalJobRunner.java:runTasks()) - map task executor complete.
-- ::, INFO [Thread-] mapred.LocalJobRunner (LocalJobRunner.java:runTasks()) - Waiting for reduce tasks
-- ::, INFO [pool--thread-] mapred.LocalJobRunner (LocalJobRunner.java:run()) - Starting task: attempt_local1963603364_0001_r_000000_0
-- ::, INFO [pool--thread-] output.FileOutputCommitter (FileOutputCommitter.java:<init>()) - File Output Committer Algorithm version is
-- ::, INFO [pool--thread-] mapred.Task (Task.java:initialize()) - Using ResourceCalculatorProcessTree : [ ]
-- ::, INFO [pool--thread-] mapred.ReduceTask (ReduceTask.java:run()) - Using ShuffleConsumerPlugin: [email protected]
-- ::, INFO [pool--thread-] reduce.MergeManagerImpl (MergeManagerImpl.java:<init>()) - MergerManager: memoryLimit=, maxSingleShuffleLimit=, mergeThreshold=, ioSortFactor=, memToMemMergeOutputsThreshold=
-- ::, INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run()) - attempt_local1963603364_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
-- ::, INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob()) - map % reduce %
-- ::, INFO [localfetcher#] reduce.LocalFetcher (LocalFetcher.java:copyMapOutput()) - localfetcher# about to shuffle output of map attempt_local1963603364_0001_m_000000_0 decomp: len: to MEMORY
-- ::, INFO [localfetcher#] reduce.InMemoryMapOutput (InMemoryMapOutput.java:shuffle()) - Read bytes from map-output for attempt_local1963603364_0001_m_000000_0
-- ::, INFO [localfetcher#] reduce.MergeManagerImpl (MergeManagerImpl.java:closeInMemoryFile()) - closeInMemoryFile -> map-output of size: , inMemoryMapOutputs.size() -> , commitMemory -> , usedMemory ->
-- ::, INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run()) - EventFetcher is interrupted.. Returning
-- ::, INFO [pool--thread-] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate()) - / copied.
-- ::, INFO [pool--thread-] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge()) - finalMerge called with in-memory map-outputs and on-disk map-outputs
-- ::, INFO [pool--thread-] mapred.Merger (Merger.java:merge()) - Merging sorted segments
-- ::, INFO [pool--thread-] mapred.Merger (Merger.java:merge()) - Down to the last merge-pass, with segments left of total size: bytes
-- ::, INFO [pool--thread-] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge()) - Merged segments, bytes to disk to satisfy reduce memory limit
-- ::, INFO [pool--thread-] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge()) - Merging files, bytes from disk
-- ::, INFO [pool--thread-] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge()) - Merging segments, bytes from memory into reduce
-- ::, INFO [pool--thread-] mapred.Merger (Merger.java:merge()) - Merging sorted segments
-- ::, INFO [pool--thread-] mapred.Merger (Merger.java:merge()) - Down to the last merge-pass, with segments left of total size: bytes
-- ::, INFO [pool--thread-] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate()) - / copied.
-- ::, INFO [pool--thread-] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated()) - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
-- ::, INFO [pool--thread-] mapred.Task (Task.java:done()) - Task:attempt_local1963603364_0001_r_000000_0 is done. And is in the process of committing
-- ::, INFO [pool--thread-] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate()) - / copied.
-- ::, INFO [pool--thread-] mapred.Task (Task.java:commit()) - Task attempt_local1963603364_0001_r_000000_0 is allowed to commit now
-- ::, INFO [pool--thread-] output.FileOutputCommitter (FileOutputCommitter.java:commitTask()) - Saved output of task 'attempt_local1963603364_0001_r_000000_0' to hdfs://master:/wc/output/_temporary//task_local1963603364_0001_r_000000
-- ::, INFO [pool--thread-] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate()) - reduce > reduce
-- ::, INFO [pool--thread-] mapred.Task (Task.java:sendDone()) - Task 'attempt_local1963603364_0001_r_000000_0' done.
-- ::, INFO [pool--thread-] mapred.LocalJobRunner (LocalJobRunner.java:run()) - Finishing task: attempt_local1963603364_0001_r_000000_0
-- ::, INFO [Thread-] mapred.LocalJobRunner (LocalJobRunner.java:runTasks()) - reduce task executor complete.
-- ::, INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob()) - map % reduce %
-- ::, INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob()) - Job job_local1963603364_0001 completed successfully
-- ::, INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob()) - Counters:
File System Counters
FILE: Number of bytes read=
FILE: Number of bytes written=
FILE: Number of read operations=
FILE: Number of large read operations=
FILE: Number of write operations=
HDFS: Number of bytes read=
HDFS: Number of bytes written=
HDFS: Number of read operations=
HDFS: Number of large read operations=
HDFS: Number of write operations=
Map-Reduce Framework
Map input records=
Map output records=
Map output bytes=
Map output materialized bytes=
Input split bytes=
Combine input records=
Combine output records=
Reduce input groups=
Reduce shuffle bytes=
Reduce input records=
Reduce output records=
Spilled Records=
Shuffled Maps =
Failed Shuffles=
Merged Map outputs=
GC time elapsed (ms)=
Total committed heap usage (bytes)=
Shuffle Errors
BAD_ID=
CONNECTION=
IO_ERROR=
WRONG_LENGTH=
WRONG_MAP=
WRONG_REDUCE=
File Input Format Counters
Bytes Read=
File Output Format Counters
Bytes Written=