天天看點

Hapdoop的一個Mapreduce示例代碼--統計單詞個數有排序功能

<code>package</code> <code>com.mzsx.hadoop;</code>

<code>import</code> <code>java.io.IOException;</code>

<code>import</code> <code>java.util.Random;</code>

<code>import</code> <code>java.util.StringTokenizer;</code>

<code>import</code> <code>org.apache.hadoop.conf.Configuration;</code>

<code>import</code> <code>org.apache.hadoop.fs.FileSystem;</code>

<code>import</code> <code>org.apache.hadoop.fs.Path;</code>

<code>import</code> <code>org.apache.hadoop.io.IntWritable;</code>

<code>import</code> <code>org.apache.hadoop.io.Text;</code>

<code>import</code> <code>org.apache.hadoop.io.WritableComparable;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.Job;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.Mapper;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.Reducer;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.lib.input.FileInputFormat;</code>

<code>import</code> <code>org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;</code>

<code>public</code> <code>class</code> <code>MySortWordCount {</code>

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>MyMapper </code><code>extends</code>

<code>            </code><code>Mapper&lt;Object, Text, Text, IntWritable&gt; {</code>

<code>        </code><code>private</code> <code>final</code> <code>static</code> <code>IntWritable one = </code><code>new</code> <code>IntWritable(</code><code>1</code><code>);</code><code>// 類似于int類型</code>

<code>        </code><code>private</code> <code>Text word = </code><code>new</code> <code>Text(); </code><code>// 可以了解成String類型</code>

<code>        </code><code>public</code> <code>void</code> <code>map(Object key, Text value, Context context)</code>

<code>                </code><code>throws</code> <code>IOException, InterruptedException {</code>

<code>            </code><code>System.err.println(key + </code><code>","</code> <code>+ value);</code>

<code>            </code><code>// 預設情況下即根據空格分隔字元串</code>

<code>            </code><code>String tmp=value.toString();</code>

<code>            </code><code>tmp=tmp.replace(</code><code>'\''</code><code>, </code><code>' '</code><code>);</code>

<code>            </code><code>tmp=tmp.replace(</code><code>'.'</code><code>, </code><code>' '</code><code>);</code>

<code>            </code><code>tmp=tmp.replace(</code><code>','</code><code>, </code><code>' '</code><code>);</code>

<code>            </code><code>tmp=tmp.replace(</code><code>':'</code><code>, </code><code>' '</code><code>);</code>

<code>            </code><code>tmp=tmp.replace(</code><code>'!'</code><code>, </code><code>' '</code><code>);</code>

<code>            </code><code>tmp=tmp.replace(</code><code>';'</code><code>, </code><code>' '</code><code>);</code>

<code>            </code><code>tmp=tmp.replace(</code><code>'?'</code><code>, </code><code>' '</code><code>);</code>

<code>            </code><code>tmp=tmp.replace(</code><code>'`'</code><code>, </code><code>' '</code><code>);</code>

<code>            </code><code>tmp=tmp.replace(</code><code>'"'</code><code>, </code><code>' '</code><code>);</code>

<code>            </code><code>tmp=tmp.replace(</code><code>'&amp;'</code><code>, </code><code>' '</code><code>);</code>

<code>            </code><code>tmp=tmp.replace(</code><code>'('</code><code>, </code><code>' '</code><code>);</code>

<code>            </code><code>tmp=tmp.replace(</code><code>')'</code><code>, </code><code>' '</code><code>);</code>

<code>            </code><code>tmp=tmp.replace(</code><code>'-'</code><code>, </code><code>' '</code><code>);</code>

<code>            </code><code>StringTokenizer itr = </code><code>new</code> <code>StringTokenizer(tmp);</code>

<code>            </code><code>while</code> <code>(itr.hasMoreTokens()) {</code>

<code>                </code><code>word.set(itr.nextToken());</code>

<code>                </code><code>context.write(word, one);</code>

<code>            </code><code>}</code>

<code>        </code><code>};</code>

<code>    </code><code>}</code>

<code>    </code><code>// Reducer&lt;KEYIN, VALUEIN, KEYOUT, VALUEOUT&gt;</code>

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>MyReducer </code><code>extends</code>

<code>            </code><code>Reducer&lt;Text, IntWritable, Text, IntWritable&gt; {</code>

<code>        </code><code>private</code> <code>IntWritable result = </code><code>new</code> <code>IntWritable();</code>

<code>        </code><code>protected</code> <code>void</code> <code>reduce(Text key, Iterable&lt;IntWritable&gt; values,</code>

<code>                </code><code>Context context) </code><code>throws</code> <code>IOException, InterruptedException {</code>

<code>            </code><code>System.err.println(key + </code><code>","</code> <code>+ values);</code>

<code>            </code><code>int</code> <code>sum = </code><code>0</code><code>;</code>

<code>            </code><code>for</code> <code>(IntWritable val : values) {</code>

<code>                </code><code>sum += val.get();</code>

<code>            </code><code>result.set(sum);</code>

<code>            </code><code>;</code>

<code>            </code><code>context.write(key, result);</code><code>// 這是最後結果</code>

<code>      </code> 

<code>    </code><code>public</code> <code>static</code> <code>class</code> <code>SortMapper </code><code>extends</code> <code>Mapper&lt;Object, Text, IntWritable,Text&gt;{</code>

<code>          </code> 

<code>        </code><code>public</code> <code>void</code> <code>map(Object key, Text value, Context context) </code><code>throws</code> <code>IOException, InterruptedException {</code>

<code>             </code> 

<code>            </code><code>IntWritable times = </code><code>new</code> <code>IntWritable(</code><code>1</code><code>);</code>

<code>            </code><code>Text password = </code><code>new</code> <code>Text();</code>

<code>            </code><code>String eachline=value.toString();</code>

<code>            </code><code>String[] eachterm =eachline.split(</code><code>"\t"</code><code>);</code>

<code>            </code><code>password.set(eachterm[</code><code>0</code><code>]);</code>

<code>            </code><code>times.set(Integer.parseInt(eachterm[</code><code>1</code><code>]));</code>

<code>            </code><code>context.write(times,password);</code>

<code>              </code> 

<code>        </code><code>}</code>

<code>      </code><code>}</code>

<code>         </code> 

<code>      </code><code>public</code> <code>static</code> <code>class</code> <code>SortReducer </code><code>extends</code> <code>Reducer&lt;IntWritable,Text,IntWritable,Text&gt; {</code>

<code>          </code><code>private</code> <code>Text password = </code><code>new</code> <code>Text();</code>

<code>          </code><code>public</code> <code>void</code> <code>reduce(IntWritable key,Iterable&lt;Text&gt; values, Context context) </code><code>throws</code> <code>IOException, InterruptedException {</code>

<code>            </code><code>for</code> <code>(Text val : values) {</code>

<code>                </code><code>password.set(val);</code>

<code>                </code><code>context.write(key,password);</code>

<code>      </code><code>private</code> <code>static</code> <code>class</code> <code>IntDecreasingComparator </code><code>extends</code> <code>IntWritable.Comparator {</code>

<code>          </code><code>public</code> <code>int</code> <code>compare(WritableComparable a, WritableComparable b) {</code>

<code>            </code><code>//return -super.compare(a, b);</code>

<code>              </code><code>return</code> <code>super</code><code>.compare(a, b);</code>

<code>          </code><code>}</code>

<code>            </code> 

<code>          </code><code>public</code> <code>int</code> <code>compare(</code><code>byte</code><code>[] b1, </code><code>int</code> <code>s1, </code><code>int</code> <code>l1, </code><code>byte</code><code>[] b2, </code><code>int</code> <code>s2, </code><code>int</code> <code>l2) {</code>

<code>              </code><code>//return -super.compare(b1, s1, l1, b2, s2, l2);</code>

<code>              </code><code>return</code> <code>super</code><code>.compare(b1, s1, l1, b2, s2, l2);</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) </code><code>throws</code> <code>Exception {</code>

<code>        </code><code>// 聲明配置資訊</code>

<code>        </code><code>Configuration conf = </code><code>new</code> <code>Configuration();</code>

<code>        </code><code>// 聲明Job</code>

<code>        </code><code>Job job = </code><code>new</code> <code>Job(conf, </code><code>"Word Count"</code><code>);</code>

<code>        </code><code>// 設定工作類</code>

<code>        </code><code>job.setJarByClass(MySortWordCount.</code><code>class</code><code>);</code>

<code>        </code><code>// 設定mapper類</code>

<code>        </code><code>job.setMapperClass(MyMapper.</code><code>class</code><code>);</code>

<code>        </code><code>// 可選</code>

<code>        </code><code>job.setCombinerClass(MyReducer.</code><code>class</code><code>);</code>

<code>        </code><code>// 設定合并計算類</code>

<code>        </code><code>job.setReducerClass(MyReducer.</code><code>class</code><code>);</code>

<code>        </code><code>// 設定key為String類型</code>

<code>        </code><code>job.setOutputKeyClass(Text.</code><code>class</code><code>);</code>

<code>        </code><code>// 設定value為int類型</code>

<code>        </code><code>job.setOutputValueClass(IntWritable.</code><code>class</code><code>);</code>

<code>        </code><code>//job.setInputFormatClass(KeyValueTextInputFormat.class);</code>

<code>        </code><code>// 設定或是接收輸入輸出</code>

<code>        </code><code>/*FileInputFormat.setInputPaths(job, new Path("/user/root/aoman.txt"));</code>

<code>        </code><code>FileOutputFormat.setOutputPath(job, new Path("/user/root/r3"));</code>

<code>        </code><code>// 執行</code>

<code>        </code><code>System.exit(job.waitForCompletion(true) ? 0 : 1);*/</code>

<code>        </code><code>//定義一個臨時目錄,先将詞頻統計任務的輸出結果寫到臨時目錄中, 下一個排序任務以臨時目錄為輸入目錄。</code>

<code>        </code><code>FileInputFormat.addInputPath(job, </code><code>new</code> <code>Path(</code><code>"/user/root/aoman.txt"</code><code>));</code>

<code>        </code><code>Path tempDir = </code><code>new</code> <code>Path(</code><code>"MySortWordCount-temp-"</code> <code>+ Integer.toString(</code><code>new</code> <code>Random().nextInt(Integer.MAX_VALUE)));</code>

<code>        </code><code>FileOutputFormat.setOutputPath(job, tempDir);</code>

<code>        </code><code>if</code><code>(job.waitForCompletion(</code><code>true</code><code>))</code>

<code>        </code><code>{</code>

<code>            </code><code>Job sortJob = </code><code>new</code> <code>Job(conf, </code><code>"csdnsort"</code><code>);</code>

<code>            </code><code>sortJob.setJarByClass(MySortWordCount.</code><code>class</code><code>);</code>

<code>            </code><code>FileInputFormat.addInputPath(sortJob, tempDir);</code>

<code>            </code><code>sortJob.setMapperClass(SortMapper.</code><code>class</code><code>);</code>

<code>            </code><code>FileOutputFormat.setOutputPath(sortJob, </code><code>new</code> <code>Path(</code><code>"/user/root/sort1"</code><code>));</code>

<code>            </code><code>sortJob.setOutputKeyClass(IntWritable.</code><code>class</code><code>);</code>

<code>            </code><code>sortJob.setOutputValueClass(Text.</code><code>class</code><code>);</code>

<code>            </code><code>sortJob.setSortComparatorClass(IntDecreasingComparator.</code><code>class</code><code>);</code>

<code>   </code> 

<code>            </code><code>FileSystem.get(conf).deleteOnExit(tempDir);</code>

<code>            </code><code>System.exit(sortJob.waitForCompletion(</code><code>true</code><code>) ? </code><code>0</code> <code>: </code><code>1</code><code>);</code>

<code>        </code><code>System.exit(job.waitForCompletion(</code><code>true</code><code>) ? </code><code>0</code> <code>: </code><code>1</code><code>);</code>

<code>}</code>

版權聲明:原創作品,如需轉載,請注明出處。否則将追究法律責任

本文轉自 夢朝思夕 51CTO部落格,原文連結:http://blog.51cto.com/qiangmzsx/1404661