天天看点

Pytorch DDP Training (分布式并行训练)

知乎—就是不吃草的羊

01

有三种分布式训练

模型被拆分到不同GPU, 模型太大了,基本用不到

模型放在一个,数据拆分不同GPU,torch.dataparallel

  • 基本不会报bug
  • sync bc要自己准备

模型和数据在不同gpu上各有一份, torch.distributeddataparallel

  • bug多,各进程间数据不共享,访问文件先后不确定,在日志系统,数据集预处理,模型loss放在指定cuda等地方要仔细设计。
  • sync 是pytorch现有的库
  • 原理和效果理论上和 2 一致,都是用更大的batchsize,速度确实比 2 快,好像显著减少了数据to cuda的时
  • 支持多机
  • 卡太多,网络跑的时间短的情况,实际还不如 2

02

原理

增大bs,就会带来增大bs的相关弊端

  • 过拟合,使用warm-up缓解,需要探索一下增大到多少不会影响泛化性
  • 对应倍增学习率,数据一个epoch减少了n倍,和学习率的影响抵消

DP汇总梯度,但是bn是根据单个gpu数据计算的,会有不正确的情况,要用sync bn

map-reduce,每个gpu得到上一个的,传给下一个

  • 一共两轮,第一轮让每个卡上有全部的数据,第二轮让数据同步给所有卡
  • 每次只需要1/N的数据,需要2N-2次,所以理论上与GPU个数无关

模型buffer, 不是参数,其优化不是反向传播而是其他途径,如bn的variance 和 moving mean

03

  • 可以调用dist来查看当前的rank,之后log等不需要重复的任务都在rank=0进行
  • 默认不用时候rank=0
  • 先用一张卡debug
  • 使用wandb的话,需要显式调用wandb.finish()
import torch.distributed as distfrom torch.nn.parallel import DistributedDataParallel as DDPimport torch.multiprocessing as mpdef demo_fn(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    # lots of code.
    if dist.get_rank() == 0:
        train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
    trainloader = torch.utils.data.DataLoader(my_trainset, 
                batch_size=16, num_workers=2, sampler=train_sampler)


    model = ToyModel()#.to(local_rank)
    # DDP: Load模型要在构造DDP模型之前,且只需要在master上加载就行了。
    # ckpt_path = None
    # if dist.get_rank() == 0 and ckpt_path is not None:
    #    model.load_state_dict(torch.load(ckpt_path))
    model = DDP(model, device_ids=[local_rank], output_device=local_rank)
        # DDP:需要注意的是,这里的batch_size指的是每个进程下的batch_size。
        #      也就是说,总batch_size是这里的batch_size再乘以并行数(world_size)。


    # torch.cuda.set_device(local_rank)
    # dist.init_process_group(backend='nccl')
    loss_func = nn.CrossEntropyLoss().to(local_rank)
    trainloader.sampler.set_epoch(epoch)
    data, label = data.to(local_rank), label.to(local_rank)if dist.get_rank() == 0:
        torch.save(model.module.state_dict(), "%d.ckpt" % epoch)
        def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)