天天看點

分布式并行計算MapReduce

這個作業的要求來自于:https://edu.cnblogs.com/campus/gzcc/GZCC-16SE2/homework/3319。

1.用自己的話闡明Hadoop平台上HDFS和MapReduce的功能、工作原理和工作過程。

HDFS

    功能:分布式檔案系統,用來存儲海量資料。

    工作原理和過程:HDFS是Hadoop的分布式檔案系統,HDFS中的檔案會預設存儲3份,存儲在不同的機器上,提供容錯機制,副本丢失或者當機的自動恢複。HDFS總體上采用Master/Slave的架構,整個HDFS架構由Client、NameNode、Secondary NameNode和DataNode構成。NameNode負責存儲整個叢集的中繼資料資訊,Client可以根據中繼資料資訊找到對應的檔案,DataNode負責資料的實際存儲。當一個檔案上傳到HDFS的時候,DataNode會按照Block為基本機關分布在各個DataNode中,而且為了保護資料的一緻性和容錯性,一般一份資料會在不同的DataNode上預設存儲三份。如下圖所示:

分布式并行計算MapReduce

MapReduce

    功能:并行處理架構,實作任務分解和排程。

    工作原理和過程:MapReduce的工作過程分成兩個階段,map階段和reduce階段。每個階段都有鍵值對作為輸入輸出,map函數和reduce函數的具體實作由程式員完成。MapReduce的架構也是采用Master/Slave的方式組織,如下圖所示。由四部分組成,分别為Client、JobTracker、TaskTracker以及Task。JobTracker主要負責資源監控和作業排程。JobTracker監控TaskTracker是否存活,任務執行的狀态以及資源的使用情況,并且把得到的資訊交給TaskSceduler。TaskSceduler根據每個TaskTracker的情況給配置設定響應的任務。TaskTracker會周期性通過heartbeats向JobTracker發送資源的使用情況,任務的執行狀況等資訊,同時會接收JobTracker的指令,TaskTracker把自己可支配的資源分成若幹個Slot,Task隻有拿到一個Slot資源才能執行任務。Task任務分成Map Task和Reduce Task兩種任務,都是由TaskTracker進行排程的。

分布式并行計算MapReduce

2.HDFS上運作MapReduce

mapper.py

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Map extends Mapper<LongWritable, Text, Text,IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
        word.set(value.toString());
        context.write(word, one);
    }
}      
分布式并行計算MapReduce

reduce.py

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class Reduce extends Reducer<Text, IntWritable, Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable intWritable : values){
            sum += intWritable.get();
        }
        context.write(key, new IntWritable(sum));
    }
}
      
#!/usr/bin/env python
cd /home/hadoop/wc
sudo gedit reduce.py
#賦予權限
chmod a+x /home/hadoop/map.py
      
分布式并行計算MapReduce

本機上測試運作代碼:

echo "foo foo quux labs foo bar quux" | /home/hadoop/wc/mapper.py

echo "foo foo quux labs foo bar quux" | /home/hadoop/wc/mapper.py | sort -k1,1 | /home/hadoop/wc/reducer.p      
啟動Hadoop,HDFS, JobTracker, TaskTracker:

        
分布式并行計算MapReduce
分布式并行計算MapReduce
分布式并行計算MapReduce

放到HDFS上運作

下載下傳并上傳檔案到hdfs上:

#上傳檔案
cd  /home/hadoop/wc
wget http://www.gutenberg.org/files/5000/5000-8.txt
wget http://www.gutenberg.org/cache/epub/20417/pg20417.txt
 
#下載下傳檔案
cd /usr/hadoop/wc
hdfs dfs -put /home/hadoop/hadoop/gutenberg/*.txt /user/hadoop/input      

建立一個檔案5000-8.txt,運作結果如下:

分布式并行計算MapReduce
分布式并行計算MapReduce