這是《Hadoop Beginner's Guide》 第四章的學習筆記。本章通過分析UFO sighting dataset的講解了相關程式設計技巧。所需的ufo.tsv檔案在書中給出的連結已經下不到了,在網上搜尋檔案名可以找到。書中的腳本用的是ruby,我把它改成了python。書中用的舊的api,因為裝的是hadoop2.6.0,用的新api,我對程式也做相應改動。事後來看這種方式還挺有用的,迫使自己先看懂程式,對api不懂的地方就得度娘幫忙了。api說明在http://hadoop.apache.org/docs/r2.6.0/api/index.html。
ufo sighting檔案
序号 | 字段 | 說明 |
1 | Sighting date | 看到UFO 的時間 |
2 | Recorded date | 記錄時間 |
3 | Location | 看到UFO 的地點 |
4 | Shape | 形狀,如diamond等 |
5 | Duration | 持續時間 |
6 | Description | 描述 |
流方式統計記錄數和字段數
這個很簡單,相當于WordCount。
wcmapper.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""a python script for hadoop streaming map """
import sys
def map(input):
for line in input:
line = line.strip()
words = line.split()
for word in words:
print '%s\t%s' % (word, 1)
def main():
map(sys.stdin)
if __name__ == "__main__":
main()
wcreducer.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""a python script for hadoop streaming map """
import sys
def reduce(input):
current_word = None
current_count = 0
word = None
for line in input:
line = line.strip()
word, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if current_word == word:
current_count += count
else:
if current_word:
print '%s\t%s' %(current_word, current_count)
current_count = count
current_word = word
print '%s\t%s' % (current_word, current_count)
def main():
reduce(sys.stdin)
if __name__ == "__main__":
main()
summarymapper.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""a python script for hadoop streaming map """
import sys
def map(input):
for line in input:
print "total\t1"
line = line.strip()
words = line.split("\t")
if len(words) != 6:
print "badline\t1"
else:
if words[0] != None:
print "sighted\t1"
if words[1] != None:
print "recorded\t1"
if words[2] != None:
print "location\t1"
if words[3] != None:
print "shape\t1"
if words[4] != None:
print "duration\t1"
if words[5] != None:
print "description\t1"
def main():
map(sys.stdin)
if __name__ == "__main__":
main()
用到的指令
cat ufo.tsv | ./summarymapper.py | sort | ./wcreducer.py > output.txt
hadoop dfs -copyFromLocal ufo.tsv /user/hadoop/ufo/ufo.tsv
hadoop jar /home/hadoop/cloud/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -file summarymapper.py -mapper summarymapper.py -file wcreducer.py -reducer wcreducer.py -input /user/hadoop/ufo/ufo.tsv -output /user/hadoop/ufo/out
按形狀分組統計持續時間
第一個腳本shpemapper.py隻是簡單的統計形狀,相當于WordCount。腳本shapetimemapper.py和shapetimemapper.py實作分組統計。相比于shpemapper.py,shapetimemapper.py多做了一件事情,第一件與shpemapper.py一樣,識别形狀(words[3]);第二件是用用到正規表達式提取文本(words[4]),轉換成整數後累加,輸出時将 print shape + "\t1" 改為 print shape + "\t" + str(time)。shapetimereducer.py中實作min 、max、total、mean等計算。
shpemapper.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""a python script for hadoop streaming map """
import sys
def map(input):
for line in input:
line = line.strip()
words = line.split("\t")
if len(words) == 6:
shape = words[3].strip()
if len(shape) > 0:
print shape + "\t1"
def main():
map(sys.stdin)
if __name__ == "__main__":
main()
shapetimemapper.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""a python script for hadoop streaming map """
import sys
import re
def map(input):
pattern1 = re.compile(r'\d* ?((min)|(sec))')
pattern2 = re.compile(r'\d*')
for line in input:
line = line.strip()
words = line.split("\t")
if len(words) == 6:
shape = words[3].strip()
duration = words[4].strip()
if shape != None and duration != None:
match = pattern1.match(duration)
if match != None:
time = pattern2.match(match.group())
unit = match.group(1)
try:
time = int(time.group())
except:
#print '??? : ' + duration
time = 0
if unit == 'min':
time = time * 60
if len(shape) > 0:
print shape + '\t' + str(time)
def main():
map(sys.stdin)
if __name__ == "__main__":
main()
shapetimereducer.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""a python script for hadoop streaming map """
import sys
import re
def reduce(input):
current = None
minv = 0
maxv = 0
mean = 0
total = 0
count = 0
for line in input:
line = line.strip()
word, time = line.split('\t')
time = int(time)
if word == current:
count += 1
total += time
if time < minv:
minv = time
if time > maxv:
maxv = time
else:
if current != None:
print current + '\t' + str(minv) +' ' + str(maxv) + ' ' + str((total/count))
current = word
count = 1
total = time
minv = time
maxv = time
print current + '\t' + str(minv) +' ' + str(maxv) + ' ' + str((total/count))
def main():
reduce(sys.stdin)
if __name__ == "__main__":
main()
ChainMapper
用鍊的方式把兩個Mapper串起來,有點像是servlet裡的Filter。示例中用了兩個Mapper,第一個驗證記錄的有效性,第二個對Location進行計數。使用ChainMapper.addMapper 方法添加Mapper。注意job.setJarByClass(ufo.UFORecordValidationMapper.class);否則會報類找不到, 要注意的是新舊api的使用有差異,我在程式中進行了注釋。
UFORecordValidationMapper.java
package ufo;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
/*
舊api,繼承類org.apache.hadoop.mapred.MapReduceBase,然後實作接口org.apache.hadoop.mapred.Mapper<K1, V1, K2, V2>。
新api,繼承類org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>。
舊api,map方法的第三、四個形參分别是OutputCollector和Reporter類。
新api,map方法的第三個參數是Context類,新api的Context把兩個類的功能合并到一起。
*/
public class UFORecordValidationMapper extends Mapper<LongWritable, Text, LongWritable, Text>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
if(validate(line))
context.write(key, value);
}
private boolean validate(String str)
{
String words[] = str.split("\t");
if(words.length != 6)
return false;
else
return true;
}
}
UFOLocation.java
package ufo;
import java.io.*;
import java.util.Iterator;
import java.util.regex.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.chain.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class UFOLocation
{
public static class MapClass extends Mapper<Object, Text, Text, LongWritable>
{
private final static LongWritable one = new LongWritable(1);
private static Pattern locationPattern = Pattern.compile("[a-zA-z]{2}[^a-zA-z]*$");
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
String[] fields = line.split("\t");
String location = fields[2].trim();
if(location.length() >= 2)
{
Matcher matcher = locationPattern.matcher(location);
if(matcher.find())
{
int start = matcher.start();
String state = location.substring(start, start + 2);
context.write(new Text(state.toUpperCase()), one);
}
}
}
}
/*
新api中,驅動代碼通過org.apache.hadoop.mapreduce.Job類實作,通過該類管理各種配置,然後調用waitForCompletion(boolean)方法把代碼送出給JobTracker執行。
舊api中,驅動代碼通過org.apache.hadoop.mapred.JobConf(Configuration, Class)類實作,通過該類管理各種配置;通過org.apache.hadoop.mapred.JobClient類的runJob(JobConf)方法實作job的送出。
*/
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "UFOLocation");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
Configuration mapAConf = new Configuration(false);
ChainMapper.addMapper(job, UFORecordValidationMapper.class, LongWritable.class, Text.class,
LongWritable.class, Text.class, mapAConf);
Configuration mapBConf = new Configuration(false);
ChainMapper.addMapper(job, MapClass.class, LongWritable.class, Text.class,
Text.class, LongWritable.class, mapBConf);
job.setJarByClass(ufo.UFORecordValidationMapper.class);
job.setMapperClass(ChainMapper.class);
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(LongSumReducer.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
DistributedCache
通過DistributedCache可以共享檔案,共享檔案分兩步實作,第一步是在main中用DistributedCache.addCacheFile添加檔案,第二步是在Mapper的獲得這個檔案,比如Mapper的setup方法将檔案讀入到一個HashMap。在執行個體中建立了一個states.txt檔案作為州名的簡稱到全稱的轉換。
states.txt
AL Alabama
AK Alaska
AZ Arizona
AR Arkansas
CA California
UFOLocation2.java
package ufo;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.regex.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.chain.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.filecache.DistributedCache;
public class UFOLocation2
{
public static class MapClass extends Mapper<Object, Text, Text, LongWritable>
{
public final static String LINK_STATES_TXT = "__Link_statestxt__";
private final static LongWritable one = new LongWritable(1);
private static Pattern locationPattern = Pattern.compile("[a-zA-z]{2}[^a-zA-z]*$");
private Map<String, String> stateNames;
@Override
public void setup(Context text) throws IOException, InterruptedException
{
try
{
setupStateMap();
}catch (IOException e){
System.err.println("Error reading state file.");
System.exit(1);
}
}
private void setupStateMap() throws IOException
{
Map<String, String> states = new HashMap<String, String>();
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(MapClass.LINK_STATES_TXT)));
String line = reader.readLine();
while(line != null)
{
String[] split = line.split("\t");
states.put(split[0], split[1]);
line = reader.readLine();
}
stateNames = states;
}
private String lookupState(String state)
{
String fullName = stateNames.get(state);
return fullName == null ? "Other" : fullName;
}
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
String[] fields = line.split("\t");
String location = fields[2].trim();
if(location.length() >= 2)
{
Matcher matcher = locationPattern.matcher(location);
if(matcher.find())
{
int start = matcher.start();
String state = location.substring(start, start + 2);
String fullName = lookupState(state.toUpperCase());
context.write(new Text(fullName), one);
}
}
}
}
/*
新api中DistributedCadhe需要通過建立符号連結的方式使用。
*/
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "UFOLocation");
String cacheFilePath = "/user/hadoop/ufo/states.txt";
Path inPath = new Path(cacheFilePath);
//#後的為符号連結
String inPathLink = inPath.toUri().toString()+"#"+MapClass.LINK_STATES_TXT;
DistributedCache.addCacheFile(new URI(inPathLink), job.getConfiguration());
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
Configuration mapAConf = new Configuration(false);
ChainMapper.addMapper(job, UFORecordValidationMapper.class, LongWritable.class, Text.class,
LongWritable.class, Text.class, mapAConf);
Configuration mapBConf = new Configuration(false);
ChainMapper.addMapper(job, MapClass.class, LongWritable.class, Text.class,
Text.class, LongWritable.class, mapBConf);
job.setJarByClass(ufo.UFORecordValidationMapper.class);
job.setMapperClass(ChainMapper.class);
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(LongSumReducer.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
Context
在舊api中使用Reporter來實作計數和狀态的輸出,在新api中通過Context實作。
UFOCountingRecordValidationMapper.java
定義 enum LineCunters類型,通過context.getCounter得到計數器,使用計數器increment增加計數。通過context.setStatus設定狀态。
package ufo;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
public class UFOCountingRecordValidationMapper extends Mapper<LongWritable, Text, LongWritable, Text>
{
public enum LineCounters
{
BAD_LINES,
TOO_MANY_TABS,
TOO_FEW_TABS
};
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
if(validate(line, context))
context.write(key, value);
}
private boolean validate(String str, Context context)
{
String words[] = str.split("\t");
if(words.length != 6)
{
if(words.length <6)
{
Counter ct = context.getCounter(LineCounters.TOO_FEW_TABS);
ct.increment(1);
}else{
Counter ct = context.getCounter(LineCounters.TOO_MANY_TABS);
ct.increment(1);
}
Counter ct = context.getCounter(LineCounters.BAD_LINES);
ct.increment(1);
if(ct.getValue() % 10 == 0){
context.setStatus("Got 10 bad lines.");
System.err.println("Read another 10 bad lines.");
}
return false;
}else
return true;
}
}
UFOLocation3.java
複制UFOLocation2.java改名,然後修改兩個地方。
ChainMapper.addMapper(job, UFOCountingRecordValidationMapper.class, LongWritable.class, Text.class,
LongWritable.class, Text.class, mapAConf);
job.setJarByClass(ufo.UFOCountingRecordValidationMapper.class);
編譯和運作程式的輔助腳本
build.sh
因為使用了包ufo,是以build.sh放在目前目錄,java源檔案放在目前目錄的下一級目錄ufo。
#/bin/sh
HADOOP_LIB_DIR=/home/hadoop/cloud/hadoop/share/hadoop
rm -f ./*.class
rm -f ./ufo.jar
javac -classpath $HADOOP_LIB_DIR/common/hadoop-common-2.6.0.jar:$HADOOP_LIB_DIR/common/lib/commons-cli-1.2.jar:$HADOOP_LIB_DIR/common/lib/hadoop-annotations-2.6.0.jar:$HADOOP_LIB_DIR/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar -d . ufo/UFORecordValidationMapper.java ufo/UFOLocation.java ufo/UFOLocation2.java ufo/UFOCountingRecordValidationMapper.java ufo/UFOLocation3.java
#package
jar -cvf ufo.jar ./ufo/*.class
run.sh
運作不同例子隻要改ufo.UFOLocation 為ufo.UFOLocation2 、ufo.UFOLocation3就行了
#/bin/sh
hdfs dfs -rm -r -f /user/hadoop/ufo/out001
hadoop jar ufo.jar ufo.UFOLocation /user/hadoop/ufo/ufo.tsv /user/hadoop/ufo/out001
hdfs dfs -cat /user/hadoop/ufo/out001/part-r-00000