前言
Hadoop在2.6.0版本中引入了一個新特性異構存儲.異構存儲關鍵在于異構2個字.異構存儲可以根據各個存儲媒體讀寫特性的不同發揮各自的優勢.一個很适用的場景就是上篇文章提到的冷熱資料的存儲.針對冷資料,采用容量大的,讀寫性能不高的存儲媒體存儲,比如最普通的Disk磁盤.而對于熱資料而言,可以采用SSD的方式進行存儲,這樣就能保證高效的讀性能,在速率上甚至能做到十倍于或百倍于普通磁盤讀寫的速度.換句話說,HDFS的異構存儲特性的出現使得我們不需要搭建2套獨立的叢集來存放冷熱2類資料,在一套叢集内就能完成.是以這個功能特性還是有非常大的實用意義的.本文就帶大家了解HDFS的異構存儲分為哪幾種類型,存儲政策如何,HDFS如何做到智能化的異構存儲.
異構存儲類型
上文提到了多次的異構這個名詞,那麼到底異構存儲分為了種類型呢,這裡列舉一下HDFS中所聲明的Storage Type.
- RAM_DISK
- SSD
- DISK
- ARCHIVE
HDFS中是定義了這4種類型,SSD,DISK一看就知道是什麼意思,這裡看一下其餘的2個,RAM_DISK,其實就是Memory記憶體,而ARCHIVE并沒有特指哪種存儲媒體,主要的指的是高密度存儲資料的媒體來解決資料量的容量擴增的問題.這4類是被定義在了StorageType類中:
public enum StorageType {
// sorted by the speed of the storage types, from fast to slow
RAM_DISK(true),
SSD(false),
DISK(false),
ARCHIVE(false);
...
旁邊的true或者false代表的是此類存儲類型是否是transient特性的.transient的意思是指轉瞬即逝的,并非持久化的.在HDFS中,如果沒有主動聲明資料目錄存儲類型的,預設都是DISK.
Defines the types of supported storage media. The default storage
medium is assumed to be DISK.
這4類存儲媒體之間一個很多的性能差別就在于讀寫速度,從上到下依次減慢,是以從冷熱資料的處理來看,将資料存在記憶體中或是SSD中會是不錯的選擇,而冷資料則存放與DISK和ARCHIVE類型的媒體中會更好.是以HDFS中冷熱資料檔案目錄的StorageType的設定将會顯得非常的重要.那麼如何讓HDFS知道叢集中哪些資料存儲目錄是具體哪種類型的存儲媒體呢,這裡需要配置的主動聲明,HDFS可沒有做自動檢測識别的功能.在配置屬性dfs.datanode.data.dir中進行本地對應存儲目錄的設定,同時帶上一個存儲類型标簽,聲明此目錄用的是哪種類型的存儲媒體,例子如下:
[SSD]file:///grid/dn/ssd0
如果目錄前沒有帶上[SSD]/[DISK]/[ARCHIVE]/[RAM_DISK]這4種中的任何一種,則預設是DISK類型.下面是一張存儲媒體結構圖
異構存儲原理
了解完了異構存儲的多種存儲媒體之後,我們有必要了解一下HDFS的異構存儲的實作原理.在這裡會結合部分HDFS源碼進行闡述.概況性的總結為3小點:
- DataNode通過心跳彙報自身資料存儲目錄的StorageType給NameNode,
- 随後NameNode進行彙總并更新叢集内各個節點的存儲類型情況
- 待複制檔案根據自身設定的存儲政策資訊向NameNode請求擁有此類型存儲媒體的DataNode作為候選節點
從以上3點來看,本質原理并不複雜.下面結合部分源碼,來一步步追蹤内部的過程細節.
DataNode存儲目錄彙報
首先是資料存儲目錄的解析與心跳彙報過程.在FsDatasetImpl的構造函數中對dataDir進行了存儲目錄的解析,生成了StorageType的List清單.
/**
* An FSDataset has a directory where it loads its data files.
*/
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
) throws IOException {
...
String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos(
dataLocations, storage);
...
真正調用的是DataNode的getStorageLocations方法.
public static List<StorageLocation> getStorageLocations(Configuration conf) {
// 擷取dfs.datanode.data.dir配置中多個目錄位址字元串
Collection<String> rawLocations =
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
List<StorageLocation> locations =
new ArrayList<StorageLocation>(rawLocations.size());
for(String locationString : rawLocations) {
final StorageLocation location;
try {
// 解析為對應的StorageLocation
location = StorageLocation.parse(locationString);
} catch (IOException ioe) {
LOG.error("Failed to initialize storage directory " + locationString
+ ". Exception details: " + ioe);
// Ignore the exception.
continue;
} catch (SecurityException se) {
LOG.error("Failed to initialize storage directory " + locationString
+ ". Exception details: " + se);
// Ignore the exception.
continue;
}
// 将解析好的StorageLocation加入到清單中
locations.add(location);
}
return locations;
}
當然我們最關心的過程就是如何解析配置并最終得到對應存儲類型的過程,就是下面這行操作所執行的内容
location = StorageLocation.parse(locationString);
進入到StorageLocation方法,查閱解析方法
public static StorageLocation parse(String rawLocation)
throws IOException, SecurityException {
// 采用正則比對的方式的方式進行解析
Matcher matcher = regex.matcher(rawLocation);
StorageType storageType = StorageType.DEFAULT;
String location = rawLocation;
if (matcher.matches()) {
String classString = matcher.group(1);
location = matcher.group(2);
if (!classString.isEmpty()) {
storageType =
StorageType.valueOf(StringUtils.toUpperCase(classString));
}
}
return new StorageLocation(storageType, new Path(location).toUri());
}
這裡的StorageType.DEFAULT就是DISK,在StorageType中定義的
public static final StorageType DEFAULT = DISK;
後續這些解析好的存儲目錄以及對應的存儲媒體類型會被加入到storageMap中.
private void addVolume(Collection<StorageLocation> dataLocations,
Storage.StorageDirectory sd) throws IOException {
final File dir = sd.getCurrentDir();
final StorageType storageType =
getStorageTypeFromLocations(dataLocations, sd.getRoot());
...
synchronized (this) {
volumeMap.addAll(tempVolumeMap);
storageMap.put(sd.getStorageUuid(),
new DatanodeStorage(sd.getStorageUuid(),
DatanodeStorage.State.NORMAL,
storageType));
...
}
這個storageMap存儲了具體存儲目錄到具體存儲類型的映射關系,可以說是非常細粒度的.更重要的是,這個資訊會被DataNode組織成StorageReport通過心跳的形式上報給NameNode.于是就來到了第一階段的下半過程.
public StorageReport[] getStorageReports(String bpid)
throws IOException {
List<StorageReport> reports;
synchronized (statsLock) {
List<FsVolumeImpl> curVolumes = getVolumes();
reports = new ArrayList<>(curVolumes.size());
for (FsVolumeImpl volume : curVolumes) {
try (FsVolumeReference ref = volume.obtainReference()) {
StorageReport sr = new StorageReport(volume.toDatanodeStorage(),
false,
volume.getCapacity(),
volume.getDfsUsed(),
volume.getAvailable(),
volume.getBlockPoolUsed(bpid));
reports.add(sr);
} catch (ClosedChannelException e) {
continue;
}
}
}
return reports.toArray(new StorageReport[reports.size()]);
}
以上是StorageReport的組織過程.最終被BPServiceActor的sendHeartBeat調用,發送給了NameNode.
HeartbeatResponse sendHeartBeat() throws IOException {
// 擷取存儲類型情況報告資訊
StorageReport[] reports =
dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat with " + reports.length +
" storage reports from service actor: " + this);
}
// 擷取壞磁盤資料資訊
VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
// 還有DataNode自身的存儲容量資訊,最後發送給了NameNode
return bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary);
}
存儲心跳資訊的更新處理
在這裡來到了第二階段的心跳處理過程.在DatanodeManager的handleHeartbeat中進行了處理
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
...
heartbeatManager.updateHeartbeat(nodeinfo, reports,
cacheCapacity, cacheUsed,
xceiverCount, failedVolumes,
volumeFailureSummary);
...
最終在heartbeatManager中會更新到具體的DatanodeDescription的updateHeartbeatState方法,裡面就會更新Storage的資訊
/**
* process datanode heartbeat or stats initialization.
*/
public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
...
for (StorageReport report : reports) {
DatanodeStorageInfo storage = updateStorage(report.getStorage());
if (checkFailedStorages) {
failedStorageInfos.remove(storage);
}
storage.receivedHeartbeat(report);
totalCapacity += report.getCapacity();
totalRemaining += report.getRemaining();
totalBlockPoolUsed += report.getBlockPoolUsed();
totalDfsUsed += report.getDfsUsed();
}
rollBlocksScheduled(getLastUpdateMonotonic());
...
目标存儲媒體類型節點的請求
各個DataNode心跳資訊都更新完畢之後,就會有目标存儲媒體需求的待複制檔案塊向NameNode進行請求.比如在FSNamesystem的getAdditionDatanode中就有這裡的處理
/** @see ClientProtocol#getAdditionalDatanode */
LocatedBlock getAdditionalDatanode(String src, long fileId,
final ExtendedBlock blk, final DatanodeInfo[] existings,
final String[] storageIDs,
final Set<Node> excludes,
final int numAdditionalNodes, final String clientName
) throws IOException {
...
final INodeFile file = checkLease(src, clientName, inode, fileId);
clientMachine = file.getFileUnderConstructionFeature().getClientMachine();
clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
preferredblocksize = file.getPreferredBlockSize();
// 擷取待複制檔案的存儲政策Id,對應的就是存儲政策資訊類型
storagePolicyID = file.getStoragePolicyID();
//find datanode storages
final DatanodeManager dm = blockManager.getDatanodeManager();
// 擷取已存在的節點中存儲目錄清單資訊
chosen = Arrays.asList(dm.getDatanodeStorageInfos(existings, storageIDs));
} finally {
readUnlock();
}
...
// choose new datanodes.
// 然後進行滿足需求節點的選擇
final DatanodeStorageInfo[] targets = blockManager.chooseTarget4AdditionalDatanode(
src, numAdditionalNodes, clientnode, chosen,
excludes, preferredblocksize, storagePolicyID);
final LocatedBlock lb = new LocatedBlock(blk, targets);
blockManager.setBlockToken(lb, AccessMode.COPY);
return lb;
}
然後目标存儲節點資訊就被設到了具體Block塊的資訊中了.這裡的一個target也就是DatanodeStorageInfo,代表的就是DataNode中的一個dataDir存儲目錄.上述代碼中具體blockManager如何根據給定的候選DatanodeStorageInfo存儲目錄和存儲政策來選擇出目标節點,那就是下一節将要重點闡述的StoragePolicy存儲媒體選擇政策的内容了.本節最後給出HDFS的異構存儲過程調用的簡單的流程圖
BlockStoragePolicy存儲類型選擇政策
與Block放置政策類似,對于資料的媒體存儲同樣有對應若幹種的政策選擇.對于一個完整的存儲類型選擇政策,有如下的基本資訊定義:
/**
* A block storage policy describes how to select the storage types
* for the replicas of a block.
*/
@InterfaceAudience.Private
public class BlockStoragePolicy {
public static final Logger LOG = LoggerFactory.getLogger(BlockStoragePolicy
.class);
/** A 4-bit policy ID 政策唯一辨別Id*/
private final byte id;
/** Policy name 政策名稱 */
private final String name;
/** The storage types to store the replicas of a new block. */
/** 對于一個新的block的一系列的存儲副本塊的可選存儲類型資訊組 **/
private final StorageType[] storageTypes;
/** The fallback storage type for block creation. */
/** 對于第一個建立的block塊的fallback情況時的可選存儲類型 **/
private final StorageType[] creationFallbacks;
/** The fallback storage type for replication. */
/** 對于的block塊的其餘副本的fallback情況時的可選存儲類型 **/
private final StorageType[] replicationFallbacks;
/**
* Whether the policy is inherited during file creation.
* If set then the policy cannot be changed after file creation.
*/
// 是否繼承祖先目錄資訊的政策資訊當建立檔案的時候,用于主動設定 Policy的時候
private boolean copyOnCreateFile;
...
這裡出現了fallback 的情況,什麼叫做fallback的情況呢
目前存儲類型不可用的時候,退一級所選擇使用的存儲類型
相應的邏輯代碼
public List<StorageType> chooseStorageTypes(final short replication,
final Iterable<StorageType> chosen,
final EnumSet<StorageType> unavailables,
final boolean isNewBlock) {
...
for(int i = storageTypes.size() - 1; i >= 0; i--) {
// replace/remove unavailable storage types.
// 擷取目前徐亞的存儲類型
final StorageType t = storageTypes.get(i);
// 如果目前的存儲類型是在不可用的存儲類型清單中,選擇fallback的情況
if (unavailables.contains(t)) {
// 根據是否是新的block塊還是普通的replica選擇相應的fallBack的storage type
final StorageType fallback = isNewBlock?
getCreationFallback(unavailables)
: getReplicationFallback(unavailables);
if (fallback == null) {
removed.add(storageTypes.remove(i));
} else {
storageTypes.set(i, fallback);
}
}
}
...
在getFallback方法中會選取第一個滿足條件的fallback的storage type.
private static StorageType getFallback(EnumSet<StorageType> unavailables,
StorageType[] fallbacks) {
for(StorageType fb : fallbacks) {
// 如果找到滿足條件的storage type,立即傳回
if (!unavailables.contains(fb)) {
return fb;
}
}
return null;
}
當然這些都隻是單一的存儲類型選擇政策.HDFS在使用的時候也不是直接new一個StoragePolicy對象的方式直接調用,而是從BlockStoragePolicySuite政策集合中進行擷取的.
BlockStoragePolicySuite存儲類型政策集合
BlockStoragePolicySuite的官方定義就是
A collection of block storage policies.
在此類内部,定義了6種政策,不僅僅分為冷熱資料2種.
- Hot - for both storage and compute. The data that is popular and still being used for processing will stay in this policy. When a block is hot, all replicas are stored in DISK.
- Cold - only for storage with limited compute. The data that is no longer being used, or data that needs to be archived is moved from hot storage to cold storage. When a block is cold, all replicas are stored in ARCHIVE.
- Warm - partially hot and partially cold. When a block is warm, some of its replicas are stored in DISK and the remaining replicas are stored in ARCHIVE.
- All_SSD - for storing all replicas in SSD.
- One_SSD - for storing one of the replicas in SSD. The remaining replicas are stored in DISK.
- Lazy_Persist - for writing blocks with single replica in memory. The replica is first written in RAM_DISK and then it is lazily persisted in DISK.
在這6種政策中,前3類政策和後3種政策可以看作是2大類.前者從冷熱資料的角度劃分出了3小類的Policy.而後面3者則根據SSD盤的和記憶體存放作為差別特征政策被單獨劃分了出來.政策倒是劃分出來了,但是這些不同的政策之間的主要差別在于哪裡呢,答案就是候選存儲類型組.
在建立BlockStoragePolicySuite的時候,對這些政策都進行了構造
public static BlockStoragePolicySuite createDefaultSuite() {
final BlockStoragePolicy[] policies =
new BlockStoragePolicy[1 << ID_BIT_LENGTH];
final byte lazyPersistId = HdfsConstants.MEMORY_STORAGE_POLICY_ID;
policies[lazyPersistId] = new BlockStoragePolicy(lazyPersistId,
HdfsConstants.MEMORY_STORAGE_POLICY_NAME,
new StorageType[]{StorageType.RAM_DISK, StorageType.DISK},
new StorageType[]{StorageType.DISK},
new StorageType[]{StorageType.DISK},
true); // Cannot be changed on regular files, but inherited.
final byte allssdId = HdfsConstants.ALLSSD_STORAGE_POLICY_ID;
policies[allssdId] = new BlockStoragePolicy(allssdId,
HdfsConstants.ALLSSD_STORAGE_POLICY_NAME,
new StorageType[]{StorageType.SSD},
new StorageType[]{StorageType.DISK},
new StorageType[]{StorageType.DISK});
final byte onessdId = HdfsConstants.ONESSD_STORAGE_POLICY_ID;
policies[onessdId] = new BlockStoragePolicy(onessdId,
HdfsConstants.ONESSD_STORAGE_POLICY_NAME,
new StorageType[]{StorageType.SSD, StorageType.DISK},
new StorageType[]{StorageType.SSD, StorageType.DISK},
new StorageType[]{StorageType.SSD, StorageType.DISK});
final byte hotId = HdfsConstants.HOT_STORAGE_POLICY_ID;
policies[hotId] = new BlockStoragePolicy(hotId,
HdfsConstants.HOT_STORAGE_POLICY_NAME,
new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY,
new StorageType[]{StorageType.ARCHIVE});
final byte warmId = HdfsConstants.WARM_STORAGE_POLICY_ID;
policies[warmId] = new BlockStoragePolicy(warmId,
HdfsConstants.WARM_STORAGE_POLICY_NAME,
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE});
final byte coldId = HdfsConstants.COLD_STORAGE_POLICY_ID;
policies[coldId] = new BlockStoragePolicy(coldId,
HdfsConstants.COLD_STORAGE_POLICY_NAME,
new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY,
StorageType.EMPTY_ARRAY);
return new BlockStoragePolicySuite(hotId, policies);
}
在這些政策對象的參數中,第三個參數是最起決定性作用的,因為第三個參數會被用來傳回給副本block作為候選存儲類型.在storageTypes參數中,有時可能隻有1個參數,例如ALLSSD政策隻有
new StorageType[]{StorageType.SSD}
而ONESSD卻有2個
new StorageType[]{StorageType.SSD, StorageType.DISK}
這裡面其實是有一定原因的.因為block有多副本的機制,每個政策要為所有的副本都傳回相應的Storage Type,如果副本數超過候選Storage Type數組怎麼處理,答案在下面這個方法中
public List<StorageType> chooseStorageTypes(final short replication) {
final List<StorageType> types = new LinkedList<StorageType>();
int i = 0, j = 0;
// Do not return transient storage types. We will not have accurate
// usage information for transient types.
// 從前往後依次比對存儲類型到對應的副本下标中
for (;i < replication && j < storageTypes.length; ++j) {
if (!storageTypes[j].isTransient()) {
types.add(storageTypes[j]);
++i;
}
}
// 擷取最後一個存儲類型,統一作為多餘副本的存儲類型
final StorageType last = storageTypes[storageTypes.length - 1];
if (!last.isTransient()) {
for (; i < replication; i++) {
types.add(last);
}
}
return types;
}
這樣的話,ONESSD就必然隻有1個block副本是此類型的,而ALLSSD則将會全部是SSD的存儲.下面給出存儲政策集合的結構圖
上述政策中有一個政策比較有意思的是LAZY_PERSIST,先将資料寫到記憶體中,然後在持久化,不知道性能如何,大家可以試試此政策.
BlockStoragePolicy存儲政策的調用
分析完BlockStoragePolicy的種類之後,我們看看HDFS在哪些地方設定了這些政策.
首先,我們要知道HDFS的預設Policy是哪種
@VisibleForTesting
public static BlockStoragePolicySuite createDefaultSuite() {
...
return new BlockStoragePolicySuite(hotId, policies);
}
...
public BlockStoragePolicySuite(byte defaultPolicyID,
BlockStoragePolicy[] policies) {
this.defaultPolicyID = defaultPolicyID;
this.policies = policies;
}
如上所示,就是HOT的政策,把叢集中的資料都看成是經常通路的資料.然後進一步檢視getPolicy的方法調用,如下圖
我們以方法chooseTarget4NewBlock為例子,追蹤一下上遊的調用過程.
public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
final int numOfReplicas, final Node client,
final Set<Node> excludedNodes,
final long blocksize,
final List<String> favoredNodes,
final byte storagePolicyID) throws IOException {
List<DatanodeDescriptor> favoredDatanodeDescriptors =
getDatanodeDescriptors(favoredNodes);
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
...
在父方法中擷取了storagePolicyID政策ID,往上追蹤,來到了FSNamesystem的getNewBlockTargets方法
DatanodeStorageInfo[] getNewBlockTargets(String src, long fileId,
String clientName, ExtendedBlock previous, Set<Node> excludedNodes,
List<String> favoredNodes, LocatedBlock[] onRetryBlock) throws IOException {
...
replication = pendingFile.getFileReplication();
storagePolicyID = pendingFile.getStoragePolicyID();
} finally {
readUnlock();
}
if (clientNode == null) {
clientNode = getClientNode(clientMachine);
}
// choose targets for the new block to be allocated.
return getBlockManager().chooseTarget4NewBlock(
src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
storagePolicyID);
}
于是我們看到storagePolicyID是從INodeFile中擷取而來的.這與上文中目标節點請求的過程類似,都有從File中擷取政策Id的動作.那麼新的問題又來了,INodeFile中的StoragePolicyID從何而來呢,有一下2種途徑
- 通過RPC接口主動設定
- 沒有主動設定的ID會繼承父目錄的政策,如果父目錄還是沒有,則會設定ID_UNSPECIFIED,繼而會用DEFAULT Storage Policy進行替代,源碼如下:
總的過程調用圖如下public byte getStoragePolicyID() { byte id = getLocalStoragePolicyID(); if (id == ID_UNSPECIFIED) { return this.getParent() != null ? this.getParent().getStoragePolicyID() : id; } return id; }
HDFS Storagepolicies政策的使用
在文章的最後介紹幾個關于Storage Policy的幾個使用指令,幫助大家真正學會運用這個強大的特性.輸入hdfs storagepolicies -help,你會得到一下3大操作指令
$ hdfs storagepolicies -help
[-listPolicies]
List all the existing block storage policies.
[-setStoragePolicy -path <path> -policy <policy>]
Set the storage policy to a file/directory.
<path> The path of the file/directory to set storage policy
<policy> The name of the block storage policy
[-getStoragePolicy -path <path>]
Get the storage policy of a file/directory.
<path> The path of the file/directory for getting the storage policy
1個設定指令,2個擷取指令,最簡單的使用方法是事先劃分好冷熱資料存儲目錄,設定好對應的Storage Policy,然後後續相應的程式在對應分類目錄下寫資料,自動繼承父目錄的存儲政策.在較新版的Hadoop釋出版本中增加了資料遷移工具.此工具的重要用途在于他會掃描HDFS上的檔案,判斷檔案是否滿足其内部設定的存儲政策,如果不滿足,就會重新遷移資料到目标存儲類型節點上.使用方式如下
$ hdfs mover -help
Usage: hdfs mover [-p <files/dirs> | -f <local file>]
-p <files/dirs> a space separated list of HDFS files/dirs to migrate.
-f <local file> a local file containing a list of HDFS files/dirs to migrate.
總結
參考連結