Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
google-python-cloud-debugger / capture_collector.py
Size: Mime:
# Copyright 2015 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Captures application state on a breakpoint hit."""

# TODO: rename this file to collector.py.

import copy
import datetime
import inspect
import itertools
import logging
import os
import re
import sys
import time
import types

import six

from . import cdbg_native as native
from . import labels

# Externally defined functions to actually log a message. If these variables
# are not initialized, the log action for breakpoints is invalid.
log_info_message = None
log_warning_message = None
log_error_message = None

# Externally defined function to collect the request log id.
request_log_id_collector = None

# Externally defined function to collect the end user id.
user_id_collector = lambda: (None, None)

# Externally defined function to collect the end user id.
breakpoint_labels_collector = lambda: {}

_PRIMITIVE_TYPES = (type(None), float, complex, bool, slice, bytearray,
                    six.text_type,
                    six.binary_type) + six.integer_types + six.string_types
_DATE_TYPES = (datetime.date, datetime.time, datetime.timedelta)
_VECTOR_TYPES = (tuple, list, set)

# TODO: move to messages.py module.
EMPTY_DICTIONARY = 'Empty dictionary'
EMPTY_COLLECTION = 'Empty collection'
OBJECT_HAS_NO_FIELDS = 'Object has no fields'
LOG_ACTION_NOT_SUPPORTED = 'Log action on a breakpoint not supported'
INVALID_EXPRESSION_INDEX = '<N/A>'
DYNAMIC_LOG_OUT_OF_QUOTA = (
    'LOGPOINT: Logpoint is paused due to high log rate until log '
    'quota is restored')


def _ListTypeFormatString(value):
  """Returns the appropriate format string for formatting a list object."""

  if isinstance(value, tuple):
    return '({0})'
  if isinstance(value, set):
    return '{{{0}}}'
  return '[{0}]'


def NormalizePath(path):
  """Removes any Python system path prefix from the given path.

  Python keeps almost all paths absolute. This is not what we actually
  want to return. This loops through system paths (directories in which
  Python will load modules). If "path" is relative to one of them, the
  directory prefix is removed.

  Args:
    path: absolute path to normalize (relative paths will not be altered)

  Returns:
    Relative path if "path" is within one of the sys.path directories or
    the input otherwise.
  """
  path = os.path.normpath(path)

  for sys_path in sys.path:
    if not sys_path:
      continue

    # Append '/' at the end of the path if it's not there already.
    sys_path = os.path.join(sys_path, '')

    if path.startswith(sys_path):
      return path[len(sys_path):]

  return path


def DetermineType(value):
  """Determines the type of val, returning a "full path" string.

  For example:
    DetermineType(5) -> __builtin__.int
    DetermineType(Foo()) -> com.google.bar.Foo

  Args:
    value: Any value, the value is irrelevant as only the type metadata
    is checked

  Returns:
    Type path string.  None if type cannot be determined.
  """

  object_type = type(value)
  if not hasattr(object_type, '__name__'):
    return None

  type_string = getattr(object_type, '__module__', '')
  if type_string:
    type_string += '.'

  type_string += object_type.__name__
  return type_string


class LineNoFilter(logging.Filter):
  """Enables overriding the path and line number in a logging record.

  The "extra" parameter in logging cannot override existing fields in log
  record, so we can't use it to directly set pathname and lineno. Instead,
  we add this filter to the default logger, and it looks for "cdbg_pathname"
  and "cdbg_lineno", moving them to the pathname and lineno fields accordingly.
  """

  def filter(self, record):
    # This method gets invoked for user-generated logging, so verify that this
    # particular invocation came from our logging code.
    if record.pathname != inspect.currentframe().f_code.co_filename:
      return True
    pathname, lineno, func_name = GetLoggingLocation()
    if pathname:
      record.pathname = pathname
      record.filename = os.path.basename(pathname)
      record.lineno = lineno
      record.funcName = func_name
    return True


def GetLoggingLocation():
  """Search for and return the file and line number from the log collector.

  Returns:
    (pathname, lineno, func_name) The full path, line number, and function name
    for the logpoint location.
  """
  frame = inspect.currentframe()
  this_file = frame.f_code.co_filename
  frame = frame.f_back
  while frame:
    if this_file == frame.f_code.co_filename:
      if 'cdbg_logging_location' in frame.f_locals:
        ret = frame.f_locals['cdbg_logging_location']
        if len(ret) != 3:
          return (None, None, None)
        return ret
    frame = frame.f_back
  return (None, None, None)


def SetLogger(logger):
  """Sets the logger object to use for all 'LOG' breakpoint actions."""
  global log_info_message
  global log_warning_message
  global log_error_message
  log_info_message = logger.info
  log_warning_message = logger.warning
  log_error_message = logger.error
  logger.addFilter(LineNoFilter())


class _CaptureLimits(object):
  """Limits for variable capture.

  Args:
    max_value_len: Maximum number of character to allow for a single string
      value.  Longer strings are truncated.
    max_list_items: Maximum number of items in a list to capture.
    max_depth: Maximum depth of dictionaries to capture.
  """

  def __init__(self, max_value_len=256, max_list_items=25, max_depth=5):
    self.max_value_len = max_value_len
    self.max_list_items = max_list_items
    self.max_depth = max_depth


class CaptureCollector(object):
  """Captures application state snapshot.

  Captures call stack, local variables and referenced objects. Then formats the
  result to be sent back to the user.

  The performance of this class is important. Once the breakpoint hits, the
  completion of the user request will be delayed until the collection is over.
  It might make sense to implement this logic in C++.

  Attributes:
    breakpoint: breakpoint definition augmented with captured call stack,
        local variables, arguments and referenced objects.
  """

  # Additional type-specific printers. Each pretty printer is a callable
  # that returns None if it doesn't recognize the object or returns a tuple
  # with iterable enumerating object fields (name-value tuple) and object type
  # string.
  pretty_printers = []

  def __init__(self, definition, data_visibility_policy):
    """Class constructor.

    Args:
      definition: breakpoint definition that this class will augment with
          captured data.
      data_visibility_policy: An object used to determine the visibiliy
          of a captured variable.  May be None if no policy is available.
    """
    self.data_visibility_policy = data_visibility_policy

    self.breakpoint = copy.deepcopy(definition)

    self.breakpoint['stackFrames'] = []
    self.breakpoint['evaluatedExpressions'] = []
    self.breakpoint['variableTable'] = [{
        'status': {
            'isError': True,
            'refersTo': 'VARIABLE_VALUE',
            'description': {
                'format': 'Buffer full. Use an expression to see more data'
            }
        }
    }]

    # Shortcut to variables table in the breakpoint message.
    self._var_table = self.breakpoint['variableTable']

    # Maps object ID to its index in variables table.
    self._var_table_index = {}

    # Total size of data collected so far. Limited by max_size.
    self._total_size = 0

    # Maximum number of stack frame to capture. The limit is aimed to reduce
    # the overall collection time.
    self.max_frames = 20

    # Only collect locals and arguments on the few top frames. For the rest of
    # the frames we only collect the source location.
    self.max_expand_frames = 5

    # Maximum amount of data to capture. The application will usually have a
    # lot of objects and we need to stop somewhere to keep the delay
    # reasonable.
    # This constant only counts the collected payload. Overhead due to key
    # names is not counted.
    self.max_size = 32768  # 32 KB

    self.default_capture_limits = _CaptureLimits()

    # When the user provides an expression, they've indicated that they're
    # interested in some specific data. Use higher per-object capture limits
    # for expressions. We don't want to globally increase capture limits,
    # because in the case where the user has not indicated a preference, we
    # don't want a single large object on the stack to use the entire max_size
    # quota and hide the rest of the data.
    self.expression_capture_limits = _CaptureLimits(max_value_len=32768,
                                                    max_list_items=32768)

  def Collect(self, top_frame):
    """Collects call stack, local variables and objects.

    Starts collection from the specified frame. We don't start from the top
    frame to exclude the frames due to debugger. Updates the content of
    self.breakpoint.

    Args:
      top_frame: top frame to start data collection.
    """
    # Evaluate call stack.
    frame = top_frame
    top_line = self.breakpoint['location']['line']
    breakpoint_frames = self.breakpoint['stackFrames']
    try:
      # Evaluate watched expressions.
      if 'expressions' in self.breakpoint:
        self.breakpoint['evaluatedExpressions'] = [
            self._CaptureExpression(top_frame, expression) for expression
            in self.breakpoint['expressions']]

      while frame and (len(breakpoint_frames) < self.max_frames):
        line = top_line if frame == top_frame else frame.f_lineno
        code = frame.f_code
        if len(breakpoint_frames) < self.max_expand_frames:
          frame_arguments, frame_locals = self.CaptureFrameLocals(frame)
        else:
          frame_arguments = []
          frame_locals = []

        breakpoint_frames.append({
            'function': _GetFrameCodeObjectName(frame),
            'location': {
                'path': NormalizePath(code.co_filename),
                'line': line
            },
            'arguments': frame_arguments,
            'locals': frame_locals
        })
        frame = frame.f_back

    except BaseException as e:  # pylint: disable=broad-except
      # The variable table will get serialized even though there was a failure.
      # The results can be useful for diagnosing the internal error.
      self.breakpoint['status'] = {
          'isError': True,
          'description': {
              'format': ('INTERNAL ERROR: Failed while capturing locals '
                         'of frame $0: $1'),
              'parameters': [str(len(breakpoint_frames)), str(e)]}}

    # Number of entries in _var_table. Starts at 1 (index 0 is the 'buffer full'
    # status value).
    num_vars = 1

    # Explore variables table in BFS fashion. The variables table will grow
    # inside CaptureVariable as we encounter new references.
    while (num_vars < len(self._var_table)) and (
        self._total_size < self.max_size):
      self._var_table[num_vars] = self.CaptureVariable(
          self._var_table[num_vars], 0, self.default_capture_limits,
          can_enqueue=False)

      # Move on to the next entry in the variable table.
      num_vars += 1

    # Trim variables table and change make all references to variables that
    # didn't make it point to var_index of 0 ("buffer full")
    self.TrimVariableTable(num_vars)

    self._CaptureEnvironmentLabels()
    self._CaptureRequestLogId()
    self._CaptureUserId()

  def CaptureFrameLocals(self, frame):
    """Captures local variables and arguments of the specified frame.

    Args:
      frame: frame to capture locals and arguments.

    Returns:
      (arguments, locals) tuple.
    """
    # Capture all local variables (including method arguments).
    variables = {n: self.CaptureNamedVariable(n, v, 1,
                                              self.default_capture_limits)
                 for n, v in six.viewitems(frame.f_locals)}

    # Split between locals and arguments (keeping arguments in the right order).
    nargs = frame.f_code.co_argcount
    if frame.f_code.co_flags & inspect.CO_VARARGS: nargs += 1
    if frame.f_code.co_flags & inspect.CO_VARKEYWORDS: nargs += 1

    frame_arguments = []
    for argname in frame.f_code.co_varnames[:nargs]:
      if argname in variables: frame_arguments.append(variables.pop(argname))

    return (frame_arguments, list(six.viewvalues(variables)))

  def CaptureNamedVariable(self, name, value, depth, limits):
    """Appends name to the product of CaptureVariable.

    Args:
      name: name of the variable.
      value: data to capture
      depth: nested depth of dictionaries and vectors so far.
      limits: Per-object limits for capturing variable data.

    Returns:
      Formatted captured data as per Variable proto with name.
    """
    if not hasattr(name, '__dict__'):
      name = str(name)
    else:  # TODO: call str(name) with immutability verifier here.
      name = str(id(name))
    self._total_size += len(name)

    v = (self.CheckDataVisibility(value) or
         self.CaptureVariable(value, depth, limits))
    v['name'] = name
    return v

  def CheckDataVisibility(self, value):
    """Returns a status object if the given name is not visible.

    Args:
      value: The value to check.  The actual value here is not important but the
      value's metadata (e.g. package and type) will be checked.

    Returns:
      None if the value is visible.  A variable structure with an error status
      if the value should not be visible.
    """
    if not self.data_visibility_policy:
      return None

    visible, reason = self.data_visibility_policy.IsDataVisible(
        DetermineType(value))

    if visible:
      return None

    return {
        'status': {
            'isError': True,
            'refersTo': 'VARIABLE_NAME',
            'description': {
                'format': reason
            }
        }
    }

  def CaptureVariablesList(self, items, depth, empty_message, limits):
    """Captures list of named items.

    Args:
      items: iterable of (name, value) tuples.
      depth: nested depth of dictionaries and vectors for items.
      empty_message: info status message to set if items is empty.
      limits: Per-object limits for capturing variable data.

    Returns:
      List of formatted variable objects.
    """
    v = []
    for name, value in items:
      if (self._total_size >= self.max_size) or (
          len(v) >= limits.max_list_items):
        v.append({
            'status': {
                'refersTo': 'VARIABLE_VALUE',
                'description': {
                    'format':
                        ('Only first $0 items were captured. Use in an '
                         'expression to see all items.'),
                    'parameters': [str(len(v))]}}})
        break
      v.append(self.CaptureNamedVariable(name, value, depth, limits))

    if not v:
      return [{'status': {
          'refersTo': 'VARIABLE_NAME',
          'description': {'format': empty_message}}}]

    return v

  def CaptureVariable(self, value, depth, limits, can_enqueue=True):
    """Try-Except wrapped version of CaptureVariableInternal."""
    try:
      return self.CaptureVariableInternal(value, depth, limits, can_enqueue)
    except BaseException as e:  # pylint: disable=broad-except
      return {
          'status': {
              'isError': True,
              'refersTo': 'VARIABLE_VALUE',
              'description': {
                  'format': ('Failed to capture variable: $0'),
                  'parameters': [str(e)]
              }
          }
      }

  def CaptureVariableInternal(self, value, depth, limits, can_enqueue=True):
    """Captures a single nameless object into Variable message.

    TODO: safely evaluate iterable types.
    TODO: safely call str(value)

    Args:
      value: data to capture
      depth: nested depth of dictionaries and vectors so far.
      limits: Per-object limits for capturing variable data.
      can_enqueue: allows referencing the object in variables table.

    Returns:
      Formatted captured data as per Variable proto.
    """
    if depth == limits.max_depth:
      return {'varTableIndex': 0}  # Buffer full.

    if value is None:
      self._total_size += 4
      return {'value': 'None'}

    if isinstance(value, _PRIMITIVE_TYPES):
      r = _TrimString(repr(value),  # Primitive type, always immutable.
                      min(limits.max_value_len,
                          self.max_size - self._total_size))
      self._total_size += len(r)
      return {'value': r, 'type': type(value).__name__}

    if isinstance(value, _DATE_TYPES):
      r = str(value)  # Safe to call str().
      self._total_size += len(r)
      return {'value': r, 'type': 'datetime.'+ type(value).__name__}

    if isinstance(value, dict):
      # Do not use iteritems() here. If GC happens during iteration (which it
      # often can for dictionaries containing large variables), you will get a
      # RunTimeError exception.
      items = [(repr(k), v) for (k, v) in value.items()]
      return {'members':
              self.CaptureVariablesList(items, depth + 1,
                                        EMPTY_DICTIONARY, limits),
              'type': 'dict'}

    if isinstance(value, _VECTOR_TYPES):
      fields = self.CaptureVariablesList(
          (('[%d]' % i, x) for i, x in enumerate(value)),
          depth + 1, EMPTY_COLLECTION, limits)
      return {'members': fields, 'type': type(value).__name__}

    if isinstance(value, types.FunctionType):
      self._total_size += len(value.__name__)
      # TODO: set value to func_name and type to 'function'
      return {'value': 'function ' + value.__name__}

    if isinstance(value, Exception):
      fields = self.CaptureVariablesList(
          (('[%d]' % i, x) for i, x in enumerate(value.args)),
          depth + 1, EMPTY_COLLECTION, limits)
      return {'members': fields, 'type': type(value).__name__}

    if can_enqueue:
      index = self._var_table_index.get(id(value))
      if index is None:
        index = len(self._var_table)
        self._var_table_index[id(value)] = index
        self._var_table.append(value)
      self._total_size += 4  # number of characters to accommodate a number.
      return {'varTableIndex': index}

    for pretty_printer in CaptureCollector.pretty_printers:
      pretty_value = pretty_printer(value)
      if not pretty_value:
        continue

      fields, object_type = pretty_value
      return {'members':
              self.CaptureVariablesList(fields, depth + 1, OBJECT_HAS_NO_FIELDS,
                                        limits),
              'type': object_type}

    if not hasattr(value, '__dict__'):
      # TODO: keep "value" empty and populate the "type" field instead.
      r = str(type(value))
      self._total_size += len(r)
      return {'value': r}

    # Add an additional depth for the object itself
    items = value.__dict__.items()
    if six.PY3:
      # Make a list of the iterator in Python 3, to avoid 'dict changed size
      # during iteration' errors from GC happening in the middle.
      # Only limits.max_list_items + 1 items are copied, anything past that will
      # get ignored by CaptureVariablesList().
      items = list(itertools.islice(items, limits.max_list_items + 1))
    members = self.CaptureVariablesList(items, depth + 2,
                                        OBJECT_HAS_NO_FIELDS, limits)
    v = {'members': members}

    type_string = DetermineType(value)
    if type_string:
      v['type'] = type_string

    return v

  def _CaptureExpression(self, frame, expression):
    """Evalutes the expression and captures it into a Variable object.

    Args:
      frame: evaluation context.
      expression: watched expression to compile and evaluate.

    Returns:
      Variable object (which will have error status if the expression fails
      to evaluate).
    """
    rc, value = _EvaluateExpression(frame, expression)
    if not rc:
      return {'name': expression, 'status': value}

    return self.CaptureNamedVariable(expression, value, 0,
                                     self.expression_capture_limits)

  def TrimVariableTable(self, new_size):
    """Trims the variable table in the formatted breakpoint message.

    Removes trailing entries in variables table. Then scans the entire
    breakpoint message and replaces references to the trimmed variables to
    point to var_index of 0 ("buffer full").

    Args:
      new_size: desired size of variables table.
    """

    def ProcessBufferFull(variables):
      for variable in variables:
        var_index = variable.get('varTableIndex')
        if var_index is not None and (var_index >= new_size):
          variable['varTableIndex'] = 0  # Buffer full.
        members = variable.get('members')
        if members is not None:
          ProcessBufferFull(members)

    del self._var_table[new_size:]
    ProcessBufferFull(self.breakpoint['evaluatedExpressions'])
    for stack_frame in self.breakpoint['stackFrames']:
      ProcessBufferFull(stack_frame['arguments'])
      ProcessBufferFull(stack_frame['locals'])
    ProcessBufferFull(self._var_table)

  def _CaptureEnvironmentLabels(self):
    """Captures information about the environment, if possible."""
    if 'labels' not in self.breakpoint:
      self.breakpoint['labels'] = {}

    if callable(breakpoint_labels_collector):
      for (key, value) in six.iteritems(breakpoint_labels_collector()):
        self._StoreLabel(key, value)

  def _CaptureRequestLogId(self):
    """Captures the request log id if possible.

    The request log id is stored inside the breakpoint labels.
    """
    # pylint: disable=not-callable
    if callable(request_log_id_collector):
      request_log_id = request_log_id_collector()
      if request_log_id:
        # We have a request_log_id, save it into the breakpoint labels
        self._StoreLabel(labels.Breakpoint.REQUEST_LOG_ID, request_log_id)

  def _CaptureUserId(self):
    """Captures the user id of the end user, if possible."""
    user_kind, user_id = user_id_collector()
    if user_kind and user_id:
      self.breakpoint['evaluatedUserId'] = {'kind': user_kind, 'id': user_id}

  def _StoreLabel(self, name, value):
    """Stores the specified label in the breakpoint's labels.

    In the event of a duplicate label, favour the pre-existing labels. This
    generally should not be an issue as the pre-existing client label names are
    chosen with care and there should be no conflicts.

    Args:
      name: The name of the label to be stored.
      value: The value of the label to be stored.
    """
    if name not in self.breakpoint['labels']:
      self.breakpoint['labels'][name] = value


class LogCollector(object):
  """Captures minimal application snapshot and logs it to application log.

  This is similar to CaptureCollector, but we don't need to capture local
  variables, arguments and the objects tree. All we need to do is to format a
  log message. We still need to evaluate watched expressions.

  The actual log functions are defined globally outside of this module.
  """

  def __init__(self, definition):
    """Class constructor.

    Args:
      definition: breakpoint definition indicating log level, message, etc.
    """
    self._definition = definition

    # Maximum number of character to allow for a single value. Longer strings
    # are truncated.
    self.max_value_len = 256

    # Maximum recursion depth.
    self.max_depth = 2

    # Maximum number of items in a list to capture at the top level.
    self.max_list_items = 10

    # When capturing recursively, limit on the size of sublists.
    self.max_sublist_items = 5

    # Time to pause after dynamic log quota has run out.
    self.quota_recovery_ms = 500

    # The time when we first entered the quota period
    self._quota_recovery_start_time = None

    # Select log function.
    level = self._definition.get('logLevel')
    if not level or level == 'INFO':
      self._log_message = log_info_message
    elif level == 'WARNING':
      self._log_message = log_warning_message
    elif level == 'ERROR':
      self._log_message = log_error_message
    else:
      self._log_message = None

  def Log(self, frame):
    """Captures the minimal application states, formats it and logs the message.

    Args:
      frame: Python stack frame of breakpoint hit.

    Returns:
      None on success or status message on error.
    """
    # Return error if log methods were not configured globally.
    if not self._log_message:
      return {'isError': True,
              'description': {'format': LOG_ACTION_NOT_SUPPORTED}}

    if self._quota_recovery_start_time:
      ms_elapsed = (time.time() - self._quota_recovery_start_time) * 1000
      if ms_elapsed > self.quota_recovery_ms:
        # We are out of the recovery period, clear the time and continue
        self._quota_recovery_start_time = None
      else:
        # We are in the recovery period, exit
        return

    # Evaluate watched expressions.
    message = 'LOGPOINT: ' + _FormatMessage(
        self._definition.get('logMessageFormat', ''),
        self._EvaluateExpressions(frame))

    line = self._definition['location']['line']
    cdbg_logging_location = (NormalizePath(frame.f_code.co_filename), line,
                             _GetFrameCodeObjectName(frame))

    if native.ApplyDynamicLogsQuota(len(message)):
      self._log_message(message)
    else:
      self._quota_recovery_start_time = time.time()
      self._log_message(DYNAMIC_LOG_OUT_OF_QUOTA)
    del cdbg_logging_location
    return None

  def _EvaluateExpressions(self, frame):
    """Evaluates watched expressions into a string form.

    If expression evaluation fails, the error message is used as evaluated
    expression string.

    Args:
      frame: Python stack frame of breakpoint hit.

    Returns:
      Array of strings where each string corresponds to the breakpoint
      expression with the same index.
    """
    return [self._FormatExpression(frame, expression) for expression in
            self._definition.get('expressions') or []]

  def _FormatExpression(self, frame, expression):
    """Evaluates a single watched expression and formats it into a string form.

    If expression evaluation fails, returns error message string.

    Args:
      frame: Python stack frame in which the expression is evaluated.
      expression: string expression to evaluate.

    Returns:
      Formatted expression value that can be used in the log message.
    """
    rc, value = _EvaluateExpression(frame, expression)
    if not rc:
      message = _FormatMessage(value['description']['format'],
                               value['description'].get('parameters'))
      return '<' + message + '>'

    return self._FormatValue(value)

  def _FormatValue(self, value, level=0):
    """Pretty-prints an object for a logger.

    This function is very similar to the standard pprint. The main difference
    is that it enforces limits to make sure we never produce an extremely long
    string or take too much time.

    Args:
      value: Python object to print.
      level: current recursion level.

    Returns:
      Formatted string.
    """

    def FormatDictItem(key_value):
      """Formats single dictionary item."""
      key, value = key_value
      return (self._FormatValue(key, level + 1) +
              ': ' +
              self._FormatValue(value, level + 1))

    def LimitedEnumerate(items, formatter, level=0):
      """Returns items in the specified enumerable enforcing threshold."""
      count = 0
      limit = self.max_sublist_items if level > 0 else self.max_list_items
      for item in items:
        if count == limit:
          yield '...'
          break

        yield formatter(item)
        count += 1

    def FormatList(items, formatter, level=0):
      """Formats a list using a custom item formatter enforcing threshold."""
      return ', '.join(LimitedEnumerate(items, formatter, level=level))

    if isinstance(value, _PRIMITIVE_TYPES):
      return _TrimString(repr(value),  # Primitive type, always immutable.
                         self.max_value_len)

    if isinstance(value, _DATE_TYPES):
      return str(value)

    if level > self.max_depth:
      return str(type(value))

    if isinstance(value, dict):
      return '{' + FormatList(six.iteritems(value), FormatDictItem) + '}'

    if isinstance(value, _VECTOR_TYPES):
      return _ListTypeFormatString(value).format(FormatList(
          value, lambda item: self._FormatValue(item, level + 1), level=level))

    if isinstance(value, types.FunctionType):
      return 'function ' + value.__name__

    if hasattr(value, '__dict__') and value.__dict__:
      return self._FormatValue(value.__dict__, level)

    return str(type(value))


def _EvaluateExpression(frame, expression):
  """Compiles and evaluates watched expression.

  Args:
    frame: evaluation context.
    expression: watched expression to compile and evaluate.

  Returns:
    (False, status) on error or (True, value) on success.
  """
  try:
    code = compile(expression, '<watched_expression>', 'eval')
  except (TypeError, ValueError) as e:
    # expression string contains null bytes.
    return (False, {
        'isError': True,
        'refersTo': 'VARIABLE_NAME',
        'description': {
            'format': 'Invalid expression',
            'parameters': [str(e)]}})
  except SyntaxError as e:
    return (False, {
        'isError': True,
        'refersTo': 'VARIABLE_NAME',
        'description': {
            'format': 'Expression could not be compiled: $0',
            'parameters': [e.msg]}})

  try:
    return (True, native.CallImmutable(frame, code))
  except BaseException as e:  # pylint: disable=broad-except
    return (False, {
        'isError': True,
        'refersTo': 'VARIABLE_VALUE',
        'description': {
            'format': 'Exception occurred: $0',
            'parameters': [str(e)]}})


def _GetFrameCodeObjectName(frame):
  """Gets the code object name for the frame.

  Args:
    frame: the frame to get the name from

  Returns:
    The function name if the code is a static function or the class name with
    the method name if it is an member function.
  """
  # This functions under the assumption that member functions will name their
  # first parameter argument 'self' but has some edge-cases.
  if frame.f_code.co_argcount >= 1 and 'self' == frame.f_code.co_varnames[0]:
    return (frame.f_locals['self'].__class__.__name__ +
            '.' + frame.f_code.co_name)
  else:
    return frame.f_code.co_name


def _FormatMessage(template, parameters):
  """Formats the message. Unescapes '$$' with '$'.

  Args:
    template: message template (e.g. 'a = $0, b = $1').
    parameters: substitution parameters for the format.

  Returns:
    Formatted message with parameters embedded in template placeholders.
  """
  def GetParameter(m):
    try:
      return parameters[int(m.group(0)[1:])]
    except IndexError:
      return INVALID_EXPRESSION_INDEX

  parts = template.split('$$')
  return '$'.join(re.sub(r'\$\d+', GetParameter, part) for part in parts)


def _TrimString(s, max_len):
  """Trims the string if it exceeds max_len."""
  if len(s) <= max_len:
    return s
  return s[:max_len+1] + '...'