11.Mapreduce執行個體——MapReduce自定義輸出格式
實驗原理
1.輸出格式:提供給OutputCollector的鍵值對會被寫到輸出檔案中,寫入的方式由輸出格式控制。OutputFormat的功能跟前面描述的InputFormat類很像,Hadoop提供的OutputFormat的執行個體會把檔案寫在本地磁盤或HDFS上。在不做設定的情況下,計算結果會以part-000*輸出成多個檔案,并且輸出的檔案數量和reduce數量一樣,檔案内容格式也不能随心所欲。每一個reducer會把結果輸出寫在公共檔案夾中一個單獨的檔案内,這些檔案的命名一般是part-nnnnn,nnnnn是關聯到某個reduce任務的partition的id,輸出檔案夾通過FileOutputFormat.setOutputPath() 來設定。你可以通過具體MapReduce作業的JobConf對象的setOutputFormat()方法來設定具體用到的輸出格式。下表給出了已提供的輸出格式:
Hadoop提供了一些OutputFormat執行個體用于寫入檔案,基本的(預設的)執行個體是TextOutputFormat,它會以一行一個鍵值對的方式把資料寫入一個文本檔案裡。這樣後面的MapReduce任務就可以通過KeyValueInputFormat類簡單的重新讀取所需的輸入資料了,而且也适合于人的閱讀。還有一個更适合于在MapReduce作業間使用的中間格式,那就是SequenceFileOutputFormat,它可以快速的序列化任意的資料類型到檔案中,而對應SequenceFileInputFormat則會把檔案反序列化為相同的類型并送出為下一個Mapper的輸入資料,方式和前一個Reducer的生成方式一樣。NullOutputFormat不會生成輸出檔案并丢棄任何通過OutputCollector傳遞給它的鍵值對,如果你在要reduce()方法中顯式的寫你自己的輸出檔案并且不想Hadoop架構輸出額外的空輸出檔案,那這個類是很有用的。
RecordWriter:這個跟InputFormat中通過RecordReader讀取單個記錄的實作很相似,OutputFormat類是RecordWriter對象的工廠方法,用來把單個的記錄寫到檔案中,就像是OuputFormat直接寫入的一樣。
2.與IntputFormat相似, 當面對一些特殊情況時,如想要Reduce支援多個輸出,這時Hadoop本身提供的TextOutputFormat、SequenceFileOutputFormat、NullOutputFormat等肯定是無法滿足我們的需求,這時我們需要自定義輸出資料格式。類似輸入資料格式,自定義輸出資料格式同樣可以參考下面的步驟:
(1) 自定義一個繼承OutputFormat的類,不過一般繼承FileOutputFormat即可;
(2)實作其getRecordWriter方法,傳回一個RecordWriter類型;
(3)自定義一個繼承RecordWriter的類,定義其write方法,針對每個<key,value>寫入檔案資料;
步驟:
1.建表,逗号分隔
2.本地建立/data/mapreduce12目錄。
mkdir -p /data/mapreduce12
3.将表上傳到虛拟機中
4.上傳并解壓hadoop2lib檔案
5.在HDFS上建立/mymapreduce12/in目錄,然後将Linux本地/data/mapreduce12目錄下的cat_group1檔案導入到HDFS的/mymapreduce12/in目錄中。
hadoop fs -mkdir -p /mymapreduce12/in
hadoop fs -put /data/mapreduce12/cat_group1 /mymapreduce12/in
6.IDEA中編寫Java代碼
package mapreduce11;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public abstract class MyMultipleOutputFormat <K extends WritableComparable<?>,V extends Writable> extends FileOutputFormat<K,V>{
private MultiRecordWriter writer=null;
public RecordWriter<K,V> getRecordWriter(TaskAttemptContext job) throws IOException{
if(writer==null){
writer=new MultiRecordWriter(job,getTaskOutputPath(job));
}
return writer;
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException{
Path workPath=null;
OutputCommitter committer=super.getOutputCommitter(conf);
if(committer instanceof FileOutputCommitter){
workPath=((FileOutputCommitter) committer).getWorkPath();
}else{
Path outputPath=super.getOutputPath(conf);
if(outputPath==null){
throw new IOException("Undefined job output-path");
}
workPath=outputPath;
}
return workPath;
}
protected abstract String generateFileNameForKayValue(K key,V value,Configuration conf);
protected static class LineRecordWriter<K,V> extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
private PrintWriter tt;
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, ":");
}
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {//
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized
void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
public class MultiRecordWriter extends RecordWriter<K,V>{
private HashMap<String,RecordWriter<K,V> >recordWriters=null;
private TaskAttemptContext job=null;
private Path workPath=null;
public MultiRecordWriter(TaskAttemptContext job,Path workPath){
super();
this.job=job;
this.workPath=workPath;
recordWriters=new HashMap<String,RecordWriter<K,V>>();
}
public void close(TaskAttemptContext context) throws IOException, InterruptedException{
Iterator<RecordWriter<K,V>> values=this.recordWriters.values().iterator();
while(values.hasNext()){
values.next().close(context);
}
this.recordWriters.clear();
}
public void write(K key,V value) throws IOException, InterruptedException{
String baseName=generateFileNameForKayValue(key ,value,job.getConfiguration());
RecordWriter<K,V> rw=this.recordWriters.get(baseName);
if(rw==null){
rw=getBaseRecordWriter(job,baseName);
this.recordWriters.put(baseName,rw);
}
rw.write(key, value);
}
private RecordWriter<K,V> getBaseRecordWriter(TaskAttemptContext job,String baseName)throws IOException,InterruptedException{
Configuration conf=job.getConfiguration();
boolean isCompressed=getCompressOutput(job);
String keyValueSeparator= ":";
RecordWriter<K,V> recordWriter=null;
if(isCompressed){
Class<?extends CompressionCodec> codecClass=getOutputCompressorClass(job,(Class<?extends CompressionCodec>) GzipCodec.class);
CompressionCodec codec=ReflectionUtils.newInstance(codecClass,conf);
Path file=new Path(workPath,baseName+codec.getDefaultExtension());
FSDataOutputStream fileOut=file.getFileSystem(conf).create(file,false);
recordWriter=new LineRecordWriter<K,V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
}else{
Path file=new Path(workPath,baseName);
FSDataOutputStream fileOut=file.getFileSystem(conf).create(file,false);
recordWriter =new LineRecordWriter<K,V>(fileOut,keyValueSeparator);
}
return recordWriter;
}
}
}
package mapreduce11;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FileOutputMR {
public static class TokenizerMapper extends Mapper<Object,Text,Text,Text>{
private Text val=new Text();
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
String str[]=value.toString().split(",");
val.set(str[0]+" "+str[1]+" "+str[2]);
context.write(new Text(str[3]), val);
}
}
public static class IntSumReducer extends Reducer<Text,Text,Text,Text>{
public void reduce(Text key,Iterable<Text> values,Context context)
throws IOException,InterruptedException{
for(Text val:values){
context.write(key,val);
}
}
}
public static class AlphabetOutputFormat extends MyMultipleOutputFormat<Text,Text>{
protected String generateFileNameForKayValue(Text key,Text value,Configuration conf){
return key+".txt";
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
Configuration conf=new Configuration();
Job job=new Job(conf,"FileOutputMR");
job.setJarByClass(FileOutputMR.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(AlphabetOutputFormat.class);
FileInputFormat.addInputPath(job,new Path("hdfs://192.168.149.10:9000/mymapreduce12/in/cat_group1"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.149.10:9000/mymapreduce12/out"));
System.exit(job.waitForCompletion(true)?0:1);
}
}
7.将hadoop2lib目錄中的jar包,拷貝到hadoop2lib目錄下。
8.拷貝log4j.properties檔案
9.運作結果

hadoop fs -ls /mymapreduce12/out
hadoop fs -cat /mymapreduce12/out/0.txt
hadoop fs -cat /mymapreduce12/out/1.txt