天天看點

batch size 訓練時間_pytorch一機多卡訓練

batch size 訓練時間_pytorch一機多卡訓練

1. 一機多卡(one matchine multi-GPU)

1.1 DataParallel

DataParallel(DP):Parameter Server模式,一張卡位reducer,實作也超級簡單,一行代碼。 有個不能接受的缺陷是:DataParallel是基于Parameter server的算法,所有的loss都在主卡上計算,負載不均衡的問題比較嚴重,有時在模型較大的時候(比如bert-large),主卡占滿了,其他的卡一半都占不到,及其浪費資源。

示例代碼:
# coding=utf-8

import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader

class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size)

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len

class Model(nn.Module):
    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        return output

input_size = 5
output_size = 2
batch_size = 30
data_size = 30

dataset = RandomDataset(input_size, data_size)
rand_loader = DataLoader(dataset=dataset,
                         batch_size=batch_size, shuffle=True)
model = Model(input_size, output_size)

if torch.cuda.is_available():
    model.cuda()

if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model)  # 關鍵代碼

for data in rand_loader:
    if torch.cuda.is_available():
        input_var = Variable(data.cuda())
    else:
        input_var = Variable(data)
    output = model(input_var)
           

1.2 DistributedDataParallel

是的,你沒有看錯,這個函數是為了分布式訓練設計的。但是,即使在單機多卡上,官方也建議使用新的DistributedDataParallel,采用all-reduce算法。

(1)初始化後端

torch.cuda.set_device(args.local_rank)
torch.distributed.init_process_group(backend='nccl', init_method='env://')
           

(2)模型并行化 這裡也很簡單,使用DistributedDataParallel函數warp一下就可以:

model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)
           

(3)資料并行 這裡需要注意,如果指定了sampler,則shuffle=False,其中DataLoader的num_worker是每一個卡獨立設定。

dataset = RandomDataset(input_size, data_size)
sampler = torch.utils.data.distributed.DistributedSampler(dataset)
rand_loader = DataLoader(dataset=dataset,
                         batch_size=batch_size, shuffle=False, sampler=sampler)
           

(4)啟動腳本

python -m torch.distributed.launch --nproc_per_node=8 train_face_troch.py
           

完整代碼示例:

# coding=utf-8

import torch
import torch.distributed
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import apex
import argparse

class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size)
        self.label = torch.mean(self.data, dim=-1)

    def __getitem__(self, index):
        return self.data[index], self.label[index]

    def __len__(self):
        return self.len

class Model(nn.Module):
    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        return output

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--local_rank', default=0, type=int)
    args = parser.parse_args()
    return args

input_size = 5
output_size = 2
batch_size = 30
data_size = 30

args = parse_args()
local_rank = args.local_rank

torch.cuda.set_device(local_rank) # 設定cuda的預設GPU,每個rank不同
torch.distributed.init_process_group(backend='nccl', init_method='env://')

dataset = RandomDataset(input_size, data_size)
sampler = torch.utils.data.distributed.DistributedSampler(dataset)
rand_loader = DataLoader(dataset=dataset,
                         batch_size=batch_size, shuffle=False, sampler=sampler)

model = Model(input_size, output_size)

if torch.cuda.is_available():
    model.cuda()

model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)

optimizer = torch.optim.Adam(model.parameters())
criterion = torch.nn.CrossEntropyLoss()

# if torch.cuda.device_count() > 1:
#    model = nn.DataParallel(model)

for data, label in rand_loader:
    data = data.cuda()
    label = label.cuda()

    output = model(data)
    loss = criterion(output, label)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
           

1.3 DistributedDataParallel + apex

大規模資料訓練時,混合精度訓練時必須的,這速度誰用誰知道。 參考: https://github.com/NVIDIA/apex/tree/master/examples/simple/distributed

這裡主要需要改兩個地方,一個是amp.initialize這個函數,一個是backward。

# coding=utf-8

import torch
import torch.distributed
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import apex
import argparse

class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size)
        self.label = torch.mean(self.data, dim=-1)

    def __getitem__(self, index):
        return self.data[index], self.label[index]

    def __len__(self):
        return self.len

class Model(nn.Module):
    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        return output

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--local_rank', default=0, type=int)
    args = parser.parse_args()
    return args

input_size = 5
output_size = 2
batch_size = 30
data_size = 30

args = parse_args()
local_rank = args.local_rank
# 初始化
torch.cuda.set_device(local_rank)
torch.distributed.init_process_group(backend='nccl', init_method='env://')

dataset = RandomDataset(input_size, data_size)
sampler = torch.utils.data.distributed.DistributedSampler(dataset)
rand_loader = DataLoader(dataset=dataset,
                         batch_size=batch_size, shuffle=False, sampler=sampler)

model = Model(input_size, output_size)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)

optimizer = torch.optim.Adam(model.parameters())

model, optimizer = amp.initialize(model, optimizer, opt_level='O1')  # 這裡是字母O

criterion = torch.nn.CrossEntropyLoss()

if torch.cuda.is_available():
    model.cuda()

# if torch.cuda.device_count() > 1:
#     model = nn.DataParallel(model)

for data, label in rand_loader:
    data = data.cuda()
    label = label.cuda()

    output = model(data)
    loss = criterion(output, label)

    optimizer.zero_grad()
    #loss.backward()
    with amp.scale_loss(loss, optimizer) as scaled_loss:
        scaled_loss.backward()
    optimizer.step()
           

1.4 其他的問題

  1. torch.load會根據之前儲存的參數的GPU資訊加載到對應的GPU上,但是在DistributedDataParallel 模式下需要加載到不同的GPU中。是以在torch.load的參數可以做如下設定
torch.load(params_path, map_location=lambda storge, loc: storge.cuda(self.local_rank))
           

2. 多機多卡(multi-matchine multi-GPU)

comming soon