我们通过下面这个天气数据处理的例子来说明hadoop的运行原理.
假设我们需要处理一批有关天气的数据,其格式如下:
按照ascii码存储,每行一条记录
每一行字符从0开始计数,第15个到第18个字符为年
第25个到第29个字符为温度,其中第25位是符号+/-
0067011990999991950051507+0000+
0043011990999991950051512+0022+
0043011990999991950051518-0011+
0043012650999991949032412+0111+
0043012650999991949032418+0078+
0067011990999991937051507+0001+
0043011990999991937051512-0002+
0043011990999991945051518+0001+
0043012650999991945032412+0002+
0043012650999991945032418+0078+
现在需要统计出每年的最高温度。
map-reduce主要包括两个步骤:map和reduce
每一步都有key-value对作为输入和输出:
map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的textinputformat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本
map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应
对于上面的例子,在map过程,输入的key-value对如下:
(0, 0067011990999991950051507+0000+)
(33, 0043011990999991950051512+0022+)
(66, 0043011990999991950051518-0011+)
(99, 0043012650999991949032412+0111+)
(132, 0043012650999991949032418+0078+)
(165, 0067011990999991937051507+0001+)
(198, 0043011990999991937051512-0002+)
(231, 0043011990999991945051518+0001+)
(264, 0043012650999991945032412+0002+)
(297, 0043012650999991945032418+0078+)
在map过程中,通过对每一行字符串的解析,得到年-温度的key-value对作为输出:
(1950, 0)
(1950, 22)
(1950, -11)
(1949, 111)
(1949, 78)
(1937, 1)
(1937, -2)
(1945, 1)
(1945, 2)
(1945, 78)
在reduce过程,将map过程中的输出,按照相同的key将value放到同一个列表中作为reduce的输入
(1950, [0, 22, –11])
(1949, [111, 78])
(1937, [1, -2])
(1945, [1, 2, 78])
在reduce过程中,在列表中选择出最大的温度,将年-最大温度的key-value作为输出:
其逻辑过程可用如下图表示:
下图大概描述了map-reduce的job运行的基本原理:
下面我们讨论jobconf,其有很多的项可以进行配置:
setinputformat:设置map的输入格式,默认为textinputformat,key为longwritable, value为text
setnummaptasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的个数
setmapperclass:设置mapper,默认为identitymapper
setmaprunnerclass:设置maprunner, map task是由maprunner运行的,默认为maprunnable,其功能为读取input split的一个个record,依次调用mapper的map函数
setmapoutputkeyclass和setmapoutputvalueclass:设置mapper的输出的key-value对的格式
setoutputkeyclass和setoutputvalueclass:设置reducer的输出的key-value对的格式
setpartitionerclass和setnumreducetasks:设置partitioner,默认为hashpartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数
setreducerclass:设置reducer,默认为identityreducer
setoutputformat:设置任务的输出格式,默认为textoutputformat
fileinputformat.addinputpath:设置输入文件的路径,可以使一个文件,一个路径,一个通配符。可以被调用多次添加多个路径
fileoutputformat.setoutputpath:设置输出文件的路径,在job运行前此路径不应该存在
当然不用所有的都设置,由上面的例子,可以编写map-reduce程序如下:
public class maxtemperature {
public static void main(string[] args) throws ioexception {
if (args.length != 2) {
system.err.println(“usage: maxtemperature <input path> <output path>”);
system.exit(-1);
}
jobconf conf = new jobconf(maxtemperature.class);
conf.setjobname(“max temperature”);
fileinputformat.addinputpath(conf, new path(args[0]));
fileoutputformat.setoutputpath(conf, new path(args[1]));
conf.setmapperclass(maxtemperaturemapper.class);
conf.setreducerclass(maxtemperaturereducer.class);
conf.setoutputkeyclass(text.class);
conf.setoutputvalueclass(intwritable.class);
jobclient.runjob(conf);
}
}
map-reduce的处理过程主要涉及以下四个部分:
客户端client:用于提交map-reduce任务job
jobtracker:协调整个job的运行,其为一个java进程,其main class为jobtracker
tasktracker:运行此job的task,处理input split,其为一个java进程,其main class为tasktracker
hdfs:hadoop分布式文件系统,用于在各个进程间共享job相关的文件
jobclient.runjob()创建一个新的jobclient实例,调用其submitjob()函数。
向jobtracker请求一个新的job id
检测此job的output配置
计算此job的input splits
将job运行所需的资源拷贝到jobtracker的文件系统中的文件夹中,包括job jar文件,job.xml配置文件,input splits
通知jobtracker此job已经可以运行了
提交任务后,runjob每隔一秒钟轮询一次job的进度,将进度返回到命令行,直到任务运行完毕。
当jobtracker收到submitjob调用的时候,将此任务放到一个队列中,job调度器将从队列中获取任务并初始化任务。
初始化首先创建一个对象来封装job运行的tasks, status以及progress。
在创建task之前,job调度器首先从共享文件系统中获得jobclient计算出的input splits。
其为每个input split创建一个map task。
每个task被分配一个id。
tasktracker周期性的向jobtracker发送heartbeat。
在heartbeat中,tasktracker告知jobtracker其已经准备运行一个新的task,jobtracker将分配给其一个task。
在jobtracker为tasktracker选择一个task之前,jobtracker必须首先按照优先级选择一个job,在最高优先级的job中选择一个task。
tasktracker有固定数量的位置来运行map task或者reduce task。
默认的调度器对待map task优先于reduce task
当选择reduce task的时候,jobtracker并不在多个task之间进行选择,而是直接取下一个,因为reduce task没有数据本地化的概念。
tasktracker被分配了一个task,下面便要运行此task。
首先,tasktracker将此job的jar从共享文件系统中拷贝到tasktracker的文件系统中。
tasktracker从distributed cache中将job运行所需要的文件拷贝到本地磁盘。
其次,其为每个task创建一个本地的工作目录,将jar解压缩到文件目录中。
其三,其创建一个taskrunner来运行task。
taskrunner创建一个新的jvm来运行task。
被创建的child jvm和tasktracker通信来报告运行进度。
maprunnable从input split中读取一个个的record,然后依次调用mapper的map函数,将结果输出。
map的输出并不是直接写入硬盘,而是将其写入缓存memory buffer。
当buffer中数据的到达一定的大小,一个背景线程将数据开始写入硬盘。
在写入硬盘之前,内存中的数据通过partitioner分成多个partition。
在同一个partition中,背景线程会将数据按照key在内存中排序。
每次从内存向硬盘flush数据,都生成一个新的spill文件。
当此task结束之前,所有的spill文件被合并为一个整的被partition的而且排好序的文件。
reducer可以通过http协议请求map的输出文件,tracker.http.threads可以设置http服务线程数。
当map task结束后,其通知tasktracker,tasktracker通知jobtracker。
对于一个job,jobtracker知道tasktracer和map输出的对应关系。
reducer中一个线程周期性的向jobtracker请求map输出的位置,直到其取得了所有的map输出。
reduce task需要其对应的partition的所有的map输出。
reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。
reduce task中有多个copy线程,可以并行拷贝map输出。
当很多map输出拷贝到reduce task后,一个背景线程将其合并为一个大的排好序的文件。
当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。
最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入hdfs。
当jobtracker获得最后一个task的运行成功的报告后,将job得状态改为成功。
当jobclient从jobtracker轮询的时候,发现此job已经成功结束,则向用户打印消息,从runjob函数中返回。