大資料學習之 MapReduce基礎及第一個程式 WordCount
- MapReduce
-
- 定義
- 優缺點
- MapReduce程式設計思想
-
- MapReduce程序
- 編碼規範
- Hadoop序列化
-
- 為什麼要序列化?
- 什麼是序列化?
- 為什麼不用Java的序列化?
- 為什麼序列化對Hadoop很重要?
- 常用資料序列化類型
- MapReduce工作流程
-
- MapTask工作機制
- ReduceTask工作機制
- shuffle流程詳解
- WordCount代碼(帶有Partitoner分區操作)
-
- pom.xml
- WordcountDriver
- WordcountMapper
- WordcountReducer
- WordcountPartitoner(分區)
MapReduce
英語四六級
定義
MapReduce是一個分布式運算程式的程式設計架構,是使用者開發“基于hadoop的資料分析應用”的核心架構。
MapReduce核心功能是将使用者編寫的業務邏輯代碼和自帶預設元件整合成為一個完整的分布式運算程式,并發運作在一個hadoop叢集上。
優缺點
優點:
- MapReduce易于程式設計。它簡單的實作一些接口,就可以完成一個分布式程式,這個分布式程式可以分布到大量廉價的PC機器上運作。也就是說你寫一個分布式程式,跟寫一個簡單的串行程式是一模一樣的。就是因為這個特點使得MapReduce程式設計變得非常流行。
- 良好的擴充性。當你的計算資源不能得到滿足的時候,你可以通過簡單的增加機器來擴充它的計算能力。
- 高容錯性。MapReduce設計的初衷就是使程式能夠部署在廉價的PC機器上,這就要求它具有很高的容錯性。
- 适合PB級以上的海量資料的離線處理。重點是PB級别和離線處理,說明它适合資料量巨大的離線處理而不适合線上處理。比如像毫秒級别的傳回一個結果,MapReduce很難做到。
缺點:
MapReduce不擅長做實時計算、流式計算、DAG(有向圖)計算。
實時計算:MapReduce無法像Mysql一樣,在毫秒或者秒級内傳回結果。
流式計算:流式計算的輸入資料是動态的,而MapReduce的輸入資料集是靜态的,不能動态變化。這是因為MapReduce自身的設計特點決定了資料源必須是靜态的。
DAG(有向圖)計算:多個應用程式存在依賴關系,後一個應用程式的輸入為前一個的輸出。在這種情況下,MapReduce并不是不能做,而是使用後,每個MapReduce作業的輸出結果都會寫入到磁盤,會造成大量的磁盤IO,導緻性能非常的低下。
MapReduce程式設計思想
分布式的運算往往需要分成至少兩個階段;
第一個階段為maptask,是并發執行個體,完全并行執行,互不相幹;
第二個階段為reducetask,也是并發執行個體,互不相幹,但是處理的資料依賴于上一階段的maptask并發執行個體的輸出;
MapReduce程式設計模型隻能包含一個map階段和一個reduce階段,如果使用者的業務邏輯非常複雜,那就隻能寫多個MapReduce程式,串行運作。
MapReduce程序
一個完整的MapReduce程式在分布式運作時有三類執行個體程序:
1)MrAppMaster:負責整個程式的過程排程及狀态協調。
2)MapTask:負責map階段的整個資料處理流程。
3)ReduceTask:負責reduce階段的整個資料處理流程。
編碼規範
使用者編寫的程式分成三個部分:Mapper,Reducer,Driver(送出運作mr程式的用戶端)
1)Mapper階段
(1)使用者自定義的Mapper要繼承自己的父類
(2)Mapper的輸入資料是KV對的形式(KV的類型可自定義)
(3)Mapper中的業務邏輯寫在map()方法中
(4)Mapper的輸出資料是KV對的形式(KV的類型可自定義)
(5)map()方法(maptask程序)對每一個<K,V>調用一次
2)Reducer階段
(1)使用者自定義的Reducer要繼承自己的父類
(2)Reducer的輸入資料類型對應Mapper的輸出資料類型,也是KV
(3)Reducer的業務邏輯寫在reduce()方法中
(4)Reducetask程序對每一組相同k的<k,v>組調用一次reduce()方法
3)Driver階段
整個程式需要一個Drvier來進行送出,送出的是一個描述了各種必要資訊的job對象
Hadoop序列化
為什麼要序列化?
一般來說,“活的”對象隻生存在記憶體裡,關機斷電就沒有了。而且“活的”對象隻能由本地的程序使用,不能被發送到網絡上的另外一台計算機。 然而序列化可以存儲“活的”對象,可以将“活的”對象發送到遠端計算機。
什麼是序列化?
序列化就是把記憶體中的對象,轉換成位元組序列(或其他資料傳輸協定)以便于存儲(持久化)和網絡傳輸。
反序列化就是将自己序列(或其他資料傳輸協定)或者是硬碟的持久化資料轉換為記憶體的對象。
為什麼不用Java的序列化?
Java的序列化是一個重量級序列化架構(Serializable),一個對象被序列化後,會附帶很多額外的資訊(各種校驗資訊,header,繼承體系等),不便于在網絡中高效傳輸。是以,hadoop自己開發了一套序列化機制(Writable),精簡、高效。
為什麼序列化對Hadoop很重要?
因為Hadoop在叢集之間進行通訊或者RPC調用的時候,需要序列化,而且要求序列化要快,且體積要小,占用帶寬要小。是以必須了解Hadoop的序列化機制。
序列化和反序列化在分布式資料處理領域經常出現:程序通信和永久存儲。
然而Hadoop中各個節點的通信是通過**遠端調用(RPC)**實作的,那麼RPC序列化要求具有以下特點:
1)緊湊:緊湊的格式能讓我們充分利用網絡帶寬,而帶寬是資料中心最稀缺的資
2)快速:程序通信形成了分布式系統的骨架,是以需要盡量減少序列化和反序列化的性能開銷,這是基本的;
3)可擴充:協定為了滿足新的需求變化,是以控制用戶端和伺服器過程中,需要直接引進相應的協定,這些是新協定,原序列化方式能支援新的協定封包;
4)互操作:能支援不同語言寫的用戶端和服務端進行互動;
常用資料序列化類型
常用的資料類型對應的hadoop資料序列化類型
Java類型 | Hadoop Writable類型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
string | Text |
map | MapWritable |
array | ArrayWritable |
也可以bean對象實作序列化接口(Writable),實作序列化
MapReduce工作流程
MapTask工作機制
- read階段:MapTask通過編寫的RecordReader,從輸入的InputSplit中解析出一個個key/value。
- map階段:該節點主要是将解析出的key/value交給編寫的map()函數處理,并産生一系列的key/value
- collect收集階段:在使用者編寫的map()函數中,當資料處理完成後,一般會調用OutPutCollect.collect()輸出結果。在該函數内部,會生成一個key/value分區(調用Partitioner),并寫入一個環形緩沖區中。
- spill階段:即“溢寫”,當環形緩沖區滿後,MapReduce會将資料寫到本地磁盤上,生成一個臨時檔案。需要注意的是,将資料寫入本地磁盤之前,先要對資料進行一次本地排序,并在必要時對資料進行合并、壓縮等操作。
- Combine階段:當所有資料處理完成後,MapTask對所有臨時檔案進行一次合并,以確定最終隻會生成一個資料檔案。
ReduceTask工作機制
- Copy階段:ReduceTask從各個MapTask上遠端拷貝一片資料,并針對某一片資料,如果其大小超過一定門檻值,則寫到磁盤上,否則直接放到記憶體中。
- Merge階段:在遠端拷貝資料的同時,ReduceTask啟動了兩個背景線程對記憶體和磁盤上的檔案進行合并,以防止記憶體使用過多或磁盤上檔案過多。
- Sort階段:按照MapReduce語義,使用者編寫reduce()函數輸入資料是按key進行聚集的一組資料。為了将key相同的資料聚在一起,Hadoop采用了基于排序的政策。由于各個MapTask已經實作對自己的處理結果進行了局部排序,是以,ReduceTask隻需對所有資料進行一次歸并排序即可。
- Reduce階段:reduce()函數将計算結果寫到HDFS上。
shuffle流程詳解
- maptask收集map方法輸出的kv對(key,value),放到記憶體緩沖區中。
- 從記憶體緩沖區中不斷溢出本地磁盤檔案,可能會溢出多個檔案
- 多個溢出檔案會被合并成大的溢出檔案
- 在溢出及合并過程中,都要調用partitioner進行分區和針對key進行排序。
- reducetask根據自己的分區号,去各個maptask機器上取得相應的結果分區資料。
- reducetask根據分區号,去各個maptask機器上相應的結果分區的資料。(如果reducetask隻有一個,則所有分區都寫入那一個reducetask)
WordCount代碼(帶有Partitoner分區操作)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>hdfs</groupId>
<artifactId>hdfs</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
WordcountDriver
package Wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import overwrite.FileInputFormat;
import java.io.IOException;
/**
* @author wt
*/
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//args = new String [] {"D:\\data\\data\\","D:\\data\\wordcount"};
args = new String [] {"D:\\data\\web\\","D:\\data\\wordcount"};
//1.擷取配置資訊
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//設定加載路徑
job.setJarByClass(WordcountDriver.class);
//設定map和Reduce
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
//設定map輸出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//設定Reduce輸出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設定Partitioner分區
job.setPartitionerClass(WordcountPartitoner.class);
job.setNumReduceTasks(2);
// 判斷output檔案夾是否存在,如果存在則删除
Path path = new Path(args[1]);
FileSystem fileSystem = path.getFileSystem(configuration);
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
}
//設定輸入和輸出的路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//送出
boolean result = job.waitForCompletion(true);
System.out.println(result?0:1);
}
}
WordcountMapper
package Wordcount;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author wt
*/
@Slf4j
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
*
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
try {
log.info("重寫setup"+context.getMapperClass().toString());
System.out.println("重寫setup"+context.getMapperClass().toString());
} catch (ClassNotFoundException e) {
log.error(""+e);
}
}
/**
*
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//擷取資料
String line = new String(value.getBytes(),0,value.getLength(),"GBK");
//String line = value.toString();
//根據\t切割資料
//String[] words = line.split("\t");
String[] words = line.split("\\w+");
//根據空格切割
for(String word : words){
context.write(new Text(word),new IntWritable(1));
}
}
/**
*
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
System.out.println("重寫cleanup");
}
}
WordcountReducer
package Wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author wt
*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
*
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//累計求和
int sum = 0;
for (IntWritable conunt:values){
sum += conunt.get();
}
//輸出
context.write(key,new IntWritable(sum));
}
}
WordcountPartitoner(分區)
package Wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
*
*/
public class WordcountPartitoner extends Partitioner<Text, IntWritable> {
/**
*
* @param text
* @param intWritable
* @param i
* @return
*/
@Override
public int getPartition(Text text, IntWritable intWritable, int i) {
//擷取單詞
String firWord="1";
if(text.toString().length() > 2){
firWord = text.toString().substring(0, 1);
}
int result = firWord.hashCode();
// 2 根據奇數偶數分區
if (result % 2 == 0) {
return 0;
}else {
return 1;
}
}
}
英語四六級
企業網:永久可查,四六級委員會終身備案。中國500強企業、銀行、國企等在四六級委員會都有對接,有背景會員号,可以很容易調出學生檔案。原版真紙,三大防僞,可過機器。