天天看點

[源碼解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化

前文我們對DDP的一些支撐子產品已經做了介紹,這為本文做了必要的鋪墊,本文就開始介紹Python世界代碼和C++世界的初始化部分。下文介紹C++世界的核心代碼。

目錄

[源碼解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化

0x00 摘要

0x01 綜述

1.1 資料并行

1.2 DDP架構

1.2.1 分布式資料并行

1.2.2 程序

1.3 DDP 總體實作

0x02 初始化

2.1 init

2.2 建構參數

2.2.1 _build_params_for_reducer

2.2.2 modules_buffers

2.3 驗證模型

2.3.1 背景知識

2.3.2 具體代碼

2.4 廣播狀态

2.4.1 state_dict

2.4.2 _sync_params_and_buffers

2.4.3 dist._broadcast_coalesced

2.5 初始化功能函數

2.5.1 _ddp_init_helper

2.5.2 計算分桶

2.5.2.1 論文内容

2.5.2.2 分組依據

2.5.2.3 compute_bucket_assignment_by_size

2.5.3 Reducer

0xFF 參考

本系列其他文章如下:

深度學習利器之自動微分(1)

深度學習利器之自動微分(2)

[源碼解析]深度學習利器之自動微分(3) --- 示例解讀

[源碼解析]PyTorch如何實作前向傳播(1) --- 基礎類(上)

[源碼解析]PyTorch如何實作前向傳播(2) --- 基礎類(下)

[源碼解析] PyTorch如何實作前向傳播(3) --- 具體實作

[源碼解析] Pytorch 如何實作後向傳播 (1)---- 調用引擎

[源碼解析] Pytorch 如何實作後向傳播 (2)---- 引擎靜态結構

[源碼解析] Pytorch 如何實作後向傳播 (3)---- 引擎動态邏輯

[源碼解析] PyTorch 如何實作後向傳播 (4)---- 具體算法

[源碼解析] PyTorch 分布式(1)------曆史和概述

[源碼解析] PyTorch 分布式(2) ----- DataParallel(上)

[源碼解析] PyTorch 分布式(3) ----- DataParallel(下)

[源碼解析] PyTorch 分布式(4)------分布式應用基礎概念

[源碼解析] PyTorch分布式(5) ------ DistributedDataParallel 總述&如何使用

[源碼解析] PyTorch分布式(6) ---DistributedDataParallel -- 初始化&store

[源碼解析] PyTorch 分布式(7) ----- DistributedDataParallel 之程序組

[源碼解析] PyTorch 分布式(8) -------- DistributedDataParallel之論文篇

DDP是資料并行訓練的實作,為了喚醒大家的記憶,我們還是要看看資料并行的一個整體流程,來自fairscale github源碼。

[源碼解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化

以下文字翻譯自 https://pytorch.org/docs/master/notes/ddp.html,這是DDP架構的一個總論。

下面是 DDP 實作元件。堆棧圖顯示了代碼的結構。

[源碼解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化

我們順着此架構圖從上往下看。

最上面是分布式資料并行元件。

Distributed.py:

這是 DDP 的 Python 入口點。它實作了初始化步驟,對應了<code>nn.parallel.DistributedDataParallel</code>子產品的<code>forward</code>函數,該子產品會調用C++庫。

它的<code>_sync_param</code>功能是:當一個DDP程序在多個裝置上工作時,會執行程序内參數同步,并且它還從rank 0 程序向所有其他程序廣播模型緩沖區。

程序間參數同步在 <code>Reducer.cpp</code>之中實作。

comm.h:實作合并廣播助手函數(coalesced broadcast helper ),該函數在初始化期間被調用以廣播模型狀态,并在前向傳播之前同步模型緩沖區。

reducer.h:提供反向傳播中梯度同步的核心實作。它具有三個入口點函數:

<code>Reducer</code>: 其構造函數在<code>distributed.py</code>被調用,<code>Reducer</code>将注冊 <code>Reducer::autograd_hook()</code>到梯度累加器。

<code>autograd_hook()</code> 當梯度就緒時,autograd 引擎将調用該函數。

<code>prepare_for_backward()</code>在 <code>distributed.py</code>之中,當 DDP 前向傳遞結束時,會調用<code>prepare_for_backward()</code>。如果在DDP構造函數中,把<code>find_unused_parameters</code>設定為<code>True</code>,DDP 會周遊 autograd 計算圖以查找未使用的參數。

以下是兩個程序相關元件。

ProcessGroup.hpp :包含所有程序組實作的抽象 API。<code>c10d</code> 庫提供了 3 個開箱即用的實作,即 ProcessGroupGloo,ProcessGroupNCCL和ProcessGroupMPI。 <code>DistributedDataParallel</code>用<code>ProcessGroup::broadcast()</code>在初始化期間将模型狀态從rank 0 的程序發送到其他程序,并對<code>ProcessGroup::allreduce()</code>梯度求和。

Store.hpp :協助程序組執行個體的集合服務找到彼此。

我們把論文和 https://pytorch.org/docs/master/notes/ddp.html 結合起來,看看 DDP 總體實作。

[源碼解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化

我們總結一次DistributedDataParallel疊代中的步驟如下(與上圖不完全一緻,有部分細化):

Prerequisite:

DDP 依賴 c10d<code>ProcessGroup</code>進行通信。是以,應用程式必須<code>ProcessGroup</code>在建構 DDP 之前建立執行個體。

Constuctor:

rank 0 程序會引用本地子產品,把模型<code>state_dict()</code>參數廣播到所有程序之中,這樣可以保證所有程序使用同樣初始化數值和模型副本進行訓練。

每個 DDP 程序建立一個 local <code>Reducer</code>,稍後将在向後傳遞期間處理梯度同步。

為了提高通信效率,<code>Reducer</code>将參數梯度組織成桶,一次規約一個桶。

初始化桶,按照逆序把 parameters 配置設定到桶之中,這樣可以提高通信效率。

可以通過設定DDP 構造函數中的參數bucket_cap_mb來配置桶的大小。

從參數梯度到桶的映射是在建構時根據桶大小限制和參數大小确定的。模型參數以(大緻)<code>Model.parameters()</code>與給定模型相反的順序配置設定到桶中 。使用相反順序的原因是因為 DDP 期望梯度在反向傳遞期間以大約該順序準備就緒。

下圖顯示了一個示例。請注意,<code>grad0</code>和<code>grad1</code>在 <code>bucket1</code>中,另外兩個梯度在 <code>bucket0</code>中。當然,這種假設可能并不總是正确的,當這種情況發生時,它可能會損害 DDP 後向速度,因為它無法 <code>Reducer</code>盡早開始通信。

[源碼解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化

除了分桶,<code>Reducer</code>還在構造期間注冊 autograd 鈎子,每個參數一個鈎子。當梯度準備好時,将在向後傳遞期間觸發這些鈎子。具體就是周遊參數,為每個參數加上 grad_accumulator 和 autograd_hook。

Forward Pass:

每個程序讀去自己的訓練資料,DistributedSampler確定每個程序讀到的資料不同。

DDP 擷取輸入并将其傳遞給本地模型。

模型進行前向計算,結果設定為 out。現在計算都是在每個程序(CUDA裝置)上完成。

如果<code>find_unused_parameters</code>設定為<code>True</code>,DDP 會分析本地模型的輸出,從 out 開始周遊計算圖,把未使用參數标示為 ready,因為每次計算圖都會改變,是以每次都要周遊。

此模式(Mode)允許在模型的子圖上向後運作,并且 DDP 通過從模型輸出out周遊 autograd 圖并将所有未使用的參數标記為就緒,以減少反向傳遞中涉及的參數。

在向後傳遞期間,<code>Reducer</code>隻會等待未準備好的參數,但它仍然會規約所有桶。将參數梯度标記為就緒并不能幫助 DDP 跳過桶,但它會阻止 DDP 在向後傳遞期間永遠等待不存在的梯度。

請注意,周遊 autograd 圖會引入額外的開銷,是以應用程式僅應必要時才設定 <code>find_unused_parameters</code>為<code>True</code> 。

傳回out。模型網絡輸出不需要gather到 rank 0程序了,這與 DP不同。

Backward Pass:

<code>backward()</code>在 loss 上直接調用該函數 <code>Tensor</code>,這是 DDP 無法控制的,DDP 使用構造時注冊的 autograd hooks 來觸發梯度同步。當一個梯度準備好時,它在該梯度累加器上的相應 DDP 鈎子将觸發。

在 autograd_hook 之中進行all-reduce。假設參數index是param_index,則利用param_index擷取到參數,标示為ready,如果某個桶裡面梯度都ready,則該桶是ready。

當一個桶中的梯度都準備好時,會 在該桶上<code>Reducer</code>啟動異步<code>allreduce</code>以計算所有程序的梯度平均值。

如果所有桶都ready,則等待所有 all-reduce 完成。當所有桶都準備好時,<code>Reducer</code>将阻塞等待所有<code>allreduce</code>操作完成。完成此操作後,将平均梯度寫入<code>param.grad</code>所有參數的字段。

所有程序的梯度都會reduce,更新之後,大家的模型權重都相同。是以在向後傳播完成之後,跨不同DDP程序的對應的相同參數上的 grad 字段應該是相等的。

不需要像 DP 那樣每次疊代之後還要廣播參數。但是 Buffers 還是需要在每次疊代由 rank 0 程序廣播到其他程序之上。

Optimizer Step:

從優化器的角度來看,它正在優化本地模型。

所有 DDP 程序上的模型副本都可以保持同步,因為它們都從相同的狀态開始,并且在每次疊代中都具有相同的平均梯度。

因為 Python 世界是可以在很多時刻給類設定成員變量,是以我們還是從 <code>__init__</code> 看起。

其核心邏輯是:

設定裝置類型。

設定裝置IDs。

設定 self.process_group,預設就是 GroupMember.WORLD。

配置各種類成員變量。

檢查 parameters。

設定bucket大小。

建構參數。

将 rank 0 的state_dict() 廣播到其他worker,以保證所有worker的模型初始狀态相同。

建立reducer。

具體代碼如下:

我們接下來選擇一些重要步驟進行分析。

對于 DDP,第一個關鍵步就是建構參數,這裡要注意,如果目前情況是單機多GPU,也就是單程序多裝置(和DP一樣了)情況,那麼需要在程序之内進行模型複制。

但是未來不會支援了,會去掉。是以 parameters 就是 [ToyModel] 的參數集合,parameters[0] 就是 ToyModel 的參數。後面介紹 BucketReplica 會提到。

我們看看模型中有哪些重要參數:

parameter :在反向傳播之中需要被optimizer更新的參數。我們可以通過 <code>model.parameters()</code> 得到這些參數。

buffer : 在反向傳播過程之中不需要被optimizer更新的參數。我們可以通過 <code>model.buffers()</code> 得到這些參數。

具體 _build_params_for_reducer 就為reducer建立參數,邏輯大緻如下:

周遊_module_copies,得到(module, parameter)清單 modules_and_parameters,這些參數是需要求導的,不能在忽略清單之中。

用集合去除可能在多個modules中共享的參數。

建構一個參數清單。

檢查是否一個module期盼一個sparse梯度,把結果放到 expect_sparse_gradient 之中。

得到module的參數,與下面的buffer一起,都是用來同步到其他worker的。

得到module的buffer,module_buffers 在後續同步時候會用到。

傳回參數清單和expect_sparse_gradient。

此時 parameters 示例如下,可以看到其隻有 [0] 元素有意義,這個 [0] 原始本身包括4個元素:

這裡多說一句,何處用到 self.modules_buffers?後來在廣播參數時候就會用到,比如:

這裡使用了 _find_common_rank 來得到目前 DDP 使用的所有有效 ranks。

接下來是驗證模型階段。

因為後續用到了如下代碼,是以我們首先看看背景知識 broadcast。不熟悉這部分的朋友會有疑問是:為什麼 broadcast 可以從 rank 0 廣播到其他rank,明明所有rank都調用到了同樣的 broadcast 代碼。

我們來到 torch/lib/c10d/ProcessGroupMPI.cpp。可以看到,其使用了 MPI 的 MPI_Bcast API 來進行廣播操作,其中 opts.rootRank是關鍵所在。

opts 是 BroadcastOptions 的執行個體。

在 C++ 世界對應了如下:

在定義時候看到,BroadcastOptions 被C++自動初始化為0,是以所有 rank 的程序都是使用 rootRank = 0 進行調用 MPI_Bcast,結果就是從 rank = 0 來向其他 rank 進行廣播。

我們接下來看看如何驗證模型。

_verify_model_across_ranks 的作用是驗證模型(replica 0)的相關參數在廣播之後,跨程序時候擁有同樣的size/strides。

通過下面代碼我們可知,_verify_model_across_ranks 實際調用到verify_replica0_across_processes。

verify_replica0_across_processes 之中,參數model_replicas 就是前面的 parameters,其邏輯如下:

首先,從 model_replicas 得到 metadata。

然後把metadata克隆到metadata_dev。

然後,把 process 0 的 metadata_dev 廣播到對應的裝置。

每個程序都會運作同樣的代碼,但是 process_group-&gt;broadcast 之中,隻有 rank 0 會設定為 root_rank,這樣就隻廣播 rank 0 的資料。

廣播之後,所有程序的 metadata_dev 都一樣,就是 process 0 内的資料。

然後把 metadata_dev 拷貝回 control,把 control 和 model_replicas[0]比較,看看是否和原來相等。

檢查 control 是否和 model_replicas 的尺寸一樣。

這裡使用了 accessor,LibTorch 使用 accessor 快速通路 Tensor,如果 tensor 在CPU上,使用 accessor,如果在 GPU上,使用 packed_accessor 通路,這部分在 "核心開發者全面解讀PyTorch 内部機制" 有相關提及。

下一步是廣播狀态,把模型初始參數和變量從 rank 0 廣播到其他 ranks。

我們先來看看需要廣播什麼。

pytorch 的 state_dict 是一個字典對象,其将模型的每一層與它的對應參數建立映射關系,比如 model 每一層的weights及偏置等等。隻有那些參數可以訓練的層(比如卷積層,線性層等)才會被儲存到模型的state_dict中,池化層、BN層這些本身沒有參數的層就不會儲存在 state_dict 之中,比如針對下面模型。

state_dict 如下:

_sync_params_and_buffers 是依據 module的state_dict 來收集可以訓練的參數,然後把這些參數廣播出去。

具體代碼是:

我們看看,<code>_distributed_broadcast_coalesced</code>調用了 <code>dist._broadcast_coalesced</code>

我們沿着代碼來尋找,首先來到 torch\distributed_init_.py,這裡會導入 _broadcast_coalesced。

我們繼續找到 torch\csrc\distributed\c10d\init.cpp

最後來到了 torch/lib/c10d/comm.cpp,這裡利用 ProcessGroup 對張量進行廣播。

對于BroadcastWork,我們補充說明一下,就是利用 ProcessGroup 來把張量廣播出去,ProcessGroup 具體可以參見前面文章。

接下來會調用 _ddp_init_helper 進行初始化業務函數。

_ddp_init_helper 是用來初始化業務的函數,其主要邏輯如下:

對參數進行分桶,盡可能按照前向傳播的逆序(前向傳播中先計算出來的梯度,會先反向傳播)把參數配置設定平均配置設定入桶,這樣可以提高通信速度和歸并速度;

重置分桶狀态;

生成一個Reducer,其内部會注冊 autograd_hook,其用來在反向傳播時候進行梯度同步;

進行logging配置;

給SyncBatchNorm Layer傳遞 DDP handle;

首先,_compute_bucket_assignment_by_size 完成了分桶功能。這裡parameters[0] 就是對應的張量清單。

我們接下來就要結合論文内容來分析。

梯度bucketing的思想是基于這樣一個觀察,即集合通信在大張量上更有效。 實驗表明,如果DDP在短時間内等待并将多個梯度存儲到一個AllReduce操作中,它可以實作更高的吞吐量和更低的延遲,而不是在每個梯度存儲可用時立即啟動專用的AllReduce。這對于具有許多小參數的模型尤其有用。但是,DDP不應在一個AllReduce中傳輸所有資料,否則,在計算結束之前無法啟動任何通信。 參數到桶映射(Parameter-to-Bucket Mapping)對DDP速度有相當大的影響。在每次向後傳播中,将所有參數梯度中的張量複制到桶中,并在AllReduce之後将平均梯度複制回桶中。為了加速複制操作,存儲桶始終與參數在同一裝置上建立。如果模型跨越多個裝置,DDP會考慮裝置關聯性,以確定同一存儲桶中的所有參數都位于同一裝置上。AllReduce的順序也會對結果産生影響,因為它決定了多少通信可以與計算重疊。DDP按model.parameters()的相反順序啟動AllReduce。

是以,為了提高通信效率,DDP 将<code>Reducer</code>參數梯度組織成為桶,一次規約一個桶。從參數梯度到桶的映射是在建構時根據桶大小限制和參數大小确定的,。使用者可以通過設定bucket_cap_mb來配置桶的大小。

模型參數以(大緻)<code>Model.parameters()</code>與給定模型相反的順序配置設定到桶中 。使用相反順序的原因是:

反向傳播的次序是前向傳播計算的反序。

DDP 期望梯度在反向傳遞期間以前向傳播的大緻順序來就緒。

DDP 按照類型和裝置作為key來分組,因為不同裝置上的tensor不應該分在一組上,同類型張量應該分在一桶。用類型和裝置作為key 就可以保證同裝置上同類型張量配置設定在同一個桶裡。

其關鍵結構如下,BucketAccumulator 可以認為是實際的桶。

我們來看看 compute_bucket_assignment_by_size的具體邏輯:

定義了桶大小限制清單。bucket_size_limit_iterators。

定義了所有桶的清單 buckets,每一個實際桶可以認為是 BucketAccumulator。

周遊傳入的所有張量:

給所有的tensor一個index,從0開始遞增,一直到 tensors.size(),如果已經傳入了 indices,就拿到張量的index。

如果配置了期待sparse gradient,則把這個張量自己放入一個桶,因為沒法和其他張量放在一起。

使用張量資訊建構桶的key,找到對應的桶。

拿到BucketAccumulator,往該桶的張量清單裡面插入新張量的index,indices 是 tensor index list。

增加對應桶大小。

如果需要,就設定成大小限制的初始值。

拿到目前最小值限制。

如果桶的尺寸大于最小值限制,就是說目前桶的尺寸已經達到了桶的最大限制,按說需要轉移到新桶了。

實際上确實轉移到了邏輯上的新桶,但是實際還是在現有桶内執行,因為 type, device 還是同樣的,還是應該在原有桶内繼續累積,不過原有桶的indice已經轉移到了result之中,就相當于清空了。

把桶内容插入到傳回result,就是說,當桶尺寸過大的時候,就先插入到result之中。

重新生成桶,bucket是個引用,是以直接指派,就相當于清空原有的桶,就是原來桶繼續用,但是桶内原有的indices已經轉移到了result之中。

前進到下一個尺寸限制。

把剩餘的桶内indices插入到傳回值,因為之前已經有些直接插入到了result之中。

對result 進行排序:

如果 tensor_indices 非空,說明張量的順序已經是梯度準備好的順序,不需要再排序了。

如果 tensor_indices 是空的,依據最小張量index來排序,這裡假定張量的順序是他們使用的順序(或者說是他們梯度産生次序的反序)。這種排序可保證桶是按照連續不斷的順序準備好。

注意,這裡就是正序排列,等到建立Reducer的時候,才反序傳入:list(reversed(bucket_indices))。

最後傳回 result,result 最終如下,裡面每個vector 都對應了一個bucket,裡面是都是 tensor 的 index,這裡都是從小到大順序排序。

result 最終如下,裡面每個vector 都對應了一個bucket,裡面是都是 tensor 的 index,這裡都是從小到大順序排序。

這裡注意的是:因為 傳入參數 tensors就是 parameters[0],而 parameters[0] 是按照 parametes() 的傳回結果來的,即,模型參數以(大緻)<code>Model.parameters()</code>與給定模型相反的順序配置設定到桶中 。使用相反順序的原因是因為 DDP 期望梯度在反向傳遞期間以大約該順序準備就緒。最終 DDP 是按model.parameters()的相反順序啟動AllReduce。

接下來的代碼就是生成了一個Reducer。

我們在後續文章中會詳細介紹 Reducer。

pytorch分布式系列3——分布式訓練時,torch.utils.data.distributed.DistributedSampler做了什麼?

pytorch分布式系列1——搞清torch.distributed.launch相關的環境變量

pytorch分布式系列2——DistributedDataParallel是如何做同步的?

pytorch(分布式)資料并行個人實踐總結——DataParallel/DistributedDataParallel

Pytorch的nn.DataParallel

https://discuss.pytorch.org/t/dataparallel-imbalanced-memory-usage/22551/20

https://pytorch.org/docs/stable/distributed.html

PyTorch 源碼解讀之分布式訓練了解一下?

實操教程|PyTorch AutoGrad C++層實作

PYTORCH 自動微分(一)

PyTorch如何加速資料并行訓練?分布式秘籍大揭秘

pytorch分布式訓練(二init_process_group)

https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

https://pytorch.org/docs/master/notes/ddp.html

https://pytorch.org/tutorials/intermediate/dist_tuto.html

PyTorch 源碼解讀之 DP &amp; DDP:模型并行和分布式訓練解析

Pytorch模型中的parameter與buffer