DistributedDataParallel笔记

    科技2025-04-24  17

    torch1.6好像不支持了

    https://blog.csdn.net/jacke121/article/details/108806050

     

    其他例子:

    https://github.com/bl0/moco/blob/6edee10d2ceb2e70da813f34a19a129c0d6e0204/train.py

    https://github.com/erikwijmans/skynet-ddp-slurm-example/blob/master/ddp_example/train_cifar10.py

    https://github.com/Lance0218/Pytorch-DistributedDataParallel-Training-Tricks

    报错:

    Traceback (most recent call last):   File "F:/project/cls/shanghai_cls/train_1010.py", line 29, in <module>     dist.init_process_group(backend='nccl', init_method='env://') AttributeError: module 'torch.distributed' has no attribute 'init_process_group'

     

      File "D:/zyb/SLS_celian_sample_0709/train_pelee.py", line 187, in <module>     model = DDP(model, device_ids=[local_rank], output_device=local_rank)   File "C:\ProgramData\Anaconda3\lib\site-packages\torch\nn\parallel\distributed.py", line 305, in __init__     self.process_group = _get_default_group() NameError: name '_get_default_group' is not defined

     

    1、报错:

    ERROR: Unexpected bus error encountered in worker. This might be caused by insufficient shared memory (shm).

    是由于在DataLoader中的num_workers设置过大,系统的共享内存不够用。

    Please note that PyTorch uses shared memory to share data between processes, so if torch multiprocessing is used (e.g. for multithreaded data loaders) the default shared memory segment size that container runs with is not enough 所以,减小num_workers的数量。

    2、

    torch.distributed.init_process_group(backend='nccl', init_method="env://")

     

    backend: 不同GPU通信的后端, 实际上是多个机器之间交换数据的协议

    init_method: 机器(GPU)之间交换数据, 需要指定一个主节点, 而这个参数就是指定主节点的

    3、

    torch.utils.data.DataLoader()中,如果设置了sampler参数,那么,shuffle参数的值应该设置为FALSE,这两个参数是互斥的。

     

    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)

    opt.batchSize = int(opt.batchSize / opt.ngpus_per_node)

    train_dataloader = DataLoader(train_dataset, batch_size=opt.batchSize, collate_fn=collafe_fn,

    shuffle=False, drop_last=True, num_workers=4, pin_memory=True, sampler=train_sampler)

    在每个epoch开始,执行:

    train_sampler.set_epoch(epoch)

    原因:

    In distributed mode, calling the set_epoch method is needed to make shuffling work; each process will use the same random seed otherwise.

    4、

    分布式训练的流程,参考这个github:https://github.com/tczhangzhi/pytorch-distributed。

    torch.nn.parallel.DistributedDataParallel()方法封装的模型,forword()函数和backward()函数必须交替执行。如果执行多个(次)forward()然后执行一次backward(), 会报错。但是,在GAN-based 模型中,将多个discriminator的判别结果进行相加,然后再执行backwar()通常是常见,此时,解决方法如下:

    (1)discriminator要同时接受real data 和fake data,并返回关于real data 和 fake data的判别结果,这样就避免了对真假数据的判别需要调用两次discriminator。

    (2)不同的discriminator在torch.nn.parallel.DistributedDataParallel()中,使用process_group参数,封装到不同的进程组中。process_group的参数使用如下的语句来创建:

    torch.distributed.new_group(range(torch.distributed.get_world_size()))

    (3)对于torch.nn.parallel.DistributedDataParallel()封装的模型,在保存checkpoint时,需要注意,不同进程的不要对写同一个文件,否则,在用torch.load()函数加载函数时,会报错:

    RuntimeError: storage has wrong size

    解决办法是:

     只让一个进程去写,因为不同GPU上的模型是一致的。所以,只需要加一个判断语句就行

    if args.local_rank == 0: torch.save()

    ,但是,这样的话,不同的GPU执行的进度就不相同了。是否会遇到问题,等程序运行后,再看看。

     

    5、报错:

    当调用model.train()时,会出现如下的错误

    TypeError: 'NoneType' object is not callable

    原因是,我用了赋值语句:model = model.train()。直接model.train()或者model.eval()即可。

     

    model = DistributedDataParallel(model, device_ids=[args.local_rank], broadcast_buffers=False)

    示例:

    https://github.com/lesliejackson/PyTorch-Distributed-Training

    代码:

    import argparse import time import torch import torchvision from torch import distributed as dist from torchvision.models import resnet18 from torch.utils.data import DataLoader from torchvision.datasets import MNIST from torchvision.transforms import ToTensor from torch.nn.parallel import DistributedDataParallel as DDP from torch.utils.data.distributed import DistributedSampler def reduce_loss(tensor, rank, world_size): with torch.no_grad(): dist.reduce(tensor, dst=0) if rank == 0: tensor /= world_size parser = argparse.ArgumentParser() parser.add_argument('--local_rank', type=int, help="local gpu id") args = parser.parse_args() batch_size = 128 epochs = 5 lr = 0.001 dist.init_process_group(backend='nccl', init_method='env://') torch.cuda.set_device(args.local_rank) global_rank = dist.get_rank() net = resnet18() net.cuda() net = torch.nn.SyncBatchNorm.convert_sync_batchnorm(net) net = DDP(net, device_ids=[args.local_rank], output_device=args.local_rank) data_root = 'dataset' trainset = MNIST(root=data_root, download=True, train=True, transform=ToTensor()) valset = MNIST(root=data_root, download=True, train=False, transform=ToTensor()) sampler = DistributedSampler(trainset) train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=False, pin_memory=True, sampler=sampler) val_loader = DataLoader(valset, batch_size=batch_size, shuffle=False, pin_memory=True) criterion = torch.nn.CrossEntropyLoss() opt = torch.optim.Adam(net.parameters(), lr=lr) net.train() for e in range(epochs): # DistributedSampler deterministically shuffle data # by seting random seed be current number epoch # so if do not call set_epoch when start of one epoch # the order of shuffled data will be always same sampler.set_epoch(e) for idx, (imgs, labels) in enumerate(train_loader): imgs = imgs.cuda() labels = labels.cuda() output = net(imgs) loss = criterion(output, labels) opt.zero_grad() loss.backward() opt.step() reduce_loss(loss, global_rank, world_size) if idx % 10 == 0 and global_rank == 0: print('Epoch: {} step: {} loss: {}'.format(e, idx, loss.item())) net.eval() with torch.no_grad(): cnt = 0 total = len(val_loader.dataset) for imgs, labels in val_loader: imgs, labels = imgs.cuda(), labels.cuda() output = net(imgs) predict = torch.argmax(output, dim=1) cnt += (predict == labels).sum().item() if global_rank == 0: print('eval accuracy: {}'.format(cnt / total))

     

    Processed: 0.012, SQL: 8