天天看点

Hadoop OutputFormat浅析

在hadoop中,outputformat和inputformat是相对应的两个东西。相比于inputformat,outputformat似乎没有那么多细节。inputformat涉及到对输入数据的解析和划分,继而影响到map任务的数目,以及map任务的调度(见《hadoop inputformat浅析》)。而outputformat似乎像其字面意思那样,仅仅是完成对输出数据的格式化。

对于输出数据的格式化,这个应该没什么值得多说的。根据需要,outputformat爱把输出写成什么格式就写成什么格式、爱把输出写到数据库就写到数据库、爱把输出通过网络发给其他服务就发给其他服务...

不过,outputformat所做的事情其实并不限于此。outputformat类包含如下三个方法:recordwriter getrecordwriter(taskattemptcontext context);void checkoutputspecs(jobcontext context);outputcommitter getoutputcommitter(taskattemptcontext context);

其中:checkoutputspecs是在jobclient提交job之前被调用的(在使用inputfomat进行输入数据划分之前),用于检测job的输出路径。比如,fileoutputformat通过这个方法来确认在job开始之前,job的output路径并不存在,然后该方法又会重新创建这个output路径。这样一来,就能确保job结束后,output路径下的东西就是且仅是该job输出的。

getrecordwriter用于返回一个recordwriter的实例,reduce任务在执行的时候就是利用这个实例来输出key/value的。(如果job不需要reduce,那么map任务会直接使用这个实例来进行输出。)

recordwriter有如下两个方法:

void write(k key, v value);void close(taskattemptcontext context);前者负责将reduce输出的key/value写成特定的格式,后者负责对输出做最后的确认并关闭输出。前面提到的outputformat的字面含义,其实就是由这个recordwriter来实现的。

而第三个方法,getoutputcommitter则用于返回一个outputcommitter的实例。(在hadoop-0.20中,mapreduce有两套api。getoutputcommitter是在newapi中才提供的,oldapi里面并没有。不过oldapi同样有outputcommtter这个东西,只是不能通过outputformat来定制而已。)

outputcommitter用于控制job的输出环境,它有下面几个方法:void setupjob(jobcontext jobcontext);void commitjob(jobcontext jobcontext);void abortjob(jobcontext jobcontext, jobstatus.state state);void setuptask(taskattemptcontext taskcontext);boolean needstaskcommit(taskattemptcontext taskcontext);void committask(taskattemptcontext taskcontext);void aborttask(taskattemptcontext taskcontext);

job开始被执行之前,框架会调用outputcommitter.setupjob()为job创建一个输出路径;

如果job成功完成,框架会调用outputcommitter.commitjob()提交job的输出;

如果job失败,框架会调用outputcommitter.abortjob()撤销job的输出;

对应于job下的每一个task,同样牵涉创建、提交和撤销三个动作,分别由outputcommitter.setuptask()、outputcommitter.committask()、outputcommitter.aborttask()来完成。而一个task可能没有输出,从而也就不需要提交,这个可以通过outputcommitter.needstaskcommit()来判断;

具体outputcommitter的这些方法里面完成了什么样的操作,这是由具体的outputcommitter来定制的,可以任意去实现。比如,fileoutputcommitter完成了如下操作:

setupjob - mkdir ${mapred.output.dir}/_temporarycommitjob - touch ${mapred.output.dir}/_success && rm -r ${mapred.output.dir}/_temporaryabortjob - rm -r ${mapred.output.dir}/_temporarysetuptask - <nothing>needstaskcommit - test -d ${mapred.output.dir}/_temporary/_${taskattemptid}committask - mv ${mapred.output.dir}/_temporary/_${taskattemptid}/* ${mapred.output.dir}/aborttask - rm -r ${mapred.output.dir}/_temporary/_${taskattemptid}

(注意,上面这些路径都是hdfs上的,不是某个tasktracker本地机器上的。)

其中的逻辑是:job执行的时候,task的输出放到output路径下的_temporary目录的以taskattemptid命名的子目录中。只有当task成功了,相应的输出才会被提交到output路径下。而只有当整个job都成功了,才会在output路径下放置_success文件。_success文件的存在表明了output路径下的输出信息是正确且完整的;而如果_success文件不存在,output下的信息也依然是正确的(这已经由committask保证了),但是不一定是完整的(可能只包含部分reduce的输出)。

与之对应,fileoutputformat会让它所创建的recordwriter将输出写到${mapred.output.dir}/_temporary/_${taskattemptid}/下。当然,map和reduce任务也可以自己向这个路径put数据。

接下来就是到在哪里去执行这些方法的问题了。

一个job被提交到jobtracker后会生成若干的map和reduce任务,这些任务会被分派到tasktracker上。对于每一个task,tasktracker会使用一个子jvm来执行它们。那么对于task的setup/commit/abort这些操作,自然应该在执行task的子jvm里面去完成:

当一个task被关联到一个子jvm后,在任务初始化阶段,outputcommitter.setuptask()会被调用;

当一个任务执行成功完成了之后,脱离子jvm之前,outputcommitter.committask()会被调用。不过这里还有两个细节:1、需要先调用outputcommitter.needstaskcommit()来确定是否有输出需要提交;2、提交之前还有一个同步逻辑,需要由jobtracker同意提交后才能提交。因为hadoop有推测执行的逻辑,一个task可能在多个tasktracker上同时执行,但是它们之中最多只有一个能得到提交,否则可能导致结果的错乱;

当一个任务执行失败时,outputcommitter.aborttask()会被调用。这个调用很特殊,它不大可能在执行任务的子jvm里面完成。因为执行任务的子jvm里面跑的是用户提供的map/reduce代码,hadoop框架是无法保证这些代码的稳定性的,所以任务的失败往往伴随着子jvm的异常退出(这也就是为什么要用子jvm来执行map和reduce任务的原因,否则异常退出的可能就是整个框架了)。于是,对于失败的任务,jobtracker除了要考虑它的重试之外,还要为其生成一个cleanup任务。这个cleanup任务像普通的map和reduce任务一样,会被分派到tasktracker上去执行(不一定分派到之前执行该任务失败的那个tasktracker上,因为输出是在hdfs上,是全局的)。而它的执行逻辑主要就是调用outputcommitter.aborttask();

而对于job的setup/commit/abort,则显然不能使用上面的逻辑。

从时间上说,outputcommitter.setupjob()应该在所有map和reduce任务执行之前被调用、outputcommitter.commitjob()应该在所有map和reduce任务执行之后被调用、而outputcommitter.abortjob()应该在job确认失败之后被调用;

从地点上说,可能调用这些方法的地方无外乎jobclient、jobtracker、或tasktracker;

jobclient应该第一个被排除,因为job的执行并不依赖于jobclient。jobclient在提交完job之后就可以退出了,它的退出并不会影响job的继续执行(如果不退出则可以接收jobtracker的进度反馈)。所以,不可能依靠jobclient在job成功以后来调用outputcommitter.commitjob();

jobtracker呢?貌似是个合适的地方,因为jobtracker明确知道job的开始与结束、成功与失败。但是实际上还是不能由jobtracker来调用这些方法。就像前面说到的outputcommitter.aborttask()一样,既然jobtracker知道了task的失败,却不直接为它清理输出,而是通过生成一个对应的cleanup任务来完成清理工作。为什么要这样做呢?其实原因很简单,因为outputcommitter是独立于hadoop框架,可以由用户自己定制的。hadoop框架不能保证用户定制代码的稳定性,当然不能让它直接在jobtracker上执行。必须启动一个新的jvm来执行这些方法,那么正好tasktracker上已经有这样的逻辑了。

所以,对于job的setup/commit/abort,跟outputcommitter.aborttask()类似,jobtracker会生成对应的setup任务和cleanup任务。在初始化job的时期将job的setup任务分派给tasktracker,tasktracker执行这个setup任务所要做的事情就是调用outputcommitter.setupjob();在job结束时,job的cleanup任务将分派给tasktracker,tasktracker执行这个cleanup任务所要做的事情就是根据job的执行结果是成功或是失败,来调用outputcommitter.commitjob()或outputcommitter.abortjob()。

为了保证outputcommitter.setupjob()在所有map和reduce任务执行之前被调用,在jobtracker上,job的初始化被分成了两个步骤:一是为job生成一堆任务,二是将setup任务分派给tasktracker去执行,并等待它执行完成。在这之后,初始化才算完成,map和reduce任务才能得到分派。

可见,在job执行的过程中,除了我们关注的map和reduce任务之外,还会有一些隐藏的setup和cleanup任务。不过这些任务都有一个共同点,它们都可以是用户定制的。