Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / rook   python

Repository URL to install this package:

Version: 0.1.176 

/ services / py_bdb.py

"""Debugger basics"""

import fnmatch
import sys
import os
import threading
from six import print_
from collections import defaultdict

from rook.logger import logger

__all__ = ["BdbQuit", "Bdb", "Breakpoint"]


class BdbQuit(Exception):
    """Exception to give up completely"""


class Bdb:

    """Generic Python debugger base class.

    This class takes care of details of the trace facility;
    a derived class should implement user interaction.
    The standard debugger class (pdb.Pdb) is an example.
    """

    _ROOKOUT_TRACE = "__ROOKOUT_TRACE"

    def __init__(self, skip=None):
        self.skip = set(skip) if skip else None
        self.breaks = {}
        self.fncache = {}
        self.canonic_names = {}
        self.aug_ids = {}
        self.positions = defaultdict(list)
        self.last_file_id = 0

        self._disabled = False

        self.thread_local = threading.local()
        self.trace_through = False

        self.user_line = None
        self.user_call = None
        self.user_return = None
        self.user_exception = None

        self.sys_settrace = sys.settrace
        self.sys_gettrace = sys.gettrace
        self.threading_settrace = threading.settrace
        self.threading_tracefunc = None

    def update_activity(self):
        self._disabled = not(bool(self.breaks) or bool(self.trace_through))

    def ignore_current_thread(self):
        # If gevent has patched away threads, don't ignore current thread (as they are all the same thread)
        try:
            import gevent.monkey

            if gevent.monkey.is_module_patched('threading'):
                return

        except ImportError:
            pass # No need to worry about gevent, continue as usual

        if not self.trace_through and self.sys_gettrace() == self.trace_dispatch:
            self.sys_settrace(None)

    def _get_thread_trace_func(self):
        try:
            return self.thread_local._rookout_tracefunc
        except AttributeError:
            return None

    def _get_frame_trace_func(self, frame):
        try:
            return frame.f_locals[self._ROOKOUT_TRACE]
        except KeyError:
            return None

    def _set_thread_trace_func(self, tracefunc):
        self.thread_local._rookout_tracefunc = tracefunc

    def _set_frame_trace_func(self, tracefunc, frame):
        frame.f_locals[self._ROOKOUT_TRACE] = tracefunc

    def sys_settrace_hook(self, tracefunc):
        if threading.current_thread().name.startswith("rookout-"):
            # ignore our threads
            return

        # Check we are resetting our own trace_dispatch
        # This probably means we are being called by threading for a new thread
        if tracefunc == self.trace_dispatch:
            self.sys_settrace(self.trace_dispatch)

            # Install the additional threading tracefunc
            if self.threading_tracefunc:
                self.sys_settrace_hook(self.threading_tracefunc)
            return

        # Enable trace through
        if tracefunc:
            self.trace_through = True
        self.update_activity()

        # Store function
        self._set_thread_trace_func(tracefunc)

    def sys_gettrace_hook(self):
        return self._get_thread_trace_func()

    def threading_settrace_hook(self, tracefunc):
        self.threading_tracefunc = tracefunc

        # Enable trace through
        if tracefunc:
            self.trace_through = True
        self.update_activity()

    def canonic(self, filename):
        # We let everybody think it is still the file name
        if isinstance(filename, int):
            return filename

        # Try to convert filename to file_id
        file_id = self.fncache.get(filename)
        if not file_id:
            # If there's no file id, get file canonic name
            if filename[0] == "<" and filename[-1] == ">":
                canonic_name = filename
            else:
                canonic_name = os.path.abspath(filename)
                canonic_name = os.path.normpath(canonic_name)

            # Check if canonic name has an id
            file_id = self.canonic_names.get(canonic_name)
            if not file_id:
                # If no id so far, create a new one
                file_id = self.last_file_id = self.last_file_id + 1
                self.canonic_names[canonic_name] = file_id
                # If sources are not available, keep only the file name.
                if not os.path.isfile(canonic_name):
                    logger.warning("Source file could not be resolved: %s", canonic_name)
                    self.canonic_names[os.path.basename(canonic_name)] = file_id

            # Cache filename to id conversion
            self.fncache[filename] = file_id

        return file_id

    def reset(self):
        import linecache
        linecache.checkcache()
        self.botframe = None

    def trace_dispatch(self, frame, event, arg):
        if self._disabled:
            return None

        if not os or not os.path:
            return None

        if event == 'line':
            # This if is an inlined version of dispatch_line to optimize performance
            # Original version:
            # if self.break_here(frame):  lgtm[py/commented-out-code]
            #    if self.user_line:
            #        self.user_line(frame)
            # return self.trace_dispatch
            filename = frame.f_code.co_filename

            # Try to convert filename to file_id
            file_id = self.fncache.get(filename)
            if not file_id:
                # If there's no file id, get file canonic name
                if filename[0] == "<" and filename[-1] == ">":
                    canonic_name = filename
                else:
                    canonic_name = os.path.abspath(filename)
                    canonic_name = os.path.normcase(canonic_name)

                # Check if canonic name has an id
                file_id = self.canonic_names.get(canonic_name)
                if not file_id:
                    # If no id so far, create a new one
                    file_id = self.last_file_id = self.last_file_id + 1
                    self.canonic_names[canonic_name] = file_id

                # Cache filename to id conversion
                self.fncache[filename] = file_id

            file_breaks = self.breaks.get(file_id)
            if file_breaks and frame.f_lineno in file_breaks:
                if self.user_line:
                    # this loop used to exist in user_line's implementation which would
                    # find all breakpoints on this filename and line, but user_line
                    # now finds BPs by aug_id
                    for aug_id in self.positions[(file_id, frame.f_lineno)]:
                        self.user_line(frame, filename, aug_id)

            if self.trace_through:
                tracefunc = self._get_frame_trace_func(frame)
                if tracefunc:
                    tracefunc_ret = tracefunc(frame, event, arg)
                    if tracefunc_ret:
                        self._set_frame_trace_func(tracefunc_ret, frame)

            return self.trace_dispatch

        if event == 'call':
            # This if is an inlined version of dispatch_call to optimize performance
            # Original version:
            # if self.break_anywhere(frame):  lgtm[py/commented-out-code]
            #     return self.trace_dispatch
            # else:
            #    return self.trace_dispatch_default
            retval = None

            filename = frame.f_code.co_filename

            # Try to convert filename to file_id
            file_id = self.fncache.get(filename)
            if not file_id:
                # If there's no file id, get file canonic name
                if filename[0] == "<" and filename[-1] == ">":
                    canonic_name = filename
                else:
                    canonic_name = os.path.abspath(filename)
                    canonic_name = os.path.normcase(canonic_name)

                # Check if canonic name has an id
                file_id = self.canonic_names.get(canonic_name)
                if not file_id:
                    file_id = self.canonic_names.get(os.path.basename(canonic_name))
                    if not file_id:
                        # If no id so far, create a new one
                        file_id = self.last_file_id = self.last_file_id + 1
                        self.canonic_names[canonic_name] = file_id

                # Cache filename to id conversion
                self.fncache[filename] = file_id

            # If file in breaks
            if file_id in self.breaks:
                file_breaks = self.breaks.get(file_id)

                # Check if code contains one of the lines we are looking for
                if frame.f_code.co_firstlineno and frame.f_code.co_lnotab:
                    current_line = frame.f_code.co_firstlineno
                    if current_line in file_breaks:
                        retval = self.trace_dispatch
                    else:
                        for incr in frame.f_code.co_lnotab[1::2]:
                            if isinstance(incr, int):
                                current_line += incr
                            else:
                                current_line += ord(incr)
                            if current_line in file_breaks:
                                retval = self.trace_dispatch
                                break

            if self.trace_through:
                tracefunc = self._get_thread_trace_func()
                if tracefunc:
                    tracefunc_ret = tracefunc(frame, event, arg)
                    if tracefunc_ret:
                        retval = self.trace_dispatch
                    self._set_frame_trace_func(tracefunc_ret, frame)

            return retval

        if event == 'return':
            if self.trace_through:
                tracefunc = self._get_frame_trace_func(frame)
                if tracefunc:
                    tracefunc(frame, event, arg)
            return None

        if event == 'exception':
            if self.trace_through:
                tracefunc = self._get_frame_trace_func(frame)
                if tracefunc:
                    tracefunc(frame, event, arg)
            return None

        if event == 'c_call':
            if self.trace_through:
                tracefunc = self._get_thread_trace_func()
                if tracefunc:
                    tracefunc_ret = tracefunc(frame, event, arg)
                    self._set_frame_trace_func(tracefunc_ret, frame)
            return None

        if event == 'c_exception':
            if self.trace_through:
                tracefunc = self._get_frame_trace_func(frame)
                if tracefunc:
                    tracefunc(frame, event, arg)
            return None

        if event == 'c_return':
            if self.trace_through:
                tracefunc = self._get_frame_trace_func(frame)
                if tracefunc:
                    tracefunc(frame, event, arg)
            return None

        print_('bdb.Bdb.dispatch: unknown debugging event:', repr(event))
        if self.trace_through:
            tracefunc = self._get_thread_trace_func()
            if tracefunc:
                tracefunc(frame, event, arg)
        return self.trace_dispatch

    def dispatch_line(self, frame):
        if self.break_here(frame):
            if self.user_line:
                self.user_line(frame)
        return self.trace_dispatch

    def dispatch_call(self, frame):
        if self.break_anywhere(frame):
             return self.trace_dispatch
        else:
            return self.trace_dispatch_default

    def dispatch_return(self, frame, arg):
        if self.user_return:
            self.user_return(frame, arg)
        return self.trace_dispatch

    def dispatch_exception(self, frame, arg):
        if self.user_exception:
            self.user_exception(frame, arg)
        return self.trace_dispatch

    # Normally derived classes don't override the following
    # methods, but they may if they want to redefine the
    # definition of stopping and breakpoints.

    def is_skipped_module(self, module_name):
        for pattern in self.skip:
Loading ...