Repository URL to install this package:
|
Version:
15.1.0-1 ▾
|
# (c) Copyright 2009, 2014-2015. CodeWeavers, Inc.
import re
import os
import cxdecorators
import cxlog
import cxutils
from cxutils import b as b
#####
#
# Abstract classes
#
#####
_BSLASH = b('/')
_BSTAR = b('*')
_BQMARK = b('?')
class GlobNode(object):
name = None
# This is either a plain string or a regular expression.
is_glob = None
# If False, then this node's key contains the exact string to match so
# checking for a match can be done with a simple
# GlobTree.exists(parent + "/" + key).
# If True, then the node's key contains a regular expression so it is
# necessary to enumerate the content of the corresponding tree with
# GlobTree.listdir(parent), and check each entry for a match.
is_dir = False
# If True, this glob only matches directories.
string_children = None
glob_children = None
# Maps the node's key to the GlobNode object.
# When building the GlobTree it's important to be able to efficiently find
# the GlobNode object for a given key, hence the use of a mapping instead
# of a set.
client_data = None
# If this was a leaf node in a glob, then this contains the data object
# specified in the GlobTree.add_glob() call. A given glob can be specified
# multiple times so this is a list of all such data objects.
def __init__(self, name):
if name.endswith(_BSLASH):
self.string_children = {}
self.glob_children = {}
self.is_dir = True
name = name[0:-1]
if _BSTAR in name or _BQMARK in name:
regex = [b('^')]
for char in name:
if char == _BSTAR:
regex.append(b('.*'))
elif char == _BQMARK:
regex.append(b('.'))
else:
regex.append(re.escape(b(char)))
regex.append(b('$'))
self.name = re.compile(b('').join(regex), re.IGNORECASE)
self.is_glob = True
else:
self.name = name
self.is_glob = False
self.client_data = []
class GlobTree(object):
"""This abstract class provides a way to efficiently find all the matches
for a list of globs in a tree of objects.
This is meant to be a general tool and should in particular be equally
applicable to filesystem and registry searches.
Note that on Unix filesystems the filenames are arbitrary strings of bytes
that may not respect any specific character encoding. Because of this, this
class too deals with byte strings.
"""
# Specifically, this should be applicable to the CD globbing, the installed
# application file and registry globbing.
case_sensitive = False # set to True if paths may be case-sensitive
def __init__(self):
self._root = GlobNode(_BSLASH)
def add_glob(self, glob, data):
"""Adds a glob to the glob tree.
A glob is a '/'-separated path which may optionally contain the
following wildcard characters:
- '?' which matches any character except '/'.
- '*' which matches zero or more non-'/' characters.
If the glob ends with a '/', then only 'folders' will be matched.
Otherwise, both files and folders will be matched. All globs are
case-insensitive and will be lowercased to avoid duplicates like
'Windows' and 'windows'.
A given glob can be added multiple times. The specified data object
will be returned with each match.
"""
if isinstance(glob, cxutils.unicode_type):
glob = glob.encode('utf8')
if glob == b(''):
return
prev_index = 0
index = glob.find(_BSLASH)
parent = self._root
while index != -1:
# add each directory
name = glob[prev_index:index+1].lower()
if _BSTAR in name or _BQMARK in name:
globs = parent.glob_children
else:
globs = parent.string_children
if name not in globs:
globs[name] = GlobNode(name)
node = globs[name]
parent = node
prev_index = index+1
index = glob.find(_BSLASH, index+1)
if not glob.endswith(_BSLASH):
# add the file at the end
if _BSLASH in glob:
_unused, name = glob.rsplit(_BSLASH, 1)
else:
name = glob
name = name.lower()
if _BSTAR in name or _BQMARK in name:
globs = parent.glob_children
else:
globs = parent.string_children
if name not in globs:
globs[name] = GlobNode(name)
node = globs[name]
node.client_data.append(data)
def matches(self, root='', *user_args, **user_kwargs):
"""Returns an iterator over the matches.
For each match the pair (path, data) is returned so this can be used
as follows:
for path, data in globtree.matches():
...
If additional arguments are given, they will be passed on to other functions.
"""
if isinstance(root, cxutils.unicode_type):
root = root.encode('utf8')
if root and not root.endswith(_BSLASH):
root += _BSLASH
paths_to_test = [(root, [self._root])]
while paths_to_test:
path, nodes = paths_to_test.pop()
need_listdir = self.case_sensitive or \
len(nodes) > 1 or \
nodes[0].glob_children or \
len(nodes[0].string_children) > 1
if need_listdir:
cxlog.log_('globtree', 'listdir(%s)' % cxlog.to_str(path))
dirs, files = self.listdir(path, *user_args, **user_kwargs)
for dirname in dirs:
if isinstance(dirname, cxutils.unicode_type):
dirname = dirname.encode('utf8')
ldirname = dirname.lower()
ldirnameslash = ldirname + _BSLASH
fullfilename = path + dirname
fulldirname = fullfilename + _BSLASH
isdir = None
subnodes_to_test = []
for node in nodes:
if isdir is not False and ldirnameslash in node.string_children:
subnode = node.string_children[ldirnameslash]
if subnode.client_data:
if isdir is None:
isdir = self._isdir(1, fulldirname, *user_args, **user_kwargs)
if isdir:
for data in subnode.client_data:
yield (fulldirname, data)
cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fulldirname)))
if subnode.string_children or subnode.glob_children:
subnodes_to_test.append(subnode)
if ldirname in node.string_children:
subnode = node.string_children[ldirname]
for data in subnode.client_data:
yield (fullfilename, data)
cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fullfilename)))
for subnode in node.glob_children.values():
if isdir is False and subnode.is_dir:
continue
if subnode.name.match(ldirname):
if subnode.is_dir:
if subnode.client_data:
if isdir is None:
isdir = self._isdir(2, fulldirname, *user_args, **user_kwargs)
if isdir:
for data in subnode.client_data:
yield (fulldirname, data)
cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fulldirname)))
if subnode.string_children or subnode.glob_children:
subnodes_to_test.append(subnode)
else:
for data in subnode.client_data:
yield (fullfilename, data)
cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fullfilename)))
if subnodes_to_test and isdir is not False:
paths_to_test.append((fulldirname, subnodes_to_test))
for filename in files:
if isinstance(filename, cxutils.unicode_type):
filename = filename.encode('utf8')
lfilename = filename.lower()
fullfilename = path + filename
for node in nodes:
if lfilename in node.string_children:
subnode = node.string_children[lfilename]
for data in subnode.client_data:
yield (fullfilename, data)
cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fullfilename)))
for subnode in node.glob_children.values():
if not subnode.is_dir and subnode.name.match(lfilename):
for data in subnode.client_data:
yield (fullfilename, data)
cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fullfilename)))
else: # only a single node with a single string subnode
node = nodes[0]
for subnode in node.string_children.values():
if subnode.is_dir:
fullfilename = path + subnode.name + _BSLASH
else:
fullfilename = path + subnode.name
if subnode.client_data and self._exists(fullfilename, *user_args, **user_kwargs):
for data in subnode.client_data:
yield (fullfilename, data)
cxlog.log_('globtree', 'found match for %s: %s' % (cxlog.debug_str(data), cxlog.to_str(fullfilename)))
if subnode.string_children or subnode.glob_children:
paths_to_test.append((fullfilename, (subnode,)))
def _exists(self, path, *user_args, **user_kwargs):
cxlog.log_('globtree', 'exists(%s)' % cxlog.to_str(path))
return self.exists(path, *user_args, **user_kwargs)
def _isdir(self, location, path, *user_args, **user_kwargs):
cxlog.log_('globtree', '(%s)isdir(%s)' % (cxlog.to_str(location), cxlog.to_str(path)))
return self.exists(path, *user_args, **user_kwargs)
#####
#
# Abstract methods
#
#####
@cxdecorators.abstractmethod
def listdir(self, _path, *_user_args, **_user_kwargs):
"""This is an abstract method which must be implemented by subclasses.
It returns a tuple of (dirs, files) in the specified path.
If the implementation cannot distinguish between dirs and files quickly,
it may claim that files are dirs. The implementation will verify that
they are dirs if necessary using listdir() or exists().
If the path doesn't exist, return empty sequences.
"""
# pylint: disable=R0201
raise NotImplementedError()
@cxdecorators.abstractmethod
def exists(self, _path, *_user_args, **_user_kwargs):
"""This is an abstract method which must be implemented by subclasses.
It returns True if the specified path exists, and False otherwise.
Note that path will have a trailing '/' if it is meant to only match a
'folder'.
"""
# pylint: disable=R0201
raise NotImplementedError()
def isdir(self, path, *user_args, **user_kwargs):
"""This is equivalent to exists(), but GlobTree will only call it
for dirs returned by listdir() and will always provide a /.
If the dirs tuple contains only directories, you may implement this by
returning True unconditionally. Otherwise, do not override the function,
and it will call exists()."""
return self.exists(path, *user_args, **user_kwargs)
#####
#
# Apply the above GlobTree class to finding files
#
#####
class FileGlobTree(GlobTree):
"""Implements the GlobTree class for file trees."""
case_sensitive = True
def listdir(self, path):
# pylint: disable=W0221
try:
return os.listdir(path), ()
except OSError:
return (), ()
def exists(self, path):
# pylint: disable=W0221
return os.path.exists(path)
def file_exists_insensitive(filename):
glob_tree = FileGlobTree()
glob_tree.add_glob(os.path.basename(filename), "")
return len(list(glob_tree.matches(os.path.dirname(filename))))
#####
#
# Further add support for matching the content of the files
#
#####
class FileContentGlobTree(FileGlobTree):
"""Performs the same FileGlobTree but further filters out files based on
their content. Specifically, the content of matching files must match
every regular expression pattern specified for that specific filename glob.
If that list is empty, then the file automatically matches.
"""
def add_glob(self, glob, data):
FileGlobTree.add_glob(self, glob, (tuple(), data))
def add_content_glob(self, glob, patterns, data):
"""patterns is a list of strings specifying the regular expression
patterns that the file content must match.
See GlobTree.add_glob() for other details.
"""
regexps = []
for pattern in patterns:
regexps.append(re.compile(pattern))
FileGlobTree.add_glob(self, glob, (regexps, data))
def matches(self, root='', *user_args, **user_kwargs):
file_contents = last_path = None
for path, (regexps, data) in FileGlobTree.matches(self, root, *user_args, **user_kwargs):
if regexps:
# Read the file
if path != last_path:
# FileGlobTree will always return all matches for a given
# file successively. We take advantage of this here to
# avoid reading a given file more than once.
last_path = path
try:
# The re module doesn't seem to like CRLF so we open
# the file in U mode, which converts CRLF to LF.
infile = open(path, 'U')
try:
file_contents = infile.read()
finally:
infile.close()
except IOError:
file_contents = None
continue
elif file_contents is None:
# There was an error the last time we tried
# to access this file.
continue
# Check the regular expressions.
for regexp in regexps:
if not regexp.search(file_contents):
search_failed = True
break
else:
search_failed = False
if search_failed:
continue
yield (path, data)