from .checkpoint_saver import CheckpointSaver
from .cuda import ApexScaler, NativeScaler
from .distributed import distribute_bn, reduce_tensor
from .jit import set_jit_legacy
from .log import setup_default_logging, FormatterNoInfo
from .metrics import AverageMeter, accuracy
from .misc import natural_key, add_bool_arg
from .model import unwrap_model, get_state_dict
from .model_ema import ModelEma
from .summary import update_summary, get_outdir

""" Checkpoint Saver
Track top-n training checkpoints and maintain recovery checkpoints on specified intervals.
Hacked together by / Copyright 2020 Ross Wightman
import glob
import operator
import os
import logging
import torch
from .model import unwrap_model, get_state_dict
_logger = logging.getLogger(__name__)
class CheckpointSaver:
def __init__(
# objects to save state_dicts of
self.model = model
self.optimizer = optimizer
self.args = args
self.model_ema = model_ema
self.amp_scaler = amp_scaler
# state
self.checkpoint_files = [] # (filename, metric) tuples in order of decreasing betterness
self.best_epoch = None
self.best_metric = None
self.curr_recovery_file = ''
self.last_recovery_file = ''
# config
self.checkpoint_dir = checkpoint_dir
self.recovery_dir = recovery_dir
self.save_prefix = checkpoint_prefix
self.recovery_prefix = recovery_prefix
self.extension = '.pth.tar'
self.decreasing = decreasing # a lower metric is better if True
self.cmp = if decreasing else # True if lhs better than rhs
self.max_history = max_history
self.unwrap_fn = unwrap_fn
assert self.max_history >= 1
def save_checkpoint(self, epoch, metric=None):
assert epoch >= 0
tmp_save_path = os.path.join(self.checkpoint_dir, 'tmp' + self.extension)
last_save_path = os.path.join(self.checkpoint_dir, 'last' + self.extension)
self._save(tmp_save_path, epoch, metric)
if os.path.exists(last_save_path):
os.unlink(last_save_path) # required for Windows support.
os.rename(tmp_save_path, last_save_path)
worst_file = self.checkpoint_files[-1] if self.checkpoint_files else None
if (len(self.checkpoint_files) < self.max_history
or metric is None or self.cmp(metric, worst_file[1])):
if len(self.checkpoint_files) >= self.max_history:
filename = '-'.join([self.save_prefix, str(epoch)]) + self.extension
save_path = os.path.join(self.checkpoint_dir, filename), save_path)
self.checkpoint_files.append((save_path, metric))
self.checkpoint_files = sorted(
self.checkpoint_files, key=lambda x: x[1],
reverse=not self.decreasing) # sort in descending order if a lower metric is not better
checkpoints_str = "Current checkpoints:\n"
for c in self.checkpoint_files:
checkpoints_str += ' {}\n'.format(c)
if metric is not None and (self.best_metric is None or self.cmp(metric, self.best_metric)):
self.best_epoch = epoch
self.best_metric = metric
best_save_path = os.path.join(self.checkpoint_dir, 'model_best' + self.extension)
if os.path.exists(best_save_path):
os.unlink(best_save_path), best_save_path)
return (None, None) if self.best_metric is None else (self.best_metric, self.best_epoch)
def _save(self, save_path, epoch, metric=None):
save_state = {
'epoch': epoch,
'arch': type(self.model).__name__.lower(),
'state_dict': get_state_dict(self.model, self.unwrap_fn),
'optimizer': self.optimizer.state_dict(),
'version': 2, # version < 2 increments epoch before save
if self.args is not None:
save_state['arch'] = self.args.model
save_state['args'] = self.args
if self.amp_scaler is not None:
save_state[self.amp_scaler.state_dict_key] = self.amp_scaler.state_dict()
if self.model_ema is not None:
save_state['state_dict_ema'] = get_state_dict(self.model_ema, self.unwrap_fn)
if metric is not None:
save_state['metric'] = metric, save_path)
def _cleanup_checkpoints(self, trim=0):
trim = min(len(self.checkpoint_files), trim)
delete_index = self.max_history - trim
if delete_index <= 0 or len(self.checkpoint_files) <= delete_index:
to_delete = self.checkpoint_files[delete_index:]
for d in to_delete:
_logger.debug("Cleaning checkpoint: {}".format(d))
except Exception as e:
_logger.error("Exception '{}' while deleting checkpoint".format(e))
self.checkpoint_files = self.checkpoint_files[:delete_index]
def save_recovery(self, epoch, batch_idx=0):
assert epoch >= 0
filename = '-'.join([self.recovery_prefix, str(epoch), str(batch_idx)]) + self.extension
save_path = os.path.join(self.recovery_dir, filename)
self._save(save_path, epoch)
if os.path.exists(self.last_recovery_file):
_logger.debug("Cleaning recovery: {}".format(self.last_recovery_file))
except Exception as e:
_logger.error("Exception '{}' while removing {}".format(e, self.last_recovery_file))
self.last_recovery_file = self.curr_recovery_file
self.curr_recovery_file = save_path
def find_recovery(self):
recovery_path = os.path.join(self.recovery_dir, self.recovery_prefix)
files = glob.glob(recovery_path + '*' + self.extension)
files = sorted(files)
if len(files):
return files[0]
return ''

""" CUDA / AMP utils
Hacked together by / Copyright 2020 Ross Wightman
import torch
from apex import amp
has_apex = True
except ImportError:
amp = None
has_apex = False
class ApexScaler:
state_dict_key = "amp"
def __call__(self, loss, optimizer):
with amp.scale_loss(loss, optimizer) as scaled_loss:
def state_dict(self):
if 'state_dict' in amp.__dict__:
return amp.state_dict()
def load_state_dict(self, state_dict):
if 'load_state_dict' in amp.__dict__:
class NativeScaler:
state_dict_key = "amp_scaler"
def __init__(self):
self._scaler = torch.cuda.amp.GradScaler()
def __call__(self, loss, optimizer):
def state_dict(self):
return self._scaler.state_dict()
def load_state_dict(self, state_dict):

""" Distributed training/validation utils
Hacked together by / Copyright 2020 Ross Wightman
import torch
from torch import distributed as dist
from .model import unwrap_model
def reduce_tensor(tensor, n):
rt = tensor.clone()
dist.all_reduce(rt, op=dist.ReduceOp.SUM)
rt /= n
return rt
def distribute_bn(model, world_size, reduce=False):
# ensure every node has the same running bn stats
for bn_name, bn_buf in unwrap_model(model).named_buffers(recurse=True):
if ('running_mean' in bn_name) or ('running_var' in bn_name):
if reduce:
# average bn stats across whole group
torch.distributed.all_reduce(bn_buf, op=dist.ReduceOp.SUM)
bn_buf /= float(world_size)
# broadcast bn stats from rank 0 to whole group
torch.distributed.broadcast(bn_buf, 0)

""" JIT scripting/tracing utils
Hacked together by / Copyright 2020 Ross Wightman
import torch
def set_jit_legacy():
""" Set JIT executor to legacy w/ support for op fusion
This is hopefully a temporary need in 1.5/1.5.1/1.6 to restore performance due to changes
in the JIT exectutor. These API are not supported so could change.
assert hasattr(torch._C, '_jit_set_profiling_executor'), "Old JIT behavior doesn't exist!"

""" Logging helpers
Hacked together by / Copyright 2020 Ross Wightman
import logging
import logging.handlers
class FormatterNoInfo(logging.Formatter):
def __init__(self, fmt='%(levelname)s: %(message)s'):
logging.Formatter.__init__(self, fmt)
def format(self, record):
if record.levelno == logging.INFO:
return str(record.getMessage())
return logging.Formatter.format(self, record)
def setup_default_logging(default_level=logging.INFO, log_path=''):
console_handler = logging.StreamHandler()
if log_path:
file_handler = logging.handlers.RotatingFileHandler(log_path, maxBytes=(1024 ** 2 * 2), backupCount=3)
file_formatter = logging.Formatter("%(asctime)s - %(name)20s: [%(levelname)8s] - %(message)s")

""" Eval metrics and related
Hacked together by / Copyright 2020 Ross Wightman
class AverageMeter:
"""Computes and stores the average and current value"""
def __init__(self):
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def accuracy(output, target, topk=(1,)):
"""Computes the accuracy over the k top predictions for the specified values of k"""
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))
return [correct[:k].view(-1).float().sum(0) * 100. / batch_size for k in topk]

""" Misc utils
Hacked together by / Copyright 2020 Ross Wightman
import re
def natural_key(string_):
return [int(s) if s.isdigit() else s for s in re.split(r'(\d+)', string_.lower())]
def add_bool_arg(parser, name, default=False, help=''):
dest_name = name.replace('-', '_')
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument('--' + name, dest=dest_name, action='store_true', help=help)
group.add_argument('--no-' + name, dest=dest_name, action='store_false', help=help)
parser.set_defaults(**{dest_name: default})

""" Model / state_dict utils
Hacked together by / Copyright 2020 Ross Wightman
from .model_ema import ModelEma
def unwrap_model(model):
if isinstance(model, ModelEma):
return unwrap_model(model.ema)
return model.module if hasattr(model, 'module') else model
def get_state_dict(model, unwrap_fn=unwrap_model):
return unwrap_fn(model).state_dict()

""" Exponential Moving Average (EMA) of model updates
Hacked together by / Copyright 2020 Ross Wightman
import logging
from collections import OrderedDict
from copy import deepcopy
import torch
_logger = logging.getLogger(__name__)
class ModelEma:
""" Model Exponential Moving Average
Keep a moving average of everything in the model state_dict (parameters and buffers).
This is intended to allow functionality like
A smoothed version of the weights is necessary for some training schemes to perform well.
E.g. Google's hyper-params for training MNASNet, MobileNet-V3, EfficientNet, etc that use
RMSprop with a short 2.4-3 epoch decay period and slow LR decay rate of .96-.99 requires EMA
smoothing of weights to match results. Pay attention to the decay constant you are using
relative to your update count per epoch.
To keep EMA from using GPU resources, set device='cpu'. This will save a bit of memory but
disable validation of the EMA weights. Validation will have to be done manually in a separate
process, or after the training stops converging.
This class is sensitive where it is initialized in the sequence of model init,
GPU assignment and distributed training wrappers.
I've tested with the sequence in my own for torch.DataParallel, apex.DDP, and single-GPU.
def __init__(self, model, decay=0.9999, device='', resume=''):
# make a copy of the model for accumulating moving average of weights
self.ema = deepcopy(model)
self.decay = decay
self.device = device # perform ema on different device from model if set
if device:
self.ema_has_module = hasattr(self.ema, 'module')
if resume:
for p in self.ema.parameters():
def _load_checkpoint(self, checkpoint_path):
checkpoint = torch.load(checkpoint_path, map_location='cpu')
assert isinstance(checkpoint, dict)
if 'state_dict_ema' in checkpoint:
new_state_dict = OrderedDict()
for k, v in checkpoint['state_dict_ema'].items():
# ema model may have been wrapped by DataParallel, and need module prefix
if self.ema_has_module:
name = 'module.' + k if not k.startswith('module') else k
name = k
new_state_dict[name] = v
self.ema.load_state_dict(new_state_dict)"Loaded state_dict_ema")
_logger.warning("Failed to find state_dict_ema, starting from loaded model weights")
def update(self, model):
# correct a mismatch in state dict keys
needs_module = hasattr(model, 'module') and not self.ema_has_module
with torch.no_grad():
msd = model.state_dict()
for k, ema_v in self.ema.state_dict().items():
if needs_module:
k = 'module.' + k
model_v = msd[k].detach()
if self.device:
model_v =
ema_v.copy_(ema_v * self.decay + (1. - self.decay) * model_v)

""" Summary utilities
Hacked together by / Copyright 2020 Ross Wightman
import csv
import os
from collections import OrderedDict
def get_outdir(path, *paths, inc=False):
outdir = os.path.join(path, *paths)
if not os.path.exists(outdir):
elif inc:
count = 1
outdir_inc = outdir + '-' + str(count)
while os.path.exists(outdir_inc):
count = count + 1
outdir_inc = outdir + '-' + str(count)
assert count < 100
outdir = outdir_inc
return outdir
def update_summary(epoch, train_metrics, eval_metrics, filename, write_header=False):
rowd = OrderedDict(epoch=epoch)
rowd.update([('train_' + k, v) for k, v in train_metrics.items()])
rowd.update([('eval_' + k, v) for k, v in eval_metrics.items()])
with open(filename, mode='a') as cf:
dw = csv.DictWriter(cf, fieldnames=rowd.keys())
if write_header: # first iteration (epoch == 1 can't be used)

@ -17,9 +17,13 @@ Hacked together by / Copyright 2020 Ross Wightman (
import argparse import argparse
import time import time
import yaml import yaml
from datetime import datetime import os
import logging
from collections import OrderedDict
from contextlib import suppress from contextlib import suppress
from datetime import datetime
import torch
import torch.nn as nn import torch.nn as nn
import torchvision.utils import torchvision.utils
from torch.nn.parallel import DistributedDataParallel as NativeDDP from torch.nn.parallel import DistributedDataParallel as NativeDDP
