跳转至

Pytorch下的多卡间变量同步操作

基于torch的无论是多机多卡还是单机多卡都有多种实现方式,另外也有一些优秀的框架对该过程进行封装,这里推荐下Pytorch Lightning 尽管框架已经封装的很是完善,但还是免不了自己写一些组件来。在多卡的环境下,如何对某个变量进行reduce和broadcast操作是避免不了的问题。下面给出如下demo演示如何单机多卡实现以上操作。

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import os
def reduce_tensor(tensor):
    rt = tensor.detach().clone()
    dist.all_reduce(rt, op=dist.ReduceOp.SUM)
    got_world_size = torch.distributed.get_world_size()
    rt /= got_world_size
    return rt
def broadcast_tensor(tensor, rank):
    # rank 表示broadcast哪个rank的数据到其他rank
    rt = tensor.detach().clone()
    dist.broadcast(rt, src=rank)
    return rt
def main_worker(gpu, ngpus_per_node):
        print('gpu', gpu)
        # 这里的rank实际上应该是机器编号,但是我们是单机多卡,所以固定为 0
        rank = 0
        if True:
            # For multiprocessing distributed training, rank needs to be the
            # global rank among all the processes
            rank = rank * ngpus_per_node + gpu
        print('rank', rank)
        dist.init_process_group(backend="nccl", init_method="tcp://127.0.0.1:8124", world_size=2, rank=rank)
        got_rand = torch.distributed.get_rank()
        print('got_rand', got_rand)
        got_world_size = torch.distributed.get_world_size()
        print('got_world_size', got_world_size)
        # tensor_list = []
        local_rank = rank
        torch.cuda.set_device('cuda:%d' % local_rank)
        t = torch.rand((10,)).cuda()
        torch.distributed.barrier()
        print('t',t)
        reduce_t = reduce_tensor(t)
        print('reduce_t', reduce_t)
        boardcasted_t = broadcast_tensor(t, 0)
        print('boardcasted_t', boardcasted_t)
def main():
    # Use torch.multiprocessing.spawn to launch distributed processes: the
    # main_worker process function
    # 假设机器一种有四张卡,这里采用前两张
    ngpus_per_node = torch.cuda.device_count() // 2
    mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, ))
if __name__ == "__main__":
    main()


最后更新: March 21, 2024
创建日期: March 21, 2024