Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tensorflow / purelib / tensorflow / python / client / timeline.py
Size: Mime:
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Timeline visualization for TensorFlow using Chrome Trace Format."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import collections
import copy
import json
import re

# The timeline target is usually imported as part of BUILD target
# "platform_test", which includes also includes the "platform"
# dependency.  This is why the logging import here is okay.
from tensorflow.python.platform import tf_logging as logging


class AllocationMaximum(collections.namedtuple(
    'AllocationMaximum', ('timestamp', 'num_bytes', 'tensors'))):
  """Stores the maximum allocation for a given allocator within the timelne.

  Parameters:
    timestamp: `tensorflow::Env::NowMicros()` when this maximum was reached.
    num_bytes: the total memory used at this time.
    tensors: the set of tensors allocated at this time.
  """
  pass


class StepStatsAnalysis(collections.namedtuple(
    'StepStatsAnalysis', ('chrome_trace', 'allocator_maximums'))):
  """Stores the step stats analysis output.

  Parameters:
    chrome_trace: A dict containing the chrome trace analysis.
    allocator_maximums: A dict mapping allocator names to AllocationMaximum.
  """
  pass


class _ChromeTraceFormatter(object):
  """A helper class for generating traces in Chrome Trace Format."""

  def __init__(self, show_memory=False):
    """Constructs a new Chrome Trace formatter."""
    self._show_memory = show_memory
    self._events = []
    self._metadata = []

  def _create_event(self, ph, category, name, pid, tid, timestamp):
    """Creates a new Chrome Trace event.

    For details of the file format, see:
    https://github.com/catapult-project/catapult/blob/master/tracing/README.md

    Args:
      ph:  The type of event - usually a single character.
      category: The event category as a string.
      name:  The event name as a string.
      pid:  Identifier of the process generating this event as an integer.
      tid:  Identifier of the thread generating this event as an integer.
      timestamp:  The timestamp of this event as a long integer.

    Returns:
      A JSON compatible event object.
    """
    event = {}
    event['ph'] = ph
    event['cat'] = category
    event['name'] = name
    event['pid'] = pid
    event['tid'] = tid
    event['ts'] = timestamp
    return event

  def emit_pid(self, name, pid):
    """Adds a process metadata event to the trace.

    Args:
      name:  The process name as a string.
      pid:  Identifier of the process as an integer.
    """
    event = {}
    event['name'] = 'process_name'
    event['ph'] = 'M'
    event['pid'] = pid
    event['args'] = {'name': name}
    self._metadata.append(event)

  def emit_tid(self, name, pid, tid):
    """Adds a thread metadata event to the trace.

    Args:
      name:  The thread name as a string.
      pid:  Identifier of the process as an integer.
      tid:  Identifier of the thread as an integer.
    """
    event = {}
    event['name'] = 'thread_name'
    event['ph'] = 'M'
    event['pid'] = pid
    event['tid'] = tid
    event['args'] = {'name': name}
    self._metadata.append(event)

  def emit_region(self, timestamp, duration, pid, tid, category, name, args):
    """Adds a region event to the trace.

    Args:
      timestamp:  The start timestamp of this region as a long integer.
      duration:  The duration of this region as a long integer.
      pid:  Identifier of the process generating this event as an integer.
      tid:  Identifier of the thread generating this event as an integer.
      category: The event category as a string.
      name:  The event name as a string.
      args:  A JSON-compatible dictionary of event arguments.
    """
    event = self._create_event('X', category, name, pid, tid, timestamp)
    event['dur'] = duration
    event['args'] = args
    self._events.append(event)

  def emit_obj_create(self, category, name, timestamp, pid, tid, object_id):
    """Adds an object creation event to the trace.

    Args:
      category: The event category as a string.
      name:  The event name as a string.
      timestamp:  The timestamp of this event as a long integer.
      pid:  Identifier of the process generating this event as an integer.
      tid:  Identifier of the thread generating this event as an integer.
      object_id: Identifier of the object as an integer.
    """
    event = self._create_event('N', category, name, pid, tid, timestamp)
    event['id'] = object_id
    self._events.append(event)

  def emit_obj_delete(self, category, name, timestamp, pid, tid, object_id):
    """Adds an object deletion event to the trace.

    Args:
      category: The event category as a string.
      name:  The event name as a string.
      timestamp:  The timestamp of this event as a long integer.
      pid:  Identifier of the process generating this event as an integer.
      tid:  Identifier of the thread generating this event as an integer.
      object_id: Identifier of the object as an integer.
    """
    event = self._create_event('D', category, name, pid, tid, timestamp)
    event['id'] = object_id
    self._events.append(event)

  def emit_obj_snapshot(self, category, name, timestamp, pid, tid, object_id,
                        snapshot):
    """Adds an object snapshot event to the trace.

    Args:
      category: The event category as a string.
      name:  The event name as a string.
      timestamp:  The timestamp of this event as a long integer.
      pid:  Identifier of the process generating this event as an integer.
      tid:  Identifier of the thread generating this event as an integer.
      object_id: Identifier of the object as an integer.
      snapshot:  A JSON-compatible representation of the object.
    """
    event = self._create_event('O', category, name, pid, tid, timestamp)
    event['id'] = object_id
    event['args'] = {'snapshot': snapshot}
    self._events.append(event)

  def emit_flow_start(self, name, timestamp, pid, tid, flow_id):
    """Adds a flow start event to the trace.

    When matched with a flow end event (with the same 'flow_id') this will
    cause the trace viewer to draw an arrow between the start and end events.

    Args:
      name:  The event name as a string.
      timestamp:  The timestamp of this event as a long integer.
      pid:  Identifier of the process generating this event as an integer.
      tid:  Identifier of the thread generating this event as an integer.
      flow_id: Identifier of the flow as an integer.
    """
    event = self._create_event('s', 'DataFlow', name, pid, tid, timestamp)
    event['id'] = flow_id
    self._events.append(event)

  def emit_flow_end(self, name, timestamp, pid, tid, flow_id):
    """Adds a flow end event to the trace.

    When matched with a flow start event (with the same 'flow_id') this will
    cause the trace viewer to draw an arrow between the start and end events.

    Args:
      name:  The event name as a string.
      timestamp:  The timestamp of this event as a long integer.
      pid:  Identifier of the process generating this event as an integer.
      tid:  Identifier of the thread generating this event as an integer.
      flow_id: Identifier of the flow as an integer.
    """
    event = self._create_event('t', 'DataFlow', name, pid, tid, timestamp)
    event['id'] = flow_id
    self._events.append(event)

  def emit_counter(self, category, name, pid, timestamp, counter, value):
    """Emits a record for a single counter.

    Args:
      category: The event category as a string.
      name:  The event name as a string.
      pid:  Identifier of the process generating this event as an integer.
      timestamp:  The timestamp of this event as a long integer.
      counter: Name of the counter as a string.
      value:  Value of the counter as an integer.
    """
    event = self._create_event('C', category, name, pid, 0, timestamp)
    event['args'] = {counter: value}
    self._events.append(event)

  def emit_counters(self, category, name, pid, timestamp, counters):
    """Emits a counter record for the dictionary 'counters'.

    Args:
      category: The event category as a string.
      name:  The event name as a string.
      pid:  Identifier of the process generating this event as an integer.
      timestamp:  The timestamp of this event as a long integer.
      counters: Dictionary of counter values.
    """
    event = self._create_event('C', category, name, pid, 0, timestamp)
    event['args'] = counters.copy()
    self._events.append(event)

  def format_to_string(self, pretty=False):
    """Formats the chrome trace to a string.

    Args:
      pretty: (Optional.)  If True, produce human-readable JSON output.

    Returns:
      A JSON-formatted string in Chrome Trace format.
    """
    trace = {}
    trace['traceEvents'] = self._metadata + self._events
    if pretty:
      return json.dumps(trace, indent=4, separators=(',', ': '))
    else:
      return json.dumps(trace, separators=(',', ':'))


class _TensorTracker(object):
  """An internal class to track the lifetime of a Tensor."""

  def __init__(self, name, object_id, timestamp, pid, allocator, num_bytes):
    """Creates an object to track tensor references.

    This class is not thread safe and is intended only for internal use by
    the 'Timeline' class in this file.

    Args:
      name:  The name of the Tensor as a string.
      object_id:  Chrome Trace object identifier assigned for this Tensor.
      timestamp:  The creation timestamp of this event as a long integer.
      pid:  Process identifier of the associated device, as an integer.
      allocator:  Name of the allocator used to create the Tensor.
      num_bytes:  Number of bytes allocated (long integer).

    Returns:
      A 'TensorTracker' object.
    """
    self._name = name
    self._pid = pid
    self._object_id = object_id
    self._create_time = timestamp
    self._allocator = allocator
    self._num_bytes = num_bytes
    self._ref_times = []
    self._unref_times = []

  @property
  def name(self):
    """Name of this tensor."""
    return self._name

  @property
  def pid(self):
    """ID of the process which created this tensor (an integer)."""
    return self._pid

  @property
  def create_time(self):
    """Timestamp when this tensor was created (long integer)."""
    return self._create_time

  @property
  def object_id(self):
    """Returns the object identifier of this tensor (integer)."""
    return self._object_id

  @property
  def num_bytes(self):
    """Size of this tensor in bytes (long integer)."""
    return self._num_bytes

  @property
  def allocator(self):
    """Name of the allocator used to create this tensor (string)."""
    return self._allocator

  @property
  def last_unref(self):
    """Last unreference timestamp of this tensor (long integer)."""
    return max(self._unref_times)

  def add_ref(self, timestamp):
    """Adds a reference to this tensor with the specified timestamp.

    Args:
      timestamp:  Timestamp of object reference as an integer.
    """
    self._ref_times.append(timestamp)

  def add_unref(self, timestamp):
    """Adds an unref to this tensor with the specified timestamp.

    Args:
      timestamp:  Timestamp of object unreference as an integer.
    """
    self._unref_times.append(timestamp)


class Timeline(object):
  """A class for visualizing execution timelines of TensorFlow steps."""

  def __init__(self, step_stats, graph=None):
    """Constructs a new Timeline.

    A 'Timeline' is used for visualizing the execution of a TensorFlow
    computation.  It shows the timings and concurrency of execution at
    the granularity of TensorFlow Ops.
    This class is not thread safe.

    Args:
      step_stats: The 'StepStats' proto recording execution times.
      graph: (Optional) The 'Graph' that was executed.
    """

    self._step_stats = step_stats
    self._graph = graph
    self._chrome_trace = _ChromeTraceFormatter()
    self._next_pid = 0
    self._device_pids = {}  # device name -> pid for compute activity.
    self._tensor_pids = {}  # device name -> pid for tensors.
    self._tensors = {}  # tensor_name -> TensorTracker
    self._next_flow_id = 0
    self._flow_starts = {}  # tensor_name -> (timestamp, pid, tid)
    self._alloc_times = {}  # tensor_name -> ( time, allocator, size )
    self._allocator_maximums = {}  # allocator name => maximum bytes long

  def _alloc_pid(self):
    """Allocate a process Id."""
    pid = self._next_pid
    self._next_pid += 1
    return pid

  def _alloc_flow_id(self):
    """Allocate a flow Id."""
    flow_id = self._next_flow_id
    self._next_flow_id += 1
    return flow_id

  def _parse_op_label(self, label):
    """Parses the fields in a node timeline label."""
    # Expects labels of the form: name = op(arg, arg, ...).
    match = re.match(r'(.*) = (.*)\((.*)\)', label)
    if match is None:
      return 'unknown', 'unknown', []
    nn, op, inputs = match.groups()
    if not inputs:
      inputs = []
    else:
      inputs = inputs.split(', ')
    return nn, op, inputs

  def _assign_lanes(self):
    """Assigns non-overlapping lanes for the activities on each device."""
    for device_stats in self._step_stats.dev_stats:
      # TODO(pbar): Genuine thread IDs in NodeExecStats might be helpful.
      lanes = [0]
      for ns in device_stats.node_stats:
        l = -1
        for (i, lts) in enumerate(lanes):
          if ns.all_start_micros > lts:
            l = i
            lanes[l] = ns.all_start_micros + ns.all_end_rel_micros
            break
        if l < 0:
          l = len(lanes)
          lanes.append(ns.all_start_micros + ns.all_end_rel_micros)
        ns.thread_id = l

  def _emit_op(self, nodestats, pid, is_gputrace):
    """Generates a Chrome Trace event to show Op execution.

    Args:
      nodestats: The 'NodeExecStats' proto recording op execution.
      pid: The pid assigned for the device where this op ran.
      is_gputrace: If True then this op came from the GPUTracer.
    """
    node_name = nodestats.node_name
    start = nodestats.all_start_micros
    duration = nodestats.all_end_rel_micros
    tid = nodestats.thread_id
    inputs = []
    if is_gputrace:
      # Node names should always have the form 'name:op'.
      fields = node_name.split(':') + ['unknown']
      node_name, op = fields[:2]
    elif node_name == 'RecvTensor':
      # RPC tracing does not use the standard timeline_label format.
      op = 'RecvTensor'
    else:
      _, op, inputs = self._parse_op_label(nodestats.timeline_label)
    args = {'name': node_name, 'op': op}
    for i, iname in enumerate(inputs):
      args['input%d' % i] = iname
    self._chrome_trace.emit_region(start, duration, pid, tid, 'Op', op, args)

  def _emit_tensor_snapshot(self, tensor, timestamp, pid, tid, value):
    """Generate Chrome Trace snapshot event for a computed Tensor.

    Args:
      tensor: A 'TensorTracker' object.
      timestamp:  The timestamp of this snapshot as a long integer.
      pid: The pid assigned for showing the device where this op ran.
      tid: The tid of the thread computing the tensor snapshot.
      value: A JSON-compliant snapshot of the object.
    """
    desc = str(value.tensor_description).replace('"', '')
    snapshot = {'tensor_description': desc}
    self._chrome_trace.emit_obj_snapshot('Tensor', tensor.name, timestamp, pid,
                                         tid, tensor.object_id, snapshot)

  def _produce_tensor(self, name, timestamp, tensors_pid, allocator, num_bytes):
    object_id = len(self._tensors)
    tensor = _TensorTracker(name, object_id, timestamp, tensors_pid, allocator,
                            num_bytes)
    self._tensors[name] = tensor
    return tensor

  def _is_gputrace_device(self, device_name):
    """Returns true if this device is part of the GPUTracer logging."""
    return '/stream:' in device_name or '/memcpy' in device_name

  def _allocate_pids(self):
    """Allocate fake process ids for each device in the StepStats."""
    self._allocators_pid = self._alloc_pid()
    self._chrome_trace.emit_pid('Allocators', self._allocators_pid)

    # Add processes in the Chrome trace to show compute and data activity.
    for dev_stats in self._step_stats.dev_stats:
      device_pid = self._alloc_pid()
      self._device_pids[dev_stats.device] = device_pid
      tensors_pid = self._alloc_pid()
      self._tensor_pids[dev_stats.device] = tensors_pid
      self._chrome_trace.emit_pid(dev_stats.device + ' Compute', device_pid)
      self._chrome_trace.emit_pid(dev_stats.device + ' Tensors', tensors_pid)

  def _analyze_tensors(self, show_memory):
    """Analyze tensor references to track dataflow."""
    for dev_stats in self._step_stats.dev_stats:
      device_pid = self._device_pids[dev_stats.device]
      tensors_pid = self._tensor_pids[dev_stats.device]
      for node_stats in dev_stats.node_stats:
        tid = node_stats.thread_id
        node_name = node_stats.node_name
        start_time = node_stats.all_start_micros
        end_time = node_stats.all_start_micros + node_stats.all_end_rel_micros
        for index, output in enumerate(node_stats.output):
          if index:
            output_name = '%s:%d' % (node_name, index)
          else:
            output_name = node_name

          allocation = output.tensor_description.allocation_description
          num_bytes = allocation.requested_bytes
          allocator_name = allocation.allocator_name
          tensor = self._produce_tensor(output_name, start_time, tensors_pid,
                                        allocator_name, num_bytes)
          tensor.add_ref(start_time)
          tensor.add_unref(end_time)
          self._flow_starts[output_name] = (end_time, device_pid, tid)

          if show_memory:
            self._chrome_trace.emit_obj_create('Tensor', output_name,
                                               start_time, tensors_pid, tid,
                                               tensor.object_id)
            self._emit_tensor_snapshot(tensor, end_time - 1, tensors_pid, tid,
                                       output)

  def _show_compute(self, show_dataflow):
    """Visualize the computation activity."""
    for dev_stats in self._step_stats.dev_stats:
      device_name = dev_stats.device
      device_pid = self._device_pids[device_name]
      is_gputrace = self._is_gputrace_device(device_name)

      for node_stats in dev_stats.node_stats:
        tid = node_stats.thread_id
        start_time = node_stats.all_start_micros
        end_time = node_stats.all_start_micros + node_stats.all_end_rel_micros
        self._emit_op(node_stats, device_pid, is_gputrace)

        if is_gputrace or node_stats.node_name == 'RecvTensor':
          continue

        _, _, inputs = self._parse_op_label(node_stats.timeline_label)
        for input_name in inputs:
          if input_name not in self._tensors:
            # This can happen when partitioning has inserted a Send/Recv.
            # We remove the numeric suffix so that the dataflow appears to
            # come from the original node.  Ideally, the StepStats would
            # contain logging for the Send and Recv nodes.
            index = input_name.rfind('/_')
            if index > 0:
              input_name = input_name[:index]

          if input_name in self._tensors:
            tensor = self._tensors[input_name]
            tensor.add_ref(start_time)
            tensor.add_unref(end_time - 1)

            if show_dataflow:
              # We use a different flow ID for every graph edge.
              create_time, create_pid, create_tid = self._flow_starts[
                  input_name]
              # Don't add flows when producer and consumer ops are on the same
              # pid/tid since the horizontal arrows clutter the visualization.
              if create_pid != device_pid or create_tid != tid:
                flow_id = self._alloc_flow_id()
                self._chrome_trace.emit_flow_start(input_name, create_time,
                                                   create_pid, create_tid,
                                                   flow_id)
                self._chrome_trace.emit_flow_end(input_name, start_time,
                                                 device_pid, tid, flow_id)
          else:
            logging.vlog(1, 'Can\'t find tensor %s - removed by CSE?',
                         input_name)

  def _show_memory_counters(self):
    """Produce a counter series for each memory allocator."""
    # Iterate over all tensor trackers to build a list of allocations and
    # frees for each allocator. Then sort the lists and emit a cumulative
    # counter series for each allocator.
    allocations = {}
    for name in self._tensors:
      tensor = self._tensors[name]
      self._chrome_trace.emit_obj_delete('Tensor', name, tensor.last_unref,
                                         tensor.pid, 0, tensor.object_id)
      allocator = tensor.allocator
      if allocator not in allocations:
        allocations[allocator] = []
      num_bytes = tensor.num_bytes
      allocations[allocator].append((tensor.create_time, num_bytes, name))
      allocations[allocator].append((tensor.last_unref, -num_bytes, name))

    alloc_maxes = {}

    # Generate a counter series showing total allocations for each allocator.
    for allocator in allocations:
      alloc_list = allocations[allocator]
      alloc_list.sort()
      total_bytes = 0
      alloc_tensor_set = set()
      alloc_maxes[allocator] = AllocationMaximum(
          timestamp=0, num_bytes=0, tensors=set())
      for time, num_bytes, name in sorted(
          alloc_list, key=lambda allocation: allocation[0]):
        total_bytes += num_bytes
        if num_bytes < 0:
          alloc_tensor_set.discard(name)
        else:
          alloc_tensor_set.add(name)

        if total_bytes > alloc_maxes[allocator].num_bytes:
          alloc_maxes[allocator] = AllocationMaximum(
              timestamp=time,
              num_bytes=total_bytes,
              tensors=copy.deepcopy(alloc_tensor_set))

        self._chrome_trace.emit_counter('Memory', allocator,
                                        self._allocators_pid, time, allocator,
                                        total_bytes)
    self._allocator_maximums = alloc_maxes

  def analyze_step_stats(self, show_dataflow=True, show_memory=True):
    self._allocate_pids()
    self._assign_lanes()
    self._analyze_tensors(show_memory)
    self._show_compute(show_dataflow)
    if show_memory:
      self._show_memory_counters()
    return StepStatsAnalysis(
        chrome_trace=self._chrome_trace,
        allocator_maximums=self._allocator_maximums)

  def generate_chrome_trace_format(self, show_dataflow=True, show_memory=False):
    """Produces a trace in Chrome Trace Format.

    Args:
      show_dataflow: (Optional.) If True, add flow events to the trace
        connecting producers and consumers of tensors.
      show_memory: (Optional.) If True, add object snapshot events to the trace
        showing the sizes and lifetimes of tensors.

    Returns:
      A JSON formatted string in Chrome Trace format.
    """
    step_stats_analysis = self.analyze_step_stats(
        show_dataflow=show_dataflow, show_memory=show_memory)

    return step_stats_analysis.chrome_trace.format_to_string(pretty=True)