信息发布→ 登录 注册 退出

PyTorch 并行训练 DistributedDataParallel 完整代码示例

发布时间:2023-04-10

点击量:

使用大型数据集训练大型深度神经网络 (DNN) 的问题是深度学习领域的主要挑战。 随着 DNN 和数据集规模的增加,训练这些模型的计算和内存需求也会增加。 这使得在计算资源有限的单台机器上训练这些模型变得困难甚至不可能。 使用大型数据集训练大型 DNN 的一些主要挑战包括:

  • 训练时间长:训练过程可能需要数周甚至数月才能完成,具体取决于模型的复杂性和数据集的大小。
  • 内存限制:大型 DNN 可能需要大量内存来存储训练期间的所有模型参数、梯度和中间激活。 这可能会导致内存不足错误并限制可在单台机器上训练的模型的大小。

为了应对这些挑战,已经开发了各种技术来扩大具有大型数据集的大型 DNN 的训练,包括模型并行性、数据并行性和混合并行性,以及硬件、软件和算法的优化。

在本文中我们将演示使用 PyTorch 的数据并行性和模型并行性。

我们所说的并行性一般是指在多个gpu,或多台机器上训练深度神经网络(dnn),以实现更少的训练时间。数据并行背后的基本思想是将训练数据分成更小的块,让每个GPU或机器处理一个单独的数据块。然后将每个节点的结果组合起来,用于更新模型参数。在数据并行中,模型体系结构在每个节点上是相同的,但模型参数在节点之间进行了分区。每个节点使用分配的数据块训练自己的本地模型,在每次训练迭代结束时,模型参数在所有节点之间同步。这个过程不断重复,直到模型收敛到一个令人满意的结果。

下面我们用用ResNet50和CIFAR10数据集来进行完整的代码示例:

在数据并行中,模型架构在每个节点上保持相同,但模型参数在节点之间进行了分区,每个节点使用分配的数据块训练自己的本地模型。

PyTorch的DistributedDataParallel 库可以进行跨节点的梯度和模型参数的高效通信和同步,实现分布式训练。本文提供了如何使用ResNet50和CIFAR10数据集使用PyTorch实现数据并行的示例,其中代码在多个gpu或机器上运行,每台机器处理训练数据的一个子集。训练过程使用PyTorch的DistributedDataParallel 库进行并行化。

导入必须要的库

import os
 from datetime import datetime
 from time import time
 import argparse
 import torchvision
 import torchvision.transforms as transforms
 import torch
 import torch.nn as nn
 import torch.distributed as dist
 from torch.nn.parallel import DistributedDataParallel

接下来,我们将检查GPU。

import subprocess
 result = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE)
 print(result.stdout.decode())

因为我们需要在多个服务器上运行,所以手动一个一个执行并不现实,所以需要有一个调度程序。这里我们使用SLURM文件来运行代码(slurm面向Linux和Unix类似内核的免费和开源工作调度程序),

def main():
 
 # get distributed configuration from Slurm environment
 
 parser = argparse.ArgumentParser()
 parser.add_argument('-b', '--batch-size', default=128, type =int,
 help='batch size. it will be divided in mini-batch for each worker')
 parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
 help='number of total epochs to run')
 parser.add_argument('-c','--checkpoint', default=None, type=str,
 help='path to checkpoint to load')
 args = parser.parse_args()
 
 rank = int(os.environ['SLURM_PROCID'])
 local_rank = int(os.environ['SLURM_LOCALID'])
 size = int(os.environ['SLURM_NTASKS'])
 master_addr = os.environ["SLURM_SRUN_COMM_HOST"]
 port = "29500"
 node_id = os.environ['SLURM_NODEID']
 ddp_arg = [rank, local_rank, size, master_addr, port, node_id]
 train(args, ddp_arg)

然后,我们使用DistributedDataParallel 库来执行分布式训练。

def train(args, ddp_arg):
 
 rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg
 
 # display info
 if rank == 0:
 #print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
 print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
 #print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
 
 print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
 
 
 # configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
 #dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
 dist.init_process_group(
 backend='nccl',
 init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
 world_size=size,
 rank=rank
)
 
 # distribute model
 torch.cuda.set_device(local_rank)
 gpu = torch.device("cuda")
 #model = ResNet18(classes=10).to(gpu)
 model = torchvision.models.resnet50(pretrained=False).to(gpu)
 ddp_model = DistributedDataParallel(model, device_ids=[local_rank])
 if args.checkpoint is not None:
 map_location = {'cuda:%d' % 0: 'cuda:%d' % local_rank}
 ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
 
 # distribute batch size (mini-batch)
 batch_size = args.batch_size
 batch_size_per_gpu = batch_size // size
 
 # define loss function (criterion) and optimizer
 criterion = nn.CrossEntropyLoss()
 optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4)
 
 
 transform_train = transforms.Compose([
 transforms.RandomCrop(32, padding=4),
 transforms.RandomHorizontalFlip(),
 transforms.ToTensor(),
 transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2025, 0.1994, 0.2010)),
])
 
 # load data with distributed sampler
 #train_dataset = torchvision.datasets.CIFAR10(root='./data',
 # train=True,
 # transform=transform_train,
 # download=False)
 
 # load data with distributed sampler
 train_dataset = torchvision.datasets.CIFAR10(root='./data',
train=True,
transform=transform_train,
download=False)
 
 train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
 num_replicas=size,
 rank=rank)
 
 train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size_per_gpu,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=train_sampler)
 
 # training (timers and display handled by process 0)
 if rank == 0: start = datetime.now()
 total_step = len(train_loader)
 
 for epoch in range(args.epochs):
 if rank == 0: start_dataload = time()
 
 for i, (images, labels) in enumerate(train_loader):
 
 # distribution of images and labels to all GPUs
 images = images.to(gpu, non_blocking=True)
 labels = labels.to(gpu, non_blocking=True)
 
 if rank == 0: stop_dataload = time()
 
 if rank == 0: start_training = time()
 
 # forward pass
 outputs = ddp_model(images)
 loss = criterion(outputs, labels)
 
 # backward and optimize
 optimizer.zero_grad()
 loss.backward()
 optimizer.step()
 
 if rank == 0: stop_training = time()
 if (i + 1) % 10 == 0 and rank == 0:
 print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch + 1, args.epochs,
 i + 1, total_step, loss.item(), (stop_dataload - start_dataload)*1000,
(stop_training - start_training)*1000))
 if rank == 0: start_dataload = time()
 
 #Save checkpoint at every end of epoch
 if rank == 0:
 torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
 
 if rank == 0:
 print(">>> Training complete in: " + str(datetime.now() - start))
 
 
 if __name__ == '__main__':
 
 main()

代码将数据和模型分割到多个gpu上,并以分布式的方式更新模型。下面是代码的一些解释:

train(args, ddp_arg)有两个参数,args和ddp_arg,其中args是传递给脚本的命令行参数,ddp_arg包含分布式训练相关参数。

rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg:解包ddp_arg中分布式训练相关参数。

如果rank为0,则打印当前使用的gpu数量和主节点IP地址信息。

dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) :使用NCCL后端初始化分布式进程组。

torch.cuda.set_device(local_rank):为这个进程选择指定的GPU。

model = torchvision.models. ResNet50 (pretrained=False).to(gpu):从torchvision模型中加载ResNet50模型,并将其移动到指定的gpu。

ddp_model = DistributedDataParallel(model, device_ids=[local_rank]):将模型包装在DistributedDataParallel模块中,也就是说这样我们就可以进行分布式训练了

加载CIFAR-10数据集并应用数据增强转换。

train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank):创建一个DistributedSampler对象,将数据集分割到多个gpu上。

train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler):创建一个DataLoader对象,数据将批量加载到模型中,这与我们平常训练的步骤是一致的只不过是增加了一个分布式的数据采样DistributedSampler。

为指定的epoch数训练模型,以分布式的方式使用optimizer.step()更新权重。

rank0在每个轮次结束时保存一个检查点。

rank0每10个批次显示损失和训练时间。

结束训练时打印训练模型所花费的总时间也是在rank0上。

代码测试

在使用1个节点1/2/3/4个gpu, 2个节点6/8个gpu,每个节点3/4个gpu上进行了训练Cifar10上的Resnet50的测试如下图所示,每次测试的批处理大小保持不变。完成每项测试所花费的时间以秒为单位记录。随着使用的gpu数量的增加,完成测试所需的时间会减少。当使用8个gpu时,需要320秒才能完成,这是记录中最快的时间。这是肯定的,但是我们可以看到训练的速度并没有像GPU数量增长呈现线性的增长,这可能是因为Resnet50算是一个比较小的模型了,并不需要进行并行化训练。

☞☞☞AI 智能聊天, 问答助手, AI 智能搜索, 免费无限量使用 DeepSeek R1 模型☜☜☜

在多个gpu上使用数据并行可以显著减少在给定数据集上训练深度神经网络(DNN)所需的时间。随着gpu数量的增加,完成训练过程所需的时间减少,这表明DNN可以更有效地并行训练。

这种方法在处理大型数据集或复杂的DNN架构时特别有用。通过利用多个gpu,可以加快训练过程,实现更快的模型迭代和实验。但是需要注意的是,通过Data Parallelism实现的性能提升可能会受到通信开销和GPU内存限制等因素的限制,需要仔细调优才能获得最佳结果。


相关文章: 常用AI工具,高效智能生活  怎么用AI写出高质量科普文章?揭秘新时代创作利器!  SEM和SEO哪个好?深度解析两者的优势与适用场景  AI赋能漫画创作,历史模型攻略揭秘新境界,130306088ai  SEO与SEM:开启数字营销的新篇章  让“润色”更智能,人工智能助力内容创作新革命  揭秘OpenAI模型参数,人工智能核心要素深度解析,ai调画质  文心一言配音秘籍,揭秘打造动听声线的核心技巧,蜜蜂ai图片  文心一言,跨界融合开启创作新,ai水纹素材  未来科技革新,智能大型AI模型软件引领新,美国ai设置  豆包AI,全能助手,聊天与表格制作两不误,ai大模型私有化部署  AI原创文章生成系统:助力内容创作的新革命  AI助力背景添加,模型美化实用技巧一网打尽,南宁AI展览  让你的文案更具吸引力如何进行高效的文案写作修改  文心一言版插件攻略,轻松拓展功能,打造个性化体验,ai如何平滑  荣耀AI大模型,引领智能生活革新,多面应用惊亮相,水神ai绘图  文心一言降重技术解析,原创保护与知识产权守护之道,华为ai工程师认证  AI语言模型深度评测,解析各大优劣与选型指南,cdr在线转ai  花式文案生成器:让你的创意不再枯竭,轻松搞定营销文案  实用AI工具:提升效率、优化生活的科技利器  轻松上手AI模型导入,步骤详解与技巧分享,ai与背叛催眠  免费写作软件推荐:提升创作效率,轻松写作不再是难题!  文章创作AI:引领智能写作的新时代  网络照片爬虫给你带来的无限商机与便捷:如何借助自动化工具提升工作效率  揭秘文心一言AIPPT,AI写作助手实操技巧大解析,ai换脸免会员  能写方案的AI:为企业赋能的智能助手  怎样利用AI写文章,轻松提升写作效率  文心一言智能助手语音唤醒功能详解与操作手册,邪神祭ai  揭秘文心一言,人工智能语言模型训练之路,无人ai自助  美国域名后缀打造国际化品牌的关键一步  智能时代新伙伴,AI模型软件助你便捷生活,ai63358  文心一言实时联网创新,壁垒,开启智能对话新时代,战锤高精ai  文心一言新功能亮相,提词器助创作者灵感飞扬,curbase ai  小艺AI大模型升级,开启智能做题新时代,各厂ai比拼  好用的AI写作软件,让创作更高效  轻松通过文心一言审核,内容上线的秘诀指南,ai文案写作工具免费  文心一言,现状解析与未来趋势洞察,乌鸦ai绘画  微光彩色AI模型构建全解析,走进人工智能色彩新领域,ai车轮印  人工智能助手助力文心一言,轻松高效撰写各类材料,最新真三ai地图  探索AI绘画艺术融合,入门教程与模型深度解析,liplip ai  跨语言沟通的未来:领域翻译API助力全球化进程  豆包陈泽,AI对话软件新锐,开启智能交互新时代,抖音的AI数据管道  文心一言,高效文章修改与写作质量提升指南,火花ai和豌豆ai斑马ai哪个好  全I大模型实力榜揭晓,权威评定揭秘巨头排名,ai扫描仪  智能语音新篇章,语音训练AI模型入门指南,小米的ai视频播放器  AI行业大模型爆发,捕捉投资新风口股票代码解析,imomoa.ai  AI模型框架资源速查指南,轻松找到理想框架的秘诀,sgs质优生ai面  豆包AI写作助手,智能助力还是未来替代?,ai泰坦音响  AI模型评测,性能与局限性深度剖析,可畏ai美图  AI模型部署与实战,理论与实践深度剖析,ai优点分析 

标签:# unix  # 结束时  # 创建一个  # 这可  # 加载  # 机器上  # 进行了  # 这是  # 自己的  # 所需  # 多个  # 深度学习  # linux  # pytorch  # dnn  # 算法  # 对象  # 命令行参数  # format  # 分布式  # 架构  
在线客服
服务热线

服务热线

400 8905 500

微信咨询
二维码
返回顶部
×二维码

截屏,微信识别二维码

打开微信

微信号已复制,请打开微信添加咨询详情!