天天看點

HDFS使用Backup Node、Checkpoint Node以及Standby Namenode進行checkpoint的機制詳解前言1. HA模式下的StandBy NameNode進行checkpoint以及image檔案的傳輸2. 使用Checkpoint Node和Backup Node進行checkpoint操作結語

前言

checkpoint,就是将某一個時間點的記憶體鏡像,完整地存放到磁盤的過程,比如,我們在StandBy NameNode上可以看到這樣的image檔案:

fsimage_0000000000992806947
fsimage_0000000000992806947.md5
           

fsimage_0000000000992806947說明這個檔案存放了txId為992806947和以前的全部記憶體操作的鏡像,隻要将這個image檔案load到記憶體,那麼NameNode就有了曆史上截止到992806947操作發生時候的記憶體鏡像。fsimage_0000000000992806947.md5是這個image檔案的md5校驗值。

image檔案與edit log檔案相配合,就可以讓NameNode在重新開機的時候恢複到關閉前的狀态,即NameNode啟目錄

    • 前言
  • 前言
  • 1. HA模式下的StandBy NameNode進行checkpoint以及image檔案的傳輸
    • 1.1 CheckpointerThread運作機制簡介
    • 1.2 checkpoint過程
    • 1.3 checkpoint完成以後,将fsimage檔案上傳到Active NameNode
  • 2. 使用Checkpoint Node和Backup Node進行checkpoint操作
    • 2.1 Checkpoint Node和Backup Node進行checkpoint操作的簡介
    • 2.2 Checkpoint Node和Backup Node的初始化
    • 2.3 使用Checkpointer線程與Active NameNode進行檔案同步
    • 2.4 Backup Node與Active NameNode進行基于JournalProtocol RPC協定進行實時edit操作同步
  • 結語
    • 使用預設的StandBy NameNode進行checkpoint以及image檔案的傳輸
      • checkpoint過程
        • 檔案傳輸過程
    • 使用Checkpoint Node和Backup Node進行checkpoint操作
      • 使用Checkpointer與Active NameNode進行檔案同步
      • Backup Node與Active NameNode進行基于JournalProtocol RPC協定進行實時edit操作同步

前言

checkpoint,就是将某一個時間點的記憶體鏡像,完整地存放到磁盤的過程,比如,我們在StandBy NameNode上可以看到這樣的image檔案:

fsimage_0000000000992806947
fsimage_0000000000992806947.md5
           

fsimage_0000000000992806947

說明這個檔案存放了txId為992806947以前(包含)的全部記憶體操作的鏡像,隻要将這個image檔案load到記憶體,那麼NameNode就有了曆史上截止到992806947操作發生時候的記憶體鏡像。

fsimage_0000000000992806947.md5

是這個image檔案的md5校驗值。

image檔案與edit log檔案相配合,就可以讓NameNode在重新開機的時候恢複到關閉前的狀态,即NameNode啟動的時候,會先讀取image檔案,加載某個時間點之前的記憶體鏡像,然後再讀取這個時間點以後的edit log檔案,進而實作對整個記憶體鏡像的恢複。即image檔案是一個全量檔案,而edit log是一個增量檔案,二者互相配置,來實作HDFS重新開機的時候的檔案系統的恢複。

注意,按照《Hadoop技術内幕:深入解析Hadoop Common和HDFS架設計與實作原理》一書中的分類:

1. 檔案系統的目錄樹關系,即目錄、檔案之間的樹形關系,以及檔案和塊之間的對應關系,我們稱為NameNode第一關系;

2. 塊與節點之間的對應關系,即根據塊找到存放該塊的DataNode,我們稱為NameNode第二關系;

這裡的image檔案隻包含NameNode第一關系,不會包含NameNode第二關系。NameNode第二關系是在NameNode啟動以後,由每一個DataNode定時向Active NameNode彙報的,即,DataNode會向Active NameNode報告自己所維護的塊資訊。這樣,當我們需要擷取某個塊的資料的時候,會先通過Active NameNode進行查詢,NameNode會将我們的請求重定向到對應的DataNode。将第二關系與NameNode脫鈎,個人感覺是非常好的設計方式,一方面解脫NameNode,無論DataNode如何橫向擴充,都不會影響NameNode的體量和效率;另外,這也避免同一進制資料在NameNode和DataNode兩處維護,如果兩方都維護,兩方的中繼資料不一緻的處理又是一個難題;

其實,在HA模式下,隻有Active NameNode會維護NameNode第二關系,Standby NameNode不會維護此關系。我在上一篇部落格《HDFS使用QJM(Quorum Journal Manager)實作的高可用性以及備份機制》中介紹過,Standby NameNode隻會通過重放機制,讓自己在記憶體中的NameNode第一關系與Active NameNode基本保持一緻。每個DataNode在向NameNode彙報自身的塊資訊的時候,請求同時發往Active NameNode和Standby NameNode,但是,通過權限檢查機制(

NameNodeHAContext.checkOperation()

),Standby NameNode會抛出異常

Operation category WRITE is not supported in state standby

,這是INFO級别的異常,可以忽略。

HDFS的checkpoint,有兩種實作方式:

1. 通過單獨的節點,即Checkpoint Node或者Backup Node完成;

2. HA模式下,通過Standby NameNode完成,完成以後通過http PUT方式,将生成的image上傳給Active NameNode;

下面,我們就對HDFS的兩種checkpoint方式進行代碼層的解析。

關于Checkpoint Node和Backup Node的簡單介紹,可以看hadoop的官方文檔;

1. HA模式下的StandBy NameNode進行checkpoint以及image檔案的傳輸

checkpoint的目的是為了定時将某個時間點或者操作點之前的全局記憶體鏡像集中為一個獨立檔案,進而在NameNode啟動的時候可以直接讀取該檔案實作系統中繼資料的恢複,如果沒有checkpoint操作,那麼HDFS需要維護數量龐大的edit log檔案,非常不利于系統的啟動速度和簡潔性。定時進行的checkpoint任務實際上與NameNode的核心工作無關,是以,HDFS将這個工作與Active NameNode隔離開,預設由StandBy NameNode負責,即,預設不配置的情況下,

dfs.ha.standby.checkpoints

配置項的值為true,表示使用StandBy NameNode來定時進行checkpoint操作。StandBy NameNode完成一次checkpoint操作生成image檔案以後,會通過http的PUT操作,将這個image檔案傳送給遠端的Active NameNode。

1.1 CheckpointerThread運作機制簡介

我們遵循預設設定,使用StandBy NameNode這種方式來進行checkpoint操作,那麼在NameNode啟動的時候,如果是Standby角色(實際上,HA模式下,兩台NameNode剛啟動的時候都會進入Standby狀态,然後由ZKFC決定哪台需要transition到Active),則會啟動一個叫做

StandbyCheckpointer.CheckpointerThread

的線程,我們來看NameNode以Standby身份啟動時候的代碼:

void startStandbyServices(final Configuration conf) throws IOException {
    //略
    editLogTailer = new EditLogTailer(this, conf);//建立一個editLogTailer線程,用來從遠端的QJM伺服器上拉取editlog,重演到自己到記憶體
    editLogTailer.start();
    if (standbyShouldCheckpoint) {
      standbyCheckpointer = new StandbyCheckpointer(conf, this);//獨立線程,用來不斷進行checkpoint操作
      standbyCheckpointer.start();
    }
  }
           

Standby模式啟動,會建立EditLogTailer負責從JournalNode上拉取edit log檔案并在記憶體中重演(具體細節參考我的部落格《HDFS使用QJM(Quorum Journal Manager)實作的高可用性以及備份機制》),然後,建立StandbyCheckpointer對象,負責進行checkpoint的定時操作。

StandbyCheckpointer的構造方法,實際上就是建立了一個CheckpointerThread線程:

public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
      throws IOException {
    //略
    this.thread = new CheckpointerThread();//建立CheckpointerThread線程
    this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
        .setNameFormat("TransferFsImageUpload-%d").build();
    setNameNodeAddresses(conf);
  }
           

這個線程的任務,就是通過判斷距離上一次checkpoint操作的時間是否超過門檻值(

dfs.namenode.checkpoint.period

,預設3600s,即1個小時),以及目前沒有進行checkpoint操作的資料量是否超過門檻值(

dfs.namenode.checkpoint.txns

,預設1000000)來判斷是否應該進行checkpoint操作。

在完成了checkpoint操作,生成了對應的img檔案以後,會通過HTTP PUT操作,将這個檔案發送到Active NameNode。

我們一起來看checkpoint以及checkpoint完畢以後檔案的上傳過程。

StandbyCheckpointer.doCheckpoint()

方法在

StandbyCheckpointer.CheckpointerThread

線程中被調用,負責checkpoint和image檔案傳輸到Active NameNode的工作。

private void doCheckpoint() throws InterruptedException, IOException {
    ...略
    //對命名空間加cpLock鎖,這是StandBy Namenode專用鎖,防止checkpoint過程中發生了edit log重演
    namesystem.cpLockInterruptibly();
    try {
      assert namesystem.getEditLog().isOpenForRead() : //狀态檢查,預期情況下,StandBy NameNode所維護的editlog應該是處于open for read狀态,這是StandBy NameNode啟動以後的持續狀态
        "Standby Checkpointer should only attempt a checkpoint when " +
        "NN is in standby mode, but the edit logs are in an unexpected state";
      FSImage img = namesystem.getFSImage();
      long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();//上一次checkpoint完成的位置
      long thisCheckpointTxId = img.getLastAppliedOrWrittenTxId();//目前最後一次操作的位置,即本次checkpoint需要到達的位置
      assert thisCheckpointTxId >= prevCheckpointTxId;
      if (thisCheckpointTxId == prevCheckpointTxId) {
        LOG.info("A checkpoint was triggered but the Standby Node has not " +
            "received any transactions since the last checkpoint at txid " +
            thisCheckpointTxId + ". Skipping...");
        return;
      }

      imageType = NameNodeFile.IMAGE;
      img.saveNamespace(namesystem, imageType, canceler);//進行checkpoint操作
      txid = img.getStorage().getMostRecentCheckpointTxId();
      assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
        thisCheckpointTxId + " but instead saved at txid=" + txid;

      // Save the legacy OIV image, if the output dir is defined.
      String outputDir = checkpointConf.getLegacyOivImageDir();
      if (outputDir != null && !outputDir.isEmpty()) {
        img.saveLegacyOIVImage(namesystem, outputDir, canceler);
      }
    } finally {
      namesystem.cpUnlock();
    }

    //采用異步方式,将目前的img檔案發送到遠端的Active NameNode
    ExecutorService executor =
        Executors.newSingleThreadExecutor(uploadThreadFactory);
    Future<Void> upload = executor.submit(new Callable<Void>() {
      @Override
      public Void call() throws IOException {
        TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
            namesystem.getFSImage().getStorage(), imageType, txid, canceler);
        return null;
      }
    });
    executor.shutdown();
    try {
      upload.get();
    } catch (InterruptedException e) {
      //異常
    }
  }
           

Standby NameNode在進行checkpoint操作之前,需要對整個namespace加鎖,這裡的鎖變量是cpLock(cp是checkpoint的簡稱)。FSNamesystem提供了兩把鎖,fsLock負責鎖定和保護整個namesystem,即,任何時候隻有擷取fsLock的線程能夠修改namesystem的結構,比如塊資訊、命名空間狀态等;而cpLock是專門給Standby NameNode使用的鎖,目的是防止checkpoint過程中發生edit log重演操作,cpLock不負責鎖定對block資訊的修改,這是因為Standby NameNode是無權維護和修改檔案到快資訊等的對應關系的,隻有Active NameNode會修改,是以無需對整個命名空間加鎖。是以,cpLock隻被用在edit log重演(代碼

EditLogTailer.doWork()

)和本節所講的checkpoint過程中。

回到

doCheckpoint()

方法,可以看到,checkpoint操作通過

img.saveNamespace(namesystem, imageType, canceler);

完成,而img檔案的傳輸,通過調用

TransferFsImage.uploadImageFromStorage(...)

完成。

1.2 checkpoint過程

跟蹤

img.saveNamespace(namesystem, imageType, canceler);

代碼,最終調用

FSImage.saveFSImageInAllDirs()

方法完成checkpoint,這個方法會生成image檔案,并存放到一個或者多個image存儲目錄,這是因為出于高可用考慮,我們是可以配置一個以上的img dir的,讓相同的img檔案存放在不同的磁盤上:

<property>
                <name>dfs.namenode.name.dir</name>
                <value>/data1/data/hadoop/namenode,/data2/data/hadoop/namenode</value>
        </property>
           
private synchronized void saveFSImageInAllDirs(FSNamesystem source,
      NameNodeFile nnf, long txid, Canceler canceler) throws IOException {
    ...略
    //擷取配置的img 目錄
    if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == ) {
      throw new IOException("No image directories available!");
    }
    ...略
    SaveNamespaceContext ctx = new SaveNamespaceContext(
        source, txid, canceler);//ctx中儲存了生成img檔案所需要的上下文資訊,最重要的,這個img檔案的end txId,即這個checkpoint的目标偏移位置

    try {
      List<Thread> saveThreads = new ArrayList<Thread>();
      // save images into current
      for (Iterator<StorageDirectory> it //對于每一個storage目錄,都建立一個獨立線程,同時進行img檔案的生成,提高生成效率
             = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
        StorageDirectory sd = it.next();//
        FSImageSaver saver = new FSImageSaver(ctx, sd, nnf);
        Thread saveThread = new Thread(saver, saver.toString());
        saveThreads.add(saveThread);
        saveThread.start();
      }
      waitForThreads(saveThreads);//等待所有的saveThread完成存儲操作
      saveThreads.clear();
      storage.reportErrorsOnDirectories(ctx.getErrorSDs());
      ...略
      if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == ) {
        throw new IOException(
          "Failed to save in any storage directories while saving namespace.");
      }
     ...略
      //完成了img檔案的存取,實際上是存為一個中間檔案,以NameNodeFile.IMAGE_NEW開頭。
      //這時候就可以把這些中間檔案rename稱為最終正式檔案了
      最後一個參數false,代表不需要rename md5檔案,這是因為在FSImageSaver中生成臨時檔案的時候已經生成了最終的md5檔案
      renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf, false);
    } finally {  }
  }
           

通過

storage.dirIterator(NameNodeDirType.IMAGE)

擷取image檔案目錄的一個Iterator,然後逐個周遊這些目錄并将image檔案存放下來。對于多image目錄的情況,會為每一個目錄建立一個單獨線程,多目錄并行進行。但是,通過

waitForThreads(saveThreads);

,會同步等待所有目錄完成img存儲才會執行下一步,是以,盡管内部并行,但是

saveFSImageInAllDirs(...)

是一個同步方法,方法傳回時,肯定已經完成了将image檔案生成到所有目錄。

FSImageSaver類負責完成對把img檔案存放到某個img目錄,我們來看

FSImageSaver.saveFSImage(...)

方法:

/**
   * Save the contents of the FS image to the file.
   */
  void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
      NameNodeFile dstType) throws IOException {
    long txid = context.getTxId();//checkpoint的目标偏移位置
    File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
    File dstFile = NNStorage.getStorageFile(sd, dstType, txid);

    FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
    FSImageCompression compression = FSImageCompression.createCompression(conf);
    //按照壓縮配置,将img存入到以fsimage.ckpt開頭的中間檔案中
    saver.save(newFile, compression);
    MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());//儲存md5檔案
    storage.setMostRecentCheckpointInfo(txid, Time.now());
  }
           

saveFSImage()

方法可以看到,生成image檔案的時候,并不是直接将記憶體鏡像dump到對應的image磁盤目錄,而是會産生一個以

fsimage.ckpt

開頭的中間檔案,如:

fsimage.ckpt_0000000000992806947

,然後生成對應的MD5校驗檔案,如:

fsimage.ckpt_0000000000992806947.md5

。當多目錄image檔案全部完成了中間檔案的生成,再調用

renameCheckpoint(...)

方法,将所有目錄的中間檔案rename為最終的格式為

fsimage_0000000000992806947

的檔案;

本文不去詳述如何将記憶體鏡像讀出、壓縮、序列化的細節,也不去詳述image檔案的結構,這涉及到NameNode的檔案系統管理,需要較長篇幅去講解。我會另起博文進行講解。

1.3 checkpoint完成以後,将fsimage檔案上傳到Active NameNode

如上文所講,

doCheckpoint()

操作包括了checkpoint和image檔案的傳輸,我們來看checkpoint完成以後,image檔案的傳輸。

doCheckpoint()

方法

//采用異步方式,将目前的img檔案發送到遠端的Active NameNode
    ExecutorService executor =
        Executors.newSingleThreadExecutor(uploadThreadFactory);
    Future<Void> upload = executor.submit(new Callable<Void>() {
      @Override
      public Void call() throws IOException {
        TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
            namesystem.getFSImage().getStorage(), imageType, txid, canceler);
        return null;
      }
    });
    executor.shutdown();
    try {
      upload.get();
    } catch (InterruptedException e) {
      //異常處理 略
    }
           

可以看到,通過ExecutorService建立了一個獨立線程負責檔案傳輸過程。這是因為image檔案一般較大,傳輸較為耗時,是以如果此時正好發生NameNode從standby到active的transition過程,這個transition會發生長時間阻塞,是以會建立一個單獨線程執行。

dfs.namenode.http-address

字首配置項設定了我們的NameNode的http位址:比如,我們在hdfs-site.xml中這樣配置:

<property>
                <name>dfs.namenode.http-address.datahdfsmaster.namenode1</name>
               <value>10.130.277.29:50070</value>
        </property>
           

此時,我們名字為

datahdfsmaster

的nameservice下的名字為

namenode1

的namenode的http位址是

10.130.277.29:50070

,這就是

namenode1

的http位址。

通過跟蹤

TransferFsImage.uploadImageFromStorage()

的代碼,我們可以非常詳細地看到根據Active NameNode的URI建構HttpURLConnection對象的建構,以及設定requestMethod為PUT、根據

dfs.image.transfer.chunksize

設定PUT請求的chunk size、設定http請求的逾時時間的全過程和細節,這裡也不再詳述;

2. 使用Checkpoint Node和Backup Node進行checkpoint操作

當然,如果我們認為這樣也不是很好,可能會影響StandBy NameNode的工作,那麼我們可以單獨起一個程序,專門負責checkpoint工作,即使用單獨的Checkpoint Node(以下簡稱CN)或者Backup Node(以下簡稱BN)負責checkpoint的定時工作。

注意,Checkpoint Node和Backup Node在某種程度上幾乎替代了Standby NameNode的功能,是以,在HA模式下,無法啟動Checkpoint Node和Backup Node。必須使用非HA模式才可以啟動并使用Checkpoint Node。

2.1 Checkpoint Node和Backup Node進行checkpoint操作的簡介

Checkpoint Node會通過單獨線程,定時從Active NameNode拉取edit log檔案拷貝到本地,并且将這些edit log在自己記憶體中進行重演。如果checkpoint條件具備,将進行checkpoint操作,檔案存入本地,并通過HTTP PUT的方式将fsimage檔案傳輸給Active NameNode。

Backup Node除了具有Checkpoint Node的上述所有功能外,還會通過RPC方式(屬于stream的方式,差別于單獨定時拷貝)實時接收Active NameNode的edit操作。是以,同Checkpoint Node相比,Backup Node的記憶體映象在時間差上幾乎與Active NameNode一緻。

下面,我們具體來介紹Checkpoint Node、Backup Node的edit log和fsimage的處理機制,以及Backup Node同Active NameNode之間基于RPC通信實作的實時edit log傳輸。

2.2 Checkpoint Node和Backup Node的初始化

我們從NameNode的啟動過程代碼可以看到,通過在NameNode啟動的時候設定啟動參數,可以将這個節點設定為Checkpoint Node或者Backup Node:

public static NameNode createNameNode(String argv[], Configuration conf)
      throws IOException {
    LOG.info("createNameNode " + Arrays.asList(argv));
    ...略
    StartupOption startOpt = parseArguments(argv);
    setStartupOption(conf, startOpt);
    switch (startOpt) {
      case FORMAT: {//NameNode 格式化
        ...略
      }
      case BACKUP://啟動一個 backup node
      case CHECKPOINT: {//啟動一個checkpoint node
        NamenodeRole role = startOpt.toNodeRole();
        DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
        return new BackupNode(conf, role);//無論是BACKUP還是CHECKPOINT,都使用BackupNode對象進行管理,唯一不同是角色不同,一個是NamenodeRole.BACKUP,一個是NamenodeRole.CHECKPOINT
      }
      ....略
      default: {
        DefaultMetricsSystem.initialize("NameNode");
        return new NameNode(conf);//正常啟動一個NameNode
      } } }
           

Checkpoint Node或者Backup Node的實作類都是BackupNode,Backup Node 是NameNode的子類,這說明Backup Node基本上與NameNode的功能一緻,所維護的中繼資料資訊相同,隻是有了定時checkpoint等功能,同時,Backup Node沒有對叢集中繼資料進行修改的權限。

NamenodeRole枚舉類型标記了NameNode節點的角色:

/**
   * Defines the NameNode role.
   */
  static public enum NamenodeRole {
    NAMENODE  ("NameNode"),//普通的NameNode節點,如Active NameNode或者Standby NameNode
    BACKUP    ("Backup Node"),//BACKUP Node
    CHECKPOINT("Checkpoint Node");//Checkpoint Node
    ...略
  }
           

Backup Node的初始化方法

initialize()

重寫了NameNode的

initialize()

方法,這個方法主要完成三項工作:

1. 握手:通過NameNodeProtocol協定的

versionRequest()

接口,與Active NameNode進行版本協商等工作,即握手;Active NameNode會響應自己的storage資訊,比如,namespaceId,clusterId, blockpoolId,BN/CN将這些資訊設定到自己的storage資訊中。在下面的RPC通信中,BN/CN會攜帶這些資訊,然後Active NameNode會對收到的CN/BN請求中攜帶的storage資訊進行校驗,隻有校驗一緻,才會成功傳回,否則,認為是異常請求;

2. 注冊:通過NameNodeProtocol協定的

registerSubordinateNamenode()

接口,向Active NameNode注冊自己。注冊請求中攜帶了在握手階段協商一緻的storage資訊、clusterId等資訊,Active NameNode作為服務端收到注冊請求以後會對這些資訊進行校驗。這樣,Active NameNode就可以在發生

startLogSegment()

等操作的時候,通過JournalProtocol協定的

startLogSegment()

接口告知BN,同時,當發生edit操作的時候,通過JournalProtocol接口的journal()接口,将這個操作發送給BN;

3. 線程啟動:

啟動

Checkpointer`線程,負責不斷從Active NameNode拉取edit log檔案;Checkpointer線程從啟動到開始運作,涉及到的資料傳輸和RPC調用過程如下:

這是BN初始化過程中的RPC調用圖:

HDFS使用Backup Node、Checkpoint Node以及Standby Namenode進行checkpoint的機制詳解前言1. HA模式下的StandBy NameNode進行checkpoint以及image檔案的傳輸2. 使用Checkpoint Node和Backup Node進行checkpoint操作結語

這是

BackupNode.initialize()

,握手、注冊以及線程啟動在這裡調用:

protected void initialize(Configuration conf) throws IOException {
    // Trash is disabled in BackupNameNode,
    // but should be turned back on if it ever becomes active.
    conf.setLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY, 
                 CommonConfigurationKeys.FS_TRASH_INTERVAL_DEFAULT);
    //通過handshake,擷取了遠端的Active Namenode的namespace資訊
    NamespaceInfo nsInfo = handshake(conf);
    super.initialize(conf);
    namesystem.setBlockPoolId(nsInfo.getBlockPoolID());

    if (false == namesystem.isInSafeMode()) {
      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
    }

    // Backup node should never do lease recovery,
    // therefore lease hard limit should never expire.
    namesystem.leaseManager.setLeasePeriod(
        HdfsConstants.LEASE_SOFTLIMIT_PERIOD, Long.MAX_VALUE);

    // register with the active name-node 
    registerWith(nsInfo);
    // Checkpoint daemon should start after the rpc server started
    runCheckpointDaemon(conf);
    InetSocketAddress addr = getHttpAddress();
    if (addr != null) {
      conf.set(BN_HTTP_ADDRESS_NAME_KEY, NetUtils.getHostPortString(getHttpAddress()));
    }
  }
           

在完成了

handshake()

即namespaceId、cluster id等資訊等協商以後,CN/BN開始向ActiveNameNode注冊自己:

注冊的代碼為

BackupNode.registerWith()

private void registerWith(NamespaceInfo nsInfo) throws IOException {
    BackupImage bnImage = (BackupImage)getFSImage();
    NNStorage storage = bnImage.getStorage();
    // verify namespaceID
    if (storage.getNamespaceID() == ) { // new backup storage
      storage.setStorageInfo(nsInfo);
      storage.setBlockPoolID(nsInfo.getBlockPoolID());
      storage.setClusterID(nsInfo.getClusterID());
    } else {
      nsInfo.validateStorage(storage);
    }
    bnImage.initEditLog(StartupOption.REGULAR);
    setRegistration();
    NamenodeRegistration nnReg = null;
    while(!isStopRequested()) {
      try {
        //通過getRegistration()擷取請求體,發送給Active NameNode。由于請求體中攜帶了角色資訊,
          //遠端的Active NameNode會根據角色判斷是否加入到自己的output stream中。通過registerBackupNode.registerBackupNode()
         //可以看到,隻有backup的角色會被添加到輸出流中去
        nnReg = namenode.registerSubordinateNamenode(getRegistration());
        break;
      } catch(SocketTimeoutException e) {  // name-node is busy
       ...略
      }
    }

    ...略
    nnRpcAddress = nnReg.getAddress();
  }
           

我們跟蹤

registerWith()

方法,可以看到注冊實際上使用的是NamenodeProtocol協定的

registerSubordinateNamenode()

接口。Active Namenode收到注冊請求以後,調用

FSNamesystem.registerBackupNode()

來進行服務端對請求的處理與響應:

void registerBackupNode(NamenodeRegistration bnReg,
      NamenodeRegistration nnReg) throws IOException {
    writeLock();//擷取全局鎖
    try {
    ...略
    //隻有當它的角色是BACKUP的時候,才會加入到FSEditLog.journalSet中,是以,Active NameNode隻會向Backup節點發送自己的edit操作
      //隻接受BACKUP節點的注冊資訊
      if (bnReg.getRole() == NamenodeRole.BACKUP) {
        getFSImage().getEditLog().registerBackupNode(
            bnReg, nnReg);
      }
    } finally {
      writeUnlock();
    }
  }
           

從上述代碼可以看到,Active NameNode對對請求者的身份類型進行了判斷。BN和CN在這裡産生差別:

  • 如果身份是

    NamenodeRole.BACKUP

    ,那麼Active NameNode會把這個用戶端加入到自己的output stream中,這樣,所有的Active NameNode上的edit操作,都會通過JournalPrototol協定的

    journal()

    接口(Active NameNode是JournalPrototol協定用戶端,而Backup Node是JournalPrototol協定服務端),發送給這個Backup Node。Backup Node收到這些操作内容以後,會将這些操作在記憶體中進行replay,即重演,這樣,Backup Node的記憶體與Active NameNode的記憶體基本一緻。根據官方文檔,由于Backup Node可以通過這種stream的方式實時擷取Active NameNode上的操作,是以沒有必要從Active NameNode拷貝fsimage檔案和edit log檔案;
The Backup node does not need to download fsimage and edits files from the active NameNode in order to create a checkpoint, as would be required with a Checkpoint node or Secondary NameNode, since it already has an up-to-date state of the namespace state in memory. The Backup node checkpoint process is more efficient as it only needs to save the namespace into the local fsimage file and reset edits.
           
  • 而如果用戶端身份是

    NamenodeRole.CHECKPOINT

    ,就沒有這個待遇了,Checkpoint Node雖然也會像Backup Node一樣在啟動的時候注冊(上文說過,它們是同一個實作類),但是,注冊的時候,Active NameNode不會将其加入到output stream中,即Active NameNode是不會通過JournalPrototol協定向Checkpoint Node發送實時edit操作的,是以,Checkpoint Node隻能通過

    Checkpointer.java

    線程同步遠端Active NameNode上的檔案,而無法通過JournalPrototol實時接收Active NameNode的寫操作。Checkpoint Node隻能通過不斷從Active NameNode下載下傳fsimage檔案和edit log檔案,然後将這些檔案讀取到記憶體并定時生車功能checkpoint操作,然後,将checkpoint檔案上傳到Active NameNode。
    HDFS使用Backup Node、Checkpoint Node以及Standby Namenode進行checkpoint的機制詳解前言1. HA模式下的StandBy NameNode進行checkpoint以及image檔案的傳輸2. 使用Checkpoint Node和Backup Node進行checkpoint操作結語

是以,由于Backup Node與Checkpoint Node的fsimage和edit log的同步代碼完全相同,除此以外,就是Backup Node比Checkpoint Node多出的實時資料傳輸過程,是以,下文将隻針對Backup Node,分成兩節,分别講解fsimage和edit log的檔案傳輸,以及Backup Node基于RPC實作的與NameNode之間的edit操作的傳輸。

2.3 使用Checkpointer線程與Active NameNode進行檔案同步

BackupNode.initialize()

方法的第三個工作,就是通過調用

runCheckpointDaemon(...)

啟動一個

Checkpointer

的線程(注意,這不是上一節講到的Standby Namenode節點使用的

StandbyCheckpointer.CheckpointerThread

),這個線程負責從遠端的Active NameNode上讀取edit log 和image檔案存入本地:

/**
   * Start a backup node daemon.
   */
  private void runCheckpointDaemon(Configuration conf) throws IOException {
    checkpointManager = new Checkpointer(conf, this);
    checkpointManager.start();//線程啟動
  }
           

這是Checkpointer線程的run()方法:

@Override
  public void run() {
    ...略
    while(shouldRun) {
      try {
        long now = now();
        boolean shouldCheckpoint = false;
        if(now >= lastCheckpointTime + periodMSec) {//判斷是否達到checkpoint時間間隔要求
          shouldCheckpoint = true;
        } else {
          long txns = countUncheckpointedTxns();
          if(txns >= checkpointConf.getTxnCount())//判斷是否到達checkpoint資料量要求
            shouldCheckpoint = true;
        }
        if(shouldCheckpoint) {
          doCheckpoint();
          lastCheckpointTime = now;
        }
      } catch(IOException e) {
        ...略
      }
      try {
        Thread.sleep(periodMSec);  } catch(InterruptedException ie) {  }
    }
  }
           

run()

方法同Standby NameNode.

CheckpointerThread.run()

方法相似,也是會判斷是否已經滿足了checkpoint條件,包括距離上一次checkpoint操作的時間是否超過門檻值(

dfs.namenode.checkpoint.txns

,預設1000000),以及目前沒有進行checkpoint操作的資料量是否超過門檻值(

dfs.namenode.checkpoint.period

,預設3600s,即1個小時),如果任意條件滿足,則調用

doCheckpoint()

操作進行checkpoint:

void doCheckpoint() throws IOException {
    BackupImage bnImage = getFSImage();
    NNStorage bnStorage = bnImage.getStorage();

    long startTime = monotonicNow();
    bnImage.freezeNamespaceAtNextRoll();
    //服務端調用,檢視NameNodeRpcServer.startCheckpoint(),服務端會結束掉正在寫的sgement檔案,開啟一個新的segment檔案
    //用戶端調用,檢視
    NamenodeCommand cmd = 
      getRemoteNamenodeProxy().startCheckpoint(backupNode.getRegistration());
    CheckpointCommand cpCmd = null;
    //關于這些傳回值的含義,可以參考FSImage.startCheckpoint()方法
    switch(cmd.getAction()) {
    //If backup storage contains image that is newer than or incompatible with 
    // what the active name-node has, then the backup node should shutdown. wuchang
      case NamenodeProtocol.ACT_SHUTDOWN://如果發現backup node的image比namenode的更新,或者storage的版本不一緻,肯定更有問題,這時候backup node需要關閉
        shutdown();
        throw new IOException("Name-node " + backupNode.nnRpcAddress
                                           + " requested shutdown.");
      case NamenodeProtocol.ACT_CHECKPOINT://校驗通過
        cpCmd = (CheckpointCommand)cmd;
        break;
      default:
        throw new IOException("Unsupported NamenodeCommand: "+cmd.getAction());
    }

    //BackupImage.namenodeStartedLogSegment()如果正在發生,那麼如果處于frozen,則必須等待
    bnImage.waitUntilNamespaceFrozen();

    //檢視服務端調用 NameNodeRPCServer.startCheckpoint()
    CheckpointSignature sig = cpCmd.getSignature();

    // Make sure we're talking to the same NN!
    sig.validateStorageInfo(bnImage);

    long lastApplied = bnImage.getLastAppliedTxId();
    LOG.debug("Doing checkpoint. Last applied: " + lastApplied);
    RemoteEditLogManifest manifest = //從NameNode處擷取edit log的檔案清單清單
      getRemoteNamenodeProxy().getEditLogManifest(bnImage.getLastAppliedTxId() + );

    boolean needReloadImage = false;
    if (!manifest.getLogs().isEmpty()) {
      RemoteEditLog firstRemoteLog = manifest.getLogs().get();//擷取第一個遠端的edit log檔案
      // we don't have enough logs to roll forward using only logs. Need
      // to download and load the image.
      //如果從遠端擷取的edit log檔案的transaction 與自己目前最後一次已經擷取的log檔案的transaction存在gap,需要進行reload
      if (firstRemoteLog.getStartTxId() > lastApplied + ) {
        //sig.mostRecentCheckpointTxId存放了Active NameNode在最後一個checkpoint的位點
        LOG.info("Unable to roll forward using only logs. Downloading " +
            "image with txid " + sig.mostRecentCheckpointTxId);
        MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
            backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId, bnStorage,
            true);//從遠端擷取這個位點的image檔案
        bnImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
            sig.mostRecentCheckpointTxId, downloadedHash);
        lastApplied = sig.mostRecentCheckpointTxId;//更新lastApplied id
        needReloadImage = true;
      }

      if (firstRemoteLog.getStartTxId() > lastApplied + ) {//在下載下傳了最新的image檔案以後,依然存在gap,則抛出異常
        throw new IOException("No logs to roll forward from " + lastApplied);
      }

      // get edits files
      for (RemoteEditLog log : manifest.getLogs()) {//依次下載下傳這些edit log檔案
        TransferFsImage.downloadEditsToStorage(
            backupNode.nnHttpAddress, log, bnStorage);
      }

      //剛才已經下載下傳了新的image檔案,是以需要将這個image檔案reload到記憶體
      if(needReloadImage) {
        LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
        File file = bnStorage.findImageFile(NameNodeFile.IMAGE,
            sig.mostRecentCheckpointTxId);
        bnImage.reloadFromImageFile(file, backupNode.getNamesystem());
      }
      rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());//将edit log應用到記憶體
    }

    long txid = bnImage.getLastAppliedTxId();

    backupNode.namesystem.writeLock();
    try {
      backupNode.namesystem.setImageLoaded();
      if(backupNode.namesystem.getBlocksTotal() > ) {
        backupNode.namesystem.setBlockTotal();
      }
      bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);//将目前記憶體鏡像dump到fsimage檔案
      bnStorage.writeAll();
    } finally {
      backupNode.namesystem.writeUnlock();
    }

    //将image檔案上傳給active namenode
    if(cpCmd.needToReturnImage()) {
      TransferFsImage.uploadImageFromStorage(backupNode.nnHttpAddress, conf,
          bnStorage, NameNodeFile.IMAGE, txid);
    }

    getRemoteNamenodeProxy().endCheckpoint(backupNode.getRegistration(), sig);//結束checkpoint過程

    //隻有backup 節點需要進行converge操作,追趕txid到最新的狀态
    //如果是checkpoint node,沒有這種實時性需求,隻需要依靠fsimage檔案和edit log檔案拷貝就可以完成
    if (backupNode.getRole() == NamenodeRole.BACKUP) {
      bnImage.convergeJournalSpool(); //調用完畢以後,狀态成為IN_SYNC
    }
    backupNode.setRegistration(); // keep registration up to date

    long imageSize = bnImage.getStorage().getFsImageName(txid).length();
    LOG.info("Checkpoint completed in "
        + (monotonicNow() - startTime)/ + " seconds."
        + " New Image Size: " + imageSize);
  }
           

進行checkpoint之前,通過調用NameNodeProtocol協定的startCheckpoint()接口,告知NameNode,自己即将進行checkpoint操作。我們可以檢視服務端NameNodeRpcServer.startCheckpoint()得知Active NameNode接收到請求以後的調用流程,可以看到服務端在收到startCheckpoint()以後的處理流程:

  • 自我狀态校驗:判斷自己目前是不是Active NameNode(防止Standby NameNode成為了被checkpoint的節點);如果校驗失敗,則抛出異常;
  • 請求者身份校驗:判斷請求者的身份是否合法,即是否是

    NamenodeRole.CHECKPOINT

    或者

    NamenodeRole.BACKUP

    ,如果校驗失敗,則傳回一個shutdown指令,要求請求者停機;
  • 請求者的存儲資訊等版本校驗:對請求者BN/CN的storage資訊與自身的storage資訊(比如clusterId、namespaceId等)進行校驗,正常情況下,BN/CN在initialize()的handshake過程中會擷取到Active NameNode的storage資訊,是以,這個校驗會通過,是以校驗會通過。是以這一步是防止未經許可的BN/CN請求進行checkpoint;如果校驗失敗,則傳回一個shutdown指令,要求請求者停機;
  • 緩存的edit log進行flush操作:在我的上一篇部落格中講過NameNode通過RPC協定将自己的edit log實時發往JournalNode的過程,這個過程使用了雙緩存。那麼在收到了startCheckpoint()的請求以後,Active NameNode會進行緩存的重新整理,把緩存的未發送的資料發送到遠端Backup Node,然後關閉目前的edit log檔案,開始一個新的edit log檔案;這樣,最近關閉的檔案就可以被BN/CN的checkpoint請求到了。注意,BN/CN從Active NameNode請求讀取edit log檔案的時候,是不會請求處于in-progress狀态的檔案的。
  • 開始進行checkpoint操作:當完成了

    startCheckpoint()

    請求以後,如果校驗通過,就可以進行下一步的fsimage和edit log檔案的同步了。

在詳細介紹

doCheckpoint()

之前,通過以下調用關系圖,我們可以了解

doCheckpoint()

的基本工作流:

HDFS使用Backup Node、Checkpoint Node以及Standby Namenode進行checkpoint的機制詳解前言1. HA模式下的StandBy NameNode進行checkpoint以及image檔案的傳輸2. 使用Checkpoint Node和Backup Node進行checkpoint操作結語

從圖中可以看到,Checkpointer線程首先會通過RPC同遠端的NameNode進行适當溝通,比如,通過上文中講到的startCheckpoint()告知遠端的Active NameNode自己即将進行checkpoint操作,然後,還會通過getEditLogManifest()的RPC擷取遠端檔案清單,然後通過http的方式将清單中的檔案拷貝到自己本地,最後,通過endCheckpoint()的RPC調用,結束checkpoint過程。

由此可見,

doCheckpoint()

方法是整個線程的核心方法,涉及到流程複雜的與Active NameNode之間的通信,是以,我來詳細講解:

STEP 1. 記憶體namespace鎖定

鎖定的目的,是讓RPC streaming傳輸過來的實時的edit操作隻能寫入edit檔案,不可以load到記憶體。

在上文中我講到,BN除了通過這種方式實作fsimage和edit log檔案的同步以外,還會通過RPC stream的方式實時接收edit。這兩種方式互為補充,才使得BN的記憶體鏡像能夠與Active NameNode保持實時一緻。同時,任何時候,兩者隻有一個能夠将edit 操作在記憶體中重演,必須互斥。我們首先來看BackNode的狀态定義:

volatile BNState bnState;
  static enum BNState {
    DROP_UNTIL_NEXT_ROLL,// BN/CN初始啟動的狀态,隻要來一次寫操作,狀态就會切換到JOURNAL_ONLY
    JOURNAL_ONLY,//處于該狀态下,來自RPC Streaming的edit隻可以追加到edit log檔案,不可以應用到記憶體
    IN_SYNC;//處于該狀态下,來自RPC Streaming的edit會先在記憶體中重演,同時寫入到edit lot檔案
  }
           

通過

bnImage.freezeNamespaceAtNextRoll();

,将stopApplyingEditsOnNextRoll标記為置為true,這樣,通過RPC方式的JournalProtocol的startLogSegment()接口告知BN的時候,BN會将目前的狀态置為JOURNAL_ONLY,即不會将新的edit操作load到記憶體。

然後,通過

bnImage.waitUntilNamespaceFrozen();

,一緻等待狀态變為JOURNAL_ONLY,在這種狀态下,通過RPC stream傳過來的edits操作不會load到記憶體,進而達到互斥目的;

通過上面的鎖定和等待,目前狀态已經切換為JOURNAL_ONLY,是以,Checkpointer線程就可以從NameNode讀取fsimage檔案和edit log檔案,然後load到記憶體了;開始STEP 2;

STEP 2.請求檔案清單清單

确認記憶體已經被自己加鎖,則開始請求Active NameNode的fsimage和image檔案清單清單:

RemoteEditLogManifest manifest = //從NameNode處擷取edit log的檔案清單清單
      getRemoteNamenodeProxy().getEditLogManifest(bnImage.getLastAppliedTxId() + );
           

這是通過NameNodeProtocol的RPC協定的getEditLogManifest()接口完成的:

/**
   * Return a structure containing details about all edit logs
   * available to be fetched from the NameNode.
   * @param sinceTxId return only logs that contain transactions >= sinceTxId
   */
  @Idempotent
  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
    throws IOException;
           

參數sinceTxId規定了這些edit log的起始位置,從代碼中可以看到,這個請求參數設定為

bnImage.getLastAppliedTxId() + 1

,即,Backup Node希望請求到的edit log能夠剛好從自己記憶體中的最大的txId的下一個值開始;

STEP 3. 根據檔案清單請求edit log檔案并load到記憶體

通過檔案清單清單擷取檔案清單,遠端Active NameNode真正傳回給我們的檔案清單的起始txId,也許并不一定就是我們所請求的sinceTxId。這時候會進行判斷判斷,如果Active NameNode傳回的edit log檔案的最小的txId大于自己請求的sinceTxId,中間存在空隙,此時,這些傳回的edit是不能夠apply到記憶體中的,因為txId必須嚴格連續遞增,不可丢失任何一個。我們看Active NameNode端在對于BN的getEditLogManifest()請求的處理

NameNodeRpcServer.getEditLogManifest() -> FSEditLog.getEditLogManifest() -> JournalSet.getRemoteEditLogs() -> FileJournalManager->getRemoteEditLogs()

public List<RemoteEditLog> getRemoteEditLogs(long firstTxId,
      boolean inProgressOk) throws IOException {
    File currentDir = sd.getCurrentDir();//擷取目前這個Journal對應的存放EditLog的目錄
    List<EditLogFile> allLogFiles = matchEditLogs(currentDir);//通過正則比對擷取候選的EditLogFile
    List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
        allLogFiles.size());
    for (EditLogFile elf : allLogFiles) {
      ...略
      if (elf.getFirstTxId() >= firstTxId) {//如果目前的segment的第一個txid大于所請求的txId,則加入到傳回結果中
        ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId,
            elf.isInProgress()));
      } else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) {
        //如果目前的segment檔案的startTxId和endTxId位于請求的txId前後,則segment也是滿足條件的
        ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId,
            elf.isInProgress()));
      }
    }

    Collections.sort(ret);//對edit log檔案排序
    return ret;
  }
           

從上述代碼可以看到用戶端如何根據

getEditLogManifest()

中的sinceTxId參數決定傳回哪些檔案。

從其執行邏輯可以看到,可能存在這種情況,伺服器端傳回的edit log中最小的txId的值大于請求的sinceTxId,比如,Active NameNode目前的fsimage檔案和edits檔案是這樣的:

fsimage_0000000000000001300
fsimage_0000000000000001300.md5
edits_0000000000000001301-
edits_0000000000000001401-
edits_inprogress_0000000000000001501
           

BN請求的edit log的sinceTxId為

0000000000000001200

,此時就隻能傳回

edits_0000000000000001301-0000000000000001400

edits_0000000000000001401-0000000000000001500

,即最小的txId值大于請求的sinceTxId。遇到這種情況,BN的處理方式為直接将Active NameNode的fsimage檔案

fsimage_0000000000000001300

下載下傳下來,然後實作txid的完全連續一緻,這相當于将Active NameNode的fsimage和edit log進行了一次完全的複制。fsimage複制過來以後,通過

bnImage.reloadFromImageFile(file, backupNode.getNamesystem());

将fsimage load到記憶體,然後才能開始把請求過來的edit log load到記憶體。當然,如果将Active NameNode的fsimage檔案下載下傳下來發現txId依然無法實作連續,隻能抛出異常了。完成了以上操作,就通過

rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());

将edit log檔案 load到記憶體。

STEP 4.checkpoint檔案的生成并上傳給Active NameNode

經過以上步驟,BN完成了記憶體中namesystem與Active NameNode的一緻。此時,就開始進行checkpoint了,把記憶體的整個鏡像dump成fsimage檔案。然後,把這個checkpoint檔案上傳到Active NameNode。

STEP 5.Backup Node對處于打開狀态的edit log的彙集

我們知道,Backup Node除了有Checkpointer線程進行檔案拷貝,同時使用RPC stream進行edit log的流式傳輸。在上文中講到BNState這個控制Backup Node的狀态,目前,

doCheckpoint()

方法執行過程中,處于

BNState.JOURNAL_ONLY

,來自RPC stream的edits操作隻會寫入到edits檔案,不會load到記憶體。是以,通過調用

bnImage.convergeJournalSpool();

,來将這個未打開的edits檔案中的edits load到記憶體,這樣,Backup Node記憶體中的景象幾乎與Active NameNode保持一緻了;

完成了converge操作,BN的狀态就切換為BNState了,這時候,通過RPC stream收到的edit log在存入edit log檔案的同時,也應用到記憶體,讓記憶體保持與Active NameNode實時一緻;關于RCP stream針對不同狀态的處理方式,看下一節介紹。

2.4 Backup Node與Active NameNode進行基于JournalProtocol RPC協定進行實時edit操作同步

Backup Node與Active NameNode進行基于JournalProtocol RPC協定進行實時edit操作同步,讓Backup Node的記憶體狀态能夠實時與Active NameNode保持一緻。

基于JournalProtocol協定,Active NameNode屬于協定的用戶端,而BN屬于協定的伺服器端。

JournalProtocol協定有兩個接口

  • startLogSegment() 接口調用發生在Active NameNode調用了FSEditLog.startLogSegment()的時候。通過我的另外一篇部落格《HDFS使用QJM(Quorum Journal Manager)實作的高可用性以及備份機制》,可以知道,Active NameNode調用FSEditLog.startLogSegment()是為了結束一個舊的edit檔案并開始一個新的edit檔案。這時候會通過調用JournalProtocol 協定的startLogSegment() 接口通知BN,BN此時也會結束目前的edit log檔案并開始一個新的edit log檔案。
  • journal()接口調用發生在Active NameNode發生了edit log寫操作,是以Active NameNode會調用這個接口将這個寫操作實時發送給BN;
HDFS使用Backup Node、Checkpoint Node以及Standby Namenode進行checkpoint的機制詳解前言1. HA模式下的StandBy NameNode進行checkpoint以及image檔案的傳輸2. 使用Checkpoint Node和Backup Node進行checkpoint操作結語

上一節講Checkpointer線程的時候講到了RPC streaming與Checkpointer之間的同步,我們可以從

journal()

方法看到通過JournalProtocol協定的

journal()

接口收到了Active NameNode的一批edit操作以後,BN根據目前的不同狀态進行的不同處理,看

BackupImage.journal(...)

synchronized void journal(long firstTxId, int numTxns, byte[] data) throws IOException {
    if (LOG.isTraceEnabled()) {
      LOG.trace("Got journal, " +
          "state = " + bnState +
          "; firstTxId = " + firstTxId +
          "; numTxns = " + numTxns);
    }

    switch(bnState) {
      case DROP_UNTIL_NEXT_ROLL:
        return;//什麼都不做,既不用apply到記憶體,也不寫入到本地
      case IN_SYNC://處于IN_SYNC狀态,則将這些消息應用到記憶體
        applyEdits(firstTxId, numTxns, data);//将收到的edits應用到記憶體的namespace
        break;//break以後
      case JOURNAL_ONLY://處于JOURNAL_ONLY,則這一批來自rpc的消息不可以load到記憶體的namespace,隻能夠追加到磁盤的edit檔案
        break;//需要把收到的edit log寫入到本地
      default:
        throw new AssertionError("Unhandled state: " + bnState);//異常狀态
    }
    logEditsLocally(firstTxId, numTxns, data);//将這些操作追加到目前的edit log磁盤檔案,下一次Checkpointer運作,通過調用convergeJournalSpool(),可以負責把這個處于in-progress狀态的檔案裡面的edits操作load到namespace
  }
           

上一節講到Checkpointer線程的時候,在Checkpointer的

doCheckpoint()

方法執行期間,整個BN處于JOURNAL_ONLY狀态,是以,通過RPC方式收到的edits隻會追加到edits檔案,不會調用

applyEdits(firstTxId, numTxns, data);

load到記憶體,這樣,Checkpointer線程拷貝過來的edits檔案或者fsimage檔案就可以load到記憶體了,是以不會造成幹擾。等到Checkpointer結束一輪運作,狀态切換為IN_SYNC,那麼記憶體的操作全就傳遞給RPC了。

關于在IN_SYNC狀态下journal()方法如何将實時收到的edit操作存入檔案并load到記憶體,不在本文讨論範圍之列,有興趣的讀者可以自行閱讀代碼。

結語

總之,HDFS使用Backup Node、Checkpoint Node以及Standby Namenode進行checkpoint操作,原理均相同。它們的存在,既保證了Active NameNode的資料備份,又将Active NameNode從不影響核心業務的checkpoint操作中解脫出來。當Active NameNode發生異常,Standby Namenode可以很快接管HDFS。

對于Backup Node ,個人認為設計得非常好的地方,在于通過良好的同步控制,讓http的方式的檔案拷貝和RPC stream的方式的實時edit傳輸交替配合,讓Backup Node的記憶體鏡像與Active NameNode時刻保持近乎一緻;動的時候,會先讀取image檔案,加載某個時間點之前的記憶體鏡像,然後再讀取這個時間點以後的edit log檔案,進而實作對整個記憶體鏡像的恢複。

注意,按照《Hadoop技術内幕:深入解析Hadoop Common和HDFS架設計與實作原理》一書中的分類:

1. 檔案系統的目錄樹關系,即目錄、檔案之間的樹形關系,以及檔案和塊之間的對應關系,我們稱為NameNode第一關系

2. 塊與節點之間的對應關系,即根據塊找到存放該塊的DataNode,我們稱為NameNode第二關系

這裡的image檔案隻包含NameNode第一關系,不會包含NameNode第二關系。NameNode第二關系是在NameNode啟動以後,由每一個DataNode定時向Active NameNode彙報的,即,DataNode會向Active NameNode報告自己所維護的塊資訊。這樣,當我們需要擷取某個塊的資料的時候,會先通過Active NameNode進行查詢,NameNode會将我們的請求重定向到對應的DataNode。将第二關系與NameNode脫鈎,個人感覺是非常好的設計方式,一方面解脫NameNode,無論DataNode如何橫向擴充,都不會影響NameNode的體量和效率;另外,這也避免同一進制資料在NameNode和DataNode兩處維護,如果兩方都維護,兩方的中繼資料不一緻的處理又是一個難題;

其實,在HA模式下,隻有Active NameNode會維護NameNode第二關系,Standby NameNode不會維護此關系。我在上一篇部落格《》中介紹過,Standby NameNode隻會通過重放機制,讓自己在記憶體中的NameNode第一關系與Active NameNode基本保持一緻。每個DataNode在向NameNode彙報自身的塊資訊的時候,請求同時發往Active NameNode和Standby NameNode,但是,通過權限檢查機制(

NameNodeHAContext.checkOperation()

),Standby NameNode會抛出異常

Operation category WRITE is not supported in state standby

,這是INFO級别的異常,可以忽略。

HDFS的checkpoint,有兩種實作方式:

1. 通過單獨的節點,即Checkpoint Node或者Backup Node完成;

2. HA模式下,通過Standby NameNode完成,完成以後通過http PUT方式,将生成的image上傳給Active NameNode;

下面,我們就對HDFS的兩種checkpoint方式進行代碼層的解析。

關于Checkpoint Node和Backup Node的簡單介紹,可以看hadoop的官方文檔;

使用預設的StandBy NameNode進行checkpoint以及image檔案的傳輸

checkpoint的目的是為了定時将某個時間點或者操作點之前的全局記憶體鏡像集中為一個獨立檔案,進而在NameNode啟動的時候可以直接讀取該檔案實作系統中繼資料的恢複,如果沒有checkpoint操作,那麼HDFS需要維護數量龐大的edit log檔案,非常不利于系統的關系。定時進行的checkpoint任務實際上與NameNode的核心工作無關,是以,HDFS将這個工作與Active NameNode隔離開,預設由StandBy NameNode負責,即,預設不配置的情況下,

dfs.ha.standby.checkpoints

配置項的值為true,表示使用StandBy NameNode來定時進行checkpoint操作。StandBy NameNode完成一次checkpoint操作生成image檔案以後,會通過http的PUT操作,将這個image檔案傳送給遠端的Active NameNode。

我們遵循預設設定,使用StandBy NameNode這種方式來進行checkpoint操作,那麼在NameNode啟動的時候,如果是Standby角色(實際上,HA模式下,兩台NameNode剛啟動的時候都會進入Standby狀态,然後由ZKFC決定哪台需要transition到Active),則會啟動一個叫做

StandbyCheckpointer.CheckpointerThread

的線程,我們來看NameNode以Standby身份啟動時候的代碼:

void startStandbyServices(final Configuration conf) throws IOException {
    //略
    editLogTailer = new EditLogTailer(this, conf);//建立一個editLogTailer線程,用來從遠端的QJM伺服器上拉取editlog,重演到自己到記憶體
    editLogTailer.start();
    if (standbyShouldCheckpoint) {
      standbyCheckpointer = new StandbyCheckpointer(conf, this);//獨立線程,用來不斷進行checkpoint操作
      standbyCheckpointer.start();
    }
  }
           

Standby模式啟動,會建立EditLogTailer負責從JournalNode上拉取edit log檔案并在記憶體中重演(具體細節參考我的部落格《》),然後,就是建立StandbyCheckpointer對象,負責進行checkpoint的定時操作。

StandbyCheckpointer的構造方法,實際上就是建立了一個CheckpointerThread線程:

public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
      throws IOException {
    //略
    this.thread = new CheckpointerThread();//建立CheckpointerThread線程
    this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
        .setNameFormat("TransferFsImageUpload-%d").build();
    setNameNodeAddresses(conf);
  }
           

這個線程的任務,就是通過判斷距離上一次checkpoint操作的時間是否超過門檻值(

dfs.namenode.checkpoint.txns

,預設1000000),以及目前沒有進行checkpoint操作的資料量是否超過門檻值(

dfs.namenode.checkpoint.period

,預設3600s,即1個小時)來判斷是否應該進行checkpoint操作。

在完成了checkpoint操作,生成了對應的img檔案以後,會通過HTTP PUT操作,将這個檔案發送到Active NameNode。

我們一起來看checkpoint以及checkpoint完畢以後檔案的上傳過程。

StandbyCheckpointer.doCheckpoint()

方法在

StandbyCheckpointer.CheckpointerThread

線程中被調用,負責checkpoint和image檔案傳輸到Active NameNode的工作。

private void doCheckpoint() throws InterruptedException, IOException {
    ...略
    //對命名空間加cpLock鎖,這是StandBy Namenode專用鎖,防止checkpoint過程中發生了edit log重演
    namesystem.cpLockInterruptibly();
    try {
      assert namesystem.getEditLog().isOpenForRead() : //狀态檢查,預期情況下,StandBy NameNode所維護的editlog應該是處于open for read狀态,這是StandBy NameNode啟動以後的持續狀态
        "Standby Checkpointer should only attempt a checkpoint when " +
        "NN is in standby mode, but the edit logs are in an unexpected state";
      FSImage img = namesystem.getFSImage();
      long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();//上一次checkpoint完成的位置
      long thisCheckpointTxId = img.getLastAppliedOrWrittenTxId();//目前最後一次操作的位置,即本次checkpoint需要到達的位置
      assert thisCheckpointTxId >= prevCheckpointTxId;
      if (thisCheckpointTxId == prevCheckpointTxId) {
        LOG.info("A checkpoint was triggered but the Standby Node has not " +
            "received any transactions since the last checkpoint at txid " +
            thisCheckpointTxId + ". Skipping...");
        return;
      }

      imageType = NameNodeFile.IMAGE;
      img.saveNamespace(namesystem, imageType, canceler);//進行checkpoint操作
      txid = img.getStorage().getMostRecentCheckpointTxId();
      assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
        thisCheckpointTxId + " but instead saved at txid=" + txid;

      // Save the legacy OIV image, if the output dir is defined.
      String outputDir = checkpointConf.getLegacyOivImageDir();
      if (outputDir != null && !outputDir.isEmpty()) {
        img.saveLegacyOIVImage(namesystem, outputDir, canceler);
      }
    } finally {
      namesystem.cpUnlock();
    }

    //采用異步方式,将目前的img檔案發送到遠端的Active NameNode
    ExecutorService executor =
        Executors.newSingleThreadExecutor(uploadThreadFactory);
    Future<Void> upload = executor.submit(new Callable<Void>() {
      @Override
      public Void call() throws IOException {
        TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
            namesystem.getFSImage().getStorage(), imageType, txid, canceler);
        return null;
      }
    });
    executor.shutdown();
    try {
      upload.get();
    } catch (InterruptedException e) {
      //異常
    }
  }
           

Standby NameNode在進行checkpoint操作之前,需要對整個namespace加鎖,這裡的鎖變量是cpLock(cp是checkpoint的簡稱)。FSNamesystem提供了兩把鎖,fsLock負責鎖定和保護整個namesystem,即,任何時候隻有擷取fsLock的線程能夠修改namesystem的結構,比如塊資訊、命名空間狀态等;而cpLock是專門給Standby NameNode使用的鎖,目的是防止checkpoint過程中發生edit log重演操作,cpLock不負責鎖定對block資訊的修改,這是因為Standby NameNode是無權維護和修改檔案到快資訊等的對應關系的,隻有Active NameNode會修改,是以無需對整個命名空間加鎖。是以,cpLock隻被用在edit log重演(代碼

EditLogTailer.doWork()

)和本節所講的checkpoint過程中。

回到

doCheckpoint()

方法,可以看到,checkpoint操作通過

img.saveNamespace(namesystem, imageType, canceler);

完成,而img檔案的傳輸,通過調用

TransferFsImage.uploadImageFromStorage(...)

完成。

checkpoint過程

跟蹤

img.saveNamespace(namesystem, imageType, canceler);

代碼,最終調用

FSImage.saveFSImageInAllDirs()

方法完成checkpoint,這個方法會生成image檔案,并存放到一個或者多個image存儲目錄,這是因為出于高可用考慮,我們是可以配置一個以上的img dir的,讓相同的img檔案存放在不同的磁盤上:

<property>
                <name>dfs.namenode.name.dir</name>
                <value>/data1/data/hadoop/namenode,/data2/data/hadoop/namenode</value>
        </property>
           
private synchronized void saveFSImageInAllDirs(FSNamesystem source,
      NameNodeFile nnf, long txid, Canceler canceler) throws IOException {
    ...略
    //擷取配置的img 目錄
    if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == ) {
      throw new IOException("No image directories available!");
    }
    ...略
    SaveNamespaceContext ctx = new SaveNamespaceContext(
        source, txid, canceler);//ctx中儲存了生成img檔案所需要的上下文資訊,最重要的,這個img檔案的end txId,即這個checkpoint的目标偏移位置

    try {
      List<Thread> saveThreads = new ArrayList<Thread>();
      // save images into current
      for (Iterator<StorageDirectory> it //對于每一個storage目錄,都建立一個獨立線程,同時進行img檔案的生成,提高生成效率
             = storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
        StorageDirectory sd = it.next();//
        FSImageSaver saver = new FSImageSaver(ctx, sd, nnf);
        Thread saveThread = new Thread(saver, saver.toString());
        saveThreads.add(saveThread);
        saveThread.start();
      }
      waitForThreads(saveThreads);//等待所有的saveThread完成存儲操作
      saveThreads.clear();
      storage.reportErrorsOnDirectories(ctx.getErrorSDs());
      ...略
      if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == ) {
        throw new IOException(
          "Failed to save in any storage directories while saving namespace.");
      }
     ...略
      //完成了img檔案的存取,實際上是存為一個中間檔案,以NameNodeFile.IMAGE_NEW開頭。
      //這時候就可以把這些中間檔案rename稱為最終正式檔案了
      最後一個參數false,代表不需要rename md5檔案,這是因為在FSImageSaver中生成臨時檔案的時候已經生成了最終的md5檔案
      renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf, false);
    } finally {  }
  }
           

通過

storage.dirIterator(NameNodeDirType.IMAGE)

擷取image檔案目錄的一個Iterator,然後逐個周遊這些目錄并将image檔案存放下來。對于多image目錄的情況,會為每一個目錄建立一個單獨線程,多目錄并行進行。但是,通過

waitForThreads(saveThreads);

,會同步等待所有目錄完成img存儲才會執行下一步,是以,盡管内部并行,但是

saveFSImageInAllDirs(...)

是一個同步方法,方法傳回時,肯定已經完成了将image檔案生成到所有目錄。

FSImageSaver類負責完成對把img檔案存放到某個img目錄,我們來看

FSImageSaver.saveFSImage(...)

方法:

/**
   * Save the contents of the FS image to the file.
   */
  void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
      NameNodeFile dstType) throws IOException {
    long txid = context.getTxId();//checkpoint的目标偏移位置
    File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
    File dstFile = NNStorage.getStorageFile(sd, dstType, txid);

    FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
    FSImageCompression compression = FSImageCompression.createCompression(conf);
    //按照壓縮配置,将img存入到以fsimage.ckpt開頭的中間檔案中
    saver.save(newFile, compression);
    MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());//儲存md5檔案
    storage.setMostRecentCheckpointInfo(txid, Time.now());
  }
           

saveFSImage()

方法可以看到,生成image檔案的時候,并不是直接将記憶體鏡像dump到對應的image磁盤目錄,而是會産生一個以

fsimage.ckpt

開頭的中間檔案,如:

fsimage.ckpt_0000000000992806947

,然後生成對應的MD5校驗檔案,如:

fsimage.ckpt_0000000000992806947.md5

。當多目錄image檔案全部完成了中間檔案的生成,再調用

renameCheckpoint(...)

方法,将所有目錄的中間檔案rename為最終的格式為

fsimage_0000000000992806947

的檔案;

本文不去詳述如何将記憶體鏡像讀出、壓縮、序列化的細節,也不去詳述image檔案的結構,這涉及到NameNode的檔案系統管理,需要較長篇幅去講解。我會另起博文進行講解。

檔案傳輸過程

如上文所講,

doCheckpoint()

操作包括了checkpoint和image檔案的傳輸,我們來看checkpoint完成以後,image檔案的傳輸。

doCheckpoint()

方法

//采用異步方式,将目前的img檔案發送到遠端的Active NameNode
    ExecutorService executor =
        Executors.newSingleThreadExecutor(uploadThreadFactory);
    Future<Void> upload = executor.submit(new Callable<Void>() {
      @Override
      public Void call() throws IOException {
        TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
            namesystem.getFSImage().getStorage(), imageType, txid, canceler);
        return null;
      }
    });
    executor.shutdown();
    try {
      upload.get();
    } catch (InterruptedException e) {
      //異常處理 略
    }
           

可以看到,通過ExecutorService建立了一個獨立線程負責檔案傳輸過程。這是因為image檔案一般較大,傳輸較為耗時,是以如果此時正好發生NameNode從standby到active的transition過程,這個transition會發生長時間阻塞,是以會建立一個單獨線程執行。

dfs.namenode.http-address

字首配置項設定了我們的NameNode的http位址:比如,我們在hdfs-site.xml中這樣配置:

<property>
                <name>dfs.namenode.http-address.datahdfsmaster.namenode1</name>
                <value>10.130.277.29:50070</value>
        </property>
           

此時,我們名字為

datahdfsmaster

的nameservice下的名字為

namenode1

的namenode的http位址是

10.130.277.29:50070

,這就是

namenode1

的http位址。

通過跟蹤

TransferFsImage.uploadImageFromStorage()

的代碼,我們可以非常詳細地看到根據Active NameNode的URI建構HttpURLConnection對象的建構,以及設定requestMethod為PUT、根據

dfs.image.transfer.chunksize

設定PUT請求的chunk size、設定http請求的逾時時間的全過程和細節,這裡也不再詳述;

使用Checkpoint Node和Backup Node進行checkpoint操作

當然,如果我們認為這樣也不是很好,可能會影響StandBy NameNode的工作,那麼我們可以單獨起一個程序,專門負責checkpoint工作,即使用單獨的Checkpoint Node(以下簡稱CN)或者Backup Node(以下簡稱BN)負責checkpoint的定時工作。

注意,Checkpoint Node和Backup Node在某種程度上幾乎替代了Standby NameNode的功能,是以,在HA模式下,無法啟動Checkpoint Node和Backup Node。必須使用非HA模式才可以啟動并使用Checkpoint Node。

Checkpoint Node會通過單獨線程,定時從Active NameNode拉取edit log檔案拷貝到本地,并且将這些edit log在自己記憶體中進行重演。如果checkpoint條件具備,将進行checkpoint操作,檔案存入本地,并通過HTTP PUT的方式将fsimage檔案傳輸給Active NameNode。

Backup Node除了具有Checkpoint Node的上述所有功能外,還會通過RPC方式(屬于stream的方式,差別于單獨定時拷貝)實時接收Active NameNode的edit操作。是以,同Checkpoint Node相比,Backup Node的記憶體映象在時間差上幾乎與Active NameNode一緻。

下面,我們具體來介紹Checkpoint Node、Backup Node的edit log和fsimage的處理機制,以及Backup Node同Active NameNode之間基于RPC通信實作的實時edit log傳輸。

我們從NameNode的啟動過程代碼可以看到,通過在NameNode啟動的時候設定啟動參數,可以将這個節點設定為Checkpoint Node或者Backup Node:

public static NameNode createNameNode(String argv[], Configuration conf)
      throws IOException {
    LOG.info("createNameNode " + Arrays.asList(argv));
    ...略
    StartupOption startOpt = parseArguments(argv);
    setStartupOption(conf, startOpt);
    switch (startOpt) {
      case FORMAT: {//NameNode 格式化
        ...略
      }
      case BACKUP://啟動一個 backup node
      case CHECKPOINT: {//啟動一個checkpoint node
        NamenodeRole role = startOpt.toNodeRole();
        DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
        return new BackupNode(conf, role);//無論是BACKUP還是CHECKPOINT,都使用BackupNode對象進行管理,唯一不同是角色不同,一個是NamenodeRole.BACKUP,一個是NamenodeRole.CHECKPOINT
      }
      ....略
      default: {
        DefaultMetricsSystem.initialize("NameNode");
        return new NameNode(conf);//正常啟動一個NameNode
      } } }
           

Checkpoint Node或者Backup Node的實作類都是BackupNode,Backup Node 是NameNode的子類,這說明Backup Node基本上與NameNode的功能一緻,所維護的中繼資料資訊相同,隻是有了定時checkpoint等功能,同時,Backup Node沒有對叢集中繼資料進行修改的權限。

NamenodeRole枚舉類型标記了NameNode節點的角色:

/**
   * Defines the NameNode role.
   */
  static public enum NamenodeRole {
    NAMENODE  ("NameNode"),//普通的NameNode節點,如Active NameNode或者Standby NameNode
    BACKUP    ("Backup Node"),//BACKUP Node
    CHECKPOINT("Checkpoint Node");//Checkpoint Node
    ...略
  }
           

Backup Node的初始化方法

initialize()

重寫了NameNode的initialize()方法,這個方法主要完成三項工作:

1. 握手:通過handshake,與Active NameNode進行版本協商等工作;通過handshare,CN/BN擷取了遠端Active NameNode的storage資訊,比如,namespaceId,clusterId, blockpoolId,将這些資訊設定到自己的storage資訊中。在下面的RPC通信中,NameNode會對收到的CN/BN請求中攜帶的storage資訊進行校驗,隻有校驗一緻,才會成功傳回,否則,認為是異常請求;

2. 注冊:向Active NameNode注冊自己,這樣,Active NameNode就可以在

startLogSegment()

等操作進行的時候,通過RPC調用,告知自己;

3. 啟動:啟動

Checkpointer

線程,負責不斷從Active NameNode拉取edit log檔案;

protected void initialize(Configuration conf) throws IOException {
    // Trash is disabled in BackupNameNode,
    // but should be turned back on if it ever becomes active.
    conf.setLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY, 
                 CommonConfigurationKeys.FS_TRASH_INTERVAL_DEFAULT);
    //通過handshake,擷取了遠端的Active Namenode的namespace資訊
    NamespaceInfo nsInfo = handshake(conf);
    super.initialize(conf);
    namesystem.setBlockPoolId(nsInfo.getBlockPoolID());

    if (false == namesystem.isInSafeMode()) {
      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
    }

    // Backup node should never do lease recovery,
    // therefore lease hard limit should never expire.
    namesystem.leaseManager.setLeasePeriod(
        HdfsConstants.LEASE_SOFTLIMIT_PERIOD, Long.MAX_VALUE);

    // register with the active name-node 
    registerWith(nsInfo);
    // Checkpoint daemon should start after the rpc server started
    runCheckpointDaemon(conf);
    InetSocketAddress addr = getHttpAddress();
    if (addr != null) {
      conf.set(BN_HTTP_ADDRESS_NAME_KEY, NetUtils.getHostPortString(getHttpAddress()));
    }
  }
           

在完成了handshare()即namespaceId、cluster id等資訊等協商以後,CN/BN開始向ActiveNameNode注冊自己:

注冊的代碼為

BackupNode.registerWith()

private void registerWith(NamespaceInfo nsInfo) throws IOException {
    BackupImage bnImage = (BackupImage)getFSImage();
    NNStorage storage = bnImage.getStorage();
    // verify namespaceID
    if (storage.getNamespaceID() == ) { // new backup storage
      storage.setStorageInfo(nsInfo);
      storage.setBlockPoolID(nsInfo.getBlockPoolID());
      storage.setClusterID(nsInfo.getClusterID());
    } else {
      nsInfo.validateStorage(storage);
    }
    bnImage.initEditLog(StartupOption.REGULAR);
    setRegistration();
    NamenodeRegistration nnReg = null;
    while(!isStopRequested()) {
      try {
        //通過getRegistration()擷取請求體,發送給Active NameNode。由于請求體中攜帶了角色資訊,
          //遠端的Active NameNode會根據角色判斷是否加入到自己的output stream中。通過registerBackupNode.registerBackupNode()
         //可以看到,隻有backup的角色會被添加到輸出流中去
        nnReg = namenode.registerSubordinateNamenode(getRegistration());
        break;
      } catch(SocketTimeoutException e) {  // name-node is busy
       ...略
      }
    }

    ...略
    nnRpcAddress = nnReg.getAddress();
  }
           

我們跟蹤registerWith()方法,可以看到注冊實際上使用的是NamenodeProtocol協定的

registerSubordinateNamenode()

接口。Active Namenode收到注冊請求以後,調用

FSNamesystem.registerBackupNode()

來進行服務端對請求的處理與響應:

void registerBackupNode(NamenodeRegistration bnReg,
      NamenodeRegistration nnReg) throws IOException {
    writeLock();//擷取全局鎖
    try {
    ...略
    //隻有當它的角色是BACKUP的時候,才會加入到FSEditLog.journalSet中,是以,Active NameNode隻會向Backup節點發送自己的edit操作
      //隻接受BACKUP節點的注冊資訊
      if (bnReg.getRole() == NamenodeRole.BACKUP) {
        getFSImage().getEditLog().registerBackupNode(
            bnReg, nnReg);
      }
    } finally {
      writeUnlock();
    }
  }
           

從上述代碼可以看到,Active NameNode對對請求者的身份類型進行了判斷。BN和CN在這裡産生差別:

  • 如果身份是

    NamenodeRole.BACKUP

    ,那麼Active NameNode會把這個用戶端加入到自己的output stream中,這樣,所有的Active NameNode上的edit操作,都會通過JournalPrototol協定的

    journal()

    接口(Active NameNode是JournalPrototol協定用戶端,而Backup Node是JournalPrototol協定服務端),發送給這個Backup Node。Backup Node收到這些操作内容以後,會将這些操作在記憶體中進行replay,即重演,這樣,Backup Node的記憶體與Active NameNode的記憶體基本一緻。根據官方文檔,由于Backup Node可以通過這種stream的方式實時擷取Active NameNode上的操作,是以沒有必要從Active NameNode拷貝fsimage檔案和edit log檔案;
The Backup node does not need to download fsimage and edits files from the active NameNode in order to create a checkpoint, as would be required with a Checkpoint node or Secondary NameNode, since it already has an up-to-date state of the namespace state in memory. The Backup node checkpoint process is more efficient as it only needs to save the namespace into the local fsimage file and reset edits.
           
  • 而如果用戶端身份是

    NamenodeRole.CHECKPOINT

    ,就沒有這個待遇了,Checkpoint Node雖然也會像Backup Node一樣在啟動的時候注冊(上文說過,它們是同一個實作類),但是,注冊的時候,Active NameNode不會将其加入到output stream中,即Active NameNode是不會通過JournalPrototol協定向Checkpoint Node發送實時edit操作的,是以,Checkpoint Node隻能通過

    Checkpointer.java

    線程同步遠端Active NameNode上的檔案,而無法通過JournalPrototol實時接收Active NameNode的寫操作。Checkpoint Node隻能通過不斷從Active NameNode下載下傳fsimage檔案和edit log檔案,然後将這些檔案讀取到記憶體并定時生車功能checkpoint操作,然後,将checkpoint檔案上傳到Active NameNode。

是以,由于Backup Node與Checkpoint Node的fsimage和edit log的同步代碼完全相同,除此以外,就是Backup Node比Checkpoint Node多出的實時資料傳輸過程,是以,下文将隻針對Backup Node,分成兩節,分别講解fsimage和edit log的檔案傳輸,以及Backup Node基于RPC實作的與NameNode之間的edit操作的傳輸。

使用Checkpointer與Active NameNode進行檔案同步

BackupNode.initialize()

方法的第三個工作,就是通過調用

runCheckpointDaemon(...)

啟動一個

Checkpointer

的線程(注意,這不是上一節講到的Standby Namenode節點使用的

StandbyCheckpointer.CheckpointerThread

),這個線程負責從遠端的Active NameNode上讀取edit log 和image檔案存入本地:

/**
   * Start a backup node daemon.
   */
  private void runCheckpointDaemon(Configuration conf) throws IOException {
    checkpointManager = new Checkpointer(conf, this);
    checkpointManager.start();//線程啟動
  }
           

這是Checkpointer線程的

run()

方法:

@Override
  public void run() {
    ...略
    while(shouldRun) {
      try {
        long now = now();
        boolean shouldCheckpoint = false;
        if(now >= lastCheckpointTime + periodMSec) {//判斷是否達到checkpoint時間間隔要求
          shouldCheckpoint = true;
        } else {
          long txns = countUncheckpointedTxns();
          if(txns >= checkpointConf.getTxnCount())//判斷是否到達checkpoint資料量要求
            shouldCheckpoint = true;
        }
        if(shouldCheckpoint) {
          doCheckpoint();
          lastCheckpointTime = now;
        }
      } catch(IOException e) {
        ...略
      }
      try {
        Thread.sleep(periodMSec);  } catch(InterruptedException ie) {  }
    }
  }
           

run()

方法同Standby NameNode的

CheckpointerThread.run()

方法相似,也是會判斷是否已經滿足了checkpoint條件,包括距離上一次checkpoint操作的時間是否超過門檻值(

dfs.namenode.checkpoint.txns

,預設1000000),以及目前沒有進行checkpoint操作的資料量是否超過門檻值(

dfs.namenode.checkpoint.period

,預設3600s,即1個小時),如果任意條件滿足,則調用

doCheckpoint()

操作進行checkpoint:

void doCheckpoint() throws IOException {
    BackupImage bnImage = getFSImage();
    NNStorage bnStorage = bnImage.getStorage();

    long startTime = monotonicNow();
    bnImage.freezeNamespaceAtNextRoll();
    //服務端調用,檢視NameNodeRpcServer.startCheckpoint(),服務端會結束掉正在寫的sgement檔案,開啟一個新的segment檔案
    //用戶端調用,檢視
    NamenodeCommand cmd = 
      getRemoteNamenodeProxy().startCheckpoint(backupNode.getRegistration());
    CheckpointCommand cpCmd = null;
    //關于這些傳回值的含義,可以參考FSImage.startCheckpoint()方法
    switch(cmd.getAction()) {
    //If backup storage contains image that is newer than or incompatible with 
    // what the active name-node has, then the backup node should shutdown. wuchang
      case NamenodeProtocol.ACT_SHUTDOWN://如果發現backup node的image比namenode的更新,或者storage的版本不一緻,肯定更有問題,這時候backup node需要關閉
        shutdown();
        throw new IOException("Name-node " + backupNode.nnRpcAddress
                                           + " requested shutdown.");
      case NamenodeProtocol.ACT_CHECKPOINT://校驗通過
        cpCmd = (CheckpointCommand)cmd;
        break;
      default:
        throw new IOException("Unsupported NamenodeCommand: "+cmd.getAction());
    }

    //BackupImage.namenodeStartedLogSegment()如果正在發生,那麼如果處于frozen,則必須等待
    bnImage.waitUntilNamespaceFrozen();

    //檢視服務端調用 NameNodeRPCServer.startCheckpoint()
    CheckpointSignature sig = cpCmd.getSignature();

    // Make sure we're talking to the same NN!
    sig.validateStorageInfo(bnImage);

    long lastApplied = bnImage.getLastAppliedTxId();
    LOG.debug("Doing checkpoint. Last applied: " + lastApplied);
    RemoteEditLogManifest manifest = //從NameNode處擷取edit log的檔案清單清單
      getRemoteNamenodeProxy().getEditLogManifest(bnImage.getLastAppliedTxId() + );

    boolean needReloadImage = false;
    if (!manifest.getLogs().isEmpty()) {
      RemoteEditLog firstRemoteLog = manifest.getLogs().get();//擷取第一個遠端的edit log檔案
      // we don't have enough logs to roll forward using only logs. Need
      // to download and load the image.
      //如果從遠端擷取的edit log檔案的transaction 與自己目前最後一次已經擷取的log檔案的transaction存在gap,需要進行reload
      if (firstRemoteLog.getStartTxId() > lastApplied + ) {
        //sig.mostRecentCheckpointTxId存放了Active NameNode在最後一個checkpoint的位點
        LOG.info("Unable to roll forward using only logs. Downloading " +
            "image with txid " + sig.mostRecentCheckpointTxId);
        MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
            backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId, bnStorage,
            true);//從遠端擷取這個位點的image檔案
        bnImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
            sig.mostRecentCheckpointTxId, downloadedHash);
        lastApplied = sig.mostRecentCheckpointTxId;//更新lastApplied id
        needReloadImage = true;
      }

      if (firstRemoteLog.getStartTxId() > lastApplied + ) {//在下載下傳了最新的image檔案以後,依然存在gap,則抛出異常
        throw new IOException("No logs to roll forward from " + lastApplied);
      }

      // get edits files
      for (RemoteEditLog log : manifest.getLogs()) {//依次下載下傳這些edit log檔案
        TransferFsImage.downloadEditsToStorage(
            backupNode.nnHttpAddress, log, bnStorage);
      }

      //剛才已經下載下傳了新的image檔案,是以需要将這個image檔案reload到記憶體
      if(needReloadImage) {
        LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
        File file = bnStorage.findImageFile(NameNodeFile.IMAGE,
            sig.mostRecentCheckpointTxId);
        bnImage.reloadFromImageFile(file, backupNode.getNamesystem());
      }
      rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());//将edit log應用到記憶體
    }

    long txid = bnImage.getLastAppliedTxId();

    backupNode.namesystem.writeLock();
    try {
      backupNode.namesystem.setImageLoaded();
      if(backupNode.namesystem.getBlocksTotal() > ) {
        backupNode.namesystem.setBlockTotal();
      }
      bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);//将目前記憶體鏡像dump到fsimage檔案
      bnStorage.writeAll();
    } finally {
      backupNode.namesystem.writeUnlock();
    }

    //将image檔案上傳給active namenode
    if(cpCmd.needToReturnImage()) {
      TransferFsImage.uploadImageFromStorage(backupNode.nnHttpAddress, conf,
          bnStorage, NameNodeFile.IMAGE, txid);
    }

    getRemoteNamenodeProxy().endCheckpoint(backupNode.getRegistration(), sig);//結束checkpoint過程

    //隻有backup 節點需要進行converge操作,追趕txid到最新的狀态
    //如果是checkpoint node,沒有這種實時性需求,隻需要依靠fsimage檔案和edit log檔案拷貝就可以完成
    if (backupNode.getRole() == NamenodeRole.BACKUP) {
      bnImage.convergeJournalSpool(); //調用完畢以後,狀态成為IN_SYNC
    }
    backupNode.setRegistration(); // keep registration up to date

    long imageSize = bnImage.getStorage().getFsImageName(txid).length();
    LOG.info("Checkpoint completed in "
        + (monotonicNow() - startTime)/ + " seconds."
        + " New Image Size: " + imageSize);
  }
           

doCheckpoint()

的主要執行流程如下:

1. 進行checkpoint之前,通過調用NameNodeProtocol協定的startCheckpoint()接口,告知NameNode,自己即将進行checkpoint操作。我們可以檢視服務端NameNodeRpcServer.startCheckpoint()得知Active NameNode接收到請求以後的調用流程,可以看到服務端在收到startCheckpoint()以後的處理流程:

  • 自我狀态校驗:判斷自己目前是不是Active NameNode(防止Standby NameNode成為了被checkpoint的節點);如果校驗失敗,則抛出異常;
  • 請求者身份校驗:判斷請求者的身份是否合法,即是否是

    NamenodeRole.CHECKPOINT

    或者

    NamenodeRole.BACKUP

    ,如果校驗失敗,則傳回一個shutdown指令,要求請求者停機;
  • 請求者的存儲資訊等版本校驗:對請求者BN/CN的storage資訊與自身的storage資訊(比如clusterId、namespaceId等)進行校驗,正常情況下,BN/CN在initialize()的handshake過程中會擷取到Active NameNode的storage資訊,是以,這個校驗會通過,是以校驗會通過。是以這一步是防止未經許可的BN/CN請求進行checkpoint;如果校驗失敗,則傳回一個shutdown指令,要求請求者停機;
  • 緩存的edit log進行flush操作:在我的上一篇部落格中講過NameNode通過RPC協定将自己的edit log實時發往JournalNode的過程,這個過程使用了雙緩存。那麼在收到了startCheckpoint()的請求以後,Active NameNode會進行緩存的重新整理,把緩存的未發送的資料發送到遠端Backup Node,然後關閉目前的edit log檔案,開始一個新的edit log檔案;這樣,最近關閉的檔案就可以被BN/CN的checkpoint請求到了。注意,BN/CN從Active NameNode請求讀取edit log檔案的時候,是不會請求處于in-progress狀态的檔案的。
    1. 當完成了startCheckpoint()請求以後,如果校驗通過,就可以進行下一步的fsimage和edit log檔案的同步了,這個同步操作發生在

      doCheckpoint()

      方法中。

doCheckpoint()

方法是整個線程的核心方法,涉及到流程複雜的與Active NameNode之間的通信,是以,我來詳細講解:

STEP 1. 記憶體namespace鎖定

鎖定記憶體的namespace:鎖定的目的,是讓RPC streaming傳輸過來的實時的edit操作隻能寫入edit檔案,不可以load到記憶體

在上文中我講到,BN除了通過這種方式實作fsimage和edit log檔案的同步以外,還會通過RPC stream的方式實時接收edit。這兩種方式互為補充,才使得BN的記憶體鏡像能夠與Active NameNode保持實時一緻。同時,任何時候,兩者隻有一個能夠将edit 操作在記憶體中重演,必須互斥。我們首先來看BackNode的狀态定義:

volatile BNState bnState;
  static enum BNState {
    DROP_UNTIL_NEXT_ROLL,// BN/CN初始啟動的狀态,隻要來一次寫操作,狀态就會切換到JOURNAL_ONLY
    JOURNAL_ONLY,//處于該狀态下,來自RPC Streaming的edit隻可以追加到edit log檔案,不可以應用到記憶體
    IN_SYNC;//處于該狀态下,來自RPC Streaming的edit會先在記憶體中重演,同時寫入到edit lot檔案
  }
           

通過

bnImage.freezeNamespaceAtNextRoll();

,将stopApplyingEditsOnNextRoll标記為置為true,這樣,通過RPC方式的JournalProtocol的startLogSegment()接口告知BN的時候,BN會将目前的狀态置為JOURNAL_ONLY,即不會将新的edit操作load到記憶體。

然後,通過

bnImage.waitUntilNamespaceFrozen();

,一緻等待狀态變為JOURNAL_ONLY,在這種狀态下,通過RPC stream傳過來的edits操作不會load到記憶體,進而達到互斥目的;

通過上面的鎖定和等待,目前狀态已經切換為JOURNAL_ONLY,是以,Checkpointer線程就可以從NameNode讀取fsimage檔案和edit log檔案,然後load到記憶體了;開始STEP 2;

STEP 2.請求檔案清單清單

确認記憶體已經被自己加鎖,則開始請求Active NameNode的fsimage和image檔案清單清單:

RemoteEditLogManifest manifest = //從NameNode處擷取edit log的檔案清單清單
      getRemoteNamenodeProxy().getEditLogManifest(bnImage.getLastAppliedTxId() + );
           

這是通過NameNodeProtocol的RPC協定的

getEditLogManifest()

接口完成的:

/**
   * Return a structure containing details about all edit logs
   * available to be fetched from the NameNode.
   * @param sinceTxId return only logs that contain transactions >= sinceTxId
   */
  @Idempotent
  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
    throws IOException;
           

參數sinceTxId規定了這些edit log的起始位置,從代碼中可以看到,這個請求參數設定為

bnImage.getLastAppliedTxId() + 1

,即,Backup Node希望請求到的edit log能夠剛好從自己記憶體中的最大的txId的下一個值開始;

STEP 3. 根據檔案清單請求edit log檔案并load到記憶體

這時候會有一個異常判斷,如果Active NameNode傳回的edit log檔案的最小的txId大于自己請求的sinceTxId,中間存在gap,此時,這些傳回的edit是不能夠apply到記憶體中的,因為txId必須嚴格連續遞增,不可丢失任何一個。我們看Active NameNode端在對于BN的

getEditLogManifest()

請求的處理(NameNodeRpcServer.getEditLogManifest() -> FSEditLog.getEditLogManifest() -> JournalSet.getRemoteEditLogs() -> FileJournalManager->getRemoteEditLogs()):

public List<RemoteEditLog> getRemoteEditLogs(long firstTxId,
      boolean inProgressOk) throws IOException {
    File currentDir = sd.getCurrentDir();//擷取目前這個Journal對應的存放EditLog的目錄
    List<EditLogFile> allLogFiles = matchEditLogs(currentDir);//通過正則比對擷取候選的EditLogFile
    List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
        allLogFiles.size());
    for (EditLogFile elf : allLogFiles) {
      ...略
      if (elf.getFirstTxId() >= firstTxId) {//如果目前的segment的第一個txid大于所請求的txId,則加入到傳回結果中
        ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId,
            elf.isInProgress()));
      } else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) {
        //如果目前的segment檔案的startTxId和endTxId位于請求的txId前後,則segment也是滿足條件的
        ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId,
            elf.isInProgress()));
      }
    }

    Collections.sort(ret);//對edit log檔案排序
    return ret;
  }
           

從上述代碼可以看到用戶端如何根據

getEditLogManifest()

中的sinceTxId參數決定傳回哪些檔案。

是以存在這種情況,伺服器端傳回的edit log中最小的txId的值大于請求的sinceTxId。這種情況是可能發生的,比如Active NameNode目前fsimage檔案的txId到130,然後edit log檔案從131開始,BN請求的edit log的sinceTxId為120,此時就隻能傳回從131開始的edit log。遇到這種情況,BN的處理方式為直接将Active NameNode的fsimage檔案下載下傳下來,然後實作txid的完全連續一緻,這相當于将Active NameNode的fsimage和edit log進行了一次完全的複制。fsimage複制過來以後,通過

bnImage.reloadFromImageFile(file, backupNode.getNamesystem());

将fsimage load到記憶體,然後才能開始把請求過來的edit log load到記憶體。當然,如果将Active NameNode的fsimage檔案下載下傳下來發現txId依然無法實作連續,隻能抛出異常了。完成了以上操作,就通過

rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());

将edit log檔案 load到記憶體。

STEP 4.checkpoint檔案的生成并上傳給Active NameNode

經過以上步驟,BN完成了記憶體中namesystem與Active NameNode的一緻。此時,就開始進行checkpoint了,把記憶體的整個鏡像dump成fsimage檔案。然後,把這個checkpoint檔案上傳到Active NameNode。

STEP 5.Backup Node對處于打開狀态的edit log的彙集

我們知道,Backup Node除了有Checkpointer線程進行檔案拷貝,同時使用RPC stream進行edit log的流式傳輸。在上文中講到BNState這個控制Backup Node的狀态,目前,

doCheckpoint()

方法執行過程中,處于

BNState.JOURNAL_ONLY

,來自RPC stream的edits操作隻會寫入到edits檔案,不會load到記憶體。是以,通過調用

bnImage.convergeJournalSpool();

,來将這個未打開的edits檔案中的edits load到記憶體,這樣,Backup Node記憶體中的景象幾乎與Active NameNode保持一緻了;

完成了converge操作,BN的狀态就切換為BNState了,這時候,通過RPC stream收到的edit log在存入edit log檔案的同時,也應用到記憶體,讓記憶體保持與Active NameNode實時一緻;關于RCP stream針對不同狀态的處理方式,看下一節介紹。

Backup Node與Active NameNode進行基于JournalProtocol RPC協定進行實時edit操作同步

上一節講Checkpointer線程的時候講到了RPC streaming與Checkpointer之間的同步,我們可以從

journal()

方法看到通過JournalProtocol協定的

journal()

接口收到了Active NameNode的一批edit操作以後,BN根據目前的不同狀态進行的不同處理,看

BackupImage.journal(...)

synchronized void journal(long firstTxId, int numTxns, byte[] data) throws IOException {
    if (LOG.isTraceEnabled()) {
      LOG.trace("Got journal, " +
          "state = " + bnState +
          "; firstTxId = " + firstTxId +
          "; numTxns = " + numTxns);
    }

    switch(bnState) {
      case DROP_UNTIL_NEXT_ROLL:
        return;//什麼都不做,既不用apply到記憶體,也不寫入到本地
      case IN_SYNC://處于IN_SYNC狀态,則将這些消息應用到記憶體
        applyEdits(firstTxId, numTxns, data);//将收到的edits應用到記憶體的namespace
        break;//break以後
      case JOURNAL_ONLY://處于JOURNAL_ONLY,則這一批來自rpc的消息不可以load到記憶體的namespace,隻能夠追加到磁盤的edit檔案
        break;//需要把收到的edit log寫入到本地
      default:
        throw new AssertionError("Unhandled state: " + bnState);//異常狀态
    }
    logEditsLocally(firstTxId, numTxns, data);//将這些操作追加到目前的edit log磁盤檔案,下一次Checkpointer運作,通過調用convergeJournalSpool(),可以負責把這個處于in-progress狀态的檔案裡面的edits操作load到namespace
  }
           

我們先看JOURNAL_ONLY狀态,上一節講到Checkpointer線程的時候,整個BN處于JOURNAL_ONLY狀态,是以,通過RPC方式收到的edits隻會追加到edits檔案,不會調用

applyEdits(firstTxId, numTxns, data);

load到記憶體,這樣,Checkpointer線程拷貝過來的edits檔案或者fsimage檔案就可以load到記憶體了。等Checkpointer結束一輪運作,狀态切換為IN_SYNC,那麼記憶體的操作全就傳遞給RPC了。

總之,HDFS使用Backup Node、Checkpoint Node以及Standby Namenode進行checkpoint操作,原理均相同。它們的存在,既保證了Active NameNode的資料備份,又将Active NameNode從不影響核心業務的checkpoint操作中解脫出來。當Active NameNode發生異常,Standby Namenode可以很快接管HDFS。

對于Backup Node ,個人認為設計得非常好的地方,在于通過良好的同步控制,讓http的方式的檔案拷貝和RPC stream的方式的實時edit傳輸交替配合,讓Backup Node的記憶體鏡像與Active NameNode時刻保持近乎一緻;

繼續閱讀