從源碼層面,總結下Hadoop用戶端送出作業的流程:
1. 選擇使用分布式環境通信協定,還是本地調試通信協定
org.apache.hadoop.mapreduce.Job#connect
2. 上傳作業代碼jobjar, libjar等,從本地檔案系統到HDFS中去。
copyAndConfigureFiles(job, submitJobDir);
3. 拆分輸入檔案,生成splits
org.apache.hadoop.mapreduce.JobSubmitter#writeSplits()
a. 調用job.getInputFormat().getSplits()
b. 根據split size (通常就是HDFS block size), 将檔案拆分成多個邏輯上的Split。
每個Split要記錄它在邏輯檔案中的位元組起始位置, 和這個Split所在的HDFS chunk存儲在了哪些HDFS datanode上:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIn5GcuUjY4YTY0YjZkdTNiFmMwUTNyQzYiVTYwQDNmBjY3QDMvwVbvNmLj5Wat4Wd5lGbh5iY1BXLn1WauU3bop3ZuFGat42YucWbp1iMhRXYvw1LcpDc0RHaiojIsJye.png)
c. 将所有Split寫入HDFS上的同一個全局檔案(<jobSubmitDir>/job.split)中。
這個全局檔案的頭部是:org.apache.hadoop.mapreduce.split.JobSplitWriter#SPLIT_FILE_HEADER ("SPL") + split version (1)
在這個全局檔案内部,為每個Split, 依次寫入 split serializer class name + 序列化了的業務資料,
org.apache.hadoop.mapreduce.split.JobSplitWriter#writeNewSplits
(
jobSubmitDir:
local: /tmp/hadoop/mapred/staging/<job id>
hdfs: /tmp/hadoop-yarn/staging/<user>/.staging/<job id>
)
d. 将所有Split的描述資訊寫入<jobSubmitDir>/job.splitmetainfo中。包括每個Split在job.split全局檔案中的偏移量、長度,存儲這個Split的datanode的位址
org.apache.hadoop.mapreduce.split.JobSplitWriter#writeJobSplitMetaInfo
4. 将用戶端記憶體中初始化了的作業配置job.configuration寫入HDFS <submitJobDir>/job.xml
5. 将作業包裝成org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext,調用org.apache.hadoop.yarn.client.api.impl.YarnClientImpl#submitApplication()送出作業給Yarn (走Hadoop RPC)。