天天看點

讀Hadoop3.2源碼,深入了解java調用HDFS的常用操作和HDFS原理目錄一、java調用HDFS的常見操作二、深入了解HDFS寫檔案的流程和HDFS原理

本文将通過一個示範工程來快速上手java調用HDFS的常見操作。接下來以建立檔案為例,通過閱讀HDFS的源碼,一步步展開HDFS相關原理、理論知識的說明。

說明:本文檔基于最新版本Hadoop3.2.1

目錄

一、java調用HDFS的常見操作

1.1、示範環境搭建

1.2、操作HDFS

1.3、java檔案操作常用方法

二、深入了解HDFS寫檔案的流程和HDFS原理

2.1、Hadoop3.2.1 源碼下載下傳及介紹

2.2、檔案系統:FileSystem

2.3、HDFS體系結構:namenode、datanode、資料塊

2.4、如何通路阿裡雲OSS等檔案系統

2.5、檔案租約機制

2.6、RPC機制

2.7、HDFS用戶端寫流程總結

2.8、Hadoop3.x新特性:糾删碼

2.9 檔案透明加密處理和目錄樹

2.10、HDFS用戶端寫流程總結

首先我們搭建一個簡單的示範工程(示範工程使用的gradle,Maven項目也同樣添加以下依賴),本次使用的是Hadoop最新的3.2.1。

新增一個普通的java工程即可,過程略,添加hdfs相關依賴jar包

implementation ('org.apache.hadoop:hadoop-common:3.2.1')
implementation ('org.apache.hadoop:hadoop-hdfs:3.2.1')
implementation ('org.apache.hadoop:hadoop-mapreduce-client-core:3.2.1')
implementation ('org.apache.hadoop:hadoop-client:3.2.1')           

在實際運作過程中,可能會發現日志Jar包沖突問題,排除掉即可

exclude group:'org.slf4j',module: 'slf4j-log4j12'           

以建立檔案為例,代碼如下。可以看到java操作hdfs就是這麼簡單、絲滑,so easy!

public static void main(String[] args) throws IOException {
        // 配置對象
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://172.22.28.202:9000");
        // HDFS檔案系統的操作對象
        FileSystem fileSystem = FileSystem.get(configuration);
        // 建立檔案。
        FSDataOutputStream outputStream =
            fileSystem.create(new Path("/hdfs/madashu/test"));
        // 寫入檔案内容
        outputStream.write("你好Hadoop,我是碼大叔".getBytes());
        outputStream.flush();
        IOUtils.closeStream(outputStream);
    }           

參照第2步檔案建立的操作,我們可以預定義好Configuration和FileSystem,然後提取出HDFSUtil的工具類出來。涉及到檔案方面的操作基本隻需要hadoop-common包下的

FileSystem

就足夠了,一些常用方法的說明:

//檔案是否存在
fileSystem.exists(new Path(fileName));
//建立目錄
fileSystem.mkdirs(new Path(directorName));
//删除目錄或檔案,第二個參數表示是否要遞歸删除
fileSystem.delete(new Path(name), true);
//擷取目前登入使用者在HDFS檔案系統中的Home目錄
fileSystem.getHomeDirectory();
//檔案重命名
fileSystem.rename(new Path(oldName), new Path(newName));
//讀取檔案,傳回的是FSDataInputStream
fileSystem.open(new Path(fileName));
//建立檔案,第二個參數表示檔案存在時是否覆寫
fileSystem.create(new Path(fileName), false);
//從本地目錄上傳檔案到HDFS
fileSystem.copyFromLocalFile(localPath, hdfsPath);
//擷取目錄下的檔案資訊,包含path,length,group,blocksize,permission等等
fileSystem.listStatus(new Path(directorName));
//釋放資源
fileSystem.close();
//設定HDFS資源權限,其中FsPermission可以設定user、group等
fileSystem.setPermission(new Path(resourceName), fsPermission);
//設定HDFS資源的Owner和group
fileSystem.setOwner(new Path(resourceName), ownerName, groupName);
//設定檔案的副本
fileSystem.setReplication(new Path(resourceName), count);           

檔案操作的方法比較多,本期我們以create方法為例,來通過閱讀源碼深入了解下hdfs寫檔案的流程和原理,代碼參見1.2 。

hadoop源碼位址:

https://github.com/apache/hadoop

,。

正常途徑下通路比較慢的同學(每次寫到這句話,都滿臉的憂傷和xx)也可以通過國内的清華大學開源軟體鏡像站來下載下傳,位址是

https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/hadoop-3.2.1/hadoop-3.2.1-src.tar.gz

下載下傳後我們可以看到這是一個maven工程,導入到idea等我們熟悉開發工具中即可。如果是使用VS需要編譯的小夥伴注意下,

讀Hadoop3.2源碼,深入了解java調用HDFS的常用操作和HDFS原理目錄一、java調用HDFS的常見操作二、深入了解HDFS寫檔案的流程和HDFS原理

目錄下有一個BUILDINDG.txt檔案,針對比較關鍵的幾個modules做了說明。

讀Hadoop3.2源碼,深入了解java調用HDFS的常用操作和HDFS原理目錄一、java調用HDFS的常見操作二、深入了解HDFS寫檔案的流程和HDFS原理

這裡面很多工程都是和打包相關的,有一個沒提到的“hadoop-cloud-storage-project”是和雲存儲相關的,比如我們熟悉的阿裡雲,AWS等。這次我們需要關注的是hadoop-hdfs-project,hadoop-hdfs-common-project。

代碼參見1.2,我們看到在操作hdfs之前首先需要根據配置檔案擷取檔案系統。

問題:

1、為什麼傳入的位址是“hdfs:”開頭的

2、為什麼要擷取檔案作業系統

我們直接進入get方法

public static FileSystem get(URI uri, Configuration conf) throws IOException {
    //擷取檔案的字首,即我們傳入的 hdfs:
    String scheme = uri.getScheme();
    // 為了便于閱讀,删除掉很多代碼
    // 從緩存中擷取
    return CACHE.get(uri, conf);
  }           

那麼緩存中存放了什麼呢?一層層深入代碼,首先會檢查檔案系統是否存在,不存在則建立檔案系統,最終将檔案系統存放在map中。

private static final Map<String, Class<? extends FileSystem>>
      SERVICE_FILE_SYSTEMS = new HashMap<>();           
public final class HdfsConstants {
  /**
   * URI Scheme for hdfs://namenode/ URIs.
   */
  public static final String HDFS_URI_SCHEME = "hdfs";           

我們再回過頭來打開

FileSystem

public abstract class FileSystem extends Configured implements Closeable, DelegationTokenIssuer            

可以看到FileSystem是一個抽象類,它有很多的子類即實作,比如DistributedFileSystem。是以這一步的操作實際是根據你輸入的字首,通過Java中SPI機制從Serviceloder中擷取所需的檔案作業系統。這裡我們還很驚喜地看到AliyunOSSFileSystem。Hadoop3.x中預設支援阿裡雲OSS對象存儲系統作為Hadoop相容的檔案系統。阿裡雲OSS是中國雲計算廠商第一個也是目前唯一一個被Hadoop官方版本支援的雲存儲系統。這是繼Docker支援阿裡雲存儲以後又一個更重大的裡程碑,這也表明主流開源社群對中國技術生态的認可。假如我們要使用阿裡雲的檔案系統,字首是什麼呢?翻看

AliyunOSSFileSystem

代碼

public String getScheme() {
    return "oss";
  }           

比如 oss://madashu/test。同樣如果需要使用亞馬遜的檔案系統,則字首是“abfs://”

根據1.2執行個體代碼,擷取到檔案作業系統後,就是建立檔案了,最終我們跟蹤到如下的方法

public abstract FSDataOutputStream create(Path f,
      FsPermission permission,
      boolean overwrite,
      int bufferSize,
      short replication,
      long blockSize,
      Progressable progress) throws IOException;           

參數說明:

  • Path:存放路徑
  • FsPermission:檔案權限
  • overwrite:當檔案存在時是否覆寫
  • bufferSize:用戶端的buffer大小
  • replication:檔案副本數
  • blockSize:塊大小
  • Progressable:檔案寫入的進度

這裡有2個參數:replication和blockSize,在解釋之前得先了解一下HDFS的體系結構

讀Hadoop3.2源碼,深入了解java調用HDFS的常用操作和HDFS原理目錄一、java調用HDFS的常見操作二、深入了解HDFS寫檔案的流程和HDFS原理

HDFS是一個主/從(Master/Slave)體系結構的分布式系統,将一個大檔案分成若幹塊儲存在不同伺服器的多個節點中,通過聯網讓使用者感覺像是在本地一樣檢視檔案。HDFS叢集擁有1個Namenode和n個Datanode,使用者可以通過HDFS用戶端和Namenode、Datanodes互動以通路檔案系統。

Namenode是HDFS的master節點,負責管理檔案系統的命名空間,即namespace,他維護這檔案系統樹及整棵樹内所有的檔案和目錄。這些資訊以命名空間鏡像檔案和編輯日志檔案兩個檔案持久化儲存在檔案磁盤上。namenode也留着每個檔案中各個塊所在的資料節點資訊,但是并不永久儲存塊的位置資訊,這些塊的位置資訊會在系統啟動時根據資料資訊節點建立。

Datanode是檔案系統的工作節點,它根據用戶端或namenode需要存儲并檢索資料塊,并且定期向nomenode發送所存儲的塊的清單。

Block是HDFS的最小存儲單元。預設大小:128M(HDFS 1.x中,預設64M),若檔案大小不足128M,則會單獨成為一個block。實質上就是Linux相應目錄下的普通檔案,名稱格式:blk_xxxxxxx。

HDFS塊為什麼這麼大呢?HDFS的塊比磁盤的塊大,主要是為了最小化尋址的開銷。如果塊足夠大,從磁盤傳輸資料的時間會明顯大于定位這個塊開始位置所需的時間。因而,傳輸一個由多個塊組成的大檔案的時間取決于磁盤傳輸速率。如果一個1MB的檔案存儲在一個128M的塊中時,檔案實際隻是用了1M的磁盤空間,而不是128M。

為了降低檔案丢失造成的錯誤,它會為每個小檔案複制多個副本(預設為三個),以此來實作多機器上的多使用者分享檔案和存儲。

第一個複本會随機選擇,但是不會選擇存儲過滿的節點。

第二個複本放在和第一個複本不同且随機選擇的機架上。

第三個和第二個放在同一個機架上的不同節點上。

剩餘的副本就完全随機節點了。

補充1:create方法還有最後一個參數:Progressable,主要是為了便于我們知悉檔案的寫入進度,使用方法如下:

FSDataOutputStream outputStream = fileSystem.create(new Path(targetDirector +   File.separator + fileName), new Progressable() {
        long fileCount = 0;
        @Override
        public void progress() {
        fileCount++;
        System.out.println("總進度:" + fileCount + "|" + fileSize + "|" + (fileCount / fileSize) * 100 + "%");
        }
    });           

補充2:在Hadoop3.2中namenode的預設端口配置發生變化:從50070改為9870

我們繼續往下扒代碼

@Override
  public FSDataOutputStream create(final Path f, final FsPermission permission,
      final EnumSet<CreateFlag> cflags, final int bufferSize,
      final short replication, final long blockSize,
      final Progressable progress, final ChecksumOpt checksumOpt)
      throws IOException {
    // 檔案操作統計,比如建立、删除、拷貝等等,以及操作次數
    statistics.incrementWriteOps(1);
    storageStatistics.incrementOpCounter(OpType.CREATE);
    // 建立檔案輸出流,采用了責任鍊的設計模式
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataOutputStream>() {
      @Override
      public FSDataOutputStream doCall(final Path p) throws IOException {
        final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
            cflags, replication, blockSize, progress, bufferSize,
            checksumOpt);
        return dfs.createWrappedOutputStream(dfsos, statistics);
      }
      @Override
      public FSDataOutputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.create(p, permission, cflags, bufferSize,
            replication, blockSize, progress, checksumOpt);
      }
    }.resolve(this, absF);
  }           

接下來再進入

FileSystemLinkResolver

類:

1、調用doCall 内部類 DFSClient的create方法,然後将DFSOutputStream包裝FSDataOutputStream

2、如果是符号連結檔案,則一層一層找到最底層的檔案。甚至能連接配接到其他的檔案系統的檔案,比如從HDFS檔案系統連接配接到阿裡雲OSS檔案系統、亞馬遜檔案系統等。

繼續跟蹤代碼,進入

DFSClient

public DFSOutputStream create(String src, FsPermission permission,
      EnumSet<CreateFlag> flag, boolean createParent, short replication,
      long blockSize, Progressable progress, int buffersize,
      ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
      String ecPolicyName) throws IOException {
    //檢查用戶端是否已經在運作了
    checkOpen();
    final FsPermission masked = applyUMask(permission);
    LOG.debug("{}: masked={}", src, masked);
    //建立檔案輸出流,和Namenode進行互動
    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
        src, masked, flag, createParent, replication, blockSize, progress,
        dfsClientConf.createChecksum(checksumOpt),
        getFavoredNodesStr(favoredNodes), ecPolicyName);
    //更新檔案租約:也可以了解為token,保證不會發生寫檔案沖突。
    beginFileLease(result.getFileId(), result);
    return result;
  }           

我們看到最後一個beginFileLease操作,也就是擷取檔案租約。我們暫時先忽略檔案建立的過程,繼續往下翻和FileLease有關的代碼:

//如果是第一次,還是設定檔案租約。
  stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
            clientMachine, flag, createParent, replication, blockSize, feInfo,
            toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);           
//設定檔案租約的方法見FSDirWriteFileOp
 fsn.leaseManager.addLease(
        newNode.getFileUnderConstructionFeature().getClientName(),
        newNode.getId());           

FileLease:檔案租約,HDFS給用戶端發放一個寫檔案操作的臨時許可證,隻有持有該證件者才允許操作此檔案,進而保證保證資料的一緻。

  • 每個用戶端使用者持有一個檔案租約。
  • 每個檔案租約内部包含有一個租約持有者資訊,還有租約對應的檔案Id清單,即目前租約持有者正在寫這些檔案Id對應的檔案。
  • 每個檔案租約内包含有一個最新近更新時間,最近更新時間将會決定此租約是否已過期。過期的租約會導緻租約持有者無法繼續執行寫資料到檔案中,除非進行租約的更新。

既然每個用戶端都有一個檔案租約,那麼HDFS如如何管理的呢?比如有些用戶端使用者寫某檔案後未及時關閉此檔案。這樣會導緻租約未釋放,進而造成其他使用者無法對此檔案進行寫操作。答案就是LeaseManager,運作在Active NameNode的服務中。它主要做2件事:

1、維護HDFS内部目前所有的租約,

2、定期釋放過期的租約對象。

補充:HDFS 隻允許對一個已打開的檔案順序寫入,或者在現有檔案的末尾追加資料。

接下來我們的代碼将進入

DFSOutputStream.newStreamForCreate()

方法

//調用namenode的檔案建立方法
 stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
              new EnumSetWritable<>(flag), createParent, replication,
              blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName)           

我們再次暫停一下,點選“這裡的namenode實際是

ClientProtocol

ClientProtocol is used by user code via the DistributedFileSystem class to communicate with the NameNode. User code can manipulate the directory namespace, as well as open/close file streams, etc.

ClientProtocol用來通過DistributedFileSystem類與NameNode通信。可以操作目錄名稱空間,以及打開/關閉檔案流等。

ClientProtocol

是一個接口,它的實作類有:

讀Hadoop3.2源碼,深入了解java調用HDFS的常用操作和HDFS原理目錄一、java調用HDFS的常見操作二、深入了解HDFS寫檔案的流程和HDFS原理

我們進入

NameNodeRpcServer.create()

@Override 
  public HdfsFileStatus create(String src, FsPermission masked,
      String clientName, EnumSetWritable<CreateFlag> flag,
      boolean createParent, short replication, long blockSize,
      CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
      throws IOException {
    //确認namenode已啟動
    checkNNStartup();
    // 擷取服務端ip
    String clientMachine = getClientMachine();
    if (stateChangeLog.isDebugEnabled()) {
      stateChangeLog.debug("*DIR* NameNode.create: file "
          +src+" for "+clientName+" at "+clientMachine);
    }
    //檢查是否可以寫入。在生成上namenode正常也會進行HA,保證高可用。隻有主的才可以寫入,
    if (!checkPathLength(src)) {
      throw new IOException("create: Pathname too long.  Limit "
          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return (HdfsFileStatus) cacheEntry.getPayload();
    }           

作為分布式檔案系統,少不了各個節點之間的通信和互動,比如client和namenode,namenode和datanode,是以需要這樣一套RPC(Remote Procedure CallProtocol,遠端過程調用協定)架構,允許程式像調用本地方法一樣調用遠端機器上應用程式提供的服務。Hadoop RPC并沒有采用JDK自帶的RMI,據說基于Google Protocol Buffer(簡稱Protobuf)來實作的。Hadoop的RPC和通用的RPC一樣,包含通信子產品、用戶端Stub程式、服務端Stub程式、請求程式、服務程式等。

讀Hadoop3.2源碼,深入了解java調用HDFS的常用操作和HDFS原理目錄一、java調用HDFS的常見操作二、深入了解HDFS寫檔案的流程和HDFS原理

Hadoop RCP 主要提供兩個接口

//構造一個用戶端代理對象,用于向伺服器發送RPC請求
public static <T>ProtocolProxy <T> getProxy/waitForProxy() 
// 為某個協定執行個體構造一個伺服器對象,用于處理用戶端發送的請求
public static Server RPC.Builder (Configuration).build()            

2.7、HAState:active、standby

HdfsFileStatus status = null;
    try {
      PermissionStatus perm = new PermissionStatus(getRemoteUser()
          .getShortUserName(), null, masked);
          // 開始建立檔案
      status = namesystem.startFile(src, perm, clientName, clientMachine,
          flag.get(), createParent, replication, blockSize, supportedVersions,
          ecPolicyName, cacheEntry != null);
    } finally {
      RetryCache.setState(cacheEntry, status != null, status);
    }

    metrics.incrFilesCreated();
    metrics.incrCreateFileOps();
    return status;
  }           
@Override
  // 報錯 
  public void checkOperation(final OperationCategory op)
      throws StandbyException {
    state.checkOperation(haContext, op);
  }           

在這個代碼裡有一個HA狀态的檢查,standby 隻能read,不能write。

public static final HAState ACTIVE_STATE = new ActiveState();
  public static final HAState STANDBY_STATE = new StandbyState();
  public static final HAState OBSERVER_STATE = new StandbyState(true);           

從Hadoop2開始,增加了對HDFS高可用(HA)的支援,配置了1對active-standby的namenode。當活動的namenode失效,備用的namenode能夠快速(幾十秒的時間)實作任務接管,因為最新的狀态存儲在記憶體中:包括最新的編輯日志條目和最新的資料塊映射資訊。實際觀察到的失效時間略長一點,需要1分鐘左右,這是因為系統需要保守确定活動的namenode是否真的失效了。假設活動的namenode和備用的namenode都失效了(人品爆發了),管理者依舊可以聲明一個備用namenode并實作冷啟動。

實際開發踩坑

在實際開發過程中,由于配置或者啟動順序的原因,倒是會經查遇到standby的問題,甚至發現master和slave兩個NameNode的狀态均為standby。比如啟動了hdfs再啟動zookeeper 導緻zookeeper的選舉機制zkfc(DFSZKFailoverController)沒有格式化 NameNode節點的自動切換機制沒有開啟 兩個NameNode都處于standby狀态(解決方案:先啟動zookeeper叢集:zkServer.sh start 再啟動hdfs叢集FSNamesystem)。

人工檢視namenode的方法

sudo -E -u hadoop /home/hadoop/bin/hdfs haadmin -getServiceState nn1           

private HdfsFileStatus startFileInt(String src,
      PermissionStatus permissions, String holder, String clientMachine,
      EnumSet<CreateFlag> flag, boolean createParent, short replication,
      long blockSize, CryptoProtocolVersion[] supportedVersions,
      String ecPolicyName, boolean logRetryCache) throws IOException

 //檢查備援政策:副本或者糾删碼
 boolean shouldReplicate = flag.contains(CreateFlag.SHOULD_REPLICATE);
 //檔案寫入鎖
writeLock(); 
 //根據檔案目錄字元串執行個體化目錄結構。比如/hdfs/madashu,在hdfs裡需要把目錄結構映射成對象
  iip = FSDirWriteFileOp.resolvePathForStartFile(
          dir, pc, src, flag, createParent);

 feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(
              dir, iip, ezInfo);
// 添加到檔案目錄樹中:檢查檔案是否已經存在,是否可覆寫,檔案數量的限制,糾删碼格式存儲,擷取糾删碼政策,建立檔案節點等。           

這裡面出現了一個新的名詞:糾删碼,Erasure Coding,EC。前面章節我們提到了預設情況下,HDFS的資料塊都會儲存三個副本。副本提供了一種簡單而健壯的備援方式來最大化保證資料的可用性。資料的多副本同時可以盡量保證計算任務的本地化。但副本方式成本是較高的:預設情況下三副本方式會在存儲空間或其他資源(比如寫入資料時的網絡帶寬)中産生200%的開銷。對于較少通路的資料集(對叢集的I/O影響相對不大),它們的第二個或者第三個副本會比較少通路,但是仍會消耗相同的存儲空間。是以可以使用糾删碼來代替多副本的方式,它使用更少的存儲卻可以保證相同級别的容錯。在典型配置下,與三副本方式相比,EC可以将存儲成本降低約50%。但同樣他的使用也是需要一些代價的,一旦資料需要恢複,他會造成2大資源的消耗:

1、網絡帶寬的消耗,因為資料恢複需要去讀其他的資料塊和校驗塊

2、進行編碼,解碼計算需要消耗CPU資源

具體可參見

https://cloud.tencent.com/developer/article/1363388

2.9、檔案透明加密處理和目錄樹

目錄樹:

讀Hadoop3.2源碼,深入了解java調用HDFS的常用操作和HDFS原理目錄一、java調用HDFS的常見操作二、深入了解HDFS寫檔案的流程和HDFS原理
讀Hadoop3.2源碼,深入了解java調用HDFS的常用操作和HDFS原理目錄一、java調用HDFS的常見操作二、深入了解HDFS寫檔案的流程和HDFS原理

在2.8 的代碼中,還出現了目錄樹和檔案加密,這一塊就不做多講了。分享兩個相關的連結:

《HDFS檔案目錄詳解》

https://blog.csdn.net/baiye_xing/article/details/76268495

《HDFS資料加密空間--Encryption zone》

https://www.cnblogs.com/bianqi/p/12183761.html

以上源碼才完成了檔案建立過程,接下來還需要通過管道方式将檔案寫入datanode中去,後續有機會再和大家一些學習分享。

// 建立檔案。
    FSDataOutputStream outputStream =
            fileSystem.create(new Path("/hdfs/madashu/test"));
    // 寫入檔案内容
    outputStream.write("你好Hadoop,我是碼大叔".getBytes());
    outputStream.flush();
    IOUtils.closeStream(outputStream);           

以下文字來自于《Hadoop權威指南》一書,對HDFS用戶端寫流程進行了總結,作為本文的收尾,想大牛緻敬!

讀Hadoop3.2源碼,深入了解java調用HDFS的常用操作和HDFS原理目錄一、java調用HDFS的常見操作二、深入了解HDFS寫檔案的流程和HDFS原理

1、建立檔案

HDFS用戶端寫一個新的檔案時,會首先調用DistributedFileSystem.create()方法在HDFS檔案系統中建立一個新的空檔案。這個方法在底層會通過調用ClientProtocol.create()方法通知Namenode執行對應的操作,Namenode會首先在檔案系統目錄樹中的指定路徑下添加一個新的檔案,然後将建立新檔案的操作記錄到editlog 中。完ClientProtocol.create()調用後,DistributedFileSystem.create()方法就會傳回一個HdfsDataOutputStream對象,這個對象在底層包裝了一個DFSOutputStream對象,真正執行寫資料操作的其實是DFSOutputStream對象。

2、 建立資料流管道

擷取了 DFSOutputStream對彖後,HDFS用戶端就可以調用DFSOutputStream.write()方法來寫資料了。由于 DistributedFileSystem.create()方法隻是在檔案系統目錄樹中建立了一個空檔案,并沒有申請任何資料塊,是以DFSOutputStream 會首先調用 ClientProtocol.addBlock()向 Namenode 申請一個新的空資料塊,addBlock()方法會返冋一個LocatedBlock對象,這個對象儲存了存儲這個資料塊的所有資料節點的位置資訊。獲得了資料流管道中所有資料節點的資訊後,DFSOutputStream就可以建立資料流管道寫資料塊了。

3、通過資料流管道寫入資料

成功地建立資料流管道後,HDFS用戶端就可以向資料流管道寫資料了。寫入DFSOutputStream中的資料會先被緩存在資料流中,之後這些資料會被切分成一個個資料包(packet)通過資料流管道發送到所有資料節點。這裡的每個資料包都會按照上圖所示,通過資料流管道依次寫入資料節點的本地存儲。每個資料包都有個确認包,确認包會逆序通過資料流管道回到輸出流。輸出流在确認了所有資料節點已經寫入這個資料包之後,就會從對應的緩存隊列删除這個資料包。當用戶端寫滿一個資料塊之後,會調用addBlock()申請一個新的資料塊,然後循環執行上述操作。

4、關閉輸入流并送出檔案

當HDFS用戶端完成了整個檔案中所有資料塊的寫操作之後,就可以調用close()方法關閉輸出流,并調用ClientProtocol.completeO方法通知Namenode送出這個檔案中的所有資料塊,也就完成了整個檔案的寫入流程。

對于Datanode ,當Datanode成功地接受一個新的資料塊時,Datanode會通過

DatanodeProtocol.blockReceivedAndDeleted()方法向 Namenode 彙報,Namenode 會更新記憶體中的資料塊與資料節點的對應關系。

本文參考:

《Hadoop權威指南》

《Hadoop 2.X HDFS源碼剖析 》

https://www.cnblogs.com/joqk/p/3963101.html https://blog.csdn.net/androidlushangderen/article/details/52850349 http://blog.itpub.net/69908606/viewspace-2648472/