Pytorch 并行:DistributedDataParallel

Pytorch 并行:DistributedDataParallel

一个节点上往往有多个 GPU(单机多卡),一旦有多个 GPU 空闲(当然得赶紧都占着),就需要合理利用多 GPU 资源,这与并行化训练是分不开的。但关于 DistributedDataParallel 的博文少之又少,所以本着简单明了的原则,本博文讲全面地阐述这个先进的 python 并行库的原理及使用示例。

O、数据并行化

按 《深入浅出Pytorch》的话来说,pytorch模型的并行化,主要分为两类:

  • 模型并行:一个 GPU 容纳不了一个模型,需要多个 GPU 分别承载模型的一部分
  • 数据并行:将训练数据分配到各个 GPU 上,在不同 GPU 上分别独立地训练相同模型,最终将并行的训练结果归约到一个 GPU 上

Pytorch并行也主要支持后者,即数据并行。一般而言,训练的时候都需要较大 batch size,才能保持训练过程的稳定性,而 batch size 直接与训练所需显存大小挂钩,设置大了,很容易爆显存。而并行化却能很好的解决这个问题,能够显式的提高逻辑 batch size 大小。

Pytorch 较早的并行化,实现在 torch.nn.DataParalell 中,但该实现是基于线程的(只使用了一个线程),由于 CPython 解释器有着全局线程锁(Global Interpreter Lock,GIL)的特性,导致任何时候CPU上的一个进程(对应一个 python 解释器)中,仅能有一个线程能真正运行 python 代码。这就大大限制了并行的实现,很难实现真正的并行,除此之外,DataParalell 还有其他诸多弊端,这里就不再赘述,本文主要阐述单机多卡条件下 DistributedDataParallel 的使用,等“富有”了再研究多机多卡。

一、DistributedDataParallel

1、基本原理

请允许我用几句话说明一下 torch.nn.DistributedDataParallel 的基本原理

  • 开启多个进程,每个进程控制一个 GPU
  • 每个 GPU 上都存储模型,执行相同的任务(虽然训练数据不同)
  • GPU 间只传递梯度(不像 DataParallel 还要传递整个模型参数)

有两种使用办法,一种是在代码内部,手动启动各个 GPU 对应的进程;另一种是用 Pytorch 官方命令 torchrun 启动多线程,这种的话代码内部实现倒是更方便。这里主要对后者的使用方法加以说明。在此之前先阐述一下基本训练代码的结构,一般而言,Pytorch 训练代码的结构如下:

定义模型和训练类

class Model(nn.module):
    '''定义所需要训练的 Model,以其前传 forward 逻辑'''
    def __init__(self, **kwargs) ...
    def forward(self, **kwargs) ...
    
class Trainer():
    '''trainer 类,集成 model、optimizer、dataloader 等,并定义训练逻辑'''
    def __init__(self, 
                 model: nn.Module,
                 optim: torch.optim.Optimizer,
                 train_dataloader,
                 test_dataloader,
                 **kwargs
                ) ...
				
    def train(self, Epochs, **kwargs) ...

当然这里的 Trainer 类在不复杂的情况下,还是可以简化的,一个 train() 函数就能搞定。

获取参数与实例化 Model、Trainer 类

from torch.optim import Optimizer
from torch.utils.data import Dataset, DataLoader
def parser():
    '''定义 python 文件运行(训练)的相关参数'''
    ......
    return args
	
def main(args):
    '''接收参数并实例化训练涉及的 class'''
    device = torch.cuda(f'cuda:{args.rank}')
	model = getModel().to(device)
    optim = Adam(model.parameters(), lr=1e-5)
    trainer_dataloader, test_dataloader = DataLoader(...)
    trainer = Trainer(model, optim, trainer_dataloader, test_dataloader)
    trainer.train(args.epochs)

if __name__=='__main__':
    args = parser()
    main(args)

2、torchrun 命令启动方式

torchrun 其实是早期 torch.distributed.launch 启动方式的优化,后者已经被 deprecated,其启动方式很难说的上优雅,而且还需要手动配置诸多参数:

python -m torch.distributed.launch --use-env train_script.py

如果想要调用 torchrun 运行,需要在上述代码中添加如下部分:

import torch.distributed as dist
import torch

def main(args)
	############################################################################
    # 1.获取并行过程 GPU 需要知道的参数,并初始化控制该 GPU 的进程
	local_rank = int(os.environ["LOCAL_RANK"])
    world_size = int(os.environ["WORLD_SIZE"])
    dist.init_process_group(
        backend='nccl',
        init_method='env://',	# 采用环境变量的值来初始化
        world_size=world_size,	# 总 GPU 数量
        rank=local_rank			# 本进程对应 GPU 编号
    )
    ##############################################################################
	device = torch.cuda(f'cuda:{args.rank}')
	model = getModel().to(device)
    ##############################################################################
    # 2.包装 model,pytorch 会自行将模型复制到各个 GPU 上
    model = torch.nn.parallel.DistributedDataParallel(model,
                                                  device_ids=[local_rank],
                                                  find_unused_parameters=True,
                                                  output_device=local_rank)
    ##############################################################################
    optim = Adam(model.parameters(), lr=1e-5)
    # dataset = Dataset()
    # trainer_dataloader, test_dataloader = DataLoader(...)
    ##############################################################################
    # 3.设置并行的数据分发器,使得各 GPU 共享一个 dataloader
    train_dataset = MyDataset(args.data_file, args.max_length)
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset,
        num_replicas=world_size,
        rank=local_rank
    )
    mini_batch_size=8
    # 一般是分布 train,不需要分布 test,所以这里只给出 train_dataloader
    train_dataloader = torch.utils.data.DataLoader(
                            dataset=train_dataset,
                            batch_size=mini_batch_size,
                            shuffle=False,	# 这里必须设置为 False
                            num_workers=0,
                            pin_memory=True,
                            sampler=train_sampler)  
    ##############################################################################
    trainer = Trainer(model, optim, trainer_dataloader, test_dataloader)
    trainer.train(args.epochs)

设置了这三部分,就可以通过 torchrun 命令 来运行程序了。单机多卡(一般的情况,即一个计算机上多个GPU)的命令如下:

torchrun \
	--nnodes 1\ # 单机,一个 node 节点(计算机数量)
	--nproc-per-node 3\ # 多卡, GPU 数量
	--train.py --rank 3 --epochs 50 ... # 训练脚本 train.py 的参数附在后面即可

torchrun 命令优点在于进程初始化所需要的变量 local_rankworld_size 都会自动设置,而如果用 multiprocess 则需要根据 nodes 数、gpu 数手动计算,显然 torchrun 更为官方一点。但另一种方式有助于我们理解并行代码中,诸多变量的含义,还可以设置一些更灵活的变量,具体参见文末的 参考文章(1),或者其中文版本 参考文章(3)

三、Tricks

这里记录一下未来分布式训练中的一些 trick,待更新……

1、指定并行 GPU 列表

有些时候并不是所有 GPU 都空着的,但上述运行方式会自动从 rank=0 开始依次调用当前节点上的 gpu。一旦有 GPU 并非空闲,很可能会报 out of memory 的错。这个时候就要指定参与并行训练的 GPUs 了。

一个较为方便的方法,是设置环境变量 CUDA_VISIBLE_DEVICES,Pytorch 根据该变量来判断哪些 GPU 可以用。具体而言,可以在自己程序中设置一个参数:

def parser():
    ......
    parser.add_argument('-v', '--visible', type=str, help='the rank of visible GPUs')
    if args.visible is not None:
        os.environ['CUDA_VISIBLE_DEVICES'] = args.visible	# -v "0,3"
    ......

运行过程中,虽然 local_rank 依然是自然数顺序,但是 pytorch 会自动将 local_rank 视作可见 GPU 的索引,而不会使用 “非空闲” 的 GPUs。

参考文章

千百度
© 版权声明
THE END
喜欢就支持一下吧
点赞11 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容