最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Facing issue with connecting to socket with DDP and Pytorch (single node, multi-GPU communication) - Stack Overflow

programmeradmin1浏览0评论

I am completely new to distributed programming and I have been trying to port the original code that ran on a multi-node cluster to single-node cluster with multiple GPUs. My goal is to simulate a federated learning setup where each "client" is a process running on a different GPU. I have modified the code to run using DDP to its current form below:

import os
import numpy as np
import time
import argparse
import sys

from math import ceil
from random import Random
from datetime import timedelta
import socket

import torch
import torch.distributed as dist
import torch.utils.data.distributed
import torch.nn as nn
import torch.nn.functional as F

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
# from torch.multiprocessing import Process

import torchvision
from torchvision import datasets, transforms
import torch.backends.cudnn as cudnn
import torchvision.models as models

from distoptim import LocalSGD, OverlapLocalSGD
import util_v4 as util
from comm_helpers import SyncAllreduce


def run(rank, size):
    # initiate experiments folder
    save_path = '<somefolder>'
    folder_name = save_path+args.name
    if rank == 0 and os.path.isdir(folder_name)==False and args.save:
        os.mkdir(folder_name)
    dist.barrier() #This collective blocks processes until the whole group enters this function - the default process group will be used
    # initiate log files
    tag = '{}/lr{:.3f}_bs{:d}_cp{:d}_a{:.2f}_b{:.2f}_e{}_r{}_n{}.csv'
    saveFileName = tag.format(folder_name, args.lr, args.bs, args.cp, 
                              args.alpha, args.gmf, args.seed, rank, size)
    args.out_fname = saveFileName
    with open(args.out_fname, 'w+') as f:
        print(
            'BEGIN-TRAINING\n'
            'World-Size,{ws}\n'
            'Batch-Size,{bs}\n'
            'Epoch,itr,BT(s),avg:BT(s),std:BT(s),'
            'CT(s),avg:CT(s),std:CT(s),'
            'Loss,avg:Loss,Prec@1,avg:Prec@1,val'.format(
                ws=args.size,
                bs=args.bs),
            file=f)


    # seed for reproducibility
    torch.manual_seed(args.seed)
    torch.cuda.manual_seed(args.seed)
    torch.backends.cudnn.deterministic = True

    # load datasets
    train_loader, test_loader = util.partition_dataset(rank, size, args)

    #set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # define neural nets model, criterion, and optimizer
    model = util.select_model(10, args).to(device)
    model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model) #as per DDP rules, If your model contains any BatchNorm layers, it needs to be converted to SyncBatchNorm to sync the running stats of BatchNorm layers across replicas.
    model = DDP(model, device_ids = [rank]) #use this to create distributed model
    criterion = nn.CrossEntropyLoss().to(device)
    optimizer = LocalSGD(model.parameters(),
                      lr=args.lr,
                      gmf=args.gmf,
                      tau=args.cp,
                      size=size).to(device)

    # optimizer = OverlapLocalSGD(model.parameters(),
    #                       lr=args.lr,
    #                       alpha=args.alpha,
    #                       gmf=args.gmf,
    #                       tau = args.cp,
    #                       size=size,
    #                       momentum=0.9,
    #                       nesterov = True,
    #                       weight_decay=1e-4)

    batch_meter = util.Meter(ptag='Time')
    comm_meter = util.Meter(ptag='Time')

    best_test_accuracy = 0
    req = None
    for epoch in range(args.epoch):
        train(model, criterion, optimizer, batch_meter, comm_meter,
              train_loader, epoch, rank, device)
        test_acc = evaluate(model, test_loader, device)
        if test_acc > best_test_accuracy:
            best_test_accuracy = test_acc
        
        if rank==0: #only print details of first client
            print('{ep},{itr},{bt:.4f},{filler},{filler},'
                    '{ct:.4f},{filler},{filler},'
                    '{filler},{filler},'
                    '{filler},{filler},'
                    '{val:.4f}'
                    .format(ep=epoch, itr=-1,
                            bt=batch_meter.sum,
                            ct=comm_meter.sum,
                            filler=-1, val=test_acc))

        with open(args.out_fname, '+a') as f:
            if rank==0: #only print details of first client
                print('{ep},{itr},{bt:.4f},{filler},{filler},'
                    '{ct:.4f},{filler},{filler},'
                    '{filler},{filler},'
                    '{filler},{filler},'
                    '{val:.4f}'
                    .format(ep=epoch, itr=-1,
                            bt=batch_meter.sum,
                            ct=comm_meter.sum,
                            filler=-1, val=test_acc), 
                    file=f)


def evaluate(model, test_loader, device):
    model.eval()
    top1 = util.AverageMeter()

    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            data = data.to(device, non_blocking = True)
            target = target.to(device, non_blocking = True)
            outputs = model(data)
            acc1 = utilp_accuracy(outputs, target)
            top1.update(acc1[0].item(), data.size(0))

    return top1.avg

def train(model, criterion, optimizer, batch_meter, comm_meter,
          loader, epoch, rank, device="cpu"):

    model.train()

    losses = util.Meter(ptag='Loss')
    top1 = util.Meter(ptag='Prec@1')
    weights = [1/args.size for i in range(args.size)]

    iter_time = time.time()
    for batch_idx, (data, target) in enumerate(loader):
        # data loading
        data = data.to(device, non_blocking = True)
        target = target.to(device, non_blocking = True)

        # forward pass
        output = model(data)
        loss = criterion(output, target)

        # backward pass
        loss.backward()
        update_learning_rate(optimizer, epoch, itr=batch_idx,
                                 itr_per_epoch=len(loader))
        # gradient step
        optimizer.step()
        optimizer.zero_grad()

        torch.cuda.synchronize()
        comm_start = time.time()
        
        # Communication step: average local models
        optimizer.average()

        if not (epoch == 0 and batch_idx == 0):
            torch.cuda.synchronize()
            comm_meter.update(time.time() - comm_start)
            batch_meter.update(time.time() - iter_time)

        # write log files
        train_acc = utilp_accuracy(output, target)
        losses.update(loss.item(), data.size(0))
        top1.update(train_acc[0].item(), data.size(0))

        if batch_idx % args.print_freq == 0 and args.save:
            print('epoch {} itr {}, '
                  'rank {}, loss value {:.4f}, train accuracy {:.3f}'.format(
                    epoch, batch_idx, rank, losses.avg, top1.avg))

            with open(args.out_fname, '+a') as f:
                print('{ep},{itr},{bt},{ct},'
                      '{loss.val:.4f},{loss.avg:.4f},'
                      '{top1.val:.3f},{top1.avg:.3f},-1'
                      .format(ep=epoch, itr=batch_idx,
                              bt=batch_meter, ct=comm_meter,
                              loss=losses, top1=top1), file=f)

        torch.cuda.synchronize()
        iter_time = time.time()

    with open(args.out_fname, '+a') as f:
        print('{ep},{itr},{bt},{ct},'
              '{loss.val:.4f},{loss.avg:.4f},'
              '{top1.val:.3f},{top1.avg:.3f},-1'
              .format(ep=epoch, itr=batch_idx,
                      bt=batch_meter, ct=comm_meter,
                      loss=losses, top1=top1), file=f)


def update_learning_rate(optimizer, epoch, itr=None, itr_per_epoch=None,
                         scale=1):
    """
    1) Linearly warmup to reference learning rate (5 epochs)
    2) Decay learning rate exponentially (epochs 30, 60, 80)
    ** note: args.lr is the reference learning rate from which to scale up
    ** note: minimum global batch-size is 256
    """
    target_lr = args.lr * args.bs * scale * args.size / 128

    lr = None
    if args.warmup and epoch < 5:  # warmup to scaled lr
        if target_lr <= args.lr:
            lr = target_lr
        else:
            assert itr is not None and itr_per_epoch is not None
            count = epoch * itr_per_epoch + itr + 1
            incr = (target_lr - args.lr) * (count / (5 * itr_per_epoch))
            lr = args.lr + incr
    else:
        lr = target_lr
        for e in args.lr_schedule:
            if epoch >= e:
                lr *= args.lr_schedule[e]

    if lr is not None:
        # print('Updating learning rate to {}'.format(lr))
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr


def init_processes_same_dev(rank, size, fn, master_port):
   """ Initialize the distributed environment for multiple GPUs, single computer. """
   print(f"init_processes_same_dev: Currently rank: {rank} and master port: {master_port}")
   init_process_group(backend="nccl",
                            rank=rank,
                            world_size=size,
                            timeout = timedelta(seconds=120)
                            )
   print("init_process_group completed.")
   fn(rank, size)


def init_processes_multiple_dev(rank, size, fn):
    """ Initialize the distributed environment for multiple computer. """
    init_process_group(backend=args.backend, 
                            init_method='tcp://h0:22000', #for our case lets see if default is ok i.e, they take values from environment itself
                            rank=rank, 
                            world_size=size,
                            timeout = timedelta(seconds=120)
                            )
    fn(rank, size)


def main(rank: int, size: int, master_port:int, args):
    print(f"main: Currently rank: {rank} and master port: {master_port}")
    # if rank == 0:
    #     broadcasted_port = [master_port]
    # else:
    #     broadcasted_port = [None]
    # dist.broadcast_object_list(broadcasted_port, 0) #broadcast the master port
    # master_port = broadcasted_port[0]
    # print(f"Currently rank: {rank} and master port: {master_port}")

    try:
        if args.dist == "True":
            init_processes_multiple_dev(rank, size, run)
        else:
            init_processes_same_dev(rank, size, run, master_port)
    except Exception as e:
        print(f"The exception is: {e}")
    destroy_process_group()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='CIFAR-10 baseline')
    parser.add_argument('--name','-n', 
                        default="default", 
                        type=str, 
                        help='experiment name, used for saving results')
    parser.add_argument('--backend',
                        default="nccl",
                        type=str,
                        help='experiment name, used for saving results')
    parser.add_argument('--dataset', 
                        default="cifar10", 
                        type=str, 
                        help='dataset name')
    parser.add_argument('--model', 
                        default="res", 
                        type=str, 
                        help='neural network model')
    parser.add_argument('--alpha', 
                        default=0.2, 
                        type=float, 
                        help='alpha')
    parser.add_argument('--gmf', 
                        default=0, 
                        type=float, 
                        help='global momentum factor')
    parser.add_argument('--lr', 
                        default=0.1, 
                        type=float, 
                        help='learning rate')
    parser.add_argument('--bs', 
                        default=512, 
                        type=int, 
                        help='batch size on each worker')
    parser.add_argument('--epoch', 
                        default=200, 
                        type=int, 
                        help='total epoch')
    parser.add_argument('--cp', 
                        default=98, 
                        type=int, 
                        help='communication period / work per clock')
    parser.add_argument('--print_freq', 
                        default=100, 
                        type=int, 
                        help='print info frequency')
    parser.add_argument('--rank', 
                        default=0, 
                        type=int, 
                        help='the rank of worker')
    parser.add_argument('--size', 
                        default=8, 
                        type=int, 
                        help='number of workers')
    parser.add_argument('--seed', 
                        default=1, 
                        type=int, 
                        help='random seed')
    parser.add_argument('--save', '-s', 
                        action='store_true', 
                        help='whether save the training results')
    parser.add_argument('--all_reduce',
                        action='store_true', 
                        help='whether use AR-SGD')
    parser.add_argument('--schedule', nargs='+', default=None,
                        type=float, help='learning rate schedule')
    parser.add_argument('--warmup', default='False', type=str,
                        help='whether to warmup learning rate for first 5 epochs')
    parser.add_argument('--p', '-p', 
                        action='store_true', 
                        help='whether the dataset is partitioned or not')

    parser.add_argument('--NIID',
                        action='store_true',
                        help='whether the dataset is partitioned or not')

    parser.add_argument('--dist', default='False', type=str,
                        help='whether we run it on distributed server env (True) or multi-GPU (False, default)')

    args = parser.parse_args()
    args.lr_schedule = {}
    if args.schedule is None:
        args.schedule = [30, 0.1, 60, 0.1, 80, 0.1] #epoch1, lr1, epoch2, lr2, ....
    i, epoch = 0, None
    for v in args.schedule: #this is just to store the schedule in a more manageable format
        if i == 0:
            epoch = v
        elif i == 1:
            args.lr_schedule[epoch] = v
        i = (i + 1) % 2
    del args.schedule
    print(f"Arguments = {args}")
    
    sock = socket.socket()
    sock.bind(('', 0))
    socket_to_use = sock.getsockname()[1]
    
    #    os.environ["MASTER_ADDR"] = "172.26.0.104"
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = str(socket_to_use)
    print(f'MASTER_ADDR: {os.environ["MASTER_ADDR"]}')
    print(f'MASTER_PORT: {os.environ["MASTER_PORT"]}')

    size = args.size
    mp.spawn(main, args=(size,socket_to_use, args), nprocs=size)
    

I am facing the following issue:

MASTER_ADDR: localhost
MASTER_PORT: 59625
main: Currently rank: 3 and master port: 59625
init_processes_same_dev: Currently rank: 3 and master port: 59625
main: Currently rank: 0 and master port: 59625
init_processes_same_dev: Currently rank: 0 and master port: 59625
main: Currently rank: 1 and master port: 59625
init_processes_same_dev: Currently rank: 1 and master port: 59625
main: Currently rank: 2 and master port: 59625
init_processes_same_dev: Currently rank: 2 and master port: 59625
[E209 06:14:43.439017058 socket.cpp:957] [c10d] The client socket has timed out after 120s while trying to connect to (localhost, 59625).
The exception is: The client socket has timed out after 120s while trying to connect to (localhost, 59625).
[E209 06:14:43.449383576 socket.cpp:957] [c10d] The client socket has timed out after 120s while trying to connect to (localhost, 59625).
The exception is: The client socket has timed out after 120s while trying to connect to (localhost, 59625).
[E209 06:14:43.461202062 socket.cpp:957] [c10d] The client socket has timed out after 120s while trying to connect to (localhost, 59625).
[E209 06:14:43.461202051 socket.cpp:957] [c10d] The client socket has timed out after 120s while trying to connect to (localhost, 59625).
The exception is: The client socket has timed out after 120s while trying to connect to (localhost, 59625).
The exception is: The client socket has timed out after 120s while trying to connect to (localhost, 59625).
W0209 06:14:43.480258 139934071513344 torch/multiprocessing/spawn.py:146] Terminating process 2675471 via signal SIGTERM
W0209 06:14:43.481550 139934071513344 torch/multiprocessing/spawn.py:146] Terminating process 2675472 via signal SIGTERM
W0209 06:14:43.481775 139934071513344 torch/multiprocessing/spawn.py:146] Terminating process 2675473 via signal SIGTERM

I am running this script using the following command:

python -m torch.distributed.run  train_LocalSGD.py \
    --NIID --lr 0.1 --bs 128 --cp 2 --alpha 0.6 \
    -p --name OLocalSGD_nccl_e300_ICASSP_niid \
    --size 4 --backend nccl \
    --epoch 10 &

I am not able to understand how to fix this issue, my init_process_group itself has not worked. Could someone explain what I am doing wrong, and what misunderstanding I might have about some function? In general, I am confused about how to simulate the whole thing, so if there are other errors in the code as well as bash commands please do let me know.

Note: I have already set the timeout timer to higher values such as 3600, but it still faces timeout issue.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论