Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
crossover / opt / cxoffice / lib / python / globtree.py
Size: Mime:
# (c) Copyright 2009, 2014-2015. CodeWeavers, Inc.

import re
import os

import cxdecorators
import cxlog

import cxutils
from cxutils import b as b


#####
#
# Abstract classes
#
#####

_BSLASH = b('/')
_BSTAR = b('*')
_BQMARK = b('?')

class GlobNode(object):
    name = None
    # This is either a plain string or a regular expression.

    is_glob = None
    # If False, then this node's key contains the exact string to match so
    # checking for a match can be done with a simple
    # GlobTree.exists(parent + "/" + key).
    # If True, then the node's key contains a regular expression so it is
    # necessary to enumerate the content of the corresponding tree with
    # GlobTree.listdir(parent), and check each entry for a match.

    is_dir = False
    # If True, this glob only matches directories.

    string_children = None
    glob_children = None
    # Maps the node's key to the GlobNode object.
    # When building the GlobTree it's important to be able to efficiently find
    # the GlobNode object for a given key, hence the use of a mapping instead
    # of a set.

    client_data = None
    # If this was a leaf node in a glob, then this contains the data object
    # specified in the GlobTree.add_glob() call. A given glob can be specified
    # multiple times so this is a list of all such data objects.

    def __init__(self, name):
        if name.endswith(_BSLASH):
            self.string_children = {}
            self.glob_children = {}
            self.is_dir = True
            name = name[0:-1]
        if _BSTAR in name or _BQMARK in name:
            regex = [b('^')]
            for char in name:
                if char == _BSTAR:
                    regex.append(b('.*'))
                elif char == _BQMARK:
                    regex.append(b('.'))
                else:
                    regex.append(re.escape(b(char)))
            regex.append(b('$'))
            self.name = re.compile(b('').join(regex), re.IGNORECASE)
            self.is_glob = True
        else:
            self.name = name
            self.is_glob = False
        self.client_data = []

class GlobTree(object):
    """This abstract class provides a way to efficiently find all the matches
    for a list of globs in a tree of objects.

    This is meant to be a general tool and should in particular be equally
    applicable to filesystem and registry searches.

    Note that on Unix filesystems the filenames are arbitrary strings of bytes
    that may not respect any specific character encoding. Because of this, this
    class too deals with byte strings.
    """
    # Specifically, this should be applicable to the CD globbing, the installed
    # application file and registry globbing.

    case_sensitive = False # set to True if paths may be case-sensitive

    def __init__(self):
        self._root = GlobNode(_BSLASH)

    def add_glob(self, glob, data):
        """Adds a glob to the glob tree.

        A glob is a '/'-separated path which may optionally contain the
        following wildcard characters:
        - '?' which matches any character except '/'.
        - '*' which matches zero or more non-'/' characters.

        If the glob ends with a '/', then only 'folders' will be matched.
        Otherwise, both files and folders will be matched. All globs are
        case-insensitive and will be lowercased to avoid duplicates like
        'Windows' and 'windows'.

        A given glob can be added multiple times. The specified data object
        will be returned with each match.
        """
        if isinstance(glob, cxutils.unicode_type):
            glob = glob.encode('utf8')
        if glob == b(''):
            return
        prev_index = 0
        index = glob.find(_BSLASH)
        parent = self._root
        while index != -1:
            # add each directory
            name = glob[prev_index:index+1].lower()
            if _BSTAR in name or _BQMARK in name:
                globs = parent.glob_children
            else:
                globs = parent.string_children
            if name not in globs:
                globs[name] = GlobNode(name)
            node = globs[name]
            parent = node
            prev_index = index+1
            index = glob.find(_BSLASH, index+1)
        if not glob.endswith(_BSLASH):
            # add the file at the end
            if _BSLASH in glob:
                _unused, name = glob.rsplit(_BSLASH, 1)
            else:
                name = glob
            name = name.lower()
            if _BSTAR in name or _BQMARK in name:
                globs = parent.glob_children
            else:
                globs = parent.string_children
            if name not in globs:
                globs[name] = GlobNode(name)
            node = globs[name]
        node.client_data.append(data)

    def matches(self, root='', *user_args, **user_kwargs):
        """Returns an iterator over the matches.

        For each match the pair (path, data) is returned so this can be used
        as follows:
            for path, data in globtree.matches():
                ...

        If additional arguments are given, they will be passed on to other functions.
        """
        if isinstance(root, cxutils.unicode_type):
            root = root.encode('utf8')
        if root and not root.endswith(_BSLASH):
            root += _BSLASH
        paths_to_test = [(root, [self._root])]
        while paths_to_test:
            path, nodes = paths_to_test.pop()
            need_listdir = self.case_sensitive or \
                           len(nodes) > 1 or \
                           nodes[0].glob_children or \
                           len(nodes[0].string_children) > 1
            if need_listdir:
                cxlog.log_('globtree', 'listdir(%s)' % cxlog.to_str(path))
                dirs, files = self.listdir(path, *user_args, **user_kwargs)
                for dirname in dirs:
                    if isinstance(dirname, cxutils.unicode_type):
                        dirname = dirname.encode('utf8')
                    ldirname = dirname.lower()
                    ldirnameslash = ldirname + _BSLASH
                    fullfilename = path + dirname
                    fulldirname = fullfilename + _BSLASH
                    isdir = None
                    subnodes_to_test = []
                    for node in nodes:
                        if isdir is not False and ldirnameslash in node.string_children:
                            subnode = node.string_children[ldirnameslash]
                            if subnode.client_data:
                                if isdir is None:
                                    isdir = self._isdir(1, fulldirname, *user_args, **user_kwargs)
                                if isdir:
                                    for data in subnode.client_data:
                                        yield (fulldirname, data)
                                        cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fulldirname)))
                            if subnode.string_children or subnode.glob_children:
                                subnodes_to_test.append(subnode)
                        if ldirname in node.string_children:
                            subnode = node.string_children[ldirname]
                            for data in subnode.client_data:
                                yield (fullfilename, data)
                                cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fullfilename)))
                        for subnode in node.glob_children.values():
                            if isdir is False and subnode.is_dir:
                                continue
                            if subnode.name.match(ldirname):
                                if subnode.is_dir:
                                    if subnode.client_data:
                                        if isdir is None:
                                            isdir = self._isdir(2, fulldirname, *user_args, **user_kwargs)
                                        if isdir:
                                            for data in subnode.client_data:
                                                yield (fulldirname, data)
                                                cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fulldirname)))
                                    if subnode.string_children or subnode.glob_children:
                                        subnodes_to_test.append(subnode)
                                else:
                                    for data in subnode.client_data:
                                        yield (fullfilename, data)
                                        cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fullfilename)))
                    if subnodes_to_test and isdir is not False:
                        paths_to_test.append((fulldirname, subnodes_to_test))
                for filename in files:
                    if isinstance(filename, cxutils.unicode_type):
                        filename = filename.encode('utf8')
                    lfilename = filename.lower()
                    fullfilename = path + filename
                    for node in nodes:
                        if lfilename in node.string_children:
                            subnode = node.string_children[lfilename]
                            for data in subnode.client_data:
                                yield (fullfilename, data)
                                cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fullfilename)))
                        for subnode in node.glob_children.values():
                            if not subnode.is_dir and subnode.name.match(lfilename):
                                for data in subnode.client_data:
                                    yield (fullfilename, data)
                                    cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fullfilename)))
            else: # only a single node with a single string subnode
                node = nodes[0]
                for subnode in node.string_children.values():
                    if subnode.is_dir:
                        fullfilename = path + subnode.name + _BSLASH
                    else:
                        fullfilename = path + subnode.name
                    if subnode.client_data and self._exists(fullfilename, *user_args, **user_kwargs):
                        for data in subnode.client_data:
                            yield (fullfilename, data)
                            cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fullfilename)))
                    if subnode.string_children or subnode.glob_children:
                        paths_to_test.append((fullfilename, (subnode,)))


    def _exists(self, path, *user_args, **user_kwargs):
        cxlog.log_('globtree', 'exists(%s)' % cxlog.to_str(path))
        return self.exists(path, *user_args, **user_kwargs)

    def _isdir(self, location, path, *user_args, **user_kwargs):
        cxlog.log_('globtree', '(%s)isdir(%s)' % (cxlog.to_str(location), cxlog.to_str(path)))
        return self.exists(path, *user_args, **user_kwargs)



    #####
    #
    # Abstract methods
    #
    #####

    @cxdecorators.abstractmethod
    def listdir(self, _path, *_user_args, **_user_kwargs):
        """This is an abstract method which must be implemented by subclasses.
        It returns a tuple of (dirs, files) in the specified path.

        If the implementation cannot distinguish between dirs and files quickly,
        it may claim that files are dirs. The implementation will verify that
        they are dirs if necessary using listdir() or exists().

        If the path doesn't exist, return empty sequences.
        """
        # pylint: disable=R0201
        raise NotImplementedError()

    @cxdecorators.abstractmethod
    def exists(self, _path, *_user_args, **_user_kwargs):
        """This is an abstract method which must be implemented by subclasses.
        It returns True if the specified path exists, and False otherwise.

        Note that path will have a trailing '/' if it is meant to only match a
        'folder'.
        """
        # pylint: disable=R0201
        raise NotImplementedError()

    def isdir(self, path, *user_args, **user_kwargs):
        """This is equivalent to exists(), but GlobTree will only call it
        for dirs returned by listdir() and will always provide a /.

        If the dirs tuple contains only directories, you may implement this by
        returning True unconditionally. Otherwise, do not override the function,
        and it will call exists()."""
        return self.exists(path, *user_args, **user_kwargs)


#####
#
# Apply the above GlobTree class to finding files
#
#####

class FileGlobTree(GlobTree):
    """Implements the GlobTree class for file trees."""

    case_sensitive = True

    def listdir(self, path):
        # pylint: disable=W0221
        try:
            return os.listdir(path), ()
        except OSError:
            return (), ()

    def exists(self, path):
        # pylint: disable=W0221
        return os.path.exists(path)

def file_exists_insensitive(filename):
    glob_tree = FileGlobTree()
    glob_tree.add_glob(os.path.basename(filename), "")
    return len(list(glob_tree.matches(os.path.dirname(filename))))



#####
#
# Further add support for matching the content of the files
#
#####

class FileContentGlobTree(FileGlobTree):
    """Performs the same FileGlobTree but further filters out files based on
    their content. Specifically, the content of matching files must match
    every regular expression pattern specified for that specific filename glob.
    If that list is empty, then the file automatically matches.
    """

    def add_glob(self, glob, data):
        FileGlobTree.add_glob(self, glob, (tuple(), data))

    def add_content_glob(self, glob, patterns, data):
        """patterns is a list of strings specifying the regular expression
        patterns that the file content must match.
        See GlobTree.add_glob() for other details.
        """
        regexps = []
        for pattern in patterns:
            regexps.append(re.compile(pattern))
        FileGlobTree.add_glob(self, glob, (regexps, data))

    def matches(self, root='', *user_args, **user_kwargs):
        file_contents = last_path = None
        for path, (regexps, data) in FileGlobTree.matches(self, root, *user_args, **user_kwargs):
            if regexps:
                # Read the file
                if path != last_path:
                    # FileGlobTree will always return all matches for a given
                    # file successively. We take advantage of this here to
                    # avoid reading a given file more than once.
                    last_path = path
                    try:
                        # The re module doesn't seem to like CRLF so we open
                        # the file in U mode, which converts CRLF to LF.
                        infile = open(path, 'U')
                        try:
                            file_contents = infile.read()
                        finally:
                            infile.close()
                    except IOError:
                        file_contents = None
                        continue
                elif file_contents is None:
                    # There was an error the last time we tried
                    # to access this file.
                    continue

            # Check the regular expressions.
            for regexp in regexps:
                if not regexp.search(file_contents):
                    search_failed = True
                    break
            else:
                search_failed = False
            if search_failed:
                continue

            yield (path, data)