Repository URL to install this package:
|
Version:
0.0.0 ▾
|
pytorch-yolov3
/
models.py
|
|---|
from __future__ import division
from itertools import chain
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import numpy as np
from pytorch_yolov3.utils.parse_config import *
from pytorch_yolov3.utils.utils import to_cpu, non_max_suppression, weights_init_normal
import matplotlib.pyplot as plt
import matplotlib.patches as patches
def create_modules(module_defs):
"""
Constructs module list of layer blocks from module configuration in module_defs
"""
hyperparams = module_defs.pop(0)
hyperparams.update({
'batch': int(hyperparams['batch']),
'subdivisions': int(hyperparams['subdivisions']),
'width': int(hyperparams['width']),
'height': int(hyperparams['height']),
'channels': int(hyperparams['channels']),
'optimizer': hyperparams.get('optimizer'),
'momentum': float(hyperparams['momentum']),
'decay': float(hyperparams['decay']),
'learning_rate': float(hyperparams['learning_rate']),
'burn_in': int(hyperparams['burn_in']),
'max_batches': int(hyperparams['max_batches']),
'policy': hyperparams['policy'],
'lr_steps': list(zip(map(int, hyperparams["steps"].split(",")),
map(float, hyperparams["scales"].split(","))))
})
assert hyperparams["height"] == hyperparams["width"], \
"Height and width should be equal! Non square images are padded with zeros."
output_filters = [hyperparams["channels"]]
module_list = nn.ModuleList()
for module_i, module_def in enumerate(module_defs):
modules = nn.Sequential()
if module_def["type"] == "convolutional":
bn = int(module_def["batch_normalize"])
filters = int(module_def["filters"])
kernel_size = int(module_def["size"])
pad = (kernel_size - 1) // 2
modules.add_module(
f"conv_{module_i}",
nn.Conv2d(
in_channels=output_filters[-1],
out_channels=filters,
kernel_size=kernel_size,
stride=int(module_def["stride"]),
padding=pad,
bias=not bn,
),
)
if bn:
modules.add_module(f"batch_norm_{module_i}", nn.BatchNorm2d(filters, momentum=0.9, eps=1e-5))
if module_def["activation"] == "leaky":
modules.add_module(f"leaky_{module_i}", nn.LeakyReLU(0.1))
elif module_def["type"] == "maxpool":
kernel_size = int(module_def["size"])
stride = int(module_def["stride"])
if kernel_size == 2 and stride == 1:
modules.add_module(f"_debug_padding_{module_i}", nn.ZeroPad2d((0, 1, 0, 1)))
maxpool = nn.MaxPool2d(kernel_size=kernel_size, stride=stride, padding=int((kernel_size - 1) // 2))
modules.add_module(f"maxpool_{module_i}", maxpool)
elif module_def["type"] == "upsample":
upsample = Upsample(scale_factor=int(module_def["stride"]), mode="nearest")
modules.add_module(f"upsample_{module_i}", upsample)
elif module_def["type"] == "route":
layers = [int(x) for x in module_def["layers"].split(",")]
filters = sum([output_filters[1:][i] for i in layers])
modules.add_module(f"route_{module_i}", nn.Sequential())
elif module_def["type"] == "shortcut":
filters = output_filters[1:][int(module_def["from"])]
modules.add_module(f"shortcut_{module_i}", nn.Sequential())
elif module_def["type"] == "yolo":
anchor_idxs = [int(x) for x in module_def["mask"].split(",")]
# Extract anchors
anchors = [int(x) for x in module_def["anchors"].split(",")]
anchors = [(anchors[i], anchors[i + 1]) for i in range(0, len(anchors), 2)]
anchors = [anchors[i] for i in anchor_idxs]
num_classes = int(module_def["classes"])
# Define detection layer
yolo_layer = YOLOLayer(anchors, num_classes)
modules.add_module(f"yolo_{module_i}", yolo_layer)
# Register module list and number of output filters
module_list.append(modules)
output_filters.append(filters)
return hyperparams, module_list
class Upsample(nn.Module):
""" nn.Upsample is deprecated """
def __init__(self, scale_factor, mode="nearest"):
super(Upsample, self).__init__()
self.scale_factor = scale_factor
self.mode = mode
def forward(self, x):
x = F.interpolate(x, scale_factor=self.scale_factor, mode=self.mode)
return x
class YOLOLayer(nn.Module):
"""Detection layer"""
def __init__(self, anchors, num_classes):
super(YOLOLayer, self).__init__()
self.num_anchors = len(anchors)
self.num_classes = num_classes
self.mse_loss = nn.MSELoss()
self.bce_loss = nn.BCELoss()
self.no = num_classes + 5 # number of outputs per anchor
self.grid = torch.zeros(1) # TODO
anchors = torch.tensor(list(chain(*anchors))).float().view(-1, 2)
self.register_buffer('anchors', anchors)
self.register_buffer('anchor_grid', anchors.clone().view(1, -1, 1, 1, 2))
self.stride = None
def forward(self, x, img_size):
stride = img_size // x.size(2)
self.stride = stride
bs, _, ny, nx = x.shape # x(bs,255,20,20) to x(bs,3,20,20,85)
x = x.view(bs, self.num_anchors, self.no, ny, nx).permute(0, 1, 3, 4, 2).contiguous()
if not self.training: # inference
if self.grid.shape[2:4] != x.shape[2:4]:
self.grid = self._make_grid(nx, ny).to(x.device)
y = x.sigmoid()
y[..., 0:2] = (y[..., 0:2] * 2. - 0.5 + self.grid.to(x.device)) * stride # xy
y[..., 2:4] = (y[..., 2:4] * 2) ** 2 * self.anchor_grid # wh
y = y.view(bs, -1, self.no)
return x if self.training else y
@staticmethod
def _make_grid(nx=20, ny=20):
yv, xv = torch.meshgrid([torch.arange(ny), torch.arange(nx)])
return torch.stack((xv, yv), 2).view((1, 1, ny, nx, 2)).float()
class Darknet(nn.Module):
"""YOLOv3 object detection model"""
def __init__(self, config_path):
super(Darknet, self).__init__()
self.module_defs = parse_model_config(config_path)
self.hyperparams, self.module_list = create_modules(self.module_defs)
self.yolo_layers = [layer[0] for layer in self.module_list if isinstance(layer[0], YOLOLayer)]
self.seen = 0
self.header_info = np.array([0, 0, 0, self.seen, 0], dtype=np.int32)
def forward(self, x):
img_size = x.size(2)
layer_outputs, yolo_outputs = [], []
for i, (module_def, module) in enumerate(zip(self.module_defs, self.module_list)):
if module_def["type"] in ["convolutional", "upsample", "maxpool"]:
x = module(x)
elif module_def["type"] == "route":
x = torch.cat([layer_outputs[int(layer_i)] for layer_i in module_def["layers"].split(",")], 1)
elif module_def["type"] == "shortcut":
layer_i = int(module_def["from"])
x = layer_outputs[-1] + layer_outputs[layer_i]
elif module_def["type"] == "yolo":
x = module[0](x, img_size)
yolo_outputs.append(x)
layer_outputs.append(x)
return yolo_outputs if self.training else torch.cat(yolo_outputs, 1)
def load_darknet_weights(self, weights_path):
"""Parses and loads the weights stored in 'weights_path'"""
# Open the weights file
with open(weights_path, "rb") as f:
header = np.fromfile(f, dtype=np.int32, count=5) # First five are header values
self.header_info = header # Needed to write header when saving weights
self.seen = header[3] # number of images seen during training
weights = np.fromfile(f, dtype=np.float32) # The rest are weights
# Establish cutoff for loading backbone weights
cutoff = None
if "darknet53.conv.74" in weights_path:
cutoff = 75
ptr = 0
for i, (module_def, module) in enumerate(zip(self.module_defs, self.module_list)):
if i == cutoff:
break
if module_def["type"] == "convolutional":
conv_layer = module[0]
if module_def["batch_normalize"]:
# Load BN bias, weights, running mean and running variance
bn_layer = module[1]
num_b = bn_layer.bias.numel() # Number of biases
# Bias
bn_b = torch.from_numpy(weights[ptr : ptr + num_b]).view_as(bn_layer.bias)
bn_layer.bias.data.copy_(bn_b)
ptr += num_b
# Weight
bn_w = torch.from_numpy(weights[ptr : ptr + num_b]).view_as(bn_layer.weight)
bn_layer.weight.data.copy_(bn_w)
ptr += num_b
# Running Mean
bn_rm = torch.from_numpy(weights[ptr : ptr + num_b]).view_as(bn_layer.running_mean)
bn_layer.running_mean.data.copy_(bn_rm)
ptr += num_b
# Running Var
bn_rv = torch.from_numpy(weights[ptr : ptr + num_b]).view_as(bn_layer.running_var)
bn_layer.running_var.data.copy_(bn_rv)
ptr += num_b
else:
# Load conv. bias
num_b = conv_layer.bias.numel()
conv_b = torch.from_numpy(weights[ptr : ptr + num_b]).view_as(conv_layer.bias)
conv_layer.bias.data.copy_(conv_b)
ptr += num_b
# Load conv. weights
num_w = conv_layer.weight.numel()
conv_w = torch.from_numpy(weights[ptr : ptr + num_w]).view_as(conv_layer.weight)
conv_layer.weight.data.copy_(conv_w)
ptr += num_w
def save_darknet_weights(self, path, cutoff=-1):
"""
@:param path - path of the new weights file
@:param cutoff - save layers between 0 and cutoff (cutoff = -1 -> all are saved)
"""
fp = open(path, "wb")
self.header_info[3] = self.seen
self.header_info.tofile(fp)
# Iterate through layers
for i, (module_def, module) in enumerate(zip(self.module_defs[:cutoff], self.module_list[:cutoff])):
if module_def["type"] == "convolutional":
conv_layer = module[0]
# If batch norm, load bn first
if module_def["batch_normalize"]:
bn_layer = module[1]
bn_layer.bias.data.cpu().numpy().tofile(fp)
bn_layer.weight.data.cpu().numpy().tofile(fp)
bn_layer.running_mean.data.cpu().numpy().tofile(fp)
bn_layer.running_var.data.cpu().numpy().tofile(fp)
# Load conv bias
else:
conv_layer.bias.data.cpu().numpy().tofile(fp)
# Load conv weights
conv_layer.weight.data.cpu().numpy().tofile(fp)
fp.close()
def load_model(model_path, weights_path=None):
"""Loads the yolo model from file.
:param model_path: Path to model definition file (.cfg)
:type model_path: str
:param weights_path: Path to weights or checkpoint file (.weights or .pth)
:type weights_path: str
:return: Returns model
:rtype: Darknet
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Select device for inference
model = Darknet(model_path).to(device)
model.apply(weights_init_normal)
# If pretrained weights are specified, start from checkpoint or weight file
if weights_path:
if weights_path.endswith(".pth"):
# Load checkpoint weights
model.load_state_dict(torch.load(weights_path))
else:
# Load darknet weights
model.load_darknet_weights(weights_path)
return model