Repository URL to install this package:
Version:
1.26.0.dev0+git7a2db260 ▾
|
# Copyright 2019 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
import itertools
import logging
import os
import unittest
import warnings
from abc import ABC, ABCMeta, abstractmethod
from collections import defaultdict
from contextlib import contextmanager
from tempfile import mkdtemp
from textwrap import dedent
from typing import Any, List, Optional, Type, TypeVar, Union, cast
from pants.base.build_root import BuildRoot
from pants.base.cmd_line_spec_parser import CmdLineSpecParser
from pants.base.exceptions import TaskError
from pants.base.specs import AddressSpec, AddressSpecs, FilesystemSpecs, Specs
from pants.build_graph.address import Address, BuildFileAddress
from pants.build_graph.build_configuration import BuildConfiguration
from pants.build_graph.build_file_aliases import BuildFileAliases
from pants.build_graph.target import Target
from pants.engine.fs import PathGlobs, PathGlobsAndRoot
from pants.engine.legacy.graph import HydratedField
from pants.engine.legacy.structs import Files, SourcesField
from pants.engine.rules import RootRule
from pants.engine.scheduler import SchedulerSession
from pants.engine.selectors import Params
from pants.init.engine_initializer import EngineInitializer
from pants.init.util import clean_global_runtime_state
from pants.option.global_options import BuildFileImportsBehavior
from pants.option.options_bootstrapper import OptionsBootstrapper
from pants.source.source_root import SourceRootConfig
from pants.source.wrapped_globs import EagerFilesetWithSpec
from pants.subsystem.subsystem import Subsystem
from pants.task.goal_options_mixin import GoalOptionsMixin
from pants.testutil.base.context_utils import create_context_from_options
from pants.testutil.engine.util import init_native
from pants.testutil.option.fakes import create_options_for_optionables
from pants.testutil.subsystem import util as subsystem_util
from pants.util.collections import assert_single_element
from pants.util.contextutil import temporary_dir
from pants.util.dirutil import (
recursive_dirname,
relative_symlink,
safe_file_dump,
safe_mkdir,
safe_mkdtemp,
safe_open,
safe_rmtree,
)
from pants.util.memo import memoized_method
from pants.util.meta import classproperty
class AbstractTestGenerator(ABC):
"""A mixin that facilitates test generation at runtime."""
@classmethod
@abstractmethod
def generate_tests(cls):
"""Generate tests for a given class.
This should be called against the composing class in its defining module, e.g.
class ThingTest(TestGenerator):
...
ThingTest.generate_tests()
"""
@classmethod
def add_test(cls, method_name, method):
"""A classmethod that adds dynamic test methods to a given class.
:param string method_name: The name of the test method (e.g. `test_thing_x`).
:param callable method: A callable representing the method. This should take a 'self' argument
as its first parameter for instance method binding.
"""
assert not hasattr(
cls, method_name
), f"a test with name `{method_name}` already exists on `{cls.__name__}`!"
assert method_name.startswith("test_"), f"{method_name} is not a valid test name!"
setattr(cls, method_name, method)
class TestBase(unittest.TestCase, metaclass=ABCMeta):
"""A baseclass useful for tests requiring a temporary buildroot.
:API: public
"""
_scheduler: Optional[SchedulerSession] = None
_build_graph = None
_address_mapper = None
def build_path(self, relpath):
"""Returns the canonical BUILD file path for the given relative build path.
:API: public
"""
if os.path.basename(relpath).startswith("BUILD"):
return relpath
else:
return os.path.join(relpath, "BUILD")
def create_dir(self, relpath):
"""Creates a directory under the buildroot.
:API: public
relpath: The relative path to the directory from the build root.
"""
path = os.path.join(self.build_root, relpath)
safe_mkdir(path)
self.invalidate_for(relpath)
return path
def create_workdir_dir(self, relpath):
"""Creates a directory under the work directory.
:API: public
relpath: The relative path to the directory from the work directory.
"""
path = os.path.join(self.pants_workdir, relpath)
safe_mkdir(path)
self.invalidate_for(relpath)
return path
def invalidate_for(self, *relpaths):
"""Invalidates all files from the relpath, recursively up to the root.
Many python operations implicitly create parent directories, so we assume that touching a
file located below directories that do not currently exist will result in their creation.
"""
if self._scheduler is None:
return
files = {f for relpath in relpaths for f in recursive_dirname(relpath)}
return self._scheduler.invalidate_files(files)
def create_link(self, relsrc, reldst):
"""Creates a symlink within the buildroot.
:API: public
relsrc: A relative path for the source of the link.
reldst: A relative path for the destination of the link.
"""
src = os.path.join(self.build_root, relsrc)
dst = os.path.join(self.build_root, reldst)
relative_symlink(src, dst)
self.invalidate_for(reldst)
def create_file(self, relpath, contents="", mode="w"):
"""Writes to a file under the buildroot.
:API: public
relpath: The relative path to the file from the build root.
contents: A string containing the contents of the file - '' by default..
mode: The mode to write to the file in - over-write by default.
"""
path = os.path.join(self.build_root, relpath)
with safe_open(path, mode=mode) as fp:
fp.write(contents)
self.invalidate_for(relpath)
return path
def create_files(self, path, files):
"""Writes to a file under the buildroot with contents same as file name.
:API: public
path: The relative path to the file from the build root.
files: List of file names.
"""
for f in files:
self.create_file(os.path.join(path, f), contents=f)
def create_workdir_file(self, relpath, contents="", mode="w"):
"""Writes to a file under the work directory.
:API: public
relpath: The relative path to the file from the work directory.
contents: A string containing the contents of the file - '' by default..
mode: The mode to write to the file in - over-write by default.
"""
path = os.path.join(self.pants_workdir, relpath)
with safe_open(path, mode=mode) as fp:
fp.write(contents)
return path
def add_to_build_file(self, relpath, target):
"""Adds the given target specification to the BUILD file at relpath.
:API: public
relpath: The relative path to the BUILD file from the build root.
target: A string containing the target definition as it would appear in a BUILD file.
"""
self.create_file(self.build_path(relpath), target, mode="a")
def make_target(
self,
spec="",
target_type=Target,
dependencies=None,
derived_from=None,
synthetic=False,
make_missing_sources=True,
**kwargs,
):
"""Creates a target and injects it into the test's build graph.
:API: public
:param string spec: The target address spec that locates this target.
:param type target_type: The concrete target subclass to create this new target from.
:param list dependencies: A list of target instances this new target depends on.
:param derived_from: The target this new target was derived from.
:type derived_from: :class:`pants.build_graph.target.Target`
"""
self._init_target_subsystem()
address = Address.parse(spec)
if make_missing_sources and "sources" in kwargs:
for source in kwargs["sources"]:
if "*" not in source:
self.create_file(os.path.join(address.spec_path, source), mode="a", contents="")
kwargs["sources"] = self.sources_for(kwargs["sources"], address.spec_path)
target = target_type(
name=address.target_name, address=address, build_graph=self.build_graph, **kwargs
)
dependencies = dependencies or []
self.build_graph.apply_injectables([target])
self.build_graph.inject_target(
target,
dependencies=[dep.address for dep in dependencies],
derived_from=derived_from,
synthetic=synthetic,
)
# TODO(John Sirois): This re-creates a little bit too much work done by the BuildGraph.
# Fixup the BuildGraph to deal with non BuildFileAddresses better and just leverage it.
traversables = [target.compute_dependency_address_specs(payload=target.payload)]
for dependency_spec in itertools.chain(*traversables):
dependency_address = Address.parse(dependency_spec, relative_to=address.spec_path)
dependency_target = self.build_graph.get_target(dependency_address)
if not dependency_target:
raise ValueError(
"Tests must make targets for dependency specs ahead of them "
"being traversed, {} tried to traverse {} which does not exist.".format(
target, dependency_address
)
)
if dependency_target not in target.dependencies:
self.build_graph.inject_dependency(
dependent=target.address, dependency=dependency_address
)
target.mark_transitive_invalidation_hash_dirty()
return target
def sources_for(
self, package_relative_path_globs: List[str], package_dir: str = "",
) -> EagerFilesetWithSpec:
sources_field = SourcesField(
address=BuildFileAddress(
rel_path=os.path.join(package_dir, "BUILD"), target_name="_bogus_target_for_test",
),
arg="sources",
filespecs={"globs": package_relative_path_globs},
base_globs=Files(spec_path=package_dir),
path_globs=PathGlobs(
tuple(os.path.join(package_dir, path) for path in package_relative_path_globs),
),
validate_fn=lambda _: True,
)
field = self.scheduler.product_request(HydratedField, [sources_field])[0]
return cast(EagerFilesetWithSpec, field.value)
@classmethod
def alias_groups(cls):
"""
:API: public
"""
return BuildFileAliases(targets={"target": Target})
@classmethod
def rules(cls):
# Required for sources_for:
return [RootRule(SourcesField)]
@classmethod
def build_config(cls):
build_config = BuildConfiguration()
build_config.register_aliases(cls.alias_groups())
build_config.register_rules(cls.rules())
return build_config
def setUp(self):
"""
:API: public
"""
super().setUp()
# Avoid resetting the Runtracker here, as that is specific to fork'd process cleanup.
clean_global_runtime_state(reset_subsystem=True)
self.addCleanup(self._reset_engine)
safe_mkdir(self.build_root, clean=True)
safe_mkdir(self.pants_workdir)
self.addCleanup(safe_rmtree, self.build_root)
BuildRoot().path = self.build_root
self.addCleanup(BuildRoot().reset)
self.subprocess_dir = os.path.join(self.build_root, ".pids")
self.options = defaultdict(dict) # scope -> key-value mapping.
self.options[""] = {
"pants_workdir": self.pants_workdir,
"pants_supportdir": os.path.join(self.build_root, "build-support"),
"pants_distdir": os.path.join(self.build_root, "dist"),
"pants_configdir": os.path.join(self.build_root, "config"),
"pants_subprocessdir": self.subprocess_dir,
"cache_key_gen_version": "0-test",
}
self.options["cache"] = {
"read_from": [],
"write_to": [],
}
self._build_configuration = self.build_config()
self._inited_target = False
subsystem_util.init_subsystem(Target.TagAssignments)
def buildroot_files(self, relpath=None):
"""Returns the set of all files under the test build root.
:API: public
:param string relpath: If supplied, only collect files from this subtree.
:returns: All file paths found.
:rtype: set
"""
def scan():
for root, dirs, files in os.walk(os.path.join(self.build_root, relpath or "")):
for f in files:
yield os.path.relpath(os.path.join(root, f), self.build_root)
return set(scan())
def _reset_engine(self):
if self._scheduler is not None:
self._build_graph.reset()
self._scheduler.invalidate_all_files()
@contextmanager
def isolated_local_store(self):
"""Temporarily use an anonymous, empty Store for the Scheduler.
In most cases we re-use a Store across all tests, since `file` and `directory` entries are
content addressed, and `process` entries are intended to have strong cache keys. But when
dealing with non-referentially transparent `process` executions, it can sometimes be
necessary to avoid this cache.
"""
self._scheduler = None
local_store_dir = os.path.realpath(safe_mkdtemp())
self._init_engine(local_store_dir=local_store_dir)
try:
yield
finally:
self._scheduler = None
safe_rmtree(local_store_dir)
@property
def build_root(self):
return self._build_root()
@property
def pants_workdir(self):
return self._pants_workdir()
@memoized_method
def _build_root(self):
return os.path.realpath(mkdtemp(suffix="_BUILD_ROOT"))
@memoized_method
def _pants_workdir(self):
return os.path.join(self._build_root(), ".pants.d")
def _init_engine(self, local_store_dir: Optional[str] = None) -> None:
if self._scheduler is not None:
return
options_bootstrapper = OptionsBootstrapper.create(args=["--pants-config-files=[]"])
local_store_dir = (
local_store_dir
or options_bootstrapper.bootstrap_options.for_global_scope().local_store_dir
)
# NB: This uses the long form of initialization because it needs to directly specify
# `cls.alias_groups` rather than having them be provided by bootstrap options.
graph_session = EngineInitializer.setup_legacy_graph_extended(
pants_ignore_patterns=None,
local_store_dir=local_store_dir,
build_file_imports_behavior=BuildFileImportsBehavior.error,
native=init_native(),
options_bootstrapper=options_bootstrapper,
build_root=self.build_root,
build_configuration=self.build_config(),
build_ignore_patterns=None,
).new_session(zipkin_trace_v2=False, build_id="buildid_for_test")
self._scheduler = graph_session.scheduler_session
self._build_graph, self._address_mapper = graph_session.create_build_graph(
Specs(address_specs=AddressSpecs([]), filesystem_specs=FilesystemSpecs([])),
self._build_root(),
)
@property
def scheduler(self) -> SchedulerSession:
if self._scheduler is None:
self._init_engine()
self.post_scheduler_init()
return cast(SchedulerSession, self._scheduler)
def post_scheduler_init(self):
"""Run after initializing the Scheduler, it will have the same lifetime."""
pass
@property
def address_mapper(self):
if self._address_mapper is None:
self._init_engine()
return self._address_mapper
@property
def build_graph(self):
if self._build_graph is None:
self._init_engine()
return self._build_graph
def reset_build_graph(self, reset_build_files=False, delete_build_files=False):
"""Start over with a fresh build graph with no targets in it."""
if delete_build_files or reset_build_files:
files = [f for f in self.buildroot_files() if os.path.basename(f) == "BUILD"]
if delete_build_files:
for f in files:
os.remove(os.path.join(self.build_root, f))
self.invalidate_for(*files)
if self._build_graph is not None:
self._build_graph.reset()
_P = TypeVar("_P")
def request_single_product(
self, product_type: Type["TestBase._P"], subject: Union[Params, Any]
) -> "TestBase._P":
result = assert_single_element(self.scheduler.product_request(product_type, [subject]))
return cast(TestBase._P, result)
def set_options_for_scope(self, scope, **kwargs):
self.options[scope].update(kwargs)
def context(
self,
for_task_types=None,
for_subsystems=None,
options=None,
target_roots=None,
console_outstream=None,
workspace=None,
scheduler=None,
address_mapper=None,
**kwargs,
):
"""
:API: public
:param dict **kwargs: keyword arguments passed in to `create_options_for_optionables`.
"""
# Many tests use source root functionality via the SourceRootConfig.global_instance().
# (typically accessed via Target.target_base), so we always set it up, for convenience.
for_subsystems = set(for_subsystems or ())
for subsystem in for_subsystems:
if subsystem.options_scope is None:
raise TaskError(
"You must set a scope on your subsystem type before using it in tests."
)
optionables = {SourceRootConfig} | self._build_configuration.optionables() | for_subsystems
for_task_types = for_task_types or ()
for task_type in for_task_types:
scope = task_type.options_scope
if scope is None:
raise TaskError("You must set a scope on your task type before using it in tests.")
optionables.add(task_type)
# If task is expected to inherit goal-level options, register those directly on the task,
# by subclassing the goal options registrar and settings its scope to the task scope.
if issubclass(task_type, GoalOptionsMixin):
subclass_name = "test_{}_{}_{}".format(
task_type.__name__,
task_type.goal_options_registrar_cls.options_scope,
task_type.options_scope,
)
optionables.add(
type(
subclass_name,
(task_type.goal_options_registrar_cls,),
{"options_scope": task_type.options_scope},
)
)
# Now expand to all deps.
all_optionables = set()
for optionable in optionables:
all_optionables.update(si.optionable_cls for si in optionable.known_scope_infos())
# Now default the option values and override with any caller-specified values.
# TODO(benjy): Get rid of the options arg, and require tests to call set_options.
options = options.copy() if options else {}
for s, opts in self.options.items():
scoped_opts = options.setdefault(s, {})
scoped_opts.update(opts)
fake_options = create_options_for_optionables(all_optionables, options=options, **kwargs)
Subsystem.reset(reset_options=True)
Subsystem.set_options(fake_options)
scheduler = scheduler or self.scheduler
address_mapper = address_mapper or self.address_mapper
context = create_context_from_options(
fake_options,
target_roots=target_roots,
build_graph=self.build_graph,
build_configuration=self._build_configuration,
address_mapper=address_mapper,
console_outstream=console_outstream,
workspace=workspace,
scheduler=scheduler,
)
return context
def tearDown(self):
"""
:API: public
"""
super().tearDown()
Subsystem.reset()
@classproperty
def subsystems(cls):
"""Initialize these subsystems when running your test.
If your test instantiates a target type that depends on any subsystems, those subsystems need to
be initialized in your test. You can override this property to return the necessary subsystem
classes.
:rtype: list of type objects, all subclasses of Subsystem
"""
return Target.subsystems()
def _init_target_subsystem(self):
if not self._inited_target:
subsystem_util.init_subsystems(self.subsystems)
self._inited_target = True
def target(self, spec):
"""Resolves the given target address to a Target object.
:API: public
address: The BUILD target address to resolve.
Returns the corresponding Target or else None if the address does not point to a defined Target.
"""
self._init_target_subsystem()
address = Address.parse(spec)
self.build_graph.inject_address_closure(address)
return self.build_graph.get_target(address)
def targets(self, address_spec):
"""Resolves a target spec to one or more Target objects.
:API: public
spec: Either BUILD target address or else a target glob using the siblings ':' or
descendants '::' suffixes.
Returns the set of all Targets found.
"""
address_spec = CmdLineSpecParser(self.build_root).parse_spec(address_spec)
assert isinstance(address_spec, AddressSpec)
targets = []
for address in self.build_graph.inject_address_specs_closure([address_spec]):
targets.append(self.build_graph.get_target(address))
return targets
def create_library(self, path, target_type, name, sources=None, **kwargs):
"""Creates a library target of given type at the BUILD file at path with sources.
:API: public
path: The relative path to the BUILD file from the build root.
target_type: valid pants target type.
name: Name of the library target.
sources: List of source file at the path relative to path.
**kwargs: Optional attributes that can be set for any library target.
Currently it includes support for resources, java_sources, provides
and dependencies.
"""
if sources:
self.create_files(path, sources)
self.add_to_build_file(
path,
dedent(
"""
%(target_type)s(name='%(name)s',
%(sources)s
%(java_sources)s
%(provides)s
%(dependencies)s
)
"""
% dict(
target_type=target_type,
name=name,
sources=("sources=%s," % repr(sources) if sources else ""),
java_sources=(
"java_sources=[%s],"
% ",".join('"%s"' % str_target for str_target in kwargs.get("java_sources"))
if "java_sources" in kwargs
else ""
),
provides=(
"provides=%s," % kwargs.get("provides") if "provides" in kwargs else ""
),
dependencies=(
"dependencies=%s," % kwargs.get("dependencies")
if "dependencies" in kwargs
else ""
),
)
),
)
return self.target(f"{path}:{name}")
def create_resources(self, path, name, *sources):
"""
:API: public
"""
return self.create_library(path, "resources", name, sources)
def assertUnorderedPrefixEqual(self, expected, actual_iter):
"""Consumes len(expected) items from the given iter, and asserts that they match, unordered.
:API: public
"""
actual = list(itertools.islice(actual_iter, len(expected)))
self.assertEqual(sorted(expected), sorted(actual))
def assertPrefixEqual(self, expected, actual_iter):
"""Consumes len(expected) items from the given iter, and asserts that they match, in order.
:API: public
"""
self.assertEqual(expected, list(itertools.islice(actual_iter, len(expected))))
def assertInFile(self, string, file_path):
"""Verifies that a string appears in a file.
:API: public
"""
with open(file_path, "r") as f:
content = f.read()
self.assertIn(string, content, f'"{string}" is not in the file {f.name}:\n{content}')
@contextmanager
def assertRaisesWithMessage(self, exception_type, error_text):
"""Verifies than an exception message is equal to `error_text`.
:param type exception_type: The exception type which is expected to be raised within the body.
:param str error_text: Text that the exception message should match exactly with
`self.assertEqual()`.
:API: public
"""
with self.assertRaises(exception_type) as cm:
yield cm
self.assertEqual(error_text, str(cm.exception))
@contextmanager
def assertRaisesWithMessageContaining(self, exception_type, error_text):
"""Verifies that the string `error_text` appears in an exception message.
:param type exception_type: The exception type which is expected to be raised within the body.
:param str error_text: Text that the exception message should contain with `self.assertIn()`.
:API: public
"""
with self.assertRaises(exception_type) as cm:
yield cm
self.assertIn(error_text, str(cm.exception))
@contextmanager
def assertDoesNotRaise(self, exc_class: Type[BaseException] = Exception):
"""Verifies that the block does not raise an exception of the specified type.
:API: public
"""
try:
yield
except exc_class as e:
raise AssertionError(f"section should not have raised, but did: {e}") from e
def get_bootstrap_options(self, cli_options=()):
"""Retrieves bootstrap options.
:param cli_options: An iterable of CLI flags to pass as arguments to `OptionsBootstrapper`.
"""
args = tuple(["--pants-config-files=[]"]) + tuple(cli_options)
return OptionsBootstrapper.create(args=args).bootstrap_options.for_global_scope()
def make_snapshot(self, files):
"""Makes a snapshot from a collection of files.
:param files: a dictionary, where key=filename, value=file_content where both are of type String.
:return: a Snapshot.
"""
with temporary_dir() as temp_dir:
for file_name, content in files.items():
safe_file_dump(os.path.join(temp_dir, file_name), content)
return self.scheduler.capture_snapshots(
(PathGlobsAndRoot(PathGlobs(("**",)), temp_dir),)
)[0]
class LoggingRecorder:
"""Simple logging handler to record warnings."""
def __init__(self):
self._records = []
self.level = logging.DEBUG
def handle(self, record):
self._records.append(record)
def _messages_for_level(self, levelname):
return [
f"{record.name}: {record.getMessage()}"
for record in self._records
if record.levelname == levelname
]
def infos(self):
return self._messages_for_level("INFO")
def warnings(self):
return self._messages_for_level("WARNING")
def errors(self):
return self._messages_for_level("ERROR")
@contextmanager
def captured_logging(self, level=None):
root_logger = logging.getLogger()
old_level = root_logger.level
root_logger.setLevel(level or logging.NOTSET)
handler = self.LoggingRecorder()
root_logger.addHandler(handler)
try:
yield handler
finally:
root_logger.setLevel(old_level)
root_logger.removeHandler(handler)
@contextmanager
def warnings_catcher(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
yield w
def assertWarning(self, w, category, warning_text):
single_warning = assert_single_element(w)
self.assertEqual(single_warning.category, category)
warning_message = single_warning.message
self.assertEqual(warning_text, str(warning_message))
def retrieve_single_product_at_target_base(self, product_mapping, target):
mapping_for_target = product_mapping.get(target)
single_base_dir = assert_single_element(list(mapping_for_target.keys()))
single_product = assert_single_element(mapping_for_target[single_base_dir])
return single_product
def populate_target_dict(self, target_map):
"""Return a dict containing targets with files generated according to `target_map`.
The keys of `target_map` are target address strings, while the values of `target_map` should be
a dict which contains keyword arguments fed into `self.make_target()`, along with a few special
keys. Special keys are:
- 'key': used to access the target in the returned dict. Defaults to the target address spec.
- 'filemap': creates files at the specified relative paths to the target.
An `OrderedDict` of 2-tuples must be used with the targets topologically ordered, if
they have dependencies on each other. Note that dependency cycles are not currently supported
with this method.
:param target_map: Dict mapping each target address to generate -> kwargs for
`self.make_target()`, along with a 'key' and optionally a 'filemap' argument.
:return: Dict mapping the required 'key' argument -> target instance for each element of
`target_map`.
:rtype: dict
"""
target_dict = {}
# Create a target from each specification and insert it into `target_dict`.
for address_spec, target_kwargs in target_map.items():
unprocessed_kwargs = target_kwargs.copy()
target_base = Address.parse(address_spec).spec_path
# Populate the target's owned files from the specification.
filemap = unprocessed_kwargs.pop("filemap", {})
for rel_path, content in filemap.items():
buildroot_path = os.path.join(target_base, rel_path)
self.create_file(buildroot_path, content)
# Ensure any dependencies exist in the target dict (`target_map` must then be an
# OrderedDict).
# The 'key' is used to access the target in `target_dict`, and defaults to `target_spec`.
target_address = Address.parse(address_spec)
key = unprocessed_kwargs.pop("key", target_address.target_name)
dep_targets = []
for dep_spec in unprocessed_kwargs.pop("dependencies", []):
existing_tgt_key = target_map[dep_spec]["key"]
dep_targets.append(target_dict[existing_tgt_key])
# Register the generated target.
generated_target = self.make_target(
spec=address_spec, dependencies=dep_targets, **unprocessed_kwargs
)
target_dict[key] = generated_target
return target_dict