天天看點

Pytorch DDP Training (分布式并行訓練)

知乎—就是不吃草的羊

01

有三種分布式訓練

模型被拆分到不同GPU, 模型太大了,基本用不到

模型放在一個,資料拆分不同GPU,torch.dataparallel

  • 基本不會報bug
  • sync bc要自己準備

模型和資料在不同gpu上各有一份, torch.distributeddataparallel

  • bug多,各程序間資料不共享,通路檔案先後不确定,在日志系統,資料集預處理,模型loss放在指定cuda等地方要仔細設計。
  • sync 是pytorch現有的庫
  • 原理和效果理論上和 2 一緻,都是用更大的batchsize,速度确實比 2 快,好像顯著減少了資料to cuda的時
  • 支援多機
  • 卡太多,網絡跑的時間短的情況,實際還不如 2

02

原理

增大bs,就會帶來增大bs的相關弊端

  • 過拟合,使用warm-up緩解,需要探索一下增大到多少不會影響泛化性
  • 對應倍增學習率,資料一個epoch減少了n倍,和學習率的影響抵消

DP彙總梯度,但是bn是根據單個gpu資料計算的,會有不正确的情況,要用sync bn

map-reduce,每個gpu得到上一個的,傳給下一個

  • 一共兩輪,第一輪讓每個卡上有全部的資料,第二輪讓資料同步給所有卡
  • 每次隻需要1/N的資料,需要2N-2次,是以理論上與GPU個數無關

模型buffer, 不是參數,其優化不是反向傳播而是其他途徑,如bn的variance 和 moving mean

03

  • 可以調用dist來檢視目前的rank,之後log等不需要重複的任務都在rank=0進行
  • 預設不用時候rank=0
  • 先用一張卡debug
  • 使用wandb的話,需要顯式調用wandb.finish()
import torch.distributed as distfrom torch.nn.parallel import DistributedDataParallel as DDPimport torch.multiprocessing as mpdef demo_fn(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    # lots of code.
    if dist.get_rank() == 0:
        train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
    trainloader = torch.utils.data.DataLoader(my_trainset, 
                batch_size=16, num_workers=2, sampler=train_sampler)


    model = ToyModel()#.to(local_rank)
    # DDP: Load模型要在構造DDP模型之前,且隻需要在master上加載就行了。
    # ckpt_path = None
    # if dist.get_rank() == 0 and ckpt_path is not None:
    #    model.load_state_dict(torch.load(ckpt_path))
    model = DDP(model, device_ids=[local_rank], output_device=local_rank)
        # DDP:需要注意的是,這裡的batch_size指的是每個程序下的batch_size。
        #      也就是說,總batch_size是這裡的batch_size再乘以并行數(world_size)。


    # torch.cuda.set_device(local_rank)
    # dist.init_process_group(backend='nccl')
    loss_func = nn.CrossEntropyLoss().to(local_rank)
    trainloader.sampler.set_epoch(epoch)
    data, label = data.to(local_rank), label.to(local_rank)if dist.get_rank() == 0:
        torch.save(model.module.state_dict(), "%d.ckpt" % epoch)
        def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)