Source code for gluoncv.model_zoo.ssd.ssd
"""Single-shot Multi-box Detector."""
from __future__ import absolute_import
import os
import warnings
import mxnet as mx
from mxnet import autograd
from mxnet.gluon import nn
from mxnet.gluon import HybridBlock
from ...nn.feature import FeatureExpander
from .anchor import SSDAnchorGenerator
from ...nn.predictor import ConvPredictor
from ...nn.coder import MultiPerClassDecoder, NormalizedBoxCenterDecoder
from .vgg_atrous import vgg16_atrous_300, vgg16_atrous_512
__all__ = ['SSD', 'get_ssd', 'custom_ssd']
[docs]class SSD(HybridBlock):
"""Single-shot Object Detection Network: https://arxiv.org/abs/1512.02325.
Parameters
----------
network : string or None
Name of the base network, if `None` is used, will instantiate the
base network from `features` directly instead of composing.
base_size : int
Base input size, it is speficied so SSD can support dynamic input shapes.
features : list of str or mxnet.gluon.HybridBlock
Intermediate features to be extracted or a network with multi-output.
If `network` is `None`, `features` is expected to be a multi-output network.
num_filters : list of int
Number of channels for the appended layers, ignored if `network`is `None`.
sizes : iterable fo float
Sizes of anchor boxes, this should be a list of floats, in incremental order.
The length of `sizes` must be len(layers) + 1. For example, a two stage SSD
model can have ``sizes = [30, 60, 90]``, and it converts to `[30, 60]` and
`[60, 90]` for the two stages, respectively. For more details, please refer
to original paper.
ratios : iterable of list
Aspect ratios of anchors in each output layer. Its length must be equals
to the number of SSD output layers.
steps : list of int
Step size of anchor boxes in each output layer.
classes : iterable of str
Names of all categories.
use_1x1_transition : bool
Whether to use 1x1 convolution as transition layer between attached layers,
it is effective reducing model capacity.
use_bn : bool
Whether to use BatchNorm layer after each attached convolutional layer.
reduce_ratio : float
Channel reduce ratio (0, 1) of the transition layer.
min_depth : int
Minimum channels for the transition layers.
global_pool : bool
Whether to attach a global average pooling layer as the last output layer.
pretrained : bool or str
Boolean value controls whether to load the default pretrained weights for model.
String value represents the hashtag for a certain version of pretrained weights.
stds : tuple of float, default is (0.1, 0.1, 0.2, 0.2)
Std values to be divided/multiplied to box encoded values.
nms_thresh : float, default is 0.45.
Non-maximum suppression threshold. You can specify < 0 or > 1 to disable NMS.
nms_topk : int, default is 400
Apply NMS to top k detection results, use -1 to disable so that every Detection
result is used in NMS.
post_nms : int, default is 100
Only return top `post_nms` detection results, the rest is discarded. The number is
based on COCO dataset which has maximum 100 objects per image. You can adjust this
number if expecting more objects. You can use -1 to return all detections.
anchor_alloc_size : tuple of int, default is (128, 128)
For advanced users. Define `anchor_alloc_size` to generate large enough anchor
maps, which will later saved in parameters. During inference, we support arbitrary
input image by cropping corresponding area of the anchor map. This allow us
to export to symbol so we can run it in c++, scalar, etc.
ctx : mx.Context
Network context.
norm_layer : object
Normalization layer used (default: :class:`mxnet.gluon.nn.BatchNorm`)
Can be :class:`mxnet.gluon.nn.BatchNorm` or :class:`mxnet.gluon.contrib.nn.SyncBatchNorm`.
This will only apply to base networks that has `norm_layer` specified, will ignore if the
base network (e.g. VGG) don't accept this argument.
norm_kwargs : dict
Additional `norm_layer` arguments, for example `num_devices=4`
for :class:`mxnet.gluon.contrib.nn.SyncBatchNorm`.
root : str
The root path for model storage, default is '~/.mxnet/models'
minimal_opset : bool
We sometimes add special operators to accelerate training/inference, however, for exporting
to third party compilers we want to utilize most widely used operators.
If `minimal_opset` is `True`, the network will use a minimal set of operators good
for e.g., `TVM`.
predictor_kernel: tuple of int. default is (3,3)
Dimension of predictor kernel
predictor_pad: tuple of int. default is (1,1)
Padding of the predictor kenrel conv.
anchor_generator: default is SSDAnchorGenerator
Anchor Generator to be used. The default it SSDAnchorGenerator corresponding
to SSD published article. This argument can be used for other custom
anchor generators. Like LiteAnchorGenerator.
"""
def __init__(self, network, base_size, features, num_filters, sizes, ratios,
steps, classes, use_1x1_transition=True, use_bn=True,
reduce_ratio=1.0, min_depth=128, global_pool=False, pretrained=False,
stds=(0.1, 0.1, 0.2, 0.2), nms_thresh=0.45, nms_topk=400, post_nms=100,
anchor_alloc_size=128, ctx=mx.cpu(),
norm_layer=nn.BatchNorm, norm_kwargs=None,
root=os.path.join('~', '.mxnet', 'models'), minimal_opset=False,
predictors_kernel=(3, 3), predictors_pad=(1, 1),
anchor_generator=SSDAnchorGenerator, **kwargs):
super(SSD, self).__init__(**kwargs)
if norm_kwargs is None:
norm_kwargs = {}
if network is None:
num_layers = len(ratios)
else:
num_layers = len(features) + len(num_filters) + int(global_pool)
assert len(sizes) == num_layers + 1
sizes = list(zip(sizes[:-1], sizes[1:]))
assert isinstance(ratios, (tuple, list)), "Must provide ratios as tuple or list"
if not isinstance(ratios[0], (tuple, list)):
ratios = ratios * num_layers # propagate to all layers if use same ratio
assert num_layers == len(sizes) == len(ratios), \
"Mismatched (number of layers) vs (sizes) vs (ratios): {}, {}, {}".format(
num_layers, len(sizes), len(ratios))
assert num_layers > 0, "SSD require at least one layer, suggest multiple."
self._num_layers = num_layers
self.classes = classes
self.nms_thresh = nms_thresh
self.nms_topk = nms_topk
self.post_nms = post_nms
with self.name_scope():
if network is None:
# use fine-grained manually designed block as features
try:
self.features = features(pretrained=pretrained, ctx=ctx, root=root,
norm_layer=norm_layer, norm_kwargs=norm_kwargs)
except TypeError:
self.features = features(pretrained=pretrained, ctx=ctx, root=root)
else:
try:
self.features = FeatureExpander(
network=network, outputs=features, num_filters=num_filters,
use_1x1_transition=use_1x1_transition,
use_bn=use_bn, reduce_ratio=reduce_ratio, min_depth=min_depth,
global_pool=global_pool, pretrained=pretrained, ctx=ctx,
norm_layer=norm_layer, norm_kwargs=norm_kwargs, root=root)
except TypeError:
self.features = FeatureExpander(
network=network, outputs=features, num_filters=num_filters,
use_1x1_transition=use_1x1_transition,
use_bn=use_bn, reduce_ratio=reduce_ratio, min_depth=min_depth,
global_pool=global_pool, pretrained=pretrained, ctx=ctx, root=root)
self.class_predictors = nn.HybridSequential()
self.box_predictors = nn.HybridSequential()
self.anchor_generators = nn.HybridSequential()
asz = anchor_alloc_size
im_size = (base_size, base_size)
for i, s, r, st in zip(range(num_layers), sizes, ratios, steps):
branch_anchor_generator = anchor_generator(i, im_size, s, r, st, (asz, asz))
self.anchor_generators.add(branch_anchor_generator)
asz = max(asz // 2, 16) # pre-compute larger than 16x16 anchor map
num_anchors = branch_anchor_generator.num_depth
self.class_predictors.add(ConvPredictor(num_anchors * (len(self.classes) + 1),
kernel=predictors_kernel,
pad=predictors_pad))
self.box_predictors.add(ConvPredictor(num_anchors * 4,
kernel=predictors_kernel,
pad=predictors_pad))
self.bbox_decoder = NormalizedBoxCenterDecoder(stds, minimal_opset=minimal_opset)
self.cls_decoder = MultiPerClassDecoder(len(self.classes) + 1, thresh=0.01)
@property
def num_classes(self):
"""Return number of foreground classes.
Returns
-------
int
Number of foreground classes
"""
return len(self.classes)
[docs] def set_nms(self, nms_thresh=0.45, nms_topk=400, post_nms=100):
"""Set non-maximum suppression parameters.
Parameters
----------
nms_thresh : float, default is 0.45.
Non-maximum suppression threshold. You can specify < 0 or > 1 to disable NMS.
nms_topk : int, default is 400
Apply NMS to top k detection results, use -1 to disable so that every Detection
result is used in NMS.
post_nms : int, default is 100
Only return top `post_nms` detection results, the rest is discarded. The number is
based on COCO dataset which has maximum 100 objects per image. You can adjust this
number if expecting more objects. You can use -1 to return all detections.
Returns
-------
None
"""
self._clear_cached_op()
self.nms_thresh = nms_thresh
self.nms_topk = nms_topk
self.post_nms = post_nms
# pylint: disable=arguments-differ
[docs] def hybrid_forward(self, F, x):
"""Hybrid forward"""
features = self.features(x)
cls_preds = [F.flatten(F.transpose(cp(feat), (0, 2, 3, 1)))
for feat, cp in zip(features, self.class_predictors)]
box_preds = [F.flatten(F.transpose(bp(feat), (0, 2, 3, 1)))
for feat, bp in zip(features, self.box_predictors)]
anchors = [F.reshape(ag(feat), shape=(1, -1))
for feat, ag in zip(features, self.anchor_generators)]
cls_preds = F.concat(*cls_preds, dim=1).reshape((0, -1, self.num_classes + 1))
box_preds = F.concat(*box_preds, dim=1).reshape((0, -1, 4))
anchors = F.concat(*anchors, dim=1).reshape((1, -1, 4))
if autograd.is_training():
return [cls_preds, box_preds, anchors]
bboxes = self.bbox_decoder(box_preds, anchors)
cls_ids, scores = self.cls_decoder(F.softmax(cls_preds, axis=-1))
results = []
for i in range(self.num_classes):
cls_id = cls_ids.slice_axis(axis=-1, begin=i, end=i+1)
score = scores.slice_axis(axis=-1, begin=i, end=i+1)
# per class results
per_result = F.concat(*[cls_id, score, bboxes], dim=-1)
results.append(per_result)
result = F.concat(*results, dim=1)
if self.nms_thresh > 0 and self.nms_thresh < 1:
result = F.contrib.box_nms(
result, overlap_thresh=self.nms_thresh, topk=self.nms_topk, valid_thresh=0.01,
id_index=0, score_index=1, coord_start=2, force_suppress=False)
if self.post_nms > 0:
result = result.slice_axis(axis=1, begin=0, end=self.post_nms)
ids = F.slice_axis(result, axis=2, begin=0, end=1)
scores = F.slice_axis(result, axis=2, begin=1, end=2)
bboxes = F.slice_axis(result, axis=2, begin=2, end=6)
return ids, scores, bboxes
[docs] def reset_class(self, classes, reuse_weights=None):
"""Reset class categories and class predictors.
Parameters
----------
classes : iterable of str
The new categories. ['apple', 'orange'] for example.
reuse_weights : dict
A {new_integer : old_integer} or mapping dict or {new_name : old_name} mapping dict,
or a list of [name0, name1,...] if class names don't change.
This allows the new predictor to reuse the
previously trained weights specified.
Example
-------
>>> net = gluoncv.model_zoo.get_model('ssd_512_resnet50_v1_voc', pretrained=True)
>>> # use direct name to name mapping to reuse weights
>>> net.reset_class(classes=['person'], reuse_weights={'person':'person'})
>>> # or use interger mapping, person is the 14th category in VOC
>>> net.reset_class(classes=['person'], reuse_weights={0:14})
>>> # you can even mix them
>>> net.reset_class(classes=['person'], reuse_weights={'person':14})
>>> # or use a list of string if class name don't change
>>> net.reset_class(classes=['person'], reuse_weights=['person'])
"""
self._clear_cached_op()
old_classes = self.classes
self.classes = classes
# trying to reuse weights by mapping old and new classes
if isinstance(reuse_weights, (dict, list)):
if isinstance(reuse_weights, dict):
# trying to replace str with indices
new_keys = []
new_vals = []
for k, v in reuse_weights.items():
if isinstance(v, str):
try:
new_vals.append(old_classes.index(v)) # raise ValueError if not found
except ValueError:
raise ValueError(
"{} not found in old class names {}".format(v, old_classes))
else:
if v < 0 or v >= len(old_classes):
raise ValueError(
"Index {} out of bounds for old class names".format(v))
new_vals.append(v)
if isinstance(k, str):
try:
new_keys.append(self.classes.index(k)) # raise ValueError if not found
except ValueError:
raise ValueError(
"{} not found in new class names {}".format(k, self.classes))
else:
if k < 0 or k >= len(self.classes):
raise ValueError(
"Index {} out of bounds for new class names".format(k))
new_keys.append(k)
reuse_weights = dict(zip(new_keys, new_vals))
else:
new_map = {}
for x in reuse_weights:
try:
new_idx = self.classes.index(x)
old_idx = old_classes.index(x)
new_map[new_idx] = old_idx
except ValueError:
warnings.warn("{} not found in old: {} or new class names: {}".format(
x, old_classes, self.classes))
reuse_weights = new_map
# replace class predictors
with self.name_scope():
class_predictors = nn.HybridSequential(prefix=self.class_predictors.prefix)
for i, ag in zip(range(len(self.class_predictors)), self.anchor_generators):
# Re-use the same prefix and ctx_list as used by the current ConvPredictor
prefix = self.class_predictors[i].prefix
old_pred = self.class_predictors[i].predictor
ctx = list(old_pred.params.values())[0].list_ctx()
# to avoid deferred init, number of in_channels must be defined
in_channels = list(old_pred.params.values())[0].shape[1]
new_cp = ConvPredictor(ag.num_depth * (self.num_classes + 1),
in_channels=in_channels, prefix=prefix)
new_cp.collect_params().initialize(ctx=ctx)
if reuse_weights:
assert isinstance(reuse_weights, dict)
for old_params, new_params in zip(old_pred.params.values(),
new_cp.predictor.params.values()):
old_data = old_params.data()
new_data = new_params.data()
for k, v in reuse_weights.items():
if k >= len(self.classes) or v >= len(old_classes):
warnings.warn("reuse mapping {}/{} -> {}/{} out of range".format(
k, self.classes, v, old_classes))
continue
# always increment k and v (background is always the 0th)
new_data[k+1::len(self.classes)+1] = old_data[v+1::len(old_classes)+1]
# reuse background weights as well
new_data[0::len(self.classes)+1] = old_data[0::len(old_classes)+1]
# set data to new conv layers
new_params.set_data(new_data)
class_predictors.add(new_cp)
self.class_predictors = class_predictors
self.cls_decoder = MultiPerClassDecoder(len(self.classes) + 1, thresh=0.01)
[docs]def get_ssd(name, base_size, features, filters, sizes, ratios, steps, classes,
dataset, pretrained=False, pretrained_base=True, ctx=mx.cpu(),
root=os.path.join('~', '.mxnet', 'models'),
anchor_generator=SSDAnchorGenerator, **kwargs):
"""Get SSD models.
Parameters
----------
name : str or None
Model name, if `None` is used, you must specify `features` to be a `HybridBlock`.
base_size : int
Base image size for training, this is fixed once training is assigned.
A fixed base size still allows you to have variable input size during test.
features : iterable of str or `HybridBlock`
List of network internal output names, in order to specify which layers are
used for predicting bbox values.
If `name` is `None`, `features` must be a `HybridBlock` which generate multiple
outputs for prediction.
filters : iterable of float or None
List of convolution layer channels which is going to be appended to the base
network feature extractor. If `name` is `None`, this is ignored.
sizes : iterable fo float
Sizes of anchor boxes, this should be a list of floats, in incremental order.
The length of `sizes` must be len(layers) + 1. For example, a two stage SSD
model can have ``sizes = [30, 60, 90]``, and it converts to `[30, 60]` and
`[60, 90]` for the two stages, respectively. For more details, please refer
to original paper.
ratios : iterable of list
Aspect ratios of anchors in each output layer. Its length must be equals
to the number of SSD output layers.
steps : list of int
Step size of anchor boxes in each output layer.
classes : iterable of str
Names of categories.
dataset : str
Name of dataset. This is used to identify model name because models trained on
different datasets are going to be very different.
pretrained : bool or str
Boolean value controls whether to load the default pretrained weights for model.
String value represents the hashtag for a certain version of pretrained weights.
pretrained_base : bool or str, optional, default is True
Load pretrained base network, the extra layers are randomized. Note that
if pretrained is `True`, this has no effect.
ctx : mxnet.Context
Context such as mx.cpu(), mx.gpu(0).
root : str
Model weights storing path.
norm_layer : object
Normalization layer used (default: :class:`mxnet.gluon.nn.BatchNorm`)
Can be :class:`mxnet.gluon.nn.BatchNorm` or :class:`mxnet.gluon.contrib.nn.SyncBatchNorm`.
norm_kwargs : dict
Additional `norm_layer` arguments, for example `num_devices=4`
for :class:`mxnet.gluon.contrib.nn.SyncBatchNorm`.
Returns
-------
HybridBlock
A SSD detection network.
"""
pretrained_base = False if pretrained else pretrained_base
base_name = None if callable(features) else name
net = SSD(base_name, base_size, features, filters, sizes, ratios, steps,
pretrained=pretrained_base, classes=classes, ctx=ctx, root=root,
minimal_opset=pretrained, anchor_generator=anchor_generator, **kwargs)
if pretrained:
from ..model_store import get_model_file
full_name = '_'.join(('ssd', str(base_size), name, dataset))
net.load_parameters(get_model_file(full_name, tag=pretrained, root=root), ctx=ctx)
return net
[docs]def custom_ssd(base_network_name, base_size, filters, sizes, ratios, steps,
classes, dataset, pretrained_base, **kwargs):
"""Custom SSD models.
"""
if base_network_name == 'vgg16_atrous' and base_size == 300:
features = vgg16_atrous_300
elif base_network_name == 'vgg16_atrous' and base_size == 512:
features = vgg16_atrous_512
elif base_network_name == 'resnet18_v1' and base_size == 512:
features = ['stage3_activation1', 'stage4_activation1']
elif base_network_name == 'resnet50_v1' and base_size == 512:
features = ['stage3_activation5', 'stage4_activation2']
elif base_network_name == 'resnet101_v2' and base_size == 512:
features = ['stage3_activation22', 'stage4_activation2']
elif base_network_name == 'resnet152_v2' and base_size == 512:
features = ['stage2_activation7', 'stage3_activation35', 'stage4_activation2']
elif base_network_name == 'mobilenet1.0' and base_size == 512:
features = ['relu22_fwd', 'relu26_fwd']
elif base_network_name == 'mobilenet1.0' and base_size == 300:
features = ['relu22_fwd', 'relu26_fwd']
elif base_network_name == 'mobilenet0.25' and base_size == 300:
features = ['relu22_fwd', 'relu26_fwd']
else:
raise NotImplementedError('Unsupported network', base_network_name)
net = get_ssd(name=base_network_name,
base_size=base_size,
features=features,
filters=filters,
sizes=sizes,
ratios=ratios,
steps=steps,
classes=classes,
dataset=dataset,
pretrained_base=pretrained_base,
**kwargs)
return net