大数据学习之 MapReduce基础及第一个程序 WordCount
- MapReduce
-
- 定义
- 优缺点
- MapReduce编程思想
-
- MapReduce进程
- 编码规范
- Hadoop序列化
-
- 为什么要序列化?
- 什么是序列化?
- 为什么不用Java的序列化?
- 为什么序列化对Hadoop很重要?
- 常用数据序列化类型
- MapReduce工作流程
-
- MapTask工作机制
- ReduceTask工作机制
- shuffle流程详解
- WordCount代码(带有Partitoner分区操作)
-
- pom.xml
- WordcountDriver
- WordcountMapper
- WordcountReducer
- WordcountPartitoner(分区)
MapReduce
英语四六级
定义
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成为一个完整的分布式运算程序,并发运行在一个hadoop集群上。
优缺点
优点:
- MapReduce易于编程。它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。
- 良好的扩展性。当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
- 高容错性。MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。
- 适合PB级以上的海量数据的离线处理。重点是PB级别和离线处理,说明它适合数据量巨大的离线处理而不适合在线处理。比如像毫秒级别的返回一个结果,MapReduce很难做到。
缺点:
MapReduce不擅长做实时计算、流式计算、DAG(有向图)计算。
实时计算:MapReduce无法像Mysql一样,在毫秒或者秒级内返回结果。
流式计算:流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。
DAG(有向图)计算:多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
MapReduce编程思想
分布式的运算往往需要分成至少两个阶段;
第一个阶段为maptask,是并发实例,完全并行执行,互不相干;
第二个阶段为reducetask,也是并发实例,互不相干,但是处理的数据依赖于上一阶段的maptask并发实例的输出;
MapReduce编程模型只能包含一个map阶段和一个reduce阶段,如果用户的业务逻辑非常复杂,那就只能写多个MapReduce程序,串行运行。
MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类实例进程:
1)MrAppMaster:负责整个程序的过程调度及状态协调。
2)MapTask:负责map阶段的整个数据处理流程。
3)ReduceTask:负责reduce阶段的整个数据处理流程。
编码规范
用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
1)Mapper阶段
(1)用户自定义的Mapper要继承自己的父类
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper中的业务逻辑写在map()方法中
(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(5)map()方法(maptask进程)对每一个<K,V>调用一次
2)Reducer阶段
(1)用户自定义的Reducer要继承自己的父类
(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(3)Reducer的业务逻辑写在reduce()方法中
(4)Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
3)Driver阶段
整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象
Hadoop序列化
为什么要序列化?
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
什么是序列化?
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。
反序列化就是将自己序列(或其他数据传输协议)或者是硬盘的持久化数据转换为内存的对象。
为什么不用Java的序列化?
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop自己开发了一套序列化机制(Writable),精简、高效。
为什么序列化对Hadoop很重要?
因为Hadoop在集群之间进行通讯或者RPC调用的时候,需要序列化,而且要求序列化要快,且体积要小,占用带宽要小。所以必须理解Hadoop的序列化机制。
序列化和反序列化在分布式数据处理领域经常出现:进程通信和永久存储。
然而Hadoop中各个节点的通信是通过**远程调用(RPC)**实现的,那么RPC序列化要求具有以下特点:
1)紧凑:紧凑的格式能让我们充分利用网络带宽,而带宽是数据中心最稀缺的资
2)快速:进程通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的;
3)可扩展:协议为了满足新的需求变化,所以控制客户端和服务器过程中,需要直接引进相应的协议,这些是新协议,原序列化方式能支持新的协议报文;
4)互操作:能支持不同语言写的客户端和服务端进行交互;
常用数据序列化类型
常用的数据类型对应的hadoop数据序列化类型
Java类型 | Hadoop Writable类型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
string | Text |
map | MapWritable |
array | ArrayWritable |
也可以bean对象实现序列化接口(Writable),实现序列化
MapReduce工作流程
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPnllN1ITY0FjMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLxYTO4QTNzkTMzATMxkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
MapTask工作机制
- read阶段:MapTask通过编写的RecordReader,从输入的InputSplit中解析出一个个key/value。
- map阶段:该节点主要是将解析出的key/value交给编写的map()函数处理,并产生一系列的key/value
- collect收集阶段:在用户编写的map()函数中,当数据处理完成后,一般会调用OutPutCollect.collect()输出结果。在该函数内部,会生成一个key/value分区(调用Partitioner),并写入一个环形缓冲区中。
- spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
- Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
ReduceTask工作机制
- Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
- Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
- Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
- Reduce阶段:reduce()函数将计算结果写到HDFS上。
shuffle流程详解
- maptask收集map方法输出的kv对(key,value),放到内存缓冲区中。
- 从内存缓冲区中不断溢出本地磁盘文件,可能会溢出多个文件
- 多个溢出文件会被合并成大的溢出文件
- 在溢出及合并过程中,都要调用partitioner进行分区和针对key进行排序。
- reducetask根据自己的分区号,去各个maptask机器上取得相应的结果分区数据。
- reducetask根据分区号,去各个maptask机器上相应的结果分区的数据。(如果reducetask只有一个,则所有分区都写入那一个reducetask)
WordCount代码(带有Partitoner分区操作)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>hdfs</groupId>
<artifactId>hdfs</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
WordcountDriver
package Wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import overwrite.FileInputFormat;
import java.io.IOException;
/**
* @author wt
*/
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//args = new String [] {"D:\\data\\data\\","D:\\data\\wordcount"};
args = new String [] {"D:\\data\\web\\","D:\\data\\wordcount"};
//1.获取配置信息
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//设置加载路径
job.setJarByClass(WordcountDriver.class);
//设置map和Reduce
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
//设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置Reduce输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置Partitioner分区
job.setPartitionerClass(WordcountPartitoner.class);
job.setNumReduceTasks(2);
// 判断output文件夹是否存在,如果存在则删除
Path path = new Path(args[1]);
FileSystem fileSystem = path.getFileSystem(configuration);
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
}
//设置输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交
boolean result = job.waitForCompletion(true);
System.out.println(result?0:1);
}
}
WordcountMapper
package Wordcount;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author wt
*/
@Slf4j
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
*
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
try {
log.info("重写setup"+context.getMapperClass().toString());
System.out.println("重写setup"+context.getMapperClass().toString());
} catch (ClassNotFoundException e) {
log.error(""+e);
}
}
/**
*
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取数据
String line = new String(value.getBytes(),0,value.getLength(),"GBK");
//String line = value.toString();
//根据\t切割数据
//String[] words = line.split("\t");
String[] words = line.split("\\w+");
//根据空格切割
for(String word : words){
context.write(new Text(word),new IntWritable(1));
}
}
/**
*
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
System.out.println("重写cleanup");
}
}
WordcountReducer
package Wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author wt
*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
*
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//累计求和
int sum = 0;
for (IntWritable conunt:values){
sum += conunt.get();
}
//输出
context.write(key,new IntWritable(sum));
}
}
WordcountPartitoner(分区)
package Wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
*
*/
public class WordcountPartitoner extends Partitioner<Text, IntWritable> {
/**
*
* @param text
* @param intWritable
* @param i
* @return
*/
@Override
public int getPartition(Text text, IntWritable intWritable, int i) {
//获取单词
String firWord="1";
if(text.toString().length() > 2){
firWord = text.toString().substring(0, 1);
}
int result = firWord.hashCode();
// 2 根据奇数偶数分区
if (result % 2 == 0) {
return 0;
}else {
return 1;
}
}
}
英语四六级
企业网:永久可查,四六级委员会终身备案。中国500强企业、银行、国企等在四六级委员会都有对接,有后台会员号,可以很容易调出学生档案。原版真纸,三大防伪,可过机器。