天天看点

详解MapReduce中的五大编程模型

前言

我们上一节讲了关于

MapReduce

中的应用场景和架构分析,最后还使用了一个

CountWord

Demo

来进行演示,关于

MapReduce

的具体操作。如果还不了解的朋友可以看看上篇文章:初识MapReduce的应用场景(附JAVA和Python代码)

接下来,我们会讲解关于

MapReduce

的编程模型,这篇文章的主要目的就是讲清楚

Mapreduce

的编程模型有多少种,它们之间是怎么协调合作的,会尽量从源码的角度来解析,最后就是讲解不同的语言是如何调用

Hadoop

中的

Mapreduce

API

的。

目录

  • MapReduce 编程模型的框架
  • 五种编程模型的详解
    • InputFormat
    • OutPutFormat
    • Mapper
    • Reducer
    • Partitioner
  • Hadoop Streaming 的详解
  • 总结

MapReduce 编程模型的框架

我们先来看一张图,关于

MapReduce

的编程模型

详解MapReduce中的五大编程模型
  • 用户程序层

用户程序层是指用户用编写好的代码来调用

MapReduce

的接口层。

  • 工具层
    • Job control 是为了监控

      Hadoop

      中的

      MapReduce

      向集群提交复杂的作业任务,提交了任务到集群中后,形成的任务是一个有向图。每一个任务都有两个方法

      submit()

      waitForCompletion(boolean)

      submit()

      方法是向集群中提交作业,然后立即返回,

      waitForCompletion(boolean)

      就是等待集群中的作业是否已经完成了,如果完成了,得到的结果可以当作下个任务的输入。
    • chain Mapper

      chain Reducer

      的这个模块,是为了用户编写链式作业,形式类似于

      Map + Reduce Map *

      ,表达的意思就是只有一个

      Reduce

      ,在

      Reduce

      的前后可以有多个

      Map

    • Hadoop Streaming

      支持的是脚本语言,例Python、PHP等来调用

      Hadoop

      的底层接口,

      Hadoop Pipes

      支持的是

      C++

      来调用。
  • 编程接口层,这一层是全部由

    Java

    语言来实现的,如果是

    Java

    来开发的话,那么可以直接使用这一层。

详解五种编程模型

InputFormat

作用

对输入进入

MapReduce

的文件进行规范处理,主要包括

InputSplit

RecordReader

两个部分。

TextOutputFormat

是默认的文件输入格式。

详解MapReduce中的五大编程模型

InputSplit

这个是指对输入的文件进行逻辑切割,切割成一对对

Key-Value

值。有两个参数可以定义

InputSplit

的块大小,分别是

mapred.max.split.size

(记为

minSize

)和

mapred.min.split.size

(记为

maxSize

)。

RecordReader

是指作业在

InputSplit

中切割完成后,输出

Key-Value

对,再由

RecordReader

进行读取到一个个

Mapper

文件中。如果没有特殊定义,一个

Mapper

文件的大小就是由

Hadoop

block_size

决定的,

Hadoop 1.x

中的

block_size

64M

,在

Hadoop 2.x

中的

block_size

的大小就是

128M

切割块的大小

Hadoop2.x

以上的版本中,一个

splitSize

的计算公式为

splitSize = max\{minSize,min\{maxSize, blockSize\}\}
           

OutputFormat

作用

对输出的文件进行规范处理,主要的工作有两个部分,一个是检查输出的目录是否已经存在,如果存在的话就会报错,另一个是输出最终结果的文件到文件系统中,

TextOutputFormat

是默认的输出格式。

详解MapReduce中的五大编程模型

OutputCommiter

OutputCommiter

的作用有六点:

  • 作业(

    job

    )的初始化
//进行作业的初始化,建立临时目录。
//如果初始化成功,那么作业就会进入到 Running 的状态
public abstract void setupJob(JobContext var1) throws IOException;
           
  • 作业运行结束后就删除作业
//如果这个job完成之后,就会删除掉这个job。
//例如删除掉临时的目录,然后会宣布这个job处于以下的三种状态之一,SUCCEDED/FAILED/KILLED
@Deprecated
    public void cleanupJob(JobContext jobContext) throws IOException {
    }
           
  • 初始化

    Task

//初始化Task的操作有建立Task的临时目录
public abstract void setupTask(TaskAttemptContext var1) throws IOException;
           
  • 检查是否提交

    Task

    的结果
//检查是否需要提交Task,为的是Task不需要提交的时候提交出去
public abstract boolean needsTaskCommit(TaskAttemptContext var1) throws IOException;
           
  • 提交

    Task

//任务结束的时候,需要提交任务
public abstract void commitTask(TaskAttemptContext var1) throws IOException;
           
  • 回退

    Task

//如果Task处于KILLED或者FAILED的状态,这Task就会进行删除掉临时的目录
//如果这个目录删除不了(例如出现了异常后,处于被锁定的状态),另一个同样的Task会被执行
//然后使用同样的attempt-id去把这个临时目录给删除掉,也就说,一定会把临时目录给删除干净
 public abstract void abortTask(TaskAttemptContext var1) throws IOException;

           

处理Task Side-Effect File

Hadoop

中有一种特殊的文件和特殊的操作,那就是

Side-Eddect File

,这个文件的存在是为了解决某一个

Task

因为网络或者是机器性能的原因导致的运行时间过长,从而导致拖慢了整体作业的进度,所以会为每一个任务在另一个节点上再运行一个子任务,然后选择两者中处理得到的结果最快的那个任务为最终结果,这个时候为了避免文件都输入在同一个文件中,所以就把备胎任务输出的文件取作为

Side-Effect File

RecordWriter

这个是指输出

KEY-VALUE

对到文件中。

Mapper和Reducer

详解Mapper

InputFormat

为每一个

InputSplit

生成一个

map

任务,

mapper

的实现是通过

job

中的

setMapperClass(Class)

方法来配置写好的

map

类,如这样

//设置要执行的mapper类
job.setMapperClass(WordMapper.class);
           

其内部是调用了

map(WritableComparable, Writable, Context)

这个方法来为每一个键值对写入到

InputSplit

,程序会调用

cleanup(Context)

方法来执行清理任务,清理掉不需要使用到的中间值。

关于输入的键值对类型不需要和输出的键值对类型一样,而且输入的键值对可以映射到0个或者多个键值对。通过调用

context.write(WritableComparable, Writable)

来收集输出的键值对。程序使用

Counter

来统计键值对的数量,

Mapper

中的输出被排序后,就会被划分到每个

Reducer

中,分块的总数目和一个作业的

reduce

任务的数目是一样的。

需要多少个Mapper任务

关于一个机器节点适合多少个

map

任务,官方的文档的建议是,一个节点有

10

个到

100

个任务是最好的,如果是

cpu

低消耗的话,

300

个也是可以的,最合理的一个map任务是需要运行超过

1

分钟。

详解Reducer

Reducer

任务的话就是将

Mapper

中输出的结果进行统计合并后,输出到文件系统中。

用户可以自定义

Reducer

的数量,使用

Job.setNumReduceTasks(int)

这个方法。

在调用

Reducer

的话,使用的是

Job.setReducerClass(Class)

方法,内部调用的是

reduce(WritableComparable, Iterable<Writable>, Context)

这个方法,最后,程序会调用

cleanup(Context)

来进行清理工作。如这样:

//设置要执行的reduce类
job.setReducerClass(WordReduce.class);
           

Reducer

实际上是分三个阶段,分别是

Shuffle

Sort

Secondary Sort

shuffle

这个阶段是指

Reducer

的输入阶段,系统会为每一个

Reduce

任务去获取所有的分块,通过的是

HTTP

的方式

sort

这个阶段是指在输入

Reducer

阶段的值进行分组,

sort

shuffle

是同时进行的,可以这么理解,一边在输入的时候,同时在一边排序。

Secondary Sort

这个阶段不是必需的,只有在中间过程中对

key

的排序和在

reduce

的输入之前对

key

的排序规则不同的时候,才会启动这个过程,可以通过的是

Job.setSortComparatorClass(Class)

来指定一个

Comparator

进行排序,然后再结合

Job.setGroupingComparatorClass(Class)

来进行分组,最后可以实现二次排序。

在整个

reduce

中的输出是没有排序

需要多少个 Reducer 任务

建议是

0.95

或者是

1.75

*

mapred.tasktracker.reduce.tasks.maximum

。如果是

0.95

的话,那么就可以在

mapper

任务结束时,立马就可以启动

Reducer

任务。如果是

1.75

的话,那么运行的快的节点就可以在

map

任务完成的时候先计算一轮,然后等到其他的节点完成的时候就可以计算第二轮了。当然,

Reduce

任务的个数不是越多就越好的,个数多会增加系统的开销,但是可以在提升负载均衡,从而降低由于失败而带来的负面影响。

Partitioner

这个模块用来划分键值空间,控制的是

map

任务中的

key

值分割的分区,默认使用的算法是哈希函数,

HashPartitioner

是默认的

Partitioner

总结

这篇文章主要就是讲了

MapReduce

的框架模型,分别是分为用户程序层、工具层、编程接口层这三层,在编程接口层主要有五种编程模型,分别是

InputFomat

MapperReduce

Partitioner

OnputFomat

Reducer

。主要是偏理论,代码的参考例子可以参考官方的例子:WordCount_v2.0

这是

MapReduce

系列的第二篇,接下来的一篇会详细写关于

MapReduce

的作业配置和环境,结合一些面试题的汇总,所以接下来的这篇还是干货满满的,期待着就好了。

更多干货,欢迎关注我的公众号:spacedong

详解MapReduce中的五大编程模型

继续阅读