天天看点

oozie fork多mapreduce任务并行处理示例

<workflow-app name="test7" xmlns="uri:oozie:workflow:0.4">
    <start to="firstjob"/>
    <action name="firstjob">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
              <job-xml>/shareScripts/xxmapred-site.xml</job-xml>
            <configuration>
                <property>
                    <name>mapreduce.job.map.class</name>
                    <value>com.besttone.hbase.demo.Identity$IdentityMapper</value>
                </property>
                <property>
                    <name>mapreduce.job.reduce.class</name>
                    <value>com.besttone.hbase.demo.Identity$IdentityReducer</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>/user/${wf:user()}/${wf:id()}/temp1</value>
                </property>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.reducer.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapreduce.job.reduces</name>
                    <value>1</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="fork"/>
        <error to="kill"/>
    </action>
    <fork name='fork'>
        <path start='secondjob' />
        <path start='thirdjob' />
    </fork>
    <action name="secondjob">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
              <job-xml>/shareScripts/xxmapred-site.xml</job-xml>
            <configuration>
                <property>
                    <name>mapreduce.job.map.class</name>
                    <value>com.besttone.hbase.demo.Identity$IdentityMapper</value>
                </property>
                <property>
                    <name>mapreduce.job.reduce.class</name>
                    <value>com.besttone.hbase.demo.Identity$IdentityReducer</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>/user/${wf:user()}/${wf:id()}/temp1</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>/user/${wf:user()}/${wf:id()}/temp2</value>
                </property>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.reducer.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapreduce.job.reduces</name>
                    <value>1</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="join"/>
        <error to="kill"/>
    </action>
    <action name="thirdjob">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
              <job-xml>/shareScripts/xxmapred-site.xml</job-xml>
            <configuration>
                <property>
                    <name>mapreduce.job.map.class</name>
                    <value>com.besttone.hbase.demo.Identity$IdentityMapper</value>
                </property>
                <property>
                    <name>mapreduce.job.reduce.class</name>
                    <value>com.besttone.hbase.demo.Identity$IdentityReducer</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>/user/${wf:user()}/${wf:id()}/temp1</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>/user/${wf:user()}/${wf:id()}/temp3</value>
                </property>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.reducer.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapreduce.job.reduces</name>
                    <value>1</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="join"/>
        <error to="kill"/>
    </action>
    <join name='join' to='finalejob'/>
    <action name="finalejob">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                  <delete path="${nameNode}${outputDir}"/>
            </prepare>
              <job-xml>/shareScripts/xxmapred-site.xml</job-xml>
            <configuration>
                <property>
                    <name>mapreduce.job.map.class</name>
                    <value>com.besttone.hbase.demo.WordCount$TokenizerMapper</value>
                </property>
                <property>
                    <name>mapreduce.job.reduce.class</name>
                    <value>com.besttone.hbase.demo.WordCount$IntSumReducer</value>
                </property>
                <property>
                    <name>mapreduce.job.combine.class</name>
                    <value>com.besttone.hbase.demo.WordCount$IntSumReducer</value>
                </property>
                <property>
                    <name>mapreduce.job.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapreduce.job.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>/user/${wf:user()}/${wf:id()}/temp2,/user/${wf:user()}/${wf:id()}/temp3</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>${outputDir}</value>
                </property>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.reducer.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapreduce.job.reduces</name>
                    <value>1</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="kill"/>
    </action>
    <kill name="kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>
           

wordcount.jar 中包含有上面配置中用到的mapper和reducer类