通過上文分析,我們已經知道了 DDP 的基本架構和如何初始化,本文就看看其核心 Reducer 的靜态架構。
目錄
[源碼解析] PyTorch 分布式(10)------DistributedDataParallel之Reducer靜态架構
0x00 摘要
0x01 引論1.1 調用
0x02 Reducer 定義
0x03 Bucket
3.1 設計
3.2 定義
3.2.1 BucketReplica有幾個
3.2.2 關鍵
3.2.3 具體定義
3.3 設定
0x03 BucketReplica
3.1 Views
3.3 初始化
0x04 查詢類
4.1 VariableIndex
4.1.1 成員變量
4.1.2 定義
4.2 VariableLocator
4.2.1 定義
4.2.2 成員變量
4.2.2.1 初始化
4.2.2.2 使用
0x05 累積相關類
5.1 grad_accumulators_
5.1.1 初始化
5.1.2 使用
5.2 gradAccToVariableMap_
5.2.1 初始化
5.2.2 使用
5.3 numGradHooksTriggeredMap_
5.3.1 初始化
5.3.2 使用
5.4 numGradHooksTriggeredMapPerIteration_5.4.1 使用
5.5 perIterationReadyParams_
5.5.1 設定
5.5.2 重置
5.5.3 使用
5.6 使用過的參數
5.6.1 論文
5.6.2 初始化
5.6.3 重置
5.6.4 設定
5.6.5 使用
5.7 計算梯度支撐類
5.7.1 RpcContext
5.7.2 hooks_
5.7.3 comm_hook_
5.7.3.1 概念
5.7.3.2 使用
5.7.4 runGradCallbackForVariable
5.7.4.1 Reducer
5.7.4.2 DistAutogradContext
0xFF 參考
通過上文分析,我們已經知道了 DDP 的基本架構和如何初始化,本文就看看其核心 Reducer 的靜态架構。Reducer提供了反向傳播中梯度同步的核心實作。
本系列其他文章如下:
深度學習利器之自動微分(1)
深度學習利器之自動微分(2)
[源碼解析]深度學習利器之自動微分(3) --- 示例解讀
[源碼解析]PyTorch如何實作前向傳播(1) --- 基礎類(上)
[源碼解析]PyTorch如何實作前向傳播(2) --- 基礎類(下)
[源碼解析] PyTorch如何實作前向傳播(3) --- 具體實作
[源碼解析] Pytorch 如何實作後向傳播 (1)---- 調用引擎
[源碼解析] Pytorch 如何實作後向傳播 (2)---- 引擎靜态結構
[源碼解析] Pytorch 如何實作後向傳播 (3)---- 引擎動态邏輯
[源碼解析] PyTorch 如何實作後向傳播 (4)---- 具體算法
[源碼解析] PyTorch 分布式(1)------曆史和概述
[源碼解析] PyTorch 分布式(2) ----- DataParallel(上)
[源碼解析] PyTorch 分布式(3) ----- DataParallel(下)
[源碼解析] PyTorch 分布式(4)------分布式應用基礎概念
[源碼解析] PyTorch分布式(5) ------ DistributedDataParallel 總述&如何使用
[源碼解析] PyTorch分布式(6) ---DistributedDataParallel -- 初始化&store
[源碼解析] PyTorch 分布式(7) ----- DistributedDataParallel 之程序組
[源碼解析] PyTorch 分布式(8) -------- DistributedDataParallel之論文篇
[源碼解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化
Reducer 的建立代碼如下,是在_ddp_init_helper 之中。
調用的 parameters 舉例如下, parameters[0] 就是 rank 0 上模型的 parameters,可以看到其隻有 [0] 元素有意義,這個 [0] 原始本身包括 20 個元素:
bucket_indices 舉例如下:
關于 tensor indices,就是給所有的tensor一個index,從0開始遞增,一直到 tensors.size()。假如模型的 parameters 一共有20個張量,則 tensor index 從 0 到 19,分成 6 個buckets,則在這6個buckets之中,每個 tensor index 都是唯一不重複的。
python代碼無意義,我們隻能看看C++。
于是我們來到了 torch/lib/c10d/reducer.h 和 torch/lib/c10d/reducer.cpp。
Reducer提供了反向傳播中梯度同步的核心實作,其定義相當複雜,我們甚至需要去掉一些不重要的成員變量以便展示:
Reducer 的關鍵成員變量如下。
我們接下來一一分析這些成員變量。
在規約梯度之前将梯度批處理在一起可以降低開銷和/或加快完成時間。但是隻能對同一裝置上相同類型的梯度進行批處理。
桶是梯度的集合,統一裝置上相同類型的梯度被放到同一個桶之中。在代碼之中,Bucket 就是桶的概念。
在每次向後傳播中,将所有參數梯度中的張量複制到桶中,并在AllReduce之後将平均梯度複制回桶中。為了加速複制操作,存儲桶始終與參數在同一裝置上建立。如果模型跨越多個裝置,DDP會考慮裝置關聯性,以確定同一存儲桶中的所有參數都位于同一裝置上。AllReduce的順序也會對結果産生影響,因為它決定了多少通信可以與計算重疊。DDP按model.parameters()的相反順序啟動AllReduce。
為了更好的說明,我們首先要分析一下 BucketReplica 是什麼。我們從注釋出發看看。
首先,一個桶 Bucket 有多個BucketReplica,每一個模型對應一個BucketReplica。
但是隻用了一個 [0] 元素,因為目前不支援單程序多裝置模式,是以假定桶裡隻有一個replica。
再結合前文代碼,未來不會支援 SPMD。parameters 就是 [ToyModel] 這個模型清單的參數集合,parameters[0] 就是 ToyModel 的參數。
綜合以上我們知道:
DDP 原來是希望像 DP 那樣支援 SPMD,是以本程序就需要維護多個 GPU 之上的多個模型副本的參數,即,parameters 就是一個數組,數組中每個元素是一個模型副本的參數。
parameters 被指派為 <code>Reducer.replicas_</code>,而 <code>Reducer.replicas_</code> 用來指派給 bucket.replicas。
因為未來不支援Reducer.replicas_,是以隻有 parameters[0] 有意義。
是以我們得出結論:
BucketReplica 就是一個模型的待求梯度參數組。replica 對應一個 device (GPU)上的模型副本的參數資訊(部分),即,一個 replica 代表了 [1..N] 個需要被規約的梯度,這些梯度擁有同樣的 dtype,位于同樣的裝置上。
事實上,隻有 bucket.replicas[0] 有意義,就對應了上面代碼中的 [self.module] 之中的部分需求導張量,就是 parameters[0] 。
我們再總結一下 Bucket 的關鍵:
replicas 成員變量就是 bucket 對應的各個BucketReplica。一個 BucketReplica 代表了 [1..N] 個需要被規約的梯度,這些梯度擁有同樣的 dtype,位于同樣的裝置上。
隻有 bucket.replicas[0] 有意義,就對應了本模型的待求梯度參數組之中本bucket對應的張量。
如何指派?就是使用 Reducer.replicas_ 來指派,而 replicas_ 就是參數 parameters。我們下面就會介紹。
variable_indices 成員變量用來記錄本桶之中有哪些variable 的index。
如何指派?使用前面介紹的 bucket_indices 進行指派。
如何使用?intra_bucket_index 是bucket.variable_indices的序号,利用序号得到真正的variable index。後文會依據代碼再進行闡釋。
最後,Bucket 具體定義如下:
Reducer 的成員變量buckets_ 是關鍵,這是Reducer 之中所有的桶。
在初始化函數中有如何初始化 buckets_,核心是:
找到本bucket在 bucket_indices 之中的 index。
在 parameters 之中找到 index 對應的張量。
在 BucketReplica 之中配置這些張量,就是本bucket應該規約的張量。
用圖例表示如下,這裡假設 bucket index 是 1,即第 2 個桶,是以 variable_indices 對應了 bucket_indices 中的相應部分。比如 BucketReplica[0] 裡面是 Tensor 4,5,6,而variable_indices就是 Tensor 4,5,6 分别的 index。
下圖中的 bucket_indices 是 Reducer 構造函數的參數之一。
如前面讨論的,一個 BucketReplica 代表了 [1..N] 個需要被規約的梯度,這些梯度擁有同樣的 dtype,位于同樣的裝置上。是一個模型待求梯度參數的一部分,具體是哪些,由 bucket 的 variable_indices 決定。
其關鍵成員變量為:
<code>std::vector<at::Tensor> variables</code> 是構成此bucket副本的variable。我們在這裡使用refcounted value,這樣我們就可以在完成規約之後,輕松地将bucket内容 unflatten 到參與變量中。
<code>at::Tensor contents</code> :把桶的内容展平的結果,即Flattened (1 dimensional) 之後的結果。
<code>std::vector<at::Tensor> bucket_views_in</code> :提供了從輸入角度在 contents 之中檢視具體梯度的方法。
<code>std::vector<at::Tensor> bucket_views_out</code> :提供了從輸出角度在 contents 之中檢視具體梯度的方法。
具體可以參見如下注釋:
關于 <code>std::vector<at::Tensor> bucket_views_in</code> 和 <code>std::vector<at::Tensor> bucket_views_out</code> 的進一步說明:
在 PyTorch 之中,視圖是指建立一個友善檢視的東西,視圖與原資料共享記憶體,它隻是将原有的資料進行整理,直接顯示其中部分内容或者進行重排序後再顯示出來。
每個 view 都将按照如下布局(sizes + strides)建立,這個布局與grad的預期布局相比對。
bucket_views_in 和 bucket_views_out 這兩個變量提供在 contents 之中操作具體梯度的方法,或者說,它們提供了視圖(views),該視圖可以操作contents 之中每個張量的梯度。使用者把這兩個變量作為入口點來把每個梯度的資料從 content 之中移入和移出。
我們為<code>bucket_</code>視圖保留兩種狀态的原因是:如果注冊了DDP通信鈎子(communication hook), <code>bucket_views_out</code> 可以用鈎子的 <code>future_work</code>值重新初始化。是以我們需要為<code>bucket_views_in[i].copy_(grad)</code> 保留一個對 replica 原始 contents 的單獨視圖引用。
<code>bucket_views_in[i].copy_(grad)</code>和 <code>grad.copy_(bucket_views_out[i])</code> 提供了将梯度資料移入/移出contents的友善方法。
另外,以下三個成員變量存儲桶的每個flat張量資訊,比如offsets存儲了各個張量在flat bucket contents中的offset。
BucketReplica 具體定義為:
目前為止,邏輯如下,如前所述,每個bucket隻有 replicas[0] 有意義。
部分初始化的代碼在 Reducer::initialize_buckets 之中。
initialize_bucket_views 具體代碼如下,這裡需要對幾個 PyTorch 函數進行說明。
as_strided :依據現有tensor以及給定的步長來建立一個視圖(類型仍然為tensor),與原資料共享記憶體,不存儲詩句,是以兩個view都不是真實的存儲,隻是視圖。
narrow :傳回一個新的張量,其是原來張量的縮小版。
initialize_bucket_views 主要邏輯是:
周遊replica的張量,針對每一個張量,依據其是dense還是sparse進行不同處理,最後插入到replica.bucket_views_in之中。
把 replica.bucket_views_out 設定為 replica.bucket_views_in,正常應該是相等的。
如果<code>gradient_as_bucket_view_</code>設定為true,則需要處理兩種情況:
當調用 rebuild_buckets 重建 bucket時,initialize_bucket_view 可以在initialize_bucket内調用,如果grad在上一次疊代中已經定義/計算過,則需要将舊的grad複制到新的bucket_view中,并讓grad指向新的bucket_view。
initialize_bucket_view 也可以在建構時候在 initialize_bucket 内調用。在建構時間内不會定義 Grad,
在這種情況下,不要讓梯度指向bucket_view,因為對于全局未使用的參數,梯度應保持為未定義。
具體代碼如下:
具體如下圖:
另外,mark_variable_ready_sparse, mark_variable_ready_dense, finalize_backward 都有對 contents 指派。
以下兩個類用來讓 autograd hook 函數确定張量對應桶。
VariableIndex 就是确定某個 tensor 在某個桶中的位置。這個對于 autograd hook 有用。對于autograd hook 回調,回調函數所在程序隻是知道自己的梯度張量,但是回調函數需要知道這個張量位于哪個replica,以及位于replica之中哪個位置,這樣才能進一步規約。
Reducer 等類的執行個體之中,隻有一個 VariableIndex 的成員變量,這個獨立成員變量是:
VariableIndex 更多是作為其他成員變量的一部分或者參數存在,比如在 Reducer 之中,gradAccToVariableMap_ 就是使用了 VaribaleIndex。
VariableIndex 定義如下:
在 Reducer 的構造函數中,有如下代碼用于autogrid_hook的設定,這是給每個 replica 上的每個張量設定了一個 hook。如果autograd hook 不知道此梯度對應哪個 bucket,就無法告訴 DDP,這個 bucket 整體ready了。
如何找到桶?需要使用下面的 VariableLocator。
VariableLocator 用來在 bucket 之中确定一個varaible。為了找到一個張量位置,我們需要知道在哪個桶,在桶的張量之中的哪個位置。
哪個桶 : <code>bucket_index</code> 是<code>Reducer.buckets_</code>清單的位置,表示 <code>buckets_</code> 之上的一個bucket。
桶副本的哪個位置 : <code>intra_bucket_index</code> 是在 bucket.replica 之中 vector 域的 variable index。
Reducer 的成員變量為:
如何初始化?
問題:variable_locators_[variable_index] 在不同的桶之間,不會重複嗎?不會,因為 VariableLocator(bucket_index, intra_bucket_index++) 從定義上看,bucket_index 和 intra_bucket_index 的組合是唯一的。
我們給出一個例子。關于 tensor indices,就是給所有的tensor一個index,從0開始遞增,一直到 tensors.size()。假如模型的 parameters 一共有12個張量,則 tensor index 從 0 到 11。假如分成 6 個buckets,則在這6個buckets之中,每個 tensor index 都是唯一不重複的。
這樣,對應的 variable_locators_ 是:
如何使用?我們用下面做為例子。
當 autograd hook 調用時候,使用 VariableIndex index 來回調,
autograd_hook 最終調用到 mark_variable_ready_dense,這裡進而通過 variable_locators_ 來确定桶,然後進行後續操作。
以下是梯度累積相關類。
grad_accumulators_ 可以認為是一個矩陣,矩陣的每個item就是一個 AccumulateGrad(Node類型),就是用來計算梯度的。目前看來,這裡隻是一個bookkeeping作用。
具體如下圖,variable1 是一個實際的 張量,grad_accumulators_ 中的一個item 就指向 variable1 的 AccumulateGrad。
如何初始化?在 Reducer 建構函數之中有:
grad_accumulator 傳回的是 Node,也就是 AccumulateGrad,是一個Node類型,我們取出了檢查校驗代碼。
gradAccToVariableMap_ 的定義如下:
作用是給每個 Node 一個對應的VariableIndex,具體如圖,下面就給 variable 1 一個 index 1:
如何初始化?在 Reducer 構造函數中有如下,就是給每個需要求導的 Varaible 一個VariableIndex。
gradAccToVariableMap_ 的使用如下,search_unused_parameters 就是周遊查找 <code>gradAccToVariableMap_</code>,如果某一個accumulator 函數沒有在 <code>gradAccToVariableMap_</code> 裡面,就說明不用計算梯度。
記錄在本張量的梯度就緒之前,該張量的 autograd_hook 應該被調用幾次。第一次疊代之後,不再增加,是以這個數值應該就是1或者0。用來設定 unused_parameters_ 和 配置 numGradHooksTriggeredMapPerIteration_。
如何初始化?在建構函數之中有:
第一次疊代之後,後續調用 autogrid_hook 就遞增加一。
如何使用?這裡會reset。
這裡也會進行處理。如果為0,則插入unused_parameters_。
在本張量的梯度就緒之前,該張量的 autograd_hook 還需要被調用幾次。如果為0,就說明這個桶應該整體就緒了。
本成員變量是使用 numGradHooksTriggeredMap_ 來重置。
如何使用?在靜态圖情況下,如果不是第一次疊代(此時剛剛産生梯度),就會把 <code>numGradHooksTriggeredMapPerIteration_[index]</code> 遞減,如果為0,就說明該變量就緒,可以進行集合操作梯度規約了。
當新一次疊代時候,會重置這個值,prepare_for_backward 會調用到 reset_bucket_counting。
而且是使用 numGradHooksTriggeredMap_ 來重置。
具體邏輯我們展示一下:
對于 張量 2,就沒有使用過,是以 delay_all_reduce 方法 之中直接放入到未使用參數。
對于 張量 1:
numGradHooksTriggeredMap_ 初始化是 0。
第一次疊代之後變成 1。
後向傳播時候,調用 prepare_for_backward 和 reset_bucket_counting,把 <code>numGradHooksTriggeredMap_</code>指派給 <code>numGradHooksTriggeredMapPerIteration_</code>。
autograd_hook 之中會遞減,然後如果是 0,就設定此變量為 ready,可以規約了。
每個疊代之中,perIterationReadyParams_ 表示就緒的參數。
就是如果某個variable是就緒狀态,就插入到 perIterationReadyParams_。
在反向傳播之前,會重置這個變量。
就是周遊perIterationReadyParams_,如果沒找到,就傳回。
在 rebuild_buckets 方法中會調用 ensure_prior_reduction_finished,裡面會調用這兩個方法來校驗。
以下兩個變量用來記錄本地使用過的參數,其标示在未啟用同步的情況下(no_sync is on),在目前疊代或者 no_sync session 之中,這些參數是否在本地被使用過。
每個模型副本對應map中的一個張量,每個張量是參數數量的一維int32(one-dim int32)張量。
這些張量在autograd_hook中标記,以訓示已使用了相應的參數。這些張量會在目前疊代或無同步會話(no_sync session)的後向傳播結束時進行allreduce,以計算出全局未使用的參數。
此處可以結合論文看看。
全局未使用參數(Globally Unused Parameters)的梯度在向前和向後過程中應保持不變。檢測未使用的參數需要全局資訊,因為在一個DDP過程中,一個參數可能在一次操作中不存在,但可能在另一個過程的同一次疊代中參與訓練。是以DDP在位圖中維護本地未使用的參數資訊,并啟動額外的AllReduce以收集全局位圖。由于位圖比張量尺寸小得多,是以模型中的所有參數共享同一位圖,而不是建立每桶位圖(per-bucket bitmaps)。位圖位于CPU上,以避免為每次更新啟動專用CUDA核心。但是,某些ProcessGroup後端可能無法在CPU 張量上運作AllReduce。例如,ProcessGroupNCCL僅支援CUDA張量。此外,由于DDP應該與任何定制的ProcessGroup後端一起工作,它不能假設所有後端都支援CPU張量。為了解決這個問題,DDP在同一裝置上維護另一個位圖作為第一個模型參數,并調用非阻塞拷貝操作(non-blocking copy)将CPU位圖移動到裝置位圖以進行集合通信。
初始化函數如下:
finalize_bucket_dense 和 finalize_backward 都會重置。
autograd_hook 之中如果使用了,就設定為1
在 mark_variable_ready 時候會調用到 all_reduce_local_used_map,如果需要同步,這裡進行同步。我們還是翻譯一下注釋:
DDP 用異步H2D來避免阻塞開銷。異步複制和allreduce 會着眼于目前流,是以将正确排序。
關于主機操作的正确順序也很重要。H2D <code>copy_</code> 是按流排序的,而主機對 <code>local_used_maps_</code> 的更改是按主機排序的。
如果大量積壓的cuda流工作将 copy_ 操作推遲到将來,并且如果從現在到finalize_backward 之間沒有發生阻塞調用,那麼finalize_backward 會在流執行複制之前将主機上使用的本地映射重新歸零,在這種情況下,copy_會讀取到這些零,而不是我們在這裡告訴它讀取的值。
将 local_used_maps_[i] 複制到pinned臨時記憶體(固定的緩存配置設定器應該異步提供)可以避免這種惡劣的、罕見的争用情況。
在希望使用所有參數的情況下,從現在到重新調零,DDP本身不會做任何阻塞工作,是以這種危險情況是真實存在的。
是以,Reducer 采用防禦性操作,以確定 local_used_maps_tmp 與local_used_maps_[i] 不同。
我們接下來分析一些計算梯度所涉及到的基本函數和支撐類。
該類用來封裝 distributed::autograd::ContextPtr。
其作用就是保持了 autograd hook,也是起到了bookkeeping 作用。
初始化如下:
我們通過 [DDP Communication Hook] 來看看概念。
DDP通信鈎子是一種增強功能,它提供了一個鈎子,其可用于覆寫DDP來進行跨rank梯度通信,這可用于梯度壓縮/GossipGrad等算法。可以使用Python API <code>register_comm_hook</code>來注冊鈎子函數。
如果未注冊DDP通信鈎子(DDP communication hook),則reducer隻需調用allreduce即可對桶進行規約。如果注冊了,則會調用鈎子并使用future work handle來處理。如果注冊,reducer也會跳過"将梯度除以世界大小(world size)" 這個步驟。這樣做的目的是:通信鈎子可以完全覆寫我們執行通信的方式,使用者可以完全控制如何處理梯度。
<code>PythonCommHook</code>是<code>CommHookInterface</code>的子類,其可以注冊一個 Python 鈎子。此外,還有一些内置的C++鈎子實作,可以通過調用Python API <code>register_builtin_comm_hook</code>來指定。
我們通過 torch/distributed/algorithms/ddp_comm_hooks/default_hooks.py 來看看。
下面的 hook 就是在 all-reduce 前後進行自己的特殊處理。如果使用這個 hook,就使用 ddp_model.register_comm_hook(process_group, fp16_compress_hook)。
mark_variable_ready_dense 函數會調用到 runGradCallbackForVariable。
Reducer的runGradCallbackForVariable如下,其調用 distributed::autograd::ContextPtr.runGradCallbackForVariable 來處理。
我們順着來到 DistAutogradContext。
它會在累積的梯度之中,在 accumulatedGrads_ 之中找到張量 對應的梯度 grad,然後用傳入的回調函數來處理梯度grad,最後把處理後的梯度拷貝回accumulatedGrads_。這樣就從 hook擷取梯度 開始,到傳回規約之後的梯度結束,完成了一個閉環。
DistAutogradContext 的 accumulatedGrads_會記錄張量對應的目前梯度。
至此,我們初步介紹了一些基本類,下一章繼續介紹(是在是太多了......)。
pytorch分布式系列3——分布式訓練時,torch.utils.data.distributed.DistributedSampler做了什麼?
pytorch分布式系列1——搞清torch.distributed.launch相關的環境變量
pytorch分布式系列2——DistributedDataParallel是如何做同步的?
pytorch(分布式)資料并行個人實踐總結——DataParallel/DistributedDataParallel
Pytorch的nn.DataParallel
https://discuss.pytorch.org/t/dataparallel-imbalanced-memory-usage/22551/20
https://pytorch.org/docs/stable/distributed.html
PyTorch 源碼解讀之分布式訓練了解一下?
實操教程|PyTorch AutoGrad C++層實作
PYTORCH 自動微分(一)
PyTorch如何加速資料并行訓練?分布式秘籍大揭秘
pytorch分布式訓練(二init_process_group)
https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
https://pytorch.org/docs/master/notes/ddp.html
https://pytorch.org/tutorials/intermediate/dist_tuto.html
PyTorch 源碼解讀之 DP & DDP:模型并行和分布式訓練解析
Pytorch模型中的parameter與buffer