天天看點

[源碼解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)

上文已經分析了如何啟動/接受反向傳播,如何進入分布式autograd 引擎,本文和下文就看看如何分布式引擎如何運作。通過本文的學習,讀者可以對 dist.autograd 引擎基本靜态架構和總體執行邏輯有所了解。

目錄

[源碼解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)

0x00 摘要

0x01 支撐系統

1.1 引擎入口

1.2 SendRpcBackward

1.2.1 剖析

1.2.2 定義

1.2.3 建構

1.2.4 grads_

0x02 定義

2.1 定義

2.2 單例

2.3 重要注釋

2.3.1 成員變量

2.3.2 建構

2.3.3 GPU to CPU continuations

2.3.4 析構

2.3.5 插入隊列

2.3.6 工作線程

0x03 總體執行

0x04 驗證節點和邊

4.1 gradient_edge

4.2 validate_outputs

4.3 VS 普通 engine

0x05 計算依賴

5.1 總體過程

5.2 第一部分 準備工作

5.2.1 實作

5.2.2 相關

5.2.2.1 sendFunctions

5.2.2.2 outstanding_tasks_

GraphTask

vania engine

dist engine

5.3 第二部分 計算依賴

5.3.1 實作

5.3.2 葉子節點的種類

5.4 第三部分 得到Functions

5.4.1 算法

5.4.2 實作

5.5 小結

0xFF 參考

PyTorch分布式其他文章如下:

深度學習利器之自動微分(1)

深度學習利器之自動微分(2)

[源碼解析]深度學習利器之自動微分(3) --- 示例解讀

[源碼解析]PyTorch如何實作前向傳播(1) --- 基礎類(上)

[源碼解析]PyTorch如何實作前向傳播(2) --- 基礎類(下)

[源碼解析] PyTorch如何實作前向傳播(3) --- 具體實作

[源碼解析] Pytorch 如何實作後向傳播 (1)---- 調用引擎

[源碼解析] Pytorch 如何實作後向傳播 (2)---- 引擎靜态結構

[源碼解析] Pytorch 如何實作後向傳播 (3)---- 引擎動态邏輯

[源碼解析] PyTorch 如何實作後向傳播 (4)---- 具體算法

[源碼解析] PyTorch 分布式(1)------曆史和概述

[源碼解析] PyTorch 分布式(2) ----- DataParallel(上)

[源碼解析] PyTorch 分布式(3) ----- DataParallel(下)

[源碼解析] PyTorch 分布式(4)------分布式應用基礎概念

[源碼解析] PyTorch分布式(5) ------ DistributedDataParallel 總述&如何使用

[源碼解析] PyTorch分布式(6) ---DistributedDataParallel -- 初始化&store

[源碼解析] PyTorch 分布式(7) ----- DistributedDataParallel 之程序組

[源碼解析] PyTorch 分布式(8) -------- DistributedDataParallel之論文篇

[源碼解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化

[源碼解析] PyTorch 分布式(10)------DistributedDataParallel 之 Reducer靜态架構

[源碼解析] PyTorch 分布式(11) ----- DistributedDataParallel 之 建構Reducer和Join操作

[源碼解析] PyTorch 分布式(12) ----- DistributedDataParallel 之 前向傳播

[源碼解析] PyTorch 分布式(13) ----- DistributedDataParallel 之 反向傳播

[源碼解析] PyTorch 分布式 Autograd (1) ---- 設計

[源碼解析] PyTorch 分布式 Autograd (2) ---- RPC基礎

[源碼解析] PyTorch 分布式 Autograd (3) ---- 上下文相關

[源碼解析] PyTorch 分布式 Autograd (4) ---- 如何切入引擎

為了更好的說明,本文代碼會依據具體情況來進行相應精簡。

我們首先看看一些引擎内部支撐系統。

引擎入口在 backward 函數中有調用,從 DistEngine::getInstance().execute 進入到引擎,由前文可知,這裡是主動調用引擎。

被動調用引擎是從 SendRpcBackward 開始的。SendRpcBackward 是前向傳播之中發送行為對應的反向傳播算子。DistAutogradContext 存儲在一個worker之上的每一個分布式autograd的相關資訊,其在分布式 autograd 之中封裝前向和後向傳播,累積梯度,這避免了多個worker在彼此的梯度上互相影響。在上下文 DistAutogradContext 之中有個成員變量,記錄了本 worker 所有發送行為對應的反向傳播算子。

sendAutogradFunctions_ 中的内容都是SendRpcBackward。

SendRpcBackward 作為分布式autograd實作的一部分,每當我們将RPC從一個節點發送到另一個節點時,我們都會向autograd圖添加一個"SendRpcBackward"autograd function。這是一個占位符函數,用于在向後傳播時啟動目前worker的autograd引擎。此autograd function的邊是RPC方法的輸入。

在向後傳播過程中,此函數将在autograd引擎中排隊等待執行,該引擎最終将運作autograd圖的其餘部分。

SendRpcBackward實際上是本地節點上autograd圖的根。我們給出之前的示意圖如下:

SendRpcBackward不會接收任何 "輸入",而是RPC架構将梯度傳遞給該函數以啟動局部autograd計算。

SendRpcBackward的input邊是RPC方法的輸入,就是梯度。

[源碼解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)

SendRpcBackward 是 Node 的派生類,因為是 Node,是以有 next_edges,可以看到其新增成員變量是 grads_。

在前向傳播過程之中,addSendRpcBackward 會建構一個SendRpcBackward,會把其前向傳播輸入邊作為反向傳播的輸出邊設定在 SendRpcBackward 之中。

之前看到,SendRpcBackward新增成員變量是 <code>grads_</code>,我們看看 <code>grads_</code> 如何設定和使用?

SendRpcBackward 提供了 set, get 操作。

何時會使用?在 torch/csrc/distributed/rpc/request_callback_no_python.cpp 之中有 processBackwardAutogradReq。processBackwardAutogradReq 會:

使用 sendFunction-&gt;setGrads(gradientsCall.getGrads()) 來設定遠端傳遞來的梯度。

調用 DistEngine::getInstance().executeSendFunctionAsync 來執行引擎開始本地後向計算。

對應了設計中如下文字,也就是被動進入引擎的起點:

SendRpcBackward實際上是本地節點上autograd圖的根。是以,它不會接收任何"輸入",而是RPC架構将梯度傳遞給該函數以啟動局部autograd計算。

具體代碼如下:

executeSendFunctionAsync 就會用 sendFunction-&gt;getGrads() 提取梯度,進行操作。

具體如下圖:

[源碼解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)

DistEngine 的定義如下,為了更好講解,下面删除了部分代碼:

引擎使用了單例模式,這樣每個 worker 之中就隻有一個單例在運作。

PyTorch 源碼之中有大量詳盡的注釋,我們挑選一些來看看。

代碼中定義了兩個 CPU 全局相關成員變量,具體如下,均注明需要看 [GPU to CPU continuations] 這個注釋。

這兩個成員變量具體初始化位置是在建構函數之中。

以下是 GPU to CPU continuations 的翻譯和了解。

Continuations 最初應該是在schema語言裡面接觸過的,後來也看過不少語言用到,這個概念沒有找到一個很好的延續概念,暫時使用"延續"這個翻譯。

為了執行GPU任務的延續(continuations),是以需要初始化一個單獨的CPU線程來處理。分布式引擎的多線程結構僅适用于CPU任務。如果我們有CPU-&gt;GPU-&gt;CPU這樣的任務順序,分布式 autograd 就沒有線程來執行最後一個CPU任務。為了解決這個問題,我們引入了一個全局CPU線程來處理這種情況,它将負責執行這些CPU任務。

CPU線程有自己的就緒隊列(ready_queue),它用作DistEngine的所有GraphTask的CPU就緒隊列(cpu_ready_queue)。這確定所有GPU到CPU的延續(continuations)都在此線程上排隊。全局CPU線程隻需将任務從全局隊列中取出,并在JIT線程上調用"execute_graph_task_until_ready_queue_empty",以執行相應的任務。

析構函數之中有如下,就是為了引擎結束而做對這兩個成員變量做了相關操作。

在哪裡往 global_cpu_ready_queue_ 插入?在 DistEngine::computeDependencies 裡面會有插入。首先,每個 GraphTask 都把 global_cpu_ready_queue_ 設定為 cpu_ready_queue。GraphTask構造函數這裡參數在調用時候傳入的是 global_cpu_ready_queue_。

是以,如果 GraphTask 最後傳回需要 CPU 運作時候,就統一用這個。

globalCpuThread 是工作線程,其就是從 ready queue 裡面彈出 NodeTask,然後執行。

總體執行是在 DistEngine::execute 之中完成,具體分為如下步驟:

使用 contextId 得到前向的上下文。

使用 validateRootsAndRetrieveEdges 進行驗證。

構造一個GraphRoot,用它來驅動後向傳播,可以認為是一個虛拟根。

使用 computeDependencies 計算依賴。

使用 runEngineAndAccumulateGradients 進行反向傳播計算。

使用 clearAndWaitForOutstandingRpcsAsync 等待 RPC 完成。

可以看到,與普通引擎相比較,分布式多了一個計算root邊和生成邊上梯度資訊的過程。因為在普通前向傳播過程之中,這些是已經配置好的,但是在分布式計算之中,前向傳播是沒有計算這些,是以需要在反向傳播之前計算出來。

我們接下來看看如何做驗證工作。

validateRootsAndRetrieveEdges 被用來驗證節點和邊的有效性,具體邏輯是:

驗證根節點的有效性,擷取根節點的邊。

看看根節點是否為空。

根節點是否需要計算梯度。

根節點是否有梯度函數。

計算梯度的邊,生成相應的梯度。

調用 validate_outputs 來驗證輸出。

gradient_edge 在本文下面會用到,就是利用一個Variable的梯度和前向傳播的輸出來建構一個Edge。

其定義在 torch/csrc/autograd/engine.cpp,原生引擎和分布式引擎都會調用。validate_outputs 之中包含了大量的驗證代碼。

如果梯度數量與邊數目不同,則退出。

周遊梯度,對于每個梯度:

擷取對應的邊,如果邊無效,則去下一個梯度。

使用input_metadata 擷取輸入資訊。

如果梯度沒有定義,也去下一個梯度。

如果梯度尺寸與輸入形狀不同,則退出。

對梯度的裝置,中繼資料的裝置進行一系列判斷。

我們和普通引擎進行對比一下校驗部分。

普通Engine 之中隻調用了 validate_outputs。

是以,對于校驗部分,DistEngine 可以總結為:

做校驗。

根據 roots 來計算root對應的邊和生成對應梯度。

再用validate_outputs驗證輸出。

我們回憶一下設計文檔中的 FAST模式算法。該算法的關鍵假設是:當我們運作反向傳播時,每個<code>send</code>函數的依賴為 1。換句話說,我們假設我們會從另一個節點通過 RPC 接收梯度。算法如下:

我們從具有反向傳播根的worker開始(所有根都必須是本地的)。

查找目前Distributed Autograd Context 的所有<code>send</code>函數 。

從提供的根和我們檢索到的所有<code>send</code>函數開始,我們在本地計算依賴項 。

計算依賴項後,使用提供的根來啟動本地 autograd 引擎。

當 autograd 引擎執行該<code>recv</code>函數時,該<code>recv</code> 函數通過 RPC 将輸入梯度發送到适當的worker。每個<code>recv</code>函數都知道目标 worker id,因為它被記錄為前向傳播的一部分。通過<code>autograd_context_id</code>和 <code>autograd_message_id</code> 該<code>recv</code>函數被發送到遠端主機。

當遠端主機收到這個請求時,我們使用 <code>autograd_context_id</code>和<code>autograd_message_id</code>來查找适當的<code>send</code>函數。

如果這是worker第一次收到對給定 <code>autograd_context_id</code>的請求,它将按照上面的第 1-3 點所述在本地計算依賴項。

然後将在第6點接受到的<code>send</code>方法插入隊列,以便在該worker的本地 autograd 引擎上執行。

最後,我們不是在 Tensor的<code>.grad</code>之上累積梯度,而是在每個Distributed Autograd Context之上分别累積梯度 。梯度存儲在<code>Dict[Tensor, Tensor]</code>之中 ,<code>Dict[Tensor, Tensor]</code>基本上是從 Tensor 到其關聯梯度的映射,并且可以使用 get_gradients() API檢索該映射 。

本章就是對應了算法的前三項,這部分是和普通引擎最大差別之一。

計算依賴分為兩大部分,第一部分是做準備工作,第二部分是計算依賴關系,第三部分是根據依賴關系來得到需要計算哪些函數。

我們先給出總體代碼和注釋,後續會仔細分析。

因為這裡是計算本地的依賴關系,是以周遊需要從 root 和 本地的 SendRpcBackward 開始計算。我們先要先做一些準備工作:

首先生成一個GraphTask,但是不需要給 GraphTask 傳一個cpu_ready_queue,因為我們後面使用execute_graph_task_until_ready_queue_empty,在那裡會給每一個調用 建立一個獨立的ReadyQueue。

其次用 seen 來記錄已經通路過的節點。

建構一個 Node 類型的 queue,把根節點插入到queue。

然後從上下文之中拿到出邊Functions,放入到 sendFunctions 之中。

sendFunctions就是出邊,之前在 addSendFunction之中被添加。

普通狀态下,root節點内在反向傳播時候,已經有了next edges,但是分布式模式下,出邊是在sendFunctions之中。

周遊出邊 sendFunctions,建構出邊清單,對于 sendFunctions 中的每一項:

GraphTask 出邊數目增加 graphTask-&gt;outstanding_tasks_++。

在 queue 之中插入 sendFunctions 中的 SendRpcBackward。

最後,queue 裡面是 root 和 若幹 SendRpcBackward。

實作之中,使用了部分函數或者成員變量,我們選取重點進行介紹。

sendFunctions 是擷取了上下文的sendAutogradFunctions_,這是一個 std::unordered_map&lt;int64_t, std::shared_ptr&gt;。

sendFunctions就是出邊,之前在 addSendFunction之中被添加,addSendRpcBackward 會調用 addSendFunction。

利用 graphTask-&gt;outstanding_tasks_++ 把GraphTask 出邊數目增加。

outstanding_tasks_ 是 GraphTask 的成員變量。

outstanding_tasks_ :用來記錄目前任務數目,如果數目為0,則說明任務結束了。 如果這個數量不為0,則此GraphTask依然需要運作。

在 vania engine 之中就有 outstanding_tasks_。

是待處理 NodeTask的數量,用來判斷該GrapTask是否還需要執行,如果數目為0,則說明任務結束了。

當 GraphTask 被建立出來時候,此數值為0。

如果有一個NodeTask被送入到 ReadyQueue,則outstanding_tasks_ 增加 1。

如果在工作線程作執行一次 evaluate_function(task)後,outstanding_tasks的值減1。

如果這個數量不為0,則此GraphTask依然需要運作。

NodeTask任務增加時候 outstanding_tasks_ 就加一。

在計算依賴時候,周遊 sendFunctions,上下文有幾個SendRpcBackward,就把 outstanding_tasks_ 就加幾,每多一條出邊,就意味着多了一個計算過程。

而執行時候,void DistEngine::execute_graph_task_until_ready_queue_empty 和 Engine::thread_main 都會減少 outstanding_tasks_。

第二部分是周遊圖,計算依賴關系。

此時 queue 裡面是 root 和 若幹 SendRpcBackward,是以接下來就是從 queue 之中不停彈出Node 進行計算。具體邏輯是:

周遊所有發送邊(從 queue 之中不停彈出Node ),對于每個Node,周遊Node(根節點或者SendRpcBackward)的next_edges:

如果可以得到一個邊,則:

對應的節點依賴度加一。

如果之前沒有通路過,就插入到queue。

如果這個邊本身沒有輸出邊,說明是葉子節點,葉子節點有兩種:AccumulateGrad 或者 RecvRpcBackward。

對于 recvBackwardEdges.emplace_back(edge) 做特殊處理。

插入到最終輸出邊 outputEdges,注意,RecvRpcBackward 也插入到這裡。

這之後,局部變量 recvBackwardEdges 裡面是RecvRpcBackward,outputEdges 裡面是 AccumulateGrad 和 RecvRpcBackward。

有兩種葉子節點,是以需要分開處理。

AccumulateGrad : 普通葉子節點,就是本地葉子節點。

RecvRpcBackward : 在正向圖中,是RPC接收節點。

從設計文檔之中,有如下對應:"

我們發現了一個葉節點,它應該是AccumulateGrad或RecvRpcBackward。我們記錄函數以確定我們不執行它,而是在autograd上下文中累積梯度。這些函數将作為"輸出"參數傳入到vanilla autograd引擎。 我們沒有在RecvRpcBackward上下文積累任何梯度。RecvRpcBackward被添加為輸出邊,以訓示它是葉節點,這有助于正确計算本地autograd graph的依賴關系。将RecvRpcBackward放在"outputEdges"中意味着需要執行此函數(與我們對快速模式的假設一緻,即所有send/recv函數在向後傳播中都有效),是以也需要執行其所有祖先函數。

比如,對于 work 1, recv 就是葉子節點,是一個RecvRpcBackward,它需要把梯度傳遞給 worker 0。對于 worker 0,上面的子圖,t1, t2 也是葉子節點,都是AccumulateGrad。

[源碼解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)

這部分根據依賴關系找到需要計算那些functions。

現在讓我們計算需要執行哪些函數。算法如下:

建立一個虛拟GraphRoot,它指向此上下文和原始GraphRoot的所有"發送"函數。使用outputEdges和虛拟GraphRoot來運作"init_to_execute"。這確定我們根據需要标記适當的函數:如果它們隻能從本地特定的"發送"函數通路,而不需要從提供的根通路。

對于"outputEdges"中指向"RecvRpcBackward"的所有邊,根據執行需要标記這些函數。原因是"init_to_execute"會将這些标記為不需要。但"RecvRpcBackward"的獨特之處在于,我們将其用作圖中的葉節點來準确計算所需的執行操作,但與AccumageGrad不同,我們确實需要執行此函數。

具體就是:

RecvRpcBackward 需要執行。

AccumulateGrad 需要累積梯度。

此時,recvBackwardEdges 裡面是RecvRpcBackward,outputEdges 裡面是 AccumulateGrad 和 RecvRpcBackward。我們需要根據這些資訊來辨別後續如何執行。具體實作是:

先計算 AccumulateGrad,如果 outputEdges 不為空,則把 outputEdges 的資訊插入到 GraphTask.exec_info_ 之中:

建構一個 edge_list edges,就是出邊清單。

周遊 sendFunctions,得到輸出清單,加入到 edges。

root也加入出邊清單。

建立一個虛拟Root。

如果出邊不為空,則會調用 init_to_execute 對GraphTask進行初始化。

周遊 GraphTask 的 exec_info,exec_info_ 的資料結構是std::unordered_map&lt;Node*, ExecInfo&gt; 。

看看此張量是否在所求梯度的張量路徑上。

如果不在路徑之上,就跳到下一個張量。

拿到 exec_info_ 的 Node。

如果 Node 是葉子節點。

周遊張量路徑上的節點。

給張量插入Hook。這裡是關鍵,就是 AccumulateGrad 對應的張量加上了 Hook,用來後續累積梯度。

周遊 recvBackwardEdges,對于每個 recvBackward,在 GraphTask.exec_info_ 之中對應項之上設止為 "需要執行"。

至此,依賴項處理完畢,所有需要計算的函數資訊都位于 GraphTask.exec_info_ 之上,我們在下一篇來看看如何執行。

我們總結一下計算依賴的邏輯:

computeDependencies 開始計算依賴。

從 DistAutogradContext 之中擷取 sendAutogradFunctions_,把 SendRpcBackward 都放入到 sendFunctions。普通狀态下,root節點内在反向傳播時候,已經有了next edges,但是分布式模式下,出邊是在sendFunctions之中,是以要提取出來,放入下面的 queue。

周遊 sendFunctions,把 Node 加入到 queue,此時 queue 之中是 root 和 一些 SendRpcBackward。

周遊 Queue 進行處理,處理結果是兩個局部變量 edge_list。 recvBackwardEdges 裡面是RecvRpcBackward,outputEdges 裡面是 AccumulateGrad 和 RecvRpcBackward,我們需要根據這些資訊來辨別後續如何執行。

周遊 recvBackwardEdges 和 outputEdges,把相關資訊加入到<code>GraphTask.exec_info_</code>,至此,依賴項處理完畢,所有需要計算的函數資訊都位于 GraphTask.exec_info_ 之上。

AccumulateGrad 被加入了 Hook,用來後續累積梯度。

RecvRpcBackward 被設定了需要執行。

Distributed Autograd Design

Remote Reference Protocol

PyTorch 源碼解讀之分布式訓練了解一下?

https://pytorch.org/docs/stable/distributed.html

https://pytorch.apachecn.org/docs/1.7/59.html

https://pytorch.org/docs/stable/distributed.html#module-torch.distributed

https://pytorch.org/docs/master/notes/autograd.html

https://pytorch.org/docs/master/rpc/distributed_autograd.html

https://pytorch.org/docs/master/rpc/rpc.html

https://www.w3cschool.cn/pytorch/pytorch-cdva3buf.html

PyTorch 分布式 Autograd 設計

Getting started with Distributed RPC Framework

Implementing a Parameter Server using Distributed RPC Framework

Combining Distributed DataParallel with Distributed RPC Framework

Profiling RPC-based Workloads

Implementing batch RPC processing

Distributed Pipeline Parallel