本文主要介绍hadoop运行经典MapReduce(MapReduce1)任务过程涉及的实体及其工作原理。
涉及实体
- 客户端:提交MapReduce作业;
- jobtracker:它协调在集群上运行的所有作业(分配给tasktracker),是java应用程序,一个集群只有一个jobtracker。
- tasktracker:运行tasktracker分配的任务并定时向jobtracker汇报进度,是Java应用程序,一个集群有多个tasktracker。
- 分布式文件系统:用来在其他实体间共享作业文件,一般是hdfs。
作业运行流程:
hadoop运行mapReduce作业整体流程如下图所示。
1. 作业提交
job的submit()方法创建一个内部的JobSubmiter实例,并调用其submitJobInternal()方法。
job的submit()方法执行后在调用job.waitForCompletion(),每秒轮询作业进度,并打印至控制台。
JobSubmiter所实现作业提交过程如下:
① 向jobtracker请求一个新的作业id;
② 检查作业输出目录是否定义和是否已经存在,若存在则不提交作业,并抛异常;
③ 计算作业输入分片;
④ 将作业运行需要的资源复制到jobtracker集群中一个作业id命名的目录下;
⑤ 告知jobtracker作业准备就绪,jobtraker将该作业该放入一个内部作业队列中,并交由作业调度器(job scheduler)进行调度。
2. 作业初始化
① 作业被调度执行时对作业进行初始化,创建一个表示正在运行作业的对象,用于封装任务和记录信息,以便跟踪任务状态和进度。
② 创建任务列表,每个输入分片创建一个map任务。reduce任务由mapred.reduce.tasks属性决定。同时还会创建两个任务:作业创建和作业清理任务。两个任务均在taskTracker中执行。作业创建任务在map任务运行前执行,用来创建作业,创建输出目录和临时工作空间;作业清理任务在reduce任务完成之后执行的,用于清理临时工作空间。
3. 任务分配
tasktracker会定期向jobtracker发送心跳(heartbeat),表明tasktracker的存活状态,同时也是二者的信息通道。作为心跳的一部分,tasktracker会指明它是否已经准备好运行新的任务,如果是,jobtracker会为它分配一个任务(使用心跳的返回值与tasktracker进行通信)。
4. 任务执行
① 从hdfs文件系统中将作业JAR包复制到tasktracker本地,同时将程序要读取的文件资源复制到tasktracker本地;
② tasktracker为任务新建一个本地工作目录,并把jar包解压到该文件夹下;
③ tasktracker新建一个TaskRunner实例来运行该任务。
④ TaskRunner 启动一个新的JVM来运行该job的每个任务(可以避免因map或reduce任务内部异常导致tasktracker崩溃或挂起),该进程每隔几秒钟便告知父进程(tasktracker)进度,直到任务完成。
5. 任务进度和状态更新
TaskRunner启动的子进程定时向tasktracker汇报进度,tasktracker通过heartbeat将任务状态发送至jobtracker,jobtracker进行汇总形成job状态的全局视图,客户端通过job的getStatus方法可以查询当前状态全局视图。如下图所示。
6. 作业完成
jobtracker收到job最后一个任务完成的通知后,便把作业的状态设置为“成功”,waitForCompletion()方法返回。最后jobtracker和tasktracker清空工作状态和中间输出等。
局限性
大型的 Hadoop 集群显现出了由单个 JobTracker 导致的可伸缩性瓶颈。