文章目錄
-
- 1. WordCount V1.0
- 2. WordCount V2.0
- 3. 坑
搭建 Hadoop 3.1.2 windows單節點安裝與使用
使用管理者身份運作IDEA
添加Maven依賴,雖然hadoop-client中有hadoop-mapreduce-client-jobclient,但不單獨添加,IDEA控制台日志不會列印
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>3.1.2</version>
</dependency>
添加log4j.properties到resource檔案夾中
log4j.rootLogger=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%m%n
将hdfs-site.xml和core-site.xml複制到resource檔案夾
項目結構
1. WordCount V1.0
map1
public class WordCountMapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 讀取一行
String line = value.toString();
// 空格分隔
StringTokenizer stringTokenizer = new StringTokenizer(line);
// 循環空格分隔,給每個計數1
while(stringTokenizer.hasMoreTokens()){
word.set(stringTokenizer.nextToken());
context.write(word, one);
}
}
}
reduce1
public class WordCountReducer1 extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 根據key對values計數
int sum = 0;
for(IntWritable intWritable : values){
sum += intWritable.get();
}
result.set(sum);
context.write(key, result);
}
}
WordCount V1.0,路徑寫死可直接在IDEA中run,也可以設定參數
public class WordCount1 {
public static void main( String[] args ) {
// 讀取hdfs-site.xml,core-site.xml
Configuration conf = new Configuration();
try{
Job job = Job.getInstance(conf,"WordCount V1.0");
job.setJarByClass(WordCount1.class);
job.setMapperClass(WordCountMapper1.class);
job.setCombinerClass(WordCountReducer1.class);
job.setReducerClass(WordCountReducer1.class);
// job 輸出key value 類型,mapper和reducer類型相同可用
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// hdfs
FileInputFormat.addInputPath(job, new Path("/hdfsTest/input"));
FileOutputFormat.setOutputPath(job, new Path("/hdfsTest/output"));
//FileInputFormat.addInputPath(job, new Path(args[0]));
//FileOutputFormat.setOutputPath(job, new Path(args[1]));
// windows 本地目錄
//FileInputFormat.setInputPaths(job, "D:\\hadoop-test\\input");
//FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop-test\\output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}catch (Exception e){
e.printStackTrace();
}
}
}
可打成jar包運作,使用參數,需要增加命名空間
hadoop jar mapreduce-test-1.0-SNAPSHOT.jar com.shpun.wordcount1.WordCount1 /hdfsTest/input /hdfsTest/output
2. WordCount V2.0
map2
public class WordCountMapper2 extends Mapper<LongWritable,Text, Text, IntWritable> {
enum MapperCounterEnums{
INPUT_WORDS
}
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
// 區分大小寫
private boolean caseSensitive;
// 用于存需要過濾的pattern
private Set<String> patternsToSkip = new HashSet<>();
private Configuration conf;
private BufferedReader bufferedReader;
/**
* 此方法被MapReduce架構僅且執行一次,在執行Map任務前,進行相關變量或者資源的集中初始化工作。
* 1.讀取配置檔案中的wordcount.case.sensitive,指派給caseSensitive變量
* 2.讀取配置檔案中的wordcount.skip.patterns,如果為true,将CacheFiles的檔案都加入過濾範圍
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
// 擷取是否區分大小寫的值
caseSensitive = conf.getBoolean("wordcount.case.sensitive",true);
// 擷取是否需要過濾
if(conf.getBoolean("wordcount.skip.patterns", false)){
// 讀取緩存檔案,在main函數中添加
URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
for(URI patternsURI : patternsURIs){
Path patternsPath = new Path(patternsURI.getPath());
String fileName = patternsPath.getName();
parseSkipFile(fileName);
}
}
}
/**
* 根據檔案名讀取每行,添加到需要過濾的set中
* @param fileName
*/
private void parseSkipFile(String fileName){
try{
bufferedReader = new BufferedReader(new FileReader(fileName));
String patternLine;
//讀取檔案每一行,并添加
while((patternLine = bufferedReader.readLine()) != null){
patternsToSkip.add(patternLine);
}
}catch (IOException e){
e.printStackTrace();
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
// 過濾
for(String pattern : patternsToSkip){
line = line.replaceAll(pattern,"");
}
StringTokenizer stringTokenizer = new StringTokenizer(line);
while(stringTokenizer.hasMoreTokens()){
word.set(stringTokenizer.nextToken());
context.write(word, one);
// 定義計數器,枚舉類型的名稱即為組的名稱,枚舉類型的字段就是計數器名稱
Counter counter = context.getCounter(MapperCounterEnums.class.getName(),MapperCounterEnums.INPUT_WORDS.toString());
counter.increment(1);
}
}
}
reduce2
public class WordCountReducer2 extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable value : values){
sum += value.get();
}
result.set(sum);
context.write(key, result);
}
}
WordCount V2.0
public class WordCount2 {
public static void main( String[] args ) {
// 讀取hdfs-site.xml,core-site.xml
Configuration conf = new Configuration();
try{
// 擷取參數
GenericOptionsParser genericOptionsParser = new GenericOptionsParser(conf,args);
String[] remainingArgs = genericOptionsParser.getRemainingArgs();
// 若指令為 $ bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output
// genericOptionsParser.getRemainingArgs() 擷取到的就是
// /user/joe/wordcount/input /user/joe/wordcount/output
// 若指令為 $ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
// -Dwordcount.case.sensitive=false 設定不區分大小寫
// genericOptionsParser.getRemainingArgs() 擷取到的就是
// /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
if(remainingArgs.length != 2 && remainingArgs.length != 4){
System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
System.exit(2);
}
Job job = Job.getInstance(conf,"WordCount V2.0");
job.setJarByClass(WordCount2.class);
job.setMapperClass(WordCountMapper2.class);
job.setCombinerClass(WordCountReducer2.class);
job.setReducerClass(WordCountReducer2.class);
// job 輸出key value 類型,mapper和reducer類型相同可用
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
List<String> otherArgList = new ArrayList<>();
for(int i = 0;i < remainingArgs.length;i++){
if("-skip".equals(remainingArgs[i])){
// 擷取過濾檔案的位址,将hdfs檔案路徑轉成完整的路徑,scheme://authority/path
URI patternURI = new Path(remainingArgs[++i]).toUri();
// 加入本地化緩存中
job.addCacheFile(patternURI);
// 需要過濾
job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
}else{
otherArgList.add(remainingArgs[i]);
}
}
FileInputFormat.addInputPath(job, new Path(otherArgList.get(0)));
FileOutputFormat.setOutputPath(job, new Path(otherArgList.get(1)));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}catch (Exception e){
e.printStackTrace();
}
}
}
IDEA指派參數,在Program arguments中空格間隔開
可打成jar包運作
hadoop jar mapreduce-test-1.0-SNAPSHOT.jar com.shpun.wordcount2.WordCount2 /hdfsTest/input /hdfsTest/output
hadoop jar mapreduce-test-1.0-SNAPSHOT.jar com.shpun.wordcount2.WordCount2 -Dwordcount.case.sensitive=false /hdfsTest/input /hdfsTest/output
hadoop jar mapreduce-test-1.0-SNAPSHOT.jar com.shpun.wordcount2.WordCount2 -Dwordcount.case.sensitive=false /hdfsTest/input /hdfsTest/output -skip /hdfsTest/skip/skipInput.txt
3. 坑
- 在WordCount V1.0中,在IDEA上運作時,切換注釋的HDFS和Windows路徑時,Maven要進行Clean再Compile,然後再Rebuild Project。不然會出現路徑類型識别出錯。HDFS路徑識别成本地路徑,Windows路徑識别成HDFS路徑。
參考:
hadoop MapReduce Tutorial
初識MapReduce的應用場景(附JAVA和Python代碼)
自己編譯WordCount編譯通過執行報錯
官網MapReduce執行個體代碼詳細批注