## @package memonger
# Module caffe2.python.memonger
import networkx as nx
import collections
import time
import copy
from caffe2.python import workspace, core
from caffe2.proto import caffe2_pb2
import enum
import logging
from future.utils import viewitems, viewvalues
import caffe2.python._import_c_extension as C
log = logging.getLogger("memonger")
log.setLevel(logging.INFO)
LiveRange = collections.namedtuple('LiveRange', ["defined", "used", "size"])
def share_grad_blobs(
net,
losses,
param_grads,
namescope,
dont_share_blobs=None,
share_activations=False,
blob_shapes=None,
):
'''
Implements similar optimization as Torch's shareGradInput():
for the gradients that are passed between layers, share blobs between
operators when possible. This yields significant memory savings with
deep networks.
Returns an optimized protobuf (assign to net._net)
'''
def is_grad_blob(b):
name = str(b)
# Note: need to look at _{namescope} pattern as it matches
# to handle the auto-split gradients
return name.endswith("_grad") and (name.startswith(namescope) or
name.startswith("_" + namescope)) and name not in param_grads
def is_grad_op(op):
# TODO: something smarter
for b in list(op.input) + list(op.output):
if is_grad_blob(b):
return True
return False
log.warn("NOTE: Executing memonger to optimize gradient memory")
# Collect ops that have something to do with gradients
if namescope != "" and not namescope.endswith("/"):
namescope += "/"
netproto = copy.deepcopy(net.Proto())
activations = []
external_output = set(net.Proto().external_output)
# Hacky way to get activations, think of a better way
for op in net.Proto().op:
for b in op.output:
if b + "_w" in op.input and b not in external_output:
activations.append(b)
# Remove last activations, as they are usually accessed externally
activations = set(activations[:-2])
# Gradient ops
grad_op_indices = []
for idx, op in enumerate(netproto.op):
if (is_grad_op(op)):
grad_op_indices.append(idx)
shared_blobs = set()
for op in net.Proto().op:
for b in list(op.input) + list(op.output):
if is_grad_blob(b) or (share_activations and b in activations):
shared_blobs.add(b)
start_time = time.time()
optim_str = C.memonger_compute_blob_recycling_for_dag(
netproto.SerializeToString(),
[str(s).encode('utf-8') for s in losses],
grad_op_indices,
set(str(s).encode('utf-8') for s in shared_blobs),
namescope.encode('utf-8'),
set() if dont_share_blobs is None else dont_share_blobs,
{} if blob_shapes is None else blob_shapes
)
log.info("Memonger memory optimization took {} secs".format(
time.time() - start_time),
)
optim = caffe2_pb2.NetDef()
optim.ParseFromString(optim_str)
assert verify_graph_equality(net.Proto(), optim), \
"Memonger graph is not equal to original."
assert verify_inplace_blobs(net.Proto(), optim), \
"Inplace assignments differ in memonger net."
return optim
def optimize_inference_for_dag(net, input_blobs, namescope=""):
netproto = copy.deepcopy(net.Proto())
external_input = set(net.Proto().external_input)
external_output = set(net.Proto().external_output)
def is_activation_blob(b):
return b not in external_input and b not in external_output
activation_blobs = set()
seen_as_output = set()
ops = list(net.Proto().op)
op_indices = [index for index, op in enumerate(net.Proto().op)]
# Sanity check: check that all external inputs are properly accounted
# and that no gradient ops are included in 'net'
for op in ops:
for b in op.input:
if is_activation_blob(b):
activation_blobs.add(b)
if b not in seen_as_output:
raise AssertionError("{} not in external input".format(b))
for b in op.output:
if is_activation_blob(b):
activation_blobs.add(b)
seen_as_output = seen_as_output.union(set(op.output))
assert not op.is_gradient_op, \
"You can only pass inference-only nets to optimize_inference_for_dag"
start_time = time.time()
optim_str = C.memonger_compute_blob_recycling_for_dag(
netproto.SerializeToString(),
[str(s).encode('utf-8') for s in input_blobs],
op_indices,
set(str(s).encode('utf-8') for s in activation_blobs),
namescope.encode('utf-8'),
set(),
{}
)
log.info("Memonger memory optimization took {} secs".format(
time.time() - start_time),
)
optim = caffe2_pb2.NetDef()
optim.ParseFromString(optim_str)
assert verify_graph_equality(net.Proto(), optim), \
"Memonger graph is not equal to original."
assert verify_inplace_blobs(net.Proto(), optim), \
"Inplace assignments differ in memonger net."
return optim
def estimate_memory_usage(protos, shapes, types, devicescope):
import numpy as np
'''
Estimate memory usage of a model. This is an estimate because
we assume a single threaded execution and miss some internal
memory usage of operators. Only estimates the memory for a given
device scope.
Also, currently it does not handle correctly if blob sizes vary
during execution, as it uses only the final blob size.
Returns (total, highwater, by op type) memory allocation in bytes.
'''
sizeofs = {
caffe2_pb2.TensorProto.DOUBLE: 8,
caffe2_pb2.TensorProto.FLOAT: 4,
caffe2_pb2.TensorProto.FLOAT16: 2,
caffe2_pb2.TensorProto.INT32: 4,
caffe2_pb2.TensorProto.INT8: 1,
caffe2_pb2.TensorProto.UINT8: 1,
caffe2_pb2.TensorProto.UINT16: 2,
caffe2_pb2.TensorProto.INT16: 2,
caffe2_pb2.TensorProto.BOOL: 1,
caffe2_pb2.TensorProto.INT64: 8,
}
def split_net(proto):
ops = [op for op in proto.op if
op.device_option == devicescope or op.type in {"Free", "Alias"}]
del proto.op[:]
proto.op.extend(ops)
return proto
def num_bytes(blob):
if blob not in shapes or blob not in types:
log.warning("Unknown blob encountered: {}".format(blob))
return 0
sizeof = sizeofs[types[blob]]
return sizeof * np.prod(shapes[blob])
protos = [split_net(proto) for proto in protos]
allocs_by_ops = collections.defaultdict(lambda: 0)
# Evaluate
current_allocated = 0
max_allocated = 0
total_allocated = 0
allocated = set()
for proto in protos:
for op in proto.op:
if op.type == "Free" or op.type == "Alias":
for o in op.output:
if o in allocated:
current_allocated -= num_bytes(o)
allocated.remove(o)
else:
for output in op.output:
if output not in allocated:
nbytes = num_bytes(output)
total_allocated += nbytes
current_allocated += nbytes
max_allocated = max(max_allocated, current_allocated)
allocated.add(output)
allocs_by_ops[op.type] += nbytes
return (total_allocated, max_allocated, allocs_by_ops)
def release_blobs_when_used(netproto, dont_free_blobs, selector_fun=None):
'''
Insert Free-ops after a blob has been used the last time, so that its
memory can be reclaimed. Use this only with efficient caching memory
managers (such as CUB, --caffe2_cuda_memory_pool=cub).
Blobs used with Alias op won't be freed.
@dont_free_blobs: is a set of blobs that should not be freed
@selector_fun: optional lambda that return True if blob name
can be released. Use for easy special filtering, like
excluding blobs with "loss" in the name.
Returns a new protobuffer. To use with a model, use:
model.net._net = memonger.release_blobs_when_used(..)
'''
input_blobs = set()
can_release = set()
alias_blobs = set()
netproto = copy.deepcopy(netproto)
for op in netproto.op:
if op.type == 'Alias':
alias_blobs.add(op.input[0])
continue
for inp in op.input:
input_blobs.add(inp)
for outp in op.output:
if outp not in input_blobs:
if selector_fun is None or selector_fun(outp):
can_release.add(outp)
# Remove such blobs that are not input at all and external outputs
can_release = can_release - set(netproto.external_output)
can_release = can_release.intersection(input_blobs)
can_release = can_release - dont_free_blobs
can_release = can_release - alias_blobs
ops = list(netproto.op)
# .. then find last use of each can-release blob, and insert a Free op
for j in reversed(range(0, len(netproto.op))):
op = netproto.op[j]
for inp in op.input:
if inp in can_release:
can_release.remove(inp)
ops.insert(j + 1, core.CreateOperator("Free", [inp], [inp]))
del netproto.op[:]
netproto.op.extend(ops)
return netproto
def _find_source_nodes(g):
''' Return nodes without predecessors '''
ret = []
for cn in g:
cur_pred = list(g.predecessors(cn))
if not cur_pred:
ret.append(cn)
return ret
def _find_target_nodes(g):
''' Return nodes without successors '''
ret = []
for cn in g:
cur_succ = list(g.successors(cn))
if not cur_succ:
ret.append(cn)
return ret
def _add_single_target_ifneeded(g):
targets = _find_target_nodes(g)
assert len(targets) >= 1
if len(targets) == 1:
return g
ret = copy.deepcopy(g)
def _next_available_idx(g):
ret = -1
for cn in g:
if cn > ret:
ret = cn
ret += 1
return ret
target_node_idx = _next_available_idx(g)
ret.add_node(target_node_idx)
for cn in targets:
ret.add_edge(cn, target_node_idx)
return ret
def _get_path(pred_list, dist_list):
''' Get the path from nx.bellman_ford()'s output '''
# distances are negative
assert all(dist_list[x] <= 0 for x in dist_list)
# node with longest distance to source is the target
target = min(dist_list, key=lambda x: dist_list[x])
ret = []
cur = target
while cur is not None:
ret.append(cur)
# Hack to get networkx 2.0 happy: it uses list in pred.
# TODO(tulloch): are there cases with multiple predecessors?
try:
cur = pred_list[cur][0] if pred_list[cur] else None
except TypeError:
cur = pred_list[cur]
return list(reversed(ret))
Loading ...