PyTorch 可以通過 torch.nn.DataParallel 直接切分資料并行在單機多卡上,實踐證明這個接口并行力度并不盡如人意,主要問題在于資料在 master 上處理然後下發到其他 slaver 上訓練,而且由于 GIL 的存在隻有計算是并行的。torch.distributed 提供了更好的接口和并行方式,搭配多程序接口 torch.multiprocessing 可以提供更加高效的并行訓練。
GIL含義解釋
多程序
- 我們都知道由于 GIL 的存在, python 要想真正的并行必須使用多程序,IO頻繁可以勉強使用多線程。torch.nn.DataParallel 全局隻有一個程序,受到了 GIL 的限制,是以肯定會拖累并行的力度。
- python 自帶的 multiprocessing 是多程序常用的實作,但是有一個巨大的問題,不支援 CUDA,是以我們使用GPU訓練的時候不能用這個包,需要使用 PyTorch 提供的 torch.multiprocessing。它提供了和 multiprocessing 幾乎一樣的接口,是以用起來也比較友善。
torch.distributed 可以通過 torch.distributed.launch 啟動多卡訓練,也可以使用 torch.multiprocessing 手動送出多程序并行。
我們分别介紹torch.distributed.launch 和 torch.multiprocessing
(1)torch.distributed.launch
(2)torch.multiprocessing
分布式訓練
torch.distributed 提供了和通用分布式系統常見的類似概念。
In the single-machine synchronous case, torch.distributed or the torch.nn.parallel.DistributedDataParallel() wrapper may still have advantages over other approaches to data-parallelism, including torch.nn.DataParallel():也就是說orch.distributed 和 the torch.nn.parallel.DistributedDataParallel() wrapper更有效率,具體原因見:
DISTRIBUTED COMMUNICATION PACKAGE - TORCH.DISTRIBUTED
1.初始化
初始化操作一般在程式剛開始的時候進行
在調用任何其他方法之前,需要使用torch.distributed.init_process_group()函數對包進行初始化。這會阻塞所有程序,直到所有程序都已連接配接。
兩種初始化方式
1.Specify store, rank, and world_size explicitly.
2.Specify init_method (a URL string) which indicates where/how to discover peers. Optionally specify rank and world_size, or encode all required parameters in the URL and omit them.
If neither is specified, init_method is assumed to be “env://”.
backend (str or Backend) – 。根據建構時配置,有效值包括mpi、gloo和nccl。該字段應該以小寫字元串的形式給出(例如,“gloo”),它也可以通過後端屬性通路(例如,back . gloo)。如果在每台機器上使用nccl後端多個程序,那麼每個程序必須獨占通路它使用的每個GPU,因為在程序之間共享GPU可能會導緻死鎖。
根據官網的介紹, 如果是使用cpu的分布式計算, 建議使用gloo, 因為表中可以看到 gloo對cpu的支援是最好的, 然後如果使用gpu進行分布式計算, 建議使用nccl, 實際測試中我也感覺到, 當使用gpu的時候, nccl的效率是高于gloo的. 根據部落格和官網的态度, 好像都不怎麼推薦在多gpu的時候使用mpi
對于後端選擇好了之後, 我們需要設定一下網絡接口, 因為多個主機之間肯定是使用網絡進行交換, 那肯定就涉及到ip之類的, 對于nccl和gloo一般會自己尋找網絡接口, 但是某些時候, 比如我測試用的伺服器, 不知道是系統有點古老, 還是網卡比較多, 需要自己手動設定. 設定的方法也比較簡單, 在Python的代碼中, 使用下面的代碼進行設定就行:
import os
# 以下二選一, 第一個是使用gloo後端需要設定的, 第二個是使用nccl需要設定的
os.environ['GLOO_SOCKET_IFNAME'] = 'eth0'
os.environ['NCCL_SOCKET_IFNAME'] = 'eth0'
我們怎麼知道自己的網絡接口呢, 打開指令行, 然後輸入ifconfig, 然後找到那個帶自己ip位址的就是了, 我見過的一般就是em0, eth0, esp2s0之類的, 當然具體的根據你自己的填寫. 如果沒裝ifconfig, 輸入指令會報錯, 但是根據報錯提示安裝一個就行了.
init_method (str, optional) –指定如何初始化程序組的URL。如果沒有指定init_method或store,Default是“env://”。與store互相排斥。
初始化init_method的方法有兩種, 一種是使用TCP進行初始化, 另外一種是使用共享檔案系統進行初始化:
使用TCP初始化:
import torch.distributed as dist
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
rank=rank, world_size=world_size)
注意這裡使用格式為tcp://ip:端口号, 首先ip位址是你的主節點的ip位址, 也就是rank參數為0的那個主機的ip位址, 然後再選擇一個空閑的端口号, 這樣就可以初始化init_method了.
使用共享檔案系統初始化
好像看到有些人并不推薦這種方法, 因為這個方法好像比TCP初始化要沒法, 搞不好和你硬碟的格式還有關系, 特别是window的硬碟格式和Ubuntu的還不一樣, 我沒有測試這個方法, 看代碼:
import torch.distributed as dist
dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile',
rank=rank, world_size=world_size)
根據官網介紹, 要注意提供的共享檔案一開始應該是不存在的, 但是這個方法又不會在自己執行結束删除檔案, 是以下次再進行初始化的時候, 需要手動删除上次的檔案, 是以比較麻煩, 而且官網給了一堆警告, 再次說明了這個方法不如TCP初始化的簡單.
world_size (int, optional) -參與作業的程序數。如果指定了store,則必須指定它。
rank (int, optional) – Rank of the current process (it should be a number between 0 and world_size-1). Required if store is specified.
表示程序式号,用于程序間通信,可以用于表示程序的優先級。一般設定 rank=0 的主機為 master 節點。
store (Store, optional) – (int, optional) –Key/value store accessible to all workers, used to exchange connection/address information. Mutually exclusive with init_method.
timeout (timedelta, optional) –預設值為30分鐘。這适用于gloo後端。對于nccl,隻有當環境變量NCCL_BLOCKING_WAIT或NCCL_ASYNC_ERROR_HANDLING設定為1時,這才适用。當設定了NCCL_BLOCKING_WAIT時,這時程序将阻塞并等待集合在抛出異常之前完成的持續時間。當設定了NCCL_ASYNC_ERROR_HANDLING時,這時集合将異步中止并且程序将崩潰的持續時間。NCCL_BLOCKING_WAIT将向使用者提供可以捕獲和處理的錯誤,但由于其阻塞性質,它有性能開銷。另一方面,NCCL_ASYNC_ERROR_HANDLING的性能開銷很少,但在出現錯誤時會使程序崩潰。這是因為CUDA執行是異步的,并且繼續執行使用者代碼不再安全,因為失敗的異步NCCL操作可能導緻後續的CUDA操作在損壞的資料上運作。應該隻設定這兩個環境變量中的一個。
group_name (str, optional, deprecated) – 組名
local_rank:程序内 GPU 編号,非顯式參數,由 torch.distributed.launch 内部指定。比方說, rank=3,local_rank=0 表示第 3 個程序内的第 1 塊 GPU。
注意初始化rank和world_size
你需要確定, 不同機器的rank值不同, 但是主機的rank必須為0, 而且使用init_method的ip一定是rank為0的主機, 其次world_size是你的程序數量, 你不能随便設定這個數值,它的值一般設定為每個節點的gpu卡個數乘以節點個數。.
初始化中一些需要注意的地方
首先是代碼的統一性, 所有的節點上面的代碼, 建議完全一樣, 不然有可能會出現一些問題, 其次, 這些初始化的參數強烈建議通過argparse子產品(指令行參數的形式)輸入, 不建議寫死在代碼中, 也不建議使用pycharm之類的IDE進行代碼的運作, 強烈建議使用指令行直接運作.
多機的啟動方式可以是直接傳遞參數并在代碼内部解析環境變量,或者通過torch.distributed.launch來啟動,兩者在格式上面有一定的差別,總之要保證代碼與啟動方式對應。
例如使用下面的指令運作代碼distributed.py:
在代碼上添加如下:
待驗證的指令行
上面的代碼是在主節點上運作, 是以設定rank為0, 同時設定了使用兩個主機, 在從節點運作的時候, 輸入的代碼是下面這樣:
這裡的rank其實是node_rank,指的是節點編号或者是機器編号,具體概念見rank local_rank node node_rank等的概念
待驗證的指令行(torch.distributed.launch啟動)
Node 1: (IP: 192.168.1.1, and has a free port: 1234)
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node_rank=0 --master_addr="192.168.1.1"
--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
Node 2:
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node_rank=1 --master_addr="192.168.1.1"
--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
一定要注意的是, 隻能修改rank的值, 其他的值一律不得修改, 否則程式就卡死了初始化到這裡也就結束了.
torch.distributed.launch
在torch.distributed當中提供了一個用于啟動的程式torch.distributed.launch,此幫助程式可用于為每個節點啟動多個程序以進行分布式訓練,它在每個訓練節點上産生多個分布式訓練程序。這個工具可以用作CPU或者GPU,如果被用于GPU,每個GPU産生一個程序進行訓練。
該工具既可以用來做單節點多GPU訓練,也可用于多節點多GPU訓練。如果是單節點多GPU,将會在單個GPU上運作一個分布式程序,據稱可以非常好地改進單節點訓練性能。如果用于多節點分布式訓練,則通過在每個節點上産生多個程序來獲得更好的多節點分布式訓練性能。如果有Infiniband接口則加速比會更高。
在單節點分布式訓練或多節點分布式訓練的兩種情況下,該工具将為每個節點啟動給定數量的程序(–nproc_per_node)。如果用于GPU教育訓練,則此數字需要小于或等于目前系統上的GPU數量(nproc_per_node),并且每個程序将在從GPU 0到GPU(nproc_per_node - 1)的單個GPU上運作。
Launch utility
1.對于單機多卡-這裡針對torch.distributed.launch 通過–nproc_per_node=5啟動多程序
因為是用torch.distributed.launch 啟動,故隻需要設定rank的初始值(一個節點一個初始值)這種解釋對不對??。
注意:讨論下torch.distributed.init_process_group 中的word_size,讓其等于torch.cuda.device_count(),那就是一個GPU一個程序,屬于單機多程序,一個程序占用一個GPU。設定為1時,一個程序占用了多個GPU,我覺得這種解釋是有問題的
結論:用torch.distributed.launch啟動時是通過nproc_per_node=5來指定程序數量的,那麼在torch.distributed.init_process_group中指定其值是不是不管用了。指定為1或者torch.cuda.device_count()都是可以的,不影響程序的總數??
單程序多卡
os.environ['CUDA_VISIBLE_DEVICES'] = "0,1,2,3,4"
torch.distributed.init_process_group(backend='nccl', init_method='tcp://localhost:23456', rank=0, world_size=1)
data_set = torchvision.datasets.MNIST('~/DATA/', train=True, transform=trans, target_transform=None, download=True)
train_sampler = torch.utils.data.distributed.DistributedSampler(data_set)
data_loader_train = torch.utils.data.DataLoader(dataset=data_set, batch_size=256, sampler=train_sampler, num_workers=16, pin_memory=True)
net = torchvision.models.resnet101(num_classes=10)
net.conv1 = torch.nn.Conv1d(1, 64, (7, 7), (2, 2), (3, 3), bias=False)
net = net.cuda()
# net中不需要指定裝置!
net = torch.nn.parallel.DistributedDataParallel(net)
運作:
python python demo.py --arg1 --arg2 --arg3
其他都一樣
單機多程序多卡(一個程序占用一個GPU)
parser.add_argument("--local_rank", type=int) # 增加local_rank
torch.cuda.set_device(args.local_rank)
os.environ['CUDA_VISIBLE_DEVICES'] = "0,1,2,3,4"
torch.distributed.init_process_group(backend='nccl', init_method='tcp://localhost:23456', rank=0, world_size=torch.cuda.device_count())或者
dist.init_process_group("nccl", init_method='env://') # init_method方式修改
data_set = torchvision.datasets.MNIST('~/DATA/',train=True, transform=trans, target_transform=None, download=True)
data_loader_train = torch.utils.data.DataLoader(dataset=data_set, batch_size=256,sampler=train_sampler, num_workers=16,pin_memory=True)
net = torchvision.models.resnet101(num_classes=10)
net = net.cuda()
# DDP 輸出方式修改:
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[args.local_rank],output_device=args.local_rank)
criterion = torch.nn.CrossEntropyLoss()
opt = torch.optim.Adam(net.parameters(), lr=0.001)
for epoch in range(1):
for i, data in enumerate(data_loader_train):
images, labels = data
# 要将資料送入指定的對應的gpu中
images.to(args.local_rank, non_blocking=True)
labels.to(args.local_rank, non_blocking=True)
opt.zero_grad()
outputs = net(images)
loss = criterion(outputs, labels)
loss.backward()
opt.step()
if i % 10 == 0:
print("loss: {}".format(loss.item()))
啟動:
python -m torch.distributed.launch --nproc_per_node=8
--nnodes=1 --node_rank=0 --master_addr="192.168.1.1"
--master_port=12355 MNIST.py
注: 這裡如果使用了argparse, 一定要在參數裡面加上–local_rank, 否則運作還是會出錯的。它是非顯式參數,在使用torch.distributed.launch 啟動時,會産生–local_rank參數。是以在參數裡面包含這個代碼就行,不需要指派,也不需要在指令行中對其指派
parser.add_argument("--local_rank",type=int)
這一參數的作用是為各個程序配置設定rank号,是以可以直接使用這個local_rank參數作為torch.distributed.init_process_group()當中的參數rank,同時也可以作為model = DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank)
**注:**還需要注意的是, 如果使用這句代碼, 直接在pycharm或者别的編輯器中,是沒法正常運作的, 因為這個需要在shell的指令行中運作, 如果想要正确執行這段代碼, 假設這段代碼的名字是main.py,用一下指令行:
python -m torch.distributed.launch --nproc_per_node=5 main.py
以下試驗會報錯:
torch.distributed.init_process_group(backend="nccl")
model = DistributedDataParallel(model) # device_ids will include all GPU devices by default
2.多機多卡-多程序訓練(每個程序一個gpu)
步驟:
import torch
torch.multiprocessing.set_start_method('spawn')
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel
import os
os.environ['SLURM_NTASKS'] #可用作world size
os.environ['SLURM_NODEID'] #node id
os.environ['SLURM_PROCID'] #可用作全局rank
os.environ['SLURM_LOCALID'] #local_rank
os.environ['SLURM_STEP_NODELIST'] #從中取得一個ip作為
通訊ip
def dist_init(host_addr, rank, local_rank, world_size, port=23456):
host_addr_full = 'tcp://' + host_addr + ':' + str(port)
torch.distributed.init_process_group("nccl", init_method=host_addr_full,rank=rank, world_size=world_size)
num_gpus = torch.cuda.device_count()
torch.cuda.set_device(local_rank)
assert torch.distributed.is_initialized()
rank = int(os.environ['SLURM_PROCID'])
local_rank = int(os.environ['SLURM_LOCALID'])
world_size = int(os.environ['SLURM_NTASKS'])
# get_ip函數自己寫一下 不同伺服器這個字元串形式不一樣
# 保證所有task拿到的是同一個ip就成
ip = get_ip(os.environ['SLURM_STEP_NODELIST'])
dist_init(ip, rank, local_rank, world_size)
# 接下來是寫dataset和dataloader,這個網上有很多教程
# 我這給的也隻是個形式,按自己需求寫好就ok
dataset = your_dataset() #主要是把這寫好
datasampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, batch_size=batch_size_per_gpu, sampler=source_sampler)
model = your_model() #也是按自己的模型寫
if torch.cuda.device_count() > 1:
print("Let's use", torch.cuda.device_count(), "GPUs!")
model = DistributedDataPrallel(model.cuda(), device_ids=[local_rank])
#srun指定-n 程序總數以及 --ntasks-per-node 每個節點程序數,這樣就可以通過os.environ獲得每個程序的節點ip資訊,全局rank以及local rank
# 這裡是3台機器,每台機器8張卡的樣子, slrum指令寫這樣:
srun -n24 --gres=gpu:8 --ntasks-per-node=8 python train.py
PyTorch分布式訓練基礎–DDP使用
DDP中有關于多機多卡
上面這兩個連結可探讨關于使用DDP實作多機多卡訓練的步驟,對照使用。
第一個裡面有好幾種情況。單程序多卡,單機多程序多卡,不同的啟動方式。
PyTorch 多程序分布式訓練實戰
這篇中使用的啟動方式mp.spawn,而且隻用了torch.distributed實作分布式訓練,沒有用DDP,其中spawn啟動方式結合和第一篇中的spawn啟動方式,代碼寫作相結合實作無DDP的多分布式訓練。
Pytoch分布式多機多卡的啟動方式詳解
Distribution is all you need
在[Distribution is all you need]中實作了以下幾種方式的多卡訓練:
1.nn.DataParallel 簡單友善的 nn.DataParallel
2.torch.distributed 使用 torch.distributed 加速并行訓練
3.torch.multiprocessing 使用 torch.multiprocessing 取代啟動器
4.apex 使用 apex 再加速
5.horovod horovod 的優雅實作
6.slurm GPU 叢集上的分布式
7.補充:分布式 evaluation
我在機器中測試了2确實可行,其他并沒有。
最複雜的實作方法應該是不需要nn.parallel.DistributedDataParallel()
采用:這上面的方法,但不全
mp.spawn啟動方式代替launch啟動方式的最全代碼