<code>package</code> <code>com.mzsx.hadoop;</code>
<code>import</code> <code>java.io.IOException;</code>
<code>import</code> <code>java.util.Random;</code>
<code>import</code> <code>java.util.StringTokenizer;</code>
<code>import</code> <code>org.apache.hadoop.conf.Configuration;</code>
<code>import</code> <code>org.apache.hadoop.fs.FileSystem;</code>
<code>import</code> <code>org.apache.hadoop.fs.Path;</code>
<code>import</code> <code>org.apache.hadoop.io.IntWritable;</code>
<code>import</code> <code>org.apache.hadoop.io.Text;</code>
<code>import</code> <code>org.apache.hadoop.io.WritableComparable;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.Job;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.Mapper;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.Reducer;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.FileInputFormat;</code>
<code>import</code> <code>org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;</code>
<code>public</code> <code>class</code> <code>MySortWordCount {</code>
<code> </code><code>public</code> <code>static</code> <code>class</code> <code>MyMapper </code><code>extends</code>
<code> </code><code>Mapper<Object, Text, Text, IntWritable> {</code>
<code> </code><code>private</code> <code>final</code> <code>static</code> <code>IntWritable one = </code><code>new</code> <code>IntWritable(</code><code>1</code><code>);</code><code>// 類似于int類型</code>
<code> </code><code>private</code> <code>Text word = </code><code>new</code> <code>Text(); </code><code>// 可以了解成String類型</code>
<code> </code><code>public</code> <code>void</code> <code>map(Object key, Text value, Context context)</code>
<code> </code><code>throws</code> <code>IOException, InterruptedException {</code>
<code> </code><code>System.err.println(key + </code><code>","</code> <code>+ value);</code>
<code> </code><code>// 預設情況下即根據空格分隔字元串</code>
<code> </code><code>String tmp=value.toString();</code>
<code> </code><code>tmp=tmp.replace(</code><code>'\''</code><code>, </code><code>' '</code><code>);</code>
<code> </code><code>tmp=tmp.replace(</code><code>'.'</code><code>, </code><code>' '</code><code>);</code>
<code> </code><code>tmp=tmp.replace(</code><code>','</code><code>, </code><code>' '</code><code>);</code>
<code> </code><code>tmp=tmp.replace(</code><code>':'</code><code>, </code><code>' '</code><code>);</code>
<code> </code><code>tmp=tmp.replace(</code><code>'!'</code><code>, </code><code>' '</code><code>);</code>
<code> </code><code>tmp=tmp.replace(</code><code>';'</code><code>, </code><code>' '</code><code>);</code>
<code> </code><code>tmp=tmp.replace(</code><code>'?'</code><code>, </code><code>' '</code><code>);</code>
<code> </code><code>tmp=tmp.replace(</code><code>'`'</code><code>, </code><code>' '</code><code>);</code>
<code> </code><code>tmp=tmp.replace(</code><code>'"'</code><code>, </code><code>' '</code><code>);</code>
<code> </code><code>tmp=tmp.replace(</code><code>'&'</code><code>, </code><code>' '</code><code>);</code>
<code> </code><code>tmp=tmp.replace(</code><code>'('</code><code>, </code><code>' '</code><code>);</code>
<code> </code><code>tmp=tmp.replace(</code><code>')'</code><code>, </code><code>' '</code><code>);</code>
<code> </code><code>tmp=tmp.replace(</code><code>'-'</code><code>, </code><code>' '</code><code>);</code>
<code> </code><code>StringTokenizer itr = </code><code>new</code> <code>StringTokenizer(tmp);</code>
<code> </code><code>while</code> <code>(itr.hasMoreTokens()) {</code>
<code> </code><code>word.set(itr.nextToken());</code>
<code> </code><code>context.write(word, one);</code>
<code> </code><code>}</code>
<code> </code><code>};</code>
<code> </code><code>}</code>
<code> </code><code>// Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT></code>
<code> </code><code>public</code> <code>static</code> <code>class</code> <code>MyReducer </code><code>extends</code>
<code> </code><code>Reducer<Text, IntWritable, Text, IntWritable> {</code>
<code> </code><code>private</code> <code>IntWritable result = </code><code>new</code> <code>IntWritable();</code>
<code> </code><code>protected</code> <code>void</code> <code>reduce(Text key, Iterable<IntWritable> values,</code>
<code> </code><code>Context context) </code><code>throws</code> <code>IOException, InterruptedException {</code>
<code> </code><code>System.err.println(key + </code><code>","</code> <code>+ values);</code>
<code> </code><code>int</code> <code>sum = </code><code>0</code><code>;</code>
<code> </code><code>for</code> <code>(IntWritable val : values) {</code>
<code> </code><code>sum += val.get();</code>
<code> </code><code>result.set(sum);</code>
<code> </code><code>;</code>
<code> </code><code>context.write(key, result);</code><code>// 這是最後結果</code>
<code> </code>
<code> </code><code>public</code> <code>static</code> <code>class</code> <code>SortMapper </code><code>extends</code> <code>Mapper<Object, Text, IntWritable,Text>{</code>
<code> </code>
<code> </code><code>public</code> <code>void</code> <code>map(Object key, Text value, Context context) </code><code>throws</code> <code>IOException, InterruptedException {</code>
<code> </code>
<code> </code><code>IntWritable times = </code><code>new</code> <code>IntWritable(</code><code>1</code><code>);</code>
<code> </code><code>Text password = </code><code>new</code> <code>Text();</code>
<code> </code><code>String eachline=value.toString();</code>
<code> </code><code>String[] eachterm =eachline.split(</code><code>"\t"</code><code>);</code>
<code> </code><code>password.set(eachterm[</code><code>0</code><code>]);</code>
<code> </code><code>times.set(Integer.parseInt(eachterm[</code><code>1</code><code>]));</code>
<code> </code><code>context.write(times,password);</code>
<code> </code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code>
<code> </code><code>public</code> <code>static</code> <code>class</code> <code>SortReducer </code><code>extends</code> <code>Reducer<IntWritable,Text,IntWritable,Text> {</code>
<code> </code><code>private</code> <code>Text password = </code><code>new</code> <code>Text();</code>
<code> </code><code>public</code> <code>void</code> <code>reduce(IntWritable key,Iterable<Text> values, Context context) </code><code>throws</code> <code>IOException, InterruptedException {</code>
<code> </code><code>for</code> <code>(Text val : values) {</code>
<code> </code><code>password.set(val);</code>
<code> </code><code>context.write(key,password);</code>
<code> </code><code>private</code> <code>static</code> <code>class</code> <code>IntDecreasingComparator </code><code>extends</code> <code>IntWritable.Comparator {</code>
<code> </code><code>public</code> <code>int</code> <code>compare(WritableComparable a, WritableComparable b) {</code>
<code> </code><code>//return -super.compare(a, b);</code>
<code> </code><code>return</code> <code>super</code><code>.compare(a, b);</code>
<code> </code><code>}</code>
<code> </code>
<code> </code><code>public</code> <code>int</code> <code>compare(</code><code>byte</code><code>[] b1, </code><code>int</code> <code>s1, </code><code>int</code> <code>l1, </code><code>byte</code><code>[] b2, </code><code>int</code> <code>s2, </code><code>int</code> <code>l2) {</code>
<code> </code><code>//return -super.compare(b1, s1, l1, b2, s2, l2);</code>
<code> </code><code>return</code> <code>super</code><code>.compare(b1, s1, l1, b2, s2, l2);</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) </code><code>throws</code> <code>Exception {</code>
<code> </code><code>// 聲明配置資訊</code>
<code> </code><code>Configuration conf = </code><code>new</code> <code>Configuration();</code>
<code> </code><code>// 聲明Job</code>
<code> </code><code>Job job = </code><code>new</code> <code>Job(conf, </code><code>"Word Count"</code><code>);</code>
<code> </code><code>// 設定工作類</code>
<code> </code><code>job.setJarByClass(MySortWordCount.</code><code>class</code><code>);</code>
<code> </code><code>// 設定mapper類</code>
<code> </code><code>job.setMapperClass(MyMapper.</code><code>class</code><code>);</code>
<code> </code><code>// 可選</code>
<code> </code><code>job.setCombinerClass(MyReducer.</code><code>class</code><code>);</code>
<code> </code><code>// 設定合并計算類</code>
<code> </code><code>job.setReducerClass(MyReducer.</code><code>class</code><code>);</code>
<code> </code><code>// 設定key為String類型</code>
<code> </code><code>job.setOutputKeyClass(Text.</code><code>class</code><code>);</code>
<code> </code><code>// 設定value為int類型</code>
<code> </code><code>job.setOutputValueClass(IntWritable.</code><code>class</code><code>);</code>
<code> </code><code>//job.setInputFormatClass(KeyValueTextInputFormat.class);</code>
<code> </code><code>// 設定或是接收輸入輸出</code>
<code> </code><code>/*FileInputFormat.setInputPaths(job, new Path("/user/root/aoman.txt"));</code>
<code> </code><code>FileOutputFormat.setOutputPath(job, new Path("/user/root/r3"));</code>
<code> </code><code>// 執行</code>
<code> </code><code>System.exit(job.waitForCompletion(true) ? 0 : 1);*/</code>
<code> </code><code>//定義一個臨時目錄,先将詞頻統計任務的輸出結果寫到臨時目錄中, 下一個排序任務以臨時目錄為輸入目錄。</code>
<code> </code><code>FileInputFormat.addInputPath(job, </code><code>new</code> <code>Path(</code><code>"/user/root/aoman.txt"</code><code>));</code>
<code> </code><code>Path tempDir = </code><code>new</code> <code>Path(</code><code>"MySortWordCount-temp-"</code> <code>+ Integer.toString(</code><code>new</code> <code>Random().nextInt(Integer.MAX_VALUE)));</code>
<code> </code><code>FileOutputFormat.setOutputPath(job, tempDir);</code>
<code> </code><code>if</code><code>(job.waitForCompletion(</code><code>true</code><code>))</code>
<code> </code><code>{</code>
<code> </code><code>Job sortJob = </code><code>new</code> <code>Job(conf, </code><code>"csdnsort"</code><code>);</code>
<code> </code><code>sortJob.setJarByClass(MySortWordCount.</code><code>class</code><code>);</code>
<code> </code><code>FileInputFormat.addInputPath(sortJob, tempDir);</code>
<code> </code><code>sortJob.setMapperClass(SortMapper.</code><code>class</code><code>);</code>
<code> </code><code>FileOutputFormat.setOutputPath(sortJob, </code><code>new</code> <code>Path(</code><code>"/user/root/sort1"</code><code>));</code>
<code> </code><code>sortJob.setOutputKeyClass(IntWritable.</code><code>class</code><code>);</code>
<code> </code><code>sortJob.setOutputValueClass(Text.</code><code>class</code><code>);</code>
<code> </code><code>sortJob.setSortComparatorClass(IntDecreasingComparator.</code><code>class</code><code>);</code>
<code> </code>
<code> </code><code>FileSystem.get(conf).deleteOnExit(tempDir);</code>
<code> </code><code>System.exit(sortJob.waitForCompletion(</code><code>true</code><code>) ? </code><code>0</code> <code>: </code><code>1</code><code>);</code>
<code> </code><code>System.exit(job.waitForCompletion(</code><code>true</code><code>) ? </code><code>0</code> <code>: </code><code>1</code><code>);</code>
<code>}</code>
版權聲明:原創作品,如需轉載,請注明出處。否則将追究法律責任
本文轉自 夢朝思夕 51CTO部落格,原文連結:http://blog.51cto.com/qiangmzsx/1404661