
背景
什麼是Pytorch的分布式?試着回答以下問題:
- 如果訓練資料量太大,我們想要把一大份資料拆分成多份,送給不同的訓練程序,通過這種并行處理資料的方式來加快計算速度,那麼反向傳播中,如何進行權重參數的更新呢?
- 如果模型網絡太大,我們想把一個大的網絡拆分成多個片段,每個片段運作在不同的訓練程序,那麼如何進行前向、反向等邏輯的銜接?如何進行權重參數的更新呢?
- 如果訓練資料量和模型網絡都太大,我們既想把一大份資料拆分成多份送往不同的訓練程序,又想把一個大的模型網絡拆分成多個片段運作在不同的訓練程序,那麼如何進行前向、反向等邏輯的銜接?如何進行權重參數的更新呢?
- 如果以上不管網絡還是資料的拆分都不止是在同一個機器上(參數在同一個機器的不同CUDA裝置中),而且要拆分在不同的機器上(參數在不同的機器的不同CUDA裝置中),那麼如何進行前向、反向等邏輯的銜接?如何進行權重參數的更新呢?
- 在一個GTX1080ti卡上基于imagenet訓練一個ResNet50需要大約一周的時間,論文https://arxiv.org/pdf/1706.02677.pdf在256個GPU上使用8192的batch size訓練隻需要一個小時,這依靠的是什麼?
這些問題的答案就是PyTorch的分布式。在你使用PyTorch的過程中,或多或少都遇到了一些關乎分布式的package,比如:
- THD
- C10D
- torch.multiprocessing
- torch.distributed
- DataParallel(DP)
- DistributedDataParallel(DDP)
- torch.distributed.rpc
這些package的作用和差別都是什麼呢?Gemfield本文基于PyTorch 1.5。
環境準備
1,distributed子產品介紹PyTorch的分布式依賴于torch.distributed子產品,但是這個子產品并非天然就包含在PyTorch庫中。要啟用PyTorch distributed, 需要在源碼編譯的時候設定USE_DISTRIBUTED=1。目前在Linux系統上編譯的時候,預設就是USE_DISTRIBUTED=1,是以預設就會編譯distributed子產品;而在MacOS上預設是0(你需要手動開啟,在PyTorch 1.3的時候才添加了對macOS的支援,使用Gloo backend)。那麼Windows系統呢?Windows不支援distributed(不過沒什麼關系,現在誰還用Windows呢)。
對于PyTorch的預編譯包來說,隻有Linux上的包提供了distribute支援,并且CPU版本的backend是Gloo,CUDA版本的backend是NCCL。如果要使用MPI的話,則如上所示我們需要從PyTorch源碼進行編譯 ,也就是在安裝有MPI的環境上編譯PyTorch。
2,測試代碼準備先定義一個資料集,這裡直接使用了毫無意義的random資料:
import
這幾個API都是必須的,__getitem__ 在for語義裡面傳回下一次loop的值,__len__是資料集的長度,如果沒有定義這個方法,pytorch就會報錯:TypeError: object of type 'RandomDataset' has no len()。
然後定義一個簡單的網絡:
class Model(nn.Module):
def __init__(self, input_size, output_size):
super(Model, self).__init__()
self.fc = nn.Linear(input_size, output_size)
def forward(self, input):
output = self.fc(input)
print("tIn Model: input size", input.size(),"output size", output.size())
return output
順便說一句,把資料集從磁盤讀到記憶體中是由DataLoader負責的,DataLoader可以設定worker的數量,Gemfield就不在本文贅述了。此外,如果想要調試GPU的使用率,使用nvprof指令:
[email protected]:/home/gemfield# nvprof --print-gpu-trace -fo gemfield.nvvp python gemfield.py
在本地機器安裝nvvp來可視化結果:
[email protected]:~$ sudo apt install nvidia-visual-profiler
這些需要nvidia驅動開啟調試支援。
下面開始迎接PyTorch分布式的到來。
torch.multiprocessing
為了并行的執行計算任務,一個直截了當的思想就是啟動多個程序,然後使用IPC的方式進行資訊交換——最友善的就是共享記憶體。PyTorch的multiprocessing子產品就來源于此,它封裝了python原生的multiprocessing子產品,并在API上做到了百分之百的相容。在此基礎上,它注冊了定制的reducers, 可以充分使用共享記憶體的IPC機制來讓不同的程序對同一份資料進行讀寫:
import torch.multiprocessing as mp
from model import MyModel
def train(model):
for data, labels in data_loader:
optimizer.zero_grad()
loss_fn(model(data), labels).backward()
optimizer.step() #會更新共享記憶體中的權重
if __name__ == '__main__':
num_processes = 4
model = MyModel()
#在下面fork新程序之前必須做share_memory的調用
model.share_memory()
processes = []
for rank in range(num_processes):
p = mp.Process(target=train, args=(model,))
p.start()
processes.append(p)
for p in processes:
p.join()
但是這種多程序的工作方式在遇到CUDA時有很多局限性,這導緻了很多比較突兀的使用限制和代碼編寫方式:它規定了發送tensor的程序必須怎麼怎麼樣、規定了接收tensor的程序必須怎麼怎麼樣、規定了生産tensor的程序的生命周期必須怎麼怎麼樣、限制不能轉發收到的tensor......以至于這些條件隻要有一個沒有遵守,在CUDA上的multiprocessing就會出現預期之外的行為。為了突破這些限制和掣肘,DataParallel到來了。
DataParallel
僅僅從子產品的名字來看,DataParallel也是為了解決data的并行問題的。DataParallel是為了解決這樣的問題的,那就是當輸入的batch很大的時候,DataParallel會将模型/網絡複制運作到多個CUDA裝置上,然後在輸入的batch次元上進行切分,這裡的切分就是torch tensor的split() API。在DataParallel出生的年代,PyTorch官方就開始推薦使用nn.DataParallel來代替multiprocessing。
我們先舉個例子——定義資料集的長度為100,batch size為32,fc層的輸入是5,輸出是2:
#資料集的長度為100,batch size為32,fc層的輸入是5,輸出是2
input_size = 5
output_size = 2
batch_size = 32
data_size = 100
model = Model(input_size, output_size)
if torch.cuda.device_count() > 1:
print("Gemfield have ", torch.cuda.device_count(), "GPUs!")
model = nn.DataParallel(model)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)
rand_loader = DataLoader(dataset=RandomDataset(input_size, data_size),batch_size=batch_size, shuffle=True)
for data in rand_loader:
input = data.to(device)
output = model(input)
print("Outside: input size", input.size(),"output_size", output.size())
本來batch_size是32,但是由于使用了DataParallel,而Gemfield有2個GPU,是以一個batch被劃分成了2份,也就是tensor.split(16),分别送往兩個GPU上。值得注意的是:在第一次調用
model.to(device)
的時候,模型被加載到了第一個GPU裝置上,而在第一次調用
output = model(input)
的時候(也就是在進行forward的時候),模型被複制到了其餘的GPU上,這裡是第2個GPU。程式輸出如下(可見大小為32的batch被拆分成了大小為16的batch):
You have 2 GPUs!
In Model: input size torch.Size([16, 5]) output size torch.Size([16, 2])
In Model: input size torch.Size([16, 5]) output size torch.Size([16, 2])
input size torch.Size([32, 5]) output_size torch.Size([32, 2])
In Model: input size torch.Size([16, 5]) output size torch.Size([16, 2])
In Model: input size torch.Size([16, 5]) output size torch.Size([16, 2])
input size torch.Size([32, 5]) output_size torch.Size([32, 2])
In Model: input size torch.Size([16, 5]) output size torch.Size([16, 2])
In Model: input size torch.Size([16, 5]) output size torch.Size([16, 2])
input size torch.Size([32, 5]) output_size torch.Size([32, 2])
In Model: input size torch.Size([2, 5]) output size torch.Size([2, 2])
In Model: input size torch.Size([2, 5]) output size torch.Size([2, 2])
input size torch.Size([4, 5]) output_size torch.Size([4, 2])
我們來總結下DataParallel一次疊代的過程:
- DataLoader把資料通過多個worker讀到主程序的記憶體中;
- 通過tensor的split語義,将一個batch的資料切分成多個更小的batch,然後分别送往不同的CUDA裝置;
- 在不同的cuda裝置上完成前向計算,網絡的輸出被gather到主CUDA裝置上(初始化時使用的裝置),loss而後在這裡被計算出來;
- loss然後被scatter到每個CUDA裝置上,每個CUDA裝置通過BP計算得到梯度;
- 然後每個CUDA裝置上的梯度被reduce到主CUDA裝置上,然後模型權重在主CUDA裝置上獲得更新;
- 在下一次疊代之前,主CUDA裝置将模型參數broadcast到其它CUDA裝置上,完成權重參數值的同步。
上述步驟提到的gather、reduce、scatter、broadcast都是來自MPI為代表的并行計算世界的概念,其中broadcast是主程序将相同的資料分發給組裡的每一個其它程序;scatter是主程序将資料的每一小部分給組裡的其它程序;gather是将其它程序的資料收集過來;reduce是将其它程序的資料收集過來并應用某種操作(比如SUM),在gather和reduce概念前面還可以加上all,如all_gather,all_reduce,那就是多對多的關系了,如下圖所示(注意reduce的操作不一定是SUM,PyTorch目前實作了SUM、PRODUCT、MAX、MIN這四種):
DataParallel通過複制一個網絡到多個cuda裝置,然後再split一個batch的data到多個cuda裝置,通過這種并行計算的方式解決了batch很大的問題,但也有自身的不足:
- 它無法跨越機器,DataParallel是單程序多線程的,無法在多個機器上工作;
- 它基于多線程的方式,确實友善了資訊的交換,但受困于GIL;
- 資料集先拷貝到主程序,然後再split到每個CUDA裝置上;
- 權重參數隻在主CUDA上更新,需要每次疊代前向所有的CUDA裝置做一次同步;
- 每次疊代的網絡輸出需要gather到主的CUDA裝置上;
- 如果模型太大需要使用 model parallel 的時候,DataParallel目前還不支援;
這個時候,DistributedDataParallel來了,并且自此之後,不管是單機還是多機,我們都推薦使用DDP來代替DP(DataParallel)。
DistributedDataParallel(DDP)
DDP基于torch.distributed子產品,下面我們通過一個例子來感受下DDP的使用。要使用DDP,需要先熟悉兩個概念:Backend和initialization methods。
1,initialization methods因為DDP是真正的分布式,可以使用多台機器來組成一次并行運算的任務,是以就需要一種方法來給大家傳遞一個資訊——如何聯系到其它機器上的程序?目前DDP子產品支援3種initialization methods:
- TCP initialization :init_method = 'tcp://10.1.1.20:23456'
- Shared file-system initialization:init_method = 'file:///mnt/nfs/sharedfile'
- Environment variable initialization
既然能夠在不同的程序間進行通信,那必然是依賴于一些IPC的通信機制,這些通信機制一般是由PyTorch之外的三方實作的。在distributed子產品中,一共有4種不同的IPC通信backend:
- TCP
- MPI
- Gloo
- NCCL
需要說明的是:
- TCP backend已經被廢棄了;
- 如果是CPU的訓練,則使用MPI或者Gloo,推薦使用Gloo;
- 如果是MPI backend,則不需要指定world_size和rank,因為這兩個值由MPI runtime來配置設定和維護;
- 如果是GPU訓練的話,使用NCCL,原廠的庫就是快;而且隻有NCCL支援InfiniBand和GPUDirect;
- 機器最好使用InfiniBand進行連接配接,條件不夠的話也是支援以太網的(需要設定一下);
- DDP隻支援Gloo和NCCL;
Gemfield使用下面的腳本作為示範:
https://github.com/CivilNet/Gemfield/blob/master/src/python/ddp/gemfield_ddp.pygithub.com
這個腳本是從pytorch/examples項目中的imagenet訓練腳本裁剪而來,裡面使用了PyTorch的DistributedDataParallel子產品。
1,設定環境變量
Gemfield的MLab2.0環境上使用的是CUDA裝置,是以使用NCCL backend;又因為沒有IB連接配接,是以需要設定如下環境變量來禁用IB轉而使用以太網(也可以不用設定,NCCL backend會自動找尋,當然,如果自動找尋失敗了,最好就手工來設定):
export NCCL_SOCKET_IFNAME=eth0
export NCCL_IB_DISABLE=1
将NCCL_IB_DISABLE設定為1來禁止使用InfiniBand,轉而使用 IP;如果網絡接口不能被自動發現,則手工設定NCCL_SOCKET_IFNAME;如果還有問題,就設定如下的NCCL變量來列印更多的log:
export NCCL_SOCKET_IFNAME=eth0
export NCCL_IB_DISABLE=1
export NCCL_DEBUG=INFO
export NCCL_DEBUG_SUBSYS=ALL
比如有時候使用NCCL作為backend的話會遇到如下錯誤:
RuntimeError: NCCL error in: /opt/conda/conda-bld/pytorch_1587428398394/work/torch/lib/c10d/ProcessGroupNCCL.cpp:514, unhandled system error, NCCL version 2.4.8
你就可以設定這些NCCL的環境變量來列印相關日志。
2,啟動master程序
也就是啟動rank為0的程序(這裡我們使用了tcp的初始化方法),我們在Node 0上啟動吧:
python gemfield_ddp.py -a resnet50 --dist-url 'tcp://172.16.90.44:27030' --gpu 0 --world-size 2 --rank 0 [your_dataset_folder]
DDP的MPI風格的四元組來了:初始化方法、backend、world size、rank。
這裡使用了tcp的初始化方法,使用了node 0上的第一塊GPU,world-size為2說明我們group中一共要啟動2個程序,rank為0說明這是第1個程序(也就是master程序)。隻要group中的程序數還不夠,程序就會阻塞在init_process_group調用上。是以,這條指令執行後,master程序就處于等待狀态。
3,啟動其它非master程序
然後在Node 1上啟動rank為1的程序:
python gemfield_ddp.py -a resnet50 --dist-url 'tcp://172.16.90.44:27030' --gpu 0 --world-size 2 --rank 1 [your_dataset_folder]
參數不解釋了,同上。你也可以看看gemfield_ddp.py的腳本源碼,可以看到裡面預設使用了nccl的backend,使用了32的batch-size(那麼在這種情況下,整體就相當于batchsize=32 *2 = 64)。因為此時process group中的程序數量達到了world size,是以訓練就開始往前疊代了。
gemfield_ddp.py中使用了DistributedDataParallel子產品,本質上來說,我們一共在4處地方引用到了DDP的API:
- torch.distributed.init_process_group(backend='nccl', init_method=args.dist_url, world_size=args.world_size, rank=args.rank);這是初始化程序組,參數正好是上面提到的MPI風格的四元組;
- model=torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]),也就是我們的網絡需要被DistributedDataParallel wrap起來,DDP封裝了分布式計算通信原語,這樣ddp後的model看起來如同之前的model一樣整潔;被DDP封裝的model的參數的grad才會進行all reduce。
- train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset),我們需要DistributedSampler作為執行個體傳遞給DataLoader來配合DDP使用,這樣資料集的樣本會為每個程序劃分,每個程序讀取各自的樣本。
- train_sampler.set_epoch(epoch),set_epoch是在DDP模式下shuffle資料集的方式;
當上面橫跨2個Node上的訓練程序開始工作起來後,一次DistributedDataParallel疊代中的步驟如下所示:
- process group中的訓練程序都起來後,rank為0的程序會将網絡初始化參數broadcast到其它每個程序中,確定每個程序中的網絡都是一樣的初始化的值(預設行為,你也可以通過參數禁止);
- 每個程序各自讀取各自的訓練資料,DistributedSampler確定了程序兩兩之間讀到的是不一樣的資料;
- 前向和loss的計算如今都是在每個程序上(也就是每個CUDA裝置上)獨立計算完成的;網絡的輸出不再需要gather到master程序上了,這和DP顯著不一樣;
- 反向階段,梯度資訊通過allReduce的MPI原語,将每個程序中計算到的梯度reduce到每個程序;也就是backward調用結束後,每個程序中的param.grad都是一樣的值;注意,為了提高allReduce的效率,梯度資訊被劃分成了多個buckets;
- 更新模型參數階段,因為剛開始模型的參數是一樣的,而梯度又是all reduced的,這樣更新完模型參數後,每個程序/裝置上的權重參數也是一樣的。是以,就無需DP那樣每次疊代後需要同步一次網絡參數,這個階段的broadcast操作就不存在了!注意,Network中的Buffers (比如BatchNorm資料) 需要在每次疊代中從rank為0的程序broadcast到程序組的其它程序上。
上面的步驟看着熟悉嗎?這是不是就是大資料Hadoop生态中經典的map-reduce概念?
福利:有的時候,有人需要将DDP 包裝後的model的parameters拆分parameter group傳給優化器,以适用不同的超參數。那麼這個過程和非DDP封裝的model有差別嗎?有。你可以參考Gemfield的該例子:
https://github.com/CivilNet/Gemfield/blob/master/src/python/ddp/seperate_parameter_group_for_optimizer.pygithub.com
ModelParallel
本文的前述部分主要說的是如何更加并發的處理更多的輸入資料,但是如果一個網絡本身太大,以至于一個cuda裝置都裝不下怎麼辦?那就是
model parallel了。
ModelParallel的核心思想就是把一個網絡拆分成不同的部分,然後分别運作在不同的CUDA裝置上。要達到這一點,就需要改造網絡的構造和forward部分。引用官方文檔裡的一個例子,就是:
class
可以看到ModelParallel的關鍵之處有2點:
- CivilNet内部的不同layer分别放在了不同的CUDA裝置上;
- forward中一個layer的輸出結果需要通過tensor.to的語義copy到另一個layer所在的CUDA裝置上。
你看,之是以沒有提及反向和優化器,就是因為PyTorch的backward和optim優化器子產品可以應付這種情況。唯一需要注意的就是,在調用loss函數的時候,網絡的輸出和label需要放在一個CUDA裝置上。
但是上面的ModelParallel實作還是有個問題,就是雖然有多塊CUDA裝置,但是同一個時刻隻有一個裝置在參與運算,而其它的CUDA裝置此刻都在打醬油,這顯然沒有充分利用多個CUDA裝置,造成平均下來單個CUDA裝置的運作效率急劇下降,何況還要牽扯到中間結果的裝置間拷貝操作。基于此,我們已經知道接下來我們要解決什麼問題了。那就是:如何讓一個CUDA裝置在工作的時候,而其它CUDA裝置不要閑置下來。另外一個問題就是,中間結果(也就是tensor)在CUDA裝置之間拷貝的時候到底會犧牲多少性能?
PyTorch中解決第一個問題完全仰仗的是CUDA的異步執行邏輯,也就是說,在CUDA裝置上的計算是異步的,隻有GPU和CPU以及GPU和GPU之間需要拷貝tensor的時候(比如,一個CUDA裝置的結果需要輸出給另一個CUDA)才會進行同步操作。如此一來,當我們把model parallel到多個CUDA裝置上時,我們可以将一次前向的batch size相應的增加,同時,将前向的輸入split成多份,以pipeline的語義在loop中不斷的同時喂給多個CUDA裝置,然後基于前述的CUDA異步邏輯實作多個CUDA裝置上的并行計算,然後等結果都出來後再進行同步操作(是以每個cuda的性能要和其上承擔的計算量一緻)。
PyTorch解決第二個問題仰仗于NCCL庫(基于CUDA計算),多個CUDA裝置之間的通信方式有很多種,這取決于你自己的硬體支援:
- 一個機器上,PCIe;
- 一個機器上,NVLink(目前有2個版本了);
- 一個機器上,NVLink-SLI;
- 一個機器上,NVSwitch(NVIDIA專屬機器上);
- 一個機器上,Infiniband with GPUDirect-RDMA;
- 多個機器間,Infiniband with GPUDirect-RDMA;
- 多個機器間,Infiniband協定;
- 多個機器間,以太網協定;
以上的種種方式無外乎追求的是兩點:帶寬和延遲。這兩個名額的下限是以太網協定,上限大抵就是片上記憶體了。
但是上面的ModelParallel是把一個網絡拆分到同一個機器上的多個CUDA裝置上,如果一個伺服器上的所有CUDA裝置都裝不下該網絡怎麼辦?如果需要将一個網絡拆分到不同機器的不同CUDA裝置上應該怎麼辦?RPC!RPC
ModelParallel隻能把模型的不同layer拆分在同一個機器的不同的CUDA裝置上,如果想進一步拆分到多個機器的CUDA裝置上,那怎麼辦呢?RPC架構來了——這是預料中的事情!我們稍微來回想下這個場景,模型的不同的layer要運作在不同的機器上,而又要對代碼維護一個“本地”的感覺,這就不由自主的讓我們想到了corba、soap、grpc這些曆史上曾經出現過的RPC架構,簡單來說,我們需要RPC能夠做到以下幾點:
- 代碼/服務運作在本地,最少也是看起來運作在本地;
- 中間的某些函數的計算實際發生地是在别的機器上;
- 輸入和輸出要能夠在不同的機器之間傳遞;
好了,圍繞着這些簡單的本質的邏輯,PyTorch的RPC架構使用了四大金剛:
- RPC call(遠端調用)
- RRef(遠端引用:跨機器的變量引用)
- Distributed Autograd
- Distributed Optimizer
主要就是rpc_sync同步遠端調用、rpc_async異步遠端調用、remote異步遠端調用。我們用一個例子來解釋下,首先寫個master.py檔案:
import os
import torch
import torch.distributed.rpc as rpc
os.environ['MASTER_ADDR'] = '172.16.138.65'
os.environ['MASTER_PORT'] = '7030'
def syszuxAdd(t1, t2):
print("syszuxAdd call in master")
return torch.add(t1, t2)
rpc.init_rpc("master", rank=0, world_size=2)
rpc.shutdown()
簡單幾行程式就有很多高密度的資訊:
- 首先是import rpc的包,目前在1.5.0版本上依然不成熟;
- 其次是設定MASTER的IP和PORT,這是當然了,RPC的底層一定是和TCP/IP協定相關的;
- 要使用init_rpc API來初始化RPC架構,這就是重點了。3個參數,第1個是worker的名字,名字必須是唯一的,不然會報錯:RuntimeError: RpcAgent name xxxxxx is not unique;第2、3個參數是rank和world_size,這和前述的DDP一樣的概念。如果啟動的rank數量還沒達到world size,程式就會block在這裡;之是以會block在這裡,是因為後續的四大金剛都依賴這個啊!不然會抛出如下錯誤:
RuntimeError: currentRpcAgent_ INTERNAL ASSERT FAILED at /opt/conda/conda-bld/pytorch_1587428398394/work/torch/csrc/distributed/rpc/rpc_agent.cpp:197, please report a bug to PyTorch. Current RPC agent is not set!
master介紹完了,再來寫個worker.py檔案:
import
和master一樣的内容就不再贅述了,在此之外,worker.py裡新增了一些rpc_async函數調用,參數裡面攜帶了真正要執行的函數符号:
gemfield1 = rpc.rpc_async("master", torch.add, args=(torch.ones(2), 3))
gemfield2 = rpc.rpc_async("master", min, args=(1, 2))
gemfield3 = rpc.rpc_async("master", syszuxAdd, args=(torch.ones(2), torch.Tensor([7029])))
這三個方法是故意這麼羅列的,它們分别代表了torch子產品的函數、python内置的函數、自定義的函數,它們看似運作在了worker中,但實際上通過遠端調用rpc_async,實際執行在了master上:
#master上
[email protected]:/home/gemfield# python master.py
syszuxAdd call in master
#worker上
[email protected]:/home/gemfield# python worker.py
gemfield: tensor([5., 5.])
gemifeld3: tensor([7030., 7030.])
通過日志可以很明确的看到rpc_async調用将函數執行在了master所在的機器上,值得說明的是,雖然函數運作在master上,但是函數的定義必須在master和worker中都有,這就是類似存根的概念,如果沒有這個定義,在文法層面都resolve不了這個符号。最後我們進行下技術總結:
RPC call實作了執行遠端機器上的程序中的函數、可以把輸入傳遞過去、可以把輸出的值拿回來。除了rpc_async(及對應的同步調用rpc_sync)之外,還有一個remote()遠端調用,它和rpc_async類似,隻是傳回值不一樣,remote傳回的是對結果的引用,哪怕這個引用跨越了機器——這說明調用者并不想拿到具體的傳回值。這個可以對本地值進行引用、也可以對遠端機器上的值進行引用的東西,正是PyTorch RPC架構中的RRef。
2,RRef首先要再次強調的是,在使用RRef之前,我們必須要先初始化RPC架構。再然後,我們隻要謹記RRef的特質,在PyTorch的常見場景下,我們甚至可以簡化為:1個RRef就是對1個Tensor的引用,可以是本地tensor的引用,也可以是對遠端機器上的tensor的引用。比如:
#建立一個遠端引用,引用遠端機器上的worker1程序中的1個tensor
rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
#使用remote本意就是沒想把具體的值拿回來,但如果你就是要拿回來,可以copy回來
x = rref.to_here()
#建立一個本地引用
rref = RRef(torch.zeros(2, 2))
RRef如果作為rpc call的輸入,又會産生什麼奇妙的事情呢?我們來看個例子:
rref0.py:
import
rref1.py:
import os
import torch
import time
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RRef
os.environ['MASTER_ADDR'] = '172.16.138.65'
os.environ['MASTER_PORT'] = '5678'
def f(rref):
print("f() called on work1 with rref from ",rref.owner().name," and value: ",rref.to_here())
return rref.to_here() + 10
rpc.init_rpc("worker1", rank=1, world_size=2)
time.sleep(2000)
rpc.shutdown()
日志輸出為:
[email protected]:/home/gemfield# python rref0.py
gemfield0: tensor([10.])
gemfield0: tensor([20.])
gemfield0: tensor([30.])
gemfield0: tensor([40.])
gemfield0: tensor([50.])
gemfield0: tensor([60.])
gemfield0: tensor([70.])
gemfield0: tensor([80.])
gemfield0: tensor([90.])
[email protected]:/home/gemfield# python rref1.py
f() called on work1 with rref from worker0 and value: tensor([0.])
f() called on work1 with rref from worker0 and value: tensor([10.])
f() called on work1 with rref from worker0 and value: tensor([20.])
f() called on work1 with rref from worker0 and value: tensor([30.])
f() called on work1 with rref from worker0 and value: tensor([40.])
f() called on work1 with rref from worker0 and value: tensor([50.])
f() called on work1 with rref from worker0 and value: tensor([60.])
f() called on work1 with rref from worker0 and value: tensor([70.])
f() called on work1 with rref from worker0 and value: tensor([80.])
這個例子使用RRef結合rpc call,生動的表達了這樣的語義:我們把rref0程序上的一個RRef共享給了rref1程序。如果再基于如下的helper函數:
def _call_method(method, rref, *args, **kwargs):
return method(rref.local_value(), *args, **kwargs)
def _remote_method(method, rref, *args, **kwargs):
return rpc.rpc_sync(rref.owner(),_call_method,args=[method, rref] + list(args),kwargs=kwargs)
_remote_method就可以實作這樣的語義:rref是在哪個機器上建立的,就去那個機器上執行method函數,并将結果拿回到目前的機器上。這是不是在為拆分模型的參數到不同的機器上鋪路?
3,Distributed Autograd和distributed.optim為了适用RPC的調用,PyTorch的Autograd也添加了torch.distributed.autograd子產品,在模型的訓練過程中,我們需要建立distributed autograd context。在此上下文中,前向階段參與RPC調用的Tensor資訊會被記錄下來,而反向傳播的時候我們利用這些資訊再通過RPC進行梯度的傳遞。
distributed.optim子產品中的DistributedOptimizer負責這種情況下的參數優化,它面對的不再是傳統的parameter tensor,而是封裝之後的RRef,借助_remote_method語義,運算會發生在RRef所在的機器上。
4,構造一個基于RPC的網絡在網絡的構造階段,我們可使用remote() rpc call 來将網絡的不同部分構造在不同的機器上,并且傳回RRef到目前的機器上;至此,在前向和反向的體系中,梯度計算和優化器分别使用的是distributed.autograd和distributed.optim,它倆将不再面對傳統的Parameter tensor,而是封裝了Parameter Tensor的RRef。
在前向階段,記住我們已經面對的是RRef了。我們可以使用上述提到的_remote_method語義,将運算放在構造時候所對應的機器上;也就是說,RRef在哪個機器上,前向運算就在哪個機器上;在反向階段,我們在distributed autograd context中使用distributed.optim來處理RRef即可。
PyTorch分布式的曆史
1,PyTorch伊始使用torch.multiprocessing 代替python原生的multiprocessing子產品。
2,PyTorch 0.1.8在PyTorch 0.1.8的時候,THD (distributed pytorch)的首個版本釋出,pytorch首次有了用于分布式計算的底層庫實作。
3,PyTorch 0.2PyTorch 0.2的時候終于釋出了torch.distributed子產品,它可以允許在不同的機器上交換Tensor的值。使用這個package,你就可以将訓練擴充到多台機器上并且可以使用更大的batch。 這個package使用了MPI風格的程式設計模型,這就意味着它對外提供了一些基本的分布式計算原語(如send、recv、all_reduce等),這些函數可以用來在不同的機器之間交換Tensor的值。
在多台機器之間進行分布式計算首先就意味着一個問題:如何讓多個機器感受到彼此的存在?這就是distributed子產品中的initialization methods,用于使用某種方法來獲得彼此之間的資訊,在這個版本中主要使用3種方法:
- 共享檔案系統 (所有機器的所有程序都要能夠通路該檔案系統);
- IP multicast (需要在同一個區域網路中);
- 環境變量 (手工通知每個程序要聯系的IP port,并為每個程序手工配置設定一個rank);
rank、world size這些概念都是來自MPI,其中world size是總共的程序數量,然後每個程序配置設定一個獨一無二的rank号,值為0和world_size - 1之間。這個rank号就是分布式系統中通信的關鍵。
分布式計算中用于通信的函數有send、recv(同步,異步的isend、irecv)等低層次的操作原語,但是有些分布式計算的模式出現的太頻繁了,于是一些更高層語義的函數被開發出來,通常用于整個程序組并且通信速度更快,比如all_reduce。又由于對于大多數人的訓練任務來說,這些偏向原子的操作還是太低層,并且大家的訓練模式基本都是通用的——比如并行處理多份資料。
于是PyTorch又開發了更高層的helper子產品:DistributedDataParallel。
4,PyTorch 0.4這個版本中,PyTorch增加了DistributedDataParallelCPU子產品,和DistributedDataParallel類似,但主要是支援模型運作在CPU上 (DistributedDataParallel是GPU),并且支援mpi、gloo和tcp這些後端(tcp後端後來被廢除了)。
distribute子產品還增加了一個工具腳本,用來在一個機器或者多個機器上使用DistributedDataParallel:
python -m torch.distributed.launch gemfield_script.py --arg1 --arg2 --arg3
此外,PyTorch正式添加了NCCL2.0作為自己的分布式計算後端,基于NVIDIA的這一CUDA通信庫,GPU之間的通信速度獲得了巨大提高:
torch.distributed.init_process_group("nccl")
5,PyTorch 1.0 全新的C10D庫釋出! 如今C10D(用來代替THD)成為了torch.distributed package和torch.nn.parallel.DistributedDataParallel 包的後端支撐。C10D帶來了如下改變:
- 對于所有的backends(Gloo, NCCL, 和 MPI)都獲得了性能提升(如今都是基于異步操作);
- 對使用以太網通信的機器來說,性能更是獲得了巨大的提升;
- 對torch.distributed 包中的所有distributed collective操作添加了異步支援;
- 對Gloo後端添加了send、recv、reduce、all_gather、gather、scatter支援;
- 對NCCL後端添加了barrier op支援、new_group支援;
此外,對于torch.distributed子產品來說,TCP後端被移除,如今PyTorch推薦使用Gloo和MPI後端用于CPU collectives ,推薦使用NCCL後端用于GPU collectives。還有,舊的基于THD的torch.distributed 包被廢棄!舊的基于THD的torch.nn.parallel.DistributedDataParallel包被廢棄!
6,PyTorch 1.1PyTorch 1.1的時候,nn.parallel.DistributedDataParallel(DDP)可以支援multi-GPU的model了,
data的parallel和model的parallel如今可以協作了!
7,PyTorch 1.3這個版本添加了torch.distributed對macOS的支援,不過隻能使用Gloo後端。
8,PyTorch 1.5新增了torch.distributed.rpc架構。RPC,顧名思義,就是可以在遠端機器上運作函數了。該架構用于克服DDP的不足之處, 對于跨機器的distributed model parallel或者實作 parameter server提供了極大的幫助。更重要的是,除了RPC(遠端調用),該架構還提供了Remote Reference(RRef)、Distributed Autograd、 Distributed Optimizer這四大金剛。總的說來,RPC架構帶來了:
- parameter server training
- distributed model parallelism
- distributed pipeline parallelism
參考
https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html