I am completely new to distributed programming and I have been trying to port the original code that ran on a multi-node cluster to single-node cluster with multiple GPUs. My goal is to simulate a federated learning setup where each "client" is a process running on a different GPU. I have modified the code to run using DDP to its current form below:
import os
import numpy as np
import time
import argparse
import sys
from math import ceil
from random import Random
from datetime import timedelta
import socket
import torch
import torch.distributed as dist
import torch.utils.data.distributed
import torch.nn as nn
import torch.nn.functional as F
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
# from torch.multiprocessing import Process
import torchvision
from torchvision import datasets, transforms
import torch.backends.cudnn as cudnn
import torchvision.models as models
from distoptim import LocalSGD, OverlapLocalSGD
import util_v4 as util
from comm_helpers import SyncAllreduce
def run(rank, size):
# initiate experiments folder
save_path = '<somefolder>'
folder_name = save_path+args.name
if rank == 0 and os.path.isdir(folder_name)==False and args.save:
os.mkdir(folder_name)
dist.barrier() #This collective blocks processes until the whole group enters this function - the default process group will be used
# initiate log files
tag = '{}/lr{:.3f}_bs{:d}_cp{:d}_a{:.2f}_b{:.2f}_e{}_r{}_n{}.csv'
saveFileName = tag.format(folder_name, args.lr, args.bs, args.cp,
args.alpha, args.gmf, args.seed, rank, size)
args.out_fname = saveFileName
with open(args.out_fname, 'w+') as f:
print(
'BEGIN-TRAINING\n'
'World-Size,{ws}\n'
'Batch-Size,{bs}\n'
'Epoch,itr,BT(s),avg:BT(s),std:BT(s),'
'CT(s),avg:CT(s),std:CT(s),'
'Loss,avg:Loss,Prec@1,avg:Prec@1,val'.format(
ws=args.size,
bs=args.bs),
file=f)
# seed for reproducibility
torch.manual_seed(args.seed)
torch.cuda.manual_seed(args.seed)
torch.backends.cudnn.deterministic = True
# load datasets
train_loader, test_loader = util.partition_dataset(rank, size, args)
#set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# define neural nets model, criterion, and optimizer
model = util.select_model(10, args).to(device)
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model) #as per DDP rules, If your model contains any BatchNorm layers, it needs to be converted to SyncBatchNorm to sync the running stats of BatchNorm layers across replicas.
model = DDP(model, device_ids = [rank]) #use this to create distributed model
criterion = nn.CrossEntropyLoss().to(device)
optimizer = LocalSGD(model.parameters(),
lr=args.lr,
gmf=args.gmf,
tau=args.cp,
size=size).to(device)
# optimizer = OverlapLocalSGD(model.parameters(),
# lr=args.lr,
# alpha=args.alpha,
# gmf=args.gmf,
# tau = args.cp,
# size=size,
# momentum=0.9,
# nesterov = True,
# weight_decay=1e-4)
batch_meter = util.Meter(ptag='Time')
comm_meter = util.Meter(ptag='Time')
best_test_accuracy = 0
req = None
for epoch in range(args.epoch):
train(model, criterion, optimizer, batch_meter, comm_meter,
train_loader, epoch, rank, device)
test_acc = evaluate(model, test_loader, device)
if test_acc > best_test_accuracy:
best_test_accuracy = test_acc
if rank==0: #only print details of first client
print('{ep},{itr},{bt:.4f},{filler},{filler},'
'{ct:.4f},{filler},{filler},'
'{filler},{filler},'
'{filler},{filler},'
'{val:.4f}'
.format(ep=epoch, itr=-1,
bt=batch_meter.sum,
ct=comm_meter.sum,
filler=-1, val=test_acc))
with open(args.out_fname, '+a') as f:
if rank==0: #only print details of first client
print('{ep},{itr},{bt:.4f},{filler},{filler},'
'{ct:.4f},{filler},{filler},'
'{filler},{filler},'
'{filler},{filler},'
'{val:.4f}'
.format(ep=epoch, itr=-1,
bt=batch_meter.sum,
ct=comm_meter.sum,
filler=-1, val=test_acc),
file=f)
def evaluate(model, test_loader, device):
model.eval()
top1 = util.AverageMeter()
with torch.no_grad():
for batch_idx, (data, target) in enumerate(test_loader):
data = data.to(device, non_blocking = True)
target = target.to(device, non_blocking = True)
outputs = model(data)
acc1 = utilp_accuracy(outputs, target)
top1.update(acc1[0].item(), data.size(0))
return top1.avg
def train(model, criterion, optimizer, batch_meter, comm_meter,
loader, epoch, rank, device="cpu"):
model.train()
losses = util.Meter(ptag='Loss')
top1 = util.Meter(ptag='Prec@1')
weights = [1/args.size for i in range(args.size)]
iter_time = time.time()
for batch_idx, (data, target) in enumerate(loader):
# data loading
data = data.to(device, non_blocking = True)
target = target.to(device, non_blocking = True)
# forward pass
output = model(data)
loss = criterion(output, target)
# backward pass
loss.backward()
update_learning_rate(optimizer, epoch, itr=batch_idx,
itr_per_epoch=len(loader))
# gradient step
optimizer.step()
optimizer.zero_grad()
torch.cuda.synchronize()
comm_start = time.time()
# Communication step: average local models
optimizer.average()
if not (epoch == 0 and batch_idx == 0):
torch.cuda.synchronize()
comm_meter.update(time.time() - comm_start)
batch_meter.update(time.time() - iter_time)
# write log files
train_acc = utilp_accuracy(output, target)
losses.update(loss.item(), data.size(0))
top1.update(train_acc[0].item(), data.size(0))
if batch_idx % args.print_freq == 0 and args.save:
print('epoch {} itr {}, '
'rank {}, loss value {:.4f}, train accuracy {:.3f}'.format(
epoch, batch_idx, rank, losses.avg, top1.avg))
with open(args.out_fname, '+a') as f:
print('{ep},{itr},{bt},{ct},'
'{loss.val:.4f},{loss.avg:.4f},'
'{top1.val:.3f},{top1.avg:.3f},-1'
.format(ep=epoch, itr=batch_idx,
bt=batch_meter, ct=comm_meter,
loss=losses, top1=top1), file=f)
torch.cuda.synchronize()
iter_time = time.time()
with open(args.out_fname, '+a') as f:
print('{ep},{itr},{bt},{ct},'
'{loss.val:.4f},{loss.avg:.4f},'
'{top1.val:.3f},{top1.avg:.3f},-1'
.format(ep=epoch, itr=batch_idx,
bt=batch_meter, ct=comm_meter,
loss=losses, top1=top1), file=f)
def update_learning_rate(optimizer, epoch, itr=None, itr_per_epoch=None,
scale=1):
"""
1) Linearly warmup to reference learning rate (5 epochs)
2) Decay learning rate exponentially (epochs 30, 60, 80)
** note: args.lr is the reference learning rate from which to scale up
** note: minimum global batch-size is 256
"""
target_lr = args.lr * args.bs * scale * args.size / 128
lr = None
if args.warmup and epoch < 5: # warmup to scaled lr
if target_lr <= args.lr:
lr = target_lr
else:
assert itr is not None and itr_per_epoch is not None
count = epoch * itr_per_epoch + itr + 1
incr = (target_lr - args.lr) * (count / (5 * itr_per_epoch))
lr = args.lr + incr
else:
lr = target_lr
for e in args.lr_schedule:
if epoch >= e:
lr *= args.lr_schedule[e]
if lr is not None:
# print('Updating learning rate to {}'.format(lr))
for param_group in optimizer.param_groups:
param_group['lr'] = lr
def init_processes_same_dev(rank, size, fn, master_port):
""" Initialize the distributed environment for multiple GPUs, single computer. """
print(f"init_processes_same_dev: Currently rank: {rank} and master port: {master_port}")
init_process_group(backend="nccl",
rank=rank,
world_size=size,
timeout = timedelta(seconds=120)
)
print("init_process_group completed.")
fn(rank, size)
def init_processes_multiple_dev(rank, size, fn):
""" Initialize the distributed environment for multiple computer. """
init_process_group(backend=args.backend,
init_method='tcp://h0:22000', #for our case lets see if default is ok i.e, they take values from environment itself
rank=rank,
world_size=size,
timeout = timedelta(seconds=120)
)
fn(rank, size)
def main(rank: int, size: int, master_port:int, args):
print(f"main: Currently rank: {rank} and master port: {master_port}")
# if rank == 0:
# broadcasted_port = [master_port]
# else:
# broadcasted_port = [None]
# dist.broadcast_object_list(broadcasted_port, 0) #broadcast the master port
# master_port = broadcasted_port[0]
# print(f"Currently rank: {rank} and master port: {master_port}")
try:
if args.dist == "True":
init_processes_multiple_dev(rank, size, run)
else:
init_processes_same_dev(rank, size, run, master_port)
except Exception as e:
print(f"The exception is: {e}")
destroy_process_group()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='CIFAR-10 baseline')
parser.add_argument('--name','-n',
default="default",
type=str,
help='experiment name, used for saving results')
parser.add_argument('--backend',
default="nccl",
type=str,
help='experiment name, used for saving results')
parser.add_argument('--dataset',
default="cifar10",
type=str,
help='dataset name')
parser.add_argument('--model',
default="res",
type=str,
help='neural network model')
parser.add_argument('--alpha',
default=0.2,
type=float,
help='alpha')
parser.add_argument('--gmf',
default=0,
type=float,
help='global momentum factor')
parser.add_argument('--lr',
default=0.1,
type=float,
help='learning rate')
parser.add_argument('--bs',
default=512,
type=int,
help='batch size on each worker')
parser.add_argument('--epoch',
default=200,
type=int,
help='total epoch')
parser.add_argument('--cp',
default=98,
type=int,
help='communication period / work per clock')
parser.add_argument('--print_freq',
default=100,
type=int,
help='print info frequency')
parser.add_argument('--rank',
default=0,
type=int,
help='the rank of worker')
parser.add_argument('--size',
default=8,
type=int,
help='number of workers')
parser.add_argument('--seed',
default=1,
type=int,
help='random seed')
parser.add_argument('--save', '-s',
action='store_true',
help='whether save the training results')
parser.add_argument('--all_reduce',
action='store_true',
help='whether use AR-SGD')
parser.add_argument('--schedule', nargs='+', default=None,
type=float, help='learning rate schedule')
parser.add_argument('--warmup', default='False', type=str,
help='whether to warmup learning rate for first 5 epochs')
parser.add_argument('--p', '-p',
action='store_true',
help='whether the dataset is partitioned or not')
parser.add_argument('--NIID',
action='store_true',
help='whether the dataset is partitioned or not')
parser.add_argument('--dist', default='False', type=str,
help='whether we run it on distributed server env (True) or multi-GPU (False, default)')
args = parser.parse_args()
args.lr_schedule = {}
if args.schedule is None:
args.schedule = [30, 0.1, 60, 0.1, 80, 0.1] #epoch1, lr1, epoch2, lr2, ....
i, epoch = 0, None
for v in args.schedule: #this is just to store the schedule in a more manageable format
if i == 0:
epoch = v
elif i == 1:
args.lr_schedule[epoch] = v
i = (i + 1) % 2
del args.schedule
print(f"Arguments = {args}")
sock = socket.socket()
sock.bind(('', 0))
socket_to_use = sock.getsockname()[1]
# os.environ["MASTER_ADDR"] = "172.26.0.104"
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = str(socket_to_use)
print(f'MASTER_ADDR: {os.environ["MASTER_ADDR"]}')
print(f'MASTER_PORT: {os.environ["MASTER_PORT"]}')
size = args.size
mp.spawn(main, args=(size,socket_to_use, args), nprocs=size)
I am facing the following issue:
MASTER_ADDR: localhost
MASTER_PORT: 59625
main: Currently rank: 3 and master port: 59625
init_processes_same_dev: Currently rank: 3 and master port: 59625
main: Currently rank: 0 and master port: 59625
init_processes_same_dev: Currently rank: 0 and master port: 59625
main: Currently rank: 1 and master port: 59625
init_processes_same_dev: Currently rank: 1 and master port: 59625
main: Currently rank: 2 and master port: 59625
init_processes_same_dev: Currently rank: 2 and master port: 59625
[E209 06:14:43.439017058 socket.cpp:957] [c10d] The client socket has timed out after 120s while trying to connect to (localhost, 59625).
The exception is: The client socket has timed out after 120s while trying to connect to (localhost, 59625).
[E209 06:14:43.449383576 socket.cpp:957] [c10d] The client socket has timed out after 120s while trying to connect to (localhost, 59625).
The exception is: The client socket has timed out after 120s while trying to connect to (localhost, 59625).
[E209 06:14:43.461202062 socket.cpp:957] [c10d] The client socket has timed out after 120s while trying to connect to (localhost, 59625).
[E209 06:14:43.461202051 socket.cpp:957] [c10d] The client socket has timed out after 120s while trying to connect to (localhost, 59625).
The exception is: The client socket has timed out after 120s while trying to connect to (localhost, 59625).
The exception is: The client socket has timed out after 120s while trying to connect to (localhost, 59625).
W0209 06:14:43.480258 139934071513344 torch/multiprocessing/spawn.py:146] Terminating process 2675471 via signal SIGTERM
W0209 06:14:43.481550 139934071513344 torch/multiprocessing/spawn.py:146] Terminating process 2675472 via signal SIGTERM
W0209 06:14:43.481775 139934071513344 torch/multiprocessing/spawn.py:146] Terminating process 2675473 via signal SIGTERM
I am running this script using the following command:
python -m torch.distributed.run train_LocalSGD.py \
--NIID --lr 0.1 --bs 128 --cp 2 --alpha 0.6 \
-p --name OLocalSGD_nccl_e300_ICASSP_niid \
--size 4 --backend nccl \
--epoch 10 &
I am not able to understand how to fix this issue, my init_process_group itself has not worked. Could someone explain what I am doing wrong, and what misunderstanding I might have about some function? In general, I am confused about how to simulate the whole thing, so if there are other errors in the code as well as bash commands please do let me know.
Note: I have already set the timeout timer to higher values such as 3600, but it still faces timeout issue.