Repository URL to install this package:
|
Version:
1.4.post2 ▾
|
"""
Parsing and finding routines.
This could be considered the core of snakefood, and where all the complexity lives.
"""
# This file is part of the Snakefood open source package.
# See http://furius.ca/snakefood/ for licensing details.
import sys, os, logging
import compiler
from compiler.visitor import ASTVisitor
from compiler.ast import Discard, Const, AssName, List, Tuple
from compiler.consts import OP_ASSIGN
from os.path import *
from snakefood.roots import find_package_root
from snakefood.local import filter_unused_imports
__all__ = ('find_dependencies', 'find_imports',
'parse_python_source',
'ImportVisitor', 'get_local_names', 'check_duplicate_imports',
'ERROR_IMPORT', 'ERROR_SYMBOL', 'ERROR_UNUSED')
ERROR_IMPORT = " Line %d: Could not import module '%s'"
ERROR_SYMBOL = " Line %d: Symbol is not a module: '%s'"
ERROR_UNUSED = " Line %d: Ignored unused import: '%s'"
ERROR_SOURCE = " %s"
WARNING_OPTIONAL = " Line %d: Pragma suppressing import '%s'"
def find_dependencies(fn, verbose, process_pragmas,
ignore_unused=False,
warning_lambda=logging.warning,
debug_lambda=logging.debug):
"Returns a list of the files 'fn' depends on."
file_errors = []
ast, _ = parse_python_source(fn)
if ast is None:
return [], file_errors
found_imports, future_imports = get_ast_imports(ast)
if found_imports is None:
return [], file_errors
# Filter out the unused imports if requested.
if ignore_unused:
found_imports, unused_imports = filter_unused_imports(ast, found_imports)
for modname, rname, lname, lineno, level, pragma in unused_imports:
file_errors.append((ERROR_UNUSED, lname))
output_code = (verbose >= 2)
source_lines = None
if output_code:
source_lines = open(fn, 'rU').read().splitlines()
absolute_import = 'absolute_import' in future_imports
files = []
assert not isdir(fn)
dn = dirname(fn)
seenset = set()
for x in found_imports:
mod, rname, lname, lineno, level, pragma = x
if process_pragmas and pragma == 'OPTIONAL':
if rname is None:
msg = WARNING_OPTIONAL % (lineno, mod)
else:
msg = '%s.%s' % (mod, rname)
logging.warning(msg)
continue
sig = (mod, rname)
if sig in seenset:
continue
seenset.add(sig)
modfile, errors = find_dotted_module(mod, rname, dn, level, absolute_import)
if errors:
file_errors.extend(errors)
for err, name in errors:
if err is ERROR_IMPORT:
efun = warning_lambda
else:
efun = debug_lambda
efun(err % (lineno, name))
if output_code:
efun(ERROR_SOURCE % source_lines[lineno-1].rstrip())
if modfile is None:
continue
files.append(realpath(modfile))
return files, file_errors
def find_imports(fn, verbose, ignores):
"Yields a list of the module names the file 'fn' depends on."
ast, _ = parse_python_source(fn)
if ast is None:
raise StopIteration
found_imports, future_imports = get_ast_imports(ast)
if found_imports is None:
raise StopIteration
dn = dirname(fn)
packroot = None
for modname, rname, lname, lineno, _, _ in found_imports:
islocal = False
names = modname.split('.')
if find_dotted(names, dn):
# This is a local import, we need to find the root in order to
# compute the absolute module name.
if packroot is None:
packroot = find_package_root(fn, ignores)
if not packroot:
logging.warning(
"%d: Could not find package root for local import '%s' from '%s'." %
(lineno, modname, fn))
continue
reldir = dirname(fn)[len(packroot)+1:]
modname = '%s.%s' % (reldir.replace(os.sep, '.'), modname)
islocal = True
if rname is not None:
modname = '%s.%s' % (modname, rname)
yield (modname, lineno, islocal)
class ImportVisitor(object):
"""AST visitor for grabbing the import statements.
This visitor produces a list of
(module-name, remote-name, local-name, line-no, pragma)
* remote-name is the name off the symbol in the imported module.
* local-name is the name of the object given in the importing module.
"""
def __init__(self):
self.modules = []
self.recent = []
self.future = set()
def visitImport(self, node):
self.accept_imports()
self.recent.extend((x[0], None, x[1] or x[0], node.lineno, 0)
for x in node.names)
def visitFrom(self, node):
self.accept_imports()
modname = node.modname
if modname == '__future__':
for name, as_ in node.names:
self.future.add(name)
return
for name, as_ in node.names:
if name == '*':
# We really don't know...
mod = (modname, None, None, node.lineno, node.level)
else:
mod = (modname, name, as_ or name, node.lineno, node.level)
self.recent.append(mod)
# For package initialization files, try to fetch the __all__ list, which
# implies an implicit import if the package is being imported via
# from-import; from the documentation:
#
# The import statement uses the following convention: if a package's
# __init__.py code defines a list named __all__, it is taken to be the list
# of module names that should be imported when from package import * is
# encountered. It is up to the package author to keep this list up-to-date
# when a new version of the package is released. Package authors may also
# decide not to support it, if they don't see a use for importing * from
# their package.
def visitAssign(self, node):
lhs = node.nodes
if (len(lhs) == 1 and
isinstance(lhs[0], AssName) and
lhs[0].name == '__all__' and
lhs[0].flags == OP_ASSIGN):
rhs = node.expr
if isinstance(rhs, (List, Tuple)):
for namenode in rhs:
# Note: maybe we should handle the case of non-consts.
if isinstance(namenode, Const):
modname = namenode.value
mod = (modname, None, modname, node.lineno, 0)#node.level
self.recent.append(mod)
def default(self, node):
pragma = None
if self.recent:
if isinstance(node, Discard):
children = node.getChildren()
if len(children) == 1 and isinstance(children[0], Const):
const_node = children[0]
pragma = const_node.value
self.accept_imports(pragma)
def accept_imports(self, pragma=None):
self.modules.extend((m, r, l, n, lvl, pragma)
for (m, r, l, n, lvl) in self.recent)
self.recent = []
def finalize(self):
self.accept_imports()
return self.modules, self.future
def check_duplicate_imports(found_imports):
"""
Heuristically check for duplicate imports, and return two lists:
a list of the unique imports and a list of the duplicates.
"""
uniq, dups = [], []
simp = set()
for x in found_imports:
modname, rname, lname, lineno, _, pragma = x
if rname is not None:
key = modname + '.' + rname
else:
key = modname
if key in simp:
dups.append(x)
else:
uniq.append(x)
simp.add(key)
return uniq, dups
def get_local_names(found_imports):
"""
Convert the results of running the ImportVisitor into a simple list of local
names.
"""
return [(lname, no)
for modname, rname, lname, no, _, pragma in found_imports
if lname is not None]
class ImportWalker(ASTVisitor):
"AST walker that we use to dispatch to a default method on the visitor."
def __init__(self, visitor):
ASTVisitor.__init__(self)
self._visitor = visitor
def default(self, node, *args):
self._visitor.default(node)
ASTVisitor.default(self, node, *args)
def parse_python_source(fn):
"""Parse the file 'fn' and return two things:
1. The AST tree.
2. A list of lines of the source line (typically used for verbose error
messages).
If the file has a syntax error in it, the first argument will be None.
"""
# Read the file's contents to return it.
# Note: we make sure to use universal newlines.
try:
contents = open(fn, 'rU').read()
lines = contents.splitlines()
except (IOError, OSError), e:
logging.error("Could not read file '%s'." % fn)
return None, None
# Convert the file to an AST.
try:
ast = compiler.parse(contents)
except SyntaxError, e:
err = '%s:%s: %s' % (fn, e.lineno or '--', e.msg)
logging.error("Error processing file '%s':\n%s" %
(fn, err))
return None, lines
except TypeError, e:
# Note: this branch untested, applied from a user-submitted patch.
err = '%s: %s' % (fn, str(e))
logging.error("Error processing file '%s':\n%s" %
(fn, err))
return None, lines
return ast, lines
def get_ast_imports(ast):
"""
Given an AST, return a list of module tuples for the imports found, in the
form:
(modname, remote-name, local-name, lineno, pragma)
"""
assert ast is not None
vis = ImportVisitor()
compiler.walk(ast, vis, ImportWalker(vis))
found_imports, future_imports = vis.finalize()
return found_imports, future_imports
# **WARNING** This is where all the evil lies. Risk and peril. Watch out.
if sys.platform == "win32":
# Location of python lib on win32
libpath = join(sys.prefix, 'lib')
else:
libpath = join(sys.prefix, 'lib', 'python%d.%d' % sys.version_info[:2])
exceptions = ('os.path',)
builtin_module_names = sys.builtin_module_names + exceptions
module_cache = {}
def find_dotted_module(modname, rname, parentdir, level, absolute_import):
"""
A version of find_module that supports dotted module names (packages). This
function returns the filename of the module if found, otherwise returns
None.
If 'rname' is not None, it first attempts to import 'modname.rname', and if it
fails, it must therefore not be a module, so we look up 'modname' and return
that instead.
'parentdir' is the directory of the file that attempts to do the import. We
attempt to do a local import there first.
'level' is the level of a relative import (i.e. the number of leading dots).
If 0, the import is absolute.
'absolute_import' use semantics defined in https://www.python.org/dev/peps/pep-0328/
"""
# Check for builtins.
if modname in builtin_module_names:
return join(libpath, modname), None
if modname == '':
# Check for "from ... import xyz"
# However, xyz could be a symbol in ./__init__.py
assert rname is not None
assert level > 0
fn = _import_relative(rname, parentdir, level)
if not fn:
return parentdir, [
(ERROR_SYMBOL, '.'*level + rname)
]
return fn, []
if absolute_import:
if level == 0:
fn = _import_module(modname)
else:
fn = _import_relative(modname, parentdir, level)
else:
fn = _import_relative(modname, parentdir, level)
if not fn and level == 0:
fn = _import_module(modname)
if not fn:
return None, [
(ERROR_IMPORT, modname)
]
# If this is a from-form, try the target symbol as a module.
if rname and _supports_submodules(fn):
fn2 = _import_relative(rname, dirname(fn), 1)
if not fn2:
return fn, [
(ERROR_SYMBOL, '.'.join((modname, rname)))
]
fn = fn2
return fn, []
def _supports_submodules(fn):
# Only folders and __init__.py's can have submodules
return isdir(fn) or fn.endswith('/__init__.py')
def _import_module(modname):
try:
return module_cache[modname]
except KeyError:
names = modname.split('.')
fn = find_dotted(names)
module_cache[modname] = fn
return fn
def _import_relative(modname, parentdir, level):
for i in range(level - 1):
parentdir = dirname(parentdir)
names = modname.split('.')
return find_dotted(names, parentdir)
try:
from imp import ImpImporter
except ImportError:
try:
from pkgutil import ImpImporter
except ImportError:
from snakefood.fallback.pkgutil import ImpImporter
def find_dotted(names, parentdir=None):
"""
Dotted import. 'names' is a list of path components, 'parentdir' is the
parent directory.
"""
filename = None
for name in names:
mod = ImpImporter(parentdir).find_module(name)
if not mod:
break
filename = mod.get_filename()
if not filename:
break
parentdir = dirname(filename)
else:
return filename