Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / rook   python

Repository URL to install this package:

Version: 0.1.176 

/ services / import_service.py

import sys
import time
import threading

import six
import os
import inspect
import site
import imp
import hashlib

from six.moves import builtins

from rook.logger import logger

from rook.config import ImportServiceConfig

from rook.serverless import on_lambda

from rook.exceptions import RookDependencyError, RookSourceFilePathSuggestion

from rook.file_utils import FileUtils

from rook.processor.error import Error

_SUPPORTED_PY_FORMATS = ['.py', '.pyc']

_BLACKLISTED_PATHS = []

_OLD_IMPORT_FUNC = builtins.__import__


# only used in tests
def set_old_import_func(import_func):
    global _OLD_IMPORT_FUNC
    current = _OLD_IMPORT_FUNC
    _OLD_IMPORT_FUNC = import_func
    return current


class CountingImportLock(object):
    """
    When locked, only this thread can import (so all imports are guaranteed to have completed,
                                              and there are no partially initialized modules in sys.modules)
    The import lock is used here because:
    (1) it's reentrant, so we won't block user threads when they already hold it
    (2) it guarantees that if it's locked, no imports are partially complete
    (3) generally speaking, if you use more than 1 lock, there's the potential for deadlocks:
        locking (A) then (B) and another thread locking (B) then (A)
        This is an inherent risk in an import hook because the import lock is locked by Python itself,
        so using _any other lock_can  result in deadlocks
    """
    def __init__(self):
        self._count = 0

    def __enter__(self):
        imp.acquire_lock()
        self._count += 1

    def __exit__(self, *args, **kwargs):
        self._count -= 1
        imp.release_lock()

    def get_recursion_count(self):
        return self._count


class ImportService(object):

    NAME = "Import"

    def __init__(self, bdb_location_service):
        self._bdb_location_service = bdb_location_service

        self._modules = sys.modules.copy()
        self._path_cache = {}
        self._post_import_notifications = {}

        self._thread = None
        self._quit = False

        self.file_utils = FileUtils()

        external_paths = [sys.prefix]

        if hasattr(sys, 'base_prefix') and sys.base_prefix not in external_paths:
            external_paths += [sys.base_prefix]

        if hasattr(site, 'getsitepackages') and hasattr(site, 'getsitepackages') and hasattr(site, 'getusersitepackages'):
            external_paths += site.getsitepackages() + [site.getuserbase() + site.getusersitepackages()]

        self.external_paths = [os.path.normcase(os.path.realpath(external_path))
                               for external_path in external_paths]

        # pyenv messes up sys.path, so we ignore everything under it's folder
        for path in self.external_paths:
            pyenv_offset = path.find('.pyenv')
            if pyenv_offset != -1:
                pyenv_directory = path[:pyenv_offset + len('.pyenv') + 1]
                self.external_paths += [pyenv_directory]
                break

        # If we are (heuristicly) worried we missed anything, add "catch-alls"
        if len(self.external_paths) < 3:
            logger.debug("Missing getsitepackages, adding heuristics")
            self.external_paths += ['site-packages', 'site-python']

        logger.debug("External paths are " + str(self.external_paths))

        if on_lambda():
            _BLACKLISTED_PATHS.extend(['/var/task/flask', '/var/task/chalice'])

        import platform
        if platform.system() != 'Windows' and ImportServiceConfig.USE_IMPORT_HOOK:
            platform = platform.python_implementation().lower()

            def import_hook(*args, **kwargs):
                """
                Declared here to make it obvious that you can't replace the function by monkeypatching -
                the C extension will always reference the function set with SetImportHook.
                """
                __rookout__tracebackhide__ = True
                # Locking the global import lock here extends the section for which it is locked -
                # we don't want the import hook to be called from two threads simultaneously,
                # but the scope in which the import lock is held might only encompass the call to the original
                # __import__. The import lock is re-entrant, so it's OK for us to lock it and then for Python
                # to lock it again.
                with import_hook.import_lock:
                    result = _OLD_IMPORT_FUNC(*args, **kwargs)

                    # TODO - review if we can process the module being loaded regardless instead of evaluating the full list
                    try:
                        # if the recursion count is > 1,
                        # then this call to import_hook is a nested import, which means that the parent
                        # import has not finished executing its code yet, but it's already in sys.modules.
                        # if we evaluate now, we will try to place BPs in the module before it finished executing,
                        # potentially resulting in CodeNotFound.
                        if import_hook.import_lock.get_recursion_count() == 1:
                            self.evaluate_module_list()
                    except:  # lgtm[py/catch-base-exception]
                        pass

                    return result
            # this must be a reentrant lock - imports can cause other imports
            import_hook.import_lock = CountingImportLock()

            if platform == "cpython":
                try:
                    import native_extensions
                except Exception as e:
                    raise RookDependencyError(e)

                logger.debug('Enabling native import hook')
                native_extensions.SetImportHook(import_hook)
                # atomic swap
                builtins.__import__ = native_extensions.CallImportHookRemovingFrames
            elif platform == "pypy":
                import __pypy__
                logger.debug('Enabling pypy import hook')
                # atomic swap
                builtins.__import__ = __pypy__.hidden_applevel(import_hook)
            else:
                # assertion should never be reached, singleton.py checks platform support
                raise AssertionError("Unsupported platform")
        else:
            self._thread = threading.Thread(target=self._query_thread,
                                            name=ImportServiceConfig.THREAD_NAME)
            self._thread.daemon = True
            self._thread.start()

    def close(self):
        if _OLD_IMPORT_FUNC:
            builtins.__import__ = _OLD_IMPORT_FUNC

        if self._thread:
            self._quit = True

            # If threading was monkey patched by gevent waiting on thread will likely throw an exception
            try:
                from gevent.monkey import is_module_patched
                if is_module_patched("threading"):
                    time.sleep(ImportServiceConfig.SYS_MODULES_QUERY_INTERVAL)
            except Exception:  # Nothing we can do here but join as usual
                pass

            self._thread.join()

    def register_post_import_notification(self, location):
        filepath = location.filename
        filename = os.path.basename(filepath) if filepath else None

        # Register notification for future loads
        self._post_import_notifications[location.aug_id] = location

        # declaring filenames blacklist for avoiding setting multiple callbacks on the same aug
        modules_blacklist = []
        match_found = False
        file_hashes = {}
        with CountingImportLock():
            # Attempt to satisfy location using known modules
            for module_key, module_object in self._modules.items():
                # Get module details and check if it matches
                module_filename = self._get_module_path(module_object)

                # If module is not valid, ignore
                if not self._is_valid_module(module_object, module_filename):
                    continue

                if filename == os.path.basename(module_filename):
                    file_content = self.file_utils.get_safe_file_contents(module_object)
                    if file_content:
                        file_hash = hashlib.sha256(file_content).hexdigest()
                        file_hashes[file_hash] = inspect.getsourcefile(module_object)

                if module_filename:
                    module_identifier = self._get_module_identifier(module_object, module_filename)

                    if module_identifier not in modules_blacklist:
                        if self._does_module_match_notification(module_filename, location, self.external_paths):
                            match_found = True
                            modules_blacklist.append(module_identifier)
                            location.module_found(module_object)
                    else:
                        logger.warn("Found duplicated occurrences of the same module that are matching the aug, "
                                    "can cause the aug to not be triggered. aug_ig: %s, module_key: %s, module_path: %s"
                                    % (location.aug_id, module_key, module_filename))

        if not match_found:
            if location.file_hash in file_hashes:
                location.send_warning(Error(exc=RookSourceFilePathSuggestion(filepath, file_hashes[location.file_hash])))

    def remove_aug(self, aug_id):
        location = self._post_import_notifications.pop(aug_id, None)
        if location is None:
            return

        location.set_removed()

    def clear_augs(self):
        # This does not require a lock - `notifications` may be added to or removed from in a different thread
        # while we still haven't replaced it with an empty dict(),
        # but `notifications` is just a pointer to the same dict object pointed to by
        # `_post_import_notifications`. Once we do replace `_post_import_notifications`
        # with a new dict, we can be sure that no more notifications will be added.
        # At that point, it's safe to iterate over `notifications` and remove all notifications.
        notifications = self._post_import_notifications
        self._post_import_notifications = dict()
        for location in six.itervalues(notifications):
            location.set_removed()

    def pre_fork(self):
        CountingImportLock().__enter__()

    def post_fork(self):
        try:
            CountingImportLock().__exit__()
        except RuntimeError:
            # This is a bit of a heck as we don't seem to hold import lock in the child.
            # See the TODO about migrating away from the import lock.
            pass

    def _is_valid_module(self, module_object, module_filename):
        return module_filename and os.path.splitext(module_filename)[1] in _SUPPORTED_PY_FORMATS and \
            module_object and inspect.ismodule(module_object) and hasattr(module_object, '__file__')

    def _query_thread(self):
        logger.debug('Starting ImportService thread')
        self._bdb_location_service.ignore_current_thread()

        while not self._quit:
            try:
                with CountingImportLock():
                    self.evaluate_module_list()
            except:  # lgtm[py/catch-base-exception]
                if logger:
                    logger.exception("Error while evaluating module list")

            # time can be None if interpreter is in shutdown
            if not time:
                return
            time.sleep(ImportServiceConfig.SYS_MODULES_QUERY_INTERVAL)

    def evaluate_module_list(self):
        try:
            # No new modules
            if len(self._modules) == len(sys.modules):
                return

            # Get a fresh list
            modules = sys.modules.copy()

            if self._post_import_notifications:
                # self._modules is only replaced (it's immutable - a frozenset), so there's no need to copy it here,
                # just keep a reference to the current self._modules so we don't start working on a different set
                # mid-loop
                old_modules = self._modules

                # For everybody not in the old list, check notifications
                for module_name, module in six.iteritems(modules):
                    module_filename = self._get_module_path(module)
                    if module_name not in old_modules.keys() and self._is_valid_module(module, module_filename):
                        self._notify_of_new_module(module, module_filename)

            # Update the "old" list
            self._modules = modules

        except Exception:
            logger.exception("Exception in ImportService")

    def _notify_of_new_module(self, module_object, module_filename):
        self._trigger_all_notifications_for_module(module_filename, module_object)

    def _trigger_all_notifications_for_module(self, module_filename, module):
        if module_filename:
            for aug_id, location in six.iteritems(self._post_import_notifications.copy()):
                if self._does_module_match_notification(module_filename, location,
                                                        self.external_paths):
                    try:
                        location.module_found(module)
                    except:  # lgtm[py/catch-base-exception]
                        logger.exception("Error on module load callback")
                elif os.path.basename(location.filename) == os.path.basename(module_filename):
                    file_content = self.file_utils.get_safe_file_contents(module)
                    if file_content:
                        file_hash = hashlib.sha256(file_content).hexdigest()
                        if location.file_hash == file_hash:
                            location.send_warning(Error(exc=RookSourceFilePathSuggestion(location.filename, module_filename)))

    def _get_module_path(self, module):
        if module is None or not hasattr(module, '__name__') or not hasattr(module, '__file__'):
            return None
        result = self._path_cache.get(module.__name__)

        if result is not None:
            return result

        if module:
            try:
                path = inspect.getsourcefile(module)
                if not path:
                    if module.__file__.endswith('.pyc'):
                        path = module.__file__.replace('.pyc', '.py')
            except Exception:
                return None
Loading ...