Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / rook   python

Repository URL to install this package:

Version: 0.1.176 

/ augs / locations / location_file_line.py

import inspect
import hashlib
import zlib

import six
import os

from rook.services.bdb_location_service import BdbLocationService
from rook.services.import_service import ImportService

from rook.exceptions import RookHashMismatchException, \
    RookCrcMismatchException, RookLineMoved

from rook.logger import logger
from rook.processor.error import Error
from rook.file_utils import FileUtils

from .location import Location


class LocationFileLine(Location):

    NAME = 'file_line'

    def __init__(self, output, aug, arguments, processor_factory=None):
        super(LocationFileLine, self).__init__(output, aug)

        self.filename = arguments.get('filename')
        # Some BDBs identify files by ID - ignore their inputs
        if isinstance(self.filename, six.string_types):
            # Make sure pyc is removed
            self.filename = self.filename.replace('.pyc', '.py')
            # Normalize file path
            self.filename = os.path.normcase(os.path.normpath(self.filename))

        self.lineno = arguments['lineno']

        self.file_hash = arguments.get('sha256')
        self.line_crc32 = arguments.get('line_crc32_2')
        self.line_unique = arguments.get('line_unique', False)

        self.file_utils = FileUtils()

        self.include_externals = arguments.get('includeExternals')
        if self.include_externals is None:
            # The only case it will be equal to '' is when its only filename (without folder, and without '/')
            if self.filename and isinstance(self.filename, six.string_types) and os.path.dirname(self.filename) == '':
                self.include_externals = False
            else:
                self.include_externals = True

        self.trigger_services = None

    def _add_aug_internal(self, trigger_services):
        logger.info("Adding aug")
        self.trigger_services = trigger_services

        self.set_pending()
        trigger_services.get_service(ImportService.NAME).register_post_import_notification(self)

    def module_found(self, module):
        try:
            updatedLineNo = self._get_updated_line_number(module)

            filepath = inspect.getsourcefile(module)
            if not filepath:
                if module.__file__.endswith('.pyc'):
                    filepath = module.__file__.replace('.pyc', '.py')

            self.trigger_services.get_service(BdbLocationService.NAME).add_breakpoint_aug(module, updatedLineNo,
                                                                                     filepath, self)
        except Exception as exc:
            message = "Exception when adding aug"
            logger.exception(message)
            self.set_error(Error(exc=exc, message=message))

    def _get_updated_line_number(self, module):
        filepath = inspect.getsourcefile(module)
        file_contents = self.file_utils.get_file_contents(self, module, filepath)
        if file_contents is None:
            logger.debug("Using blind line {lineno} for aug: {aug_id}, due to file content not found".format(lineno=self.lineno, aug_id=self.aug_id))
            return self.lineno

        if self.line_crc32 is not None:
            return self._find_line_using_crc(filepath, file_contents)

        logger.debug("Using blind line: {lineno}, for aug: {aug_id}, due to no line crc in aug's configuration".format(lineno=self.lineno, aug_id=self.aug_id))

        if self.file_hash:
            self._verify_file_hash(filepath, file_contents)
            return self.lineno

        return self.lineno

    def _find_line_using_crc(self, filepath, file_contents):
        lines = file_contents.splitlines()

        line_crc32 = None
        if len(lines) >= self.lineno:
            line_crc32 = format(zlib.crc32(lines[self.lineno - 1]) & 0xffffffff, 'x')
        if line_crc32 == self.line_crc32:
            return self.lineno

        if self.line_unique:
            crc32s = [format(zlib.crc32(line) & 0xffffffff, 'x') for line in lines]
            firstIndex = None
            secondFound = False

            for i in range(len(crc32s)):
                if crc32s[i] == self.line_crc32:
                    if firstIndex is None:
                        firstIndex = i
                    else:
                        secondFound = True
                        break

            if firstIndex and not secondFound:
                updatedLine = firstIndex + 1
                self.send_warning(Error(exc=RookLineMoved(filepath, self.lineno, updatedLine)))
                return updatedLine

        blob_hash = self._get_git_blob_hash(filepath)
        raise RookCrcMismatchException(filepath, self.line_crc32, line_crc32, blob_hash)

    def _verify_file_hash(self, filepath, file_contents):
        hash = hashlib.sha256(file_contents).hexdigest()
        if hash != self.file_hash:
            blob_hash = self._get_git_blob_hash(filepath)
            raise RookHashMismatchException(filepath, self.file_hash, hash, blob_hash)

    def _get_git_blob_hash(self, file_path):
        return None