Repository URL to install this package:
|
Version:
1.26.0.dev0+gite506aa5f ▾
|
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
import itertools
import logging
import weakref
from abc import ABC, abstractmethod
from collections import OrderedDict, defaultdict, deque
from typing import Iterable
from twitter.common.collections import OrderedSet
from pants.base.deprecated import warn_or_error
from pants.base.specs import AddressSpec
from pants.build_graph.address import Address
from pants.build_graph.address_lookup_error import AddressLookupError
from pants.build_graph.injectables_mixin import InjectablesMixin
from pants.build_graph.target import Target
logger = logging.getLogger(__name__)
class BuildGraph(ABC):
"""A directed acyclic graph of Targets and dependencies. Not necessarily connected.
:API: public
"""
class DuplicateAddressError(AddressLookupError):
"""The same address appears multiple times in a dependency list.
:API: public
"""
class TransitiveLookupError(AddressLookupError):
"""Used to append the current node to the error message from an AddressLookupError.
:API: public
"""
class ManualSyntheticTargetError(AddressLookupError):
"""Used to indicate that an synthetic target was defined manually.
:API: public
"""
def __init__(self, addr):
super(BuildGraph.ManualSyntheticTargetError, self).__init__(
"Found a manually-defined target at synthetic address {}".format(addr.spec)
)
class NoDepPredicateWalk:
"""This is a utility class to aid in graph traversals that don't have predicates on
dependency edges."""
def __init__(self):
self._worked = set()
self._expanded = set()
def expanded_or_worked(self, vertex):
"""Returns True if the vertex has been expanded or worked."""
return vertex in self._expanded or vertex in self._worked
def do_work_once(self, vertex):
"""Returns True exactly once for the given vertex."""
if vertex in self._worked:
return False
self._worked.add(vertex)
return True
def expand_once(self, vertex, _):
"""Returns True exactly once for the given vertex."""
if vertex in self._expanded:
return False
self._expanded.add(vertex)
return True
def dep_predicate(self, target, dep, level):
return True
class DepPredicateWalk(NoDepPredicateWalk):
"""This is a utility class to aid in graph traversals that don't care about the depth."""
def __init__(self, dep_predicate):
super(BuildGraph.DepPredicateWalk, self).__init__()
self._dep_predicate = dep_predicate
def dep_predicate(self, target, dep, level):
return self._dep_predicate(target, dep)
@staticmethod
def closure(*vargs, **kwargs):
"""See `Target.closure_for_targets` for arguments.
:API: public
"""
return Target.closure_for_targets(*vargs, **kwargs)
def __init__(self):
self._invalidation_callbacks = []
self.reset()
def __len__(self):
return len(self._target_by_address)
def target_file_count(self):
"""Returns a count of source files owned by all Targets in the BuildGraph."""
# TODO: Move this file counting into the `ProductGraph`.
return sum(t.sources_count() for t in self.targets())
@abstractmethod
def clone_new(self):
"""Returns a new BuildGraph instance of the same type and with the same __init__ params."""
def apply_injectables(self, targets):
"""Given an iterable of `Target` instances, apply their transitive injectables."""
target_types = {type(t) for t in targets}
target_subsystem_deps = {
s for s in itertools.chain(*(t.subsystems() for t in target_types))
}
for subsystem in target_subsystem_deps:
# TODO: The is_initialized() check is primarily for tests and would be nice to do away with.
if issubclass(subsystem, InjectablesMixin) and subsystem.is_initialized():
subsystem.global_instance().injectables(self)
def reset(self):
"""Clear out the state of the BuildGraph, in particular Target mappings and dependencies.
:API: public
"""
self._invalidated()
self._target_by_address = OrderedDict()
self._target_dependencies_by_address = defaultdict(OrderedSet)
self._target_dependees_by_address = defaultdict(OrderedSet)
self._derived_from_by_derivative = {} # Address -> Address.
self._derivatives_by_derived_from = defaultdict(list) # Address -> list of Address.
self.synthetic_addresses = set()
def add_invalidation_callback(self, callback):
"""Adds a no-arg function that will be called whenever the graph is changed.
This interface exists to allow for invalidation of caches as the graph changes (due to injected
targets: generally synthetic). We should continue to move away from mutability in order to make
this kind of interface unnecessary, but synthetic targets are a fundamental part of the v1 model
that can likely only be removed via v2.
"""
self._invalidation_callbacks.append(weakref.ref(callback))
def _invalidated(self):
for callback_ref in self._invalidation_callbacks:
callback = callback_ref()
if callback is not None:
callback()
def contains_address(self, address):
"""
:API: public
"""
return address in self._target_by_address
def get_target_from_spec(self, spec, relative_to=""):
"""Converts `spec` into an address and returns the result of `get_target`
:API: public
"""
return self.get_target(Address.parse(spec, relative_to=relative_to))
def get_target(self, address):
"""Returns the Target at `address` if it has been injected into the BuildGraph, otherwise
None.
:API: public
"""
return self._target_by_address.get(address, None)
def dependencies_of(self, address):
"""Returns the dependencies of the Target at `address`.
This method asserts that the address given is actually in the BuildGraph.
:API: public
"""
assert (
address in self._target_by_address
), "Cannot retrieve dependencies of {address} because it is not in the BuildGraph.".format(
address=address
)
return self._target_dependencies_by_address[address]
def dependents_of(self, address):
"""Returns the addresses of the targets that depend on the target at `address`.
This method asserts that the address given is actually in the BuildGraph.
:API: public
"""
assert (
address in self._target_by_address
), "Cannot retrieve dependents of {address} because it is not in the BuildGraph.".format(
address=address
)
return self._target_dependees_by_address[address]
def get_derived_from(self, address):
"""Get the target the specified target was derived from.
If a Target was injected programmatically, e.g. from codegen, this allows us to trace its
ancestry. If a Target is not derived, default to returning itself.
:API: public
"""
parent_address = self._derived_from_by_derivative.get(address, address)
return self.get_target(parent_address)
def get_concrete_derived_from(self, address):
"""Get the concrete target the specified target was (directly or indirectly) derived from.
The returned target is guaranteed to not have been derived from any other target.
:API: public
"""
current_address = address
next_address = self._derived_from_by_derivative.get(current_address, current_address)
while next_address != current_address:
current_address = next_address
next_address = self._derived_from_by_derivative.get(current_address, current_address)
return self.get_target(current_address)
def get_direct_derivatives(self, address):
"""Get all targets derived directly from the specified target.
Note that the specified target itself is not returned.
:API: public
"""
derivative_addrs = self._derivatives_by_derived_from.get(address, [])
return [self.get_target(addr) for addr in derivative_addrs]
def get_all_derivatives(self, address):
"""Get all targets derived directly or indirectly from the specified target.
Note that the specified target itself is not returned.
:API: public
"""
ret = []
direct = self.get_direct_derivatives(address)
ret.extend(direct)
for t in direct:
ret.extend(self.get_all_derivatives(t.address))
return ret
def inject_target(self, target, dependencies=None, derived_from=None, synthetic=False):
"""Injects a fully realized Target into the BuildGraph.
:API: public
:param Target target: The Target to inject.
:param list<Address> dependencies: The Target addresses that `target` depends on.
:param Target derived_from: The Target that `target` was derived from, usually as a result
of codegen.
:param bool synthetic: Whether to flag this target as synthetic, even if it isn't derived
from another target.
"""
self._invalidated()
if self.contains_address(target.address):
raise ValueError(
"Attempted to inject synthetic {target} derived from {derived_from}"
" into the BuildGraph with address {address}, but there is already a Target"
" {existing_target} with that address".format(
target=target,
derived_from=derived_from,
address=target.address,
existing_target=self.get_target(target.address),
)
)
dependencies = dependencies or frozenset()
address = target.address
if address in self._target_by_address:
raise ValueError(
"A Target {existing_target} already exists in the BuildGraph at address"
" {address}. Failed to insert {target}.".format(
existing_target=self._target_by_address[address], address=address, target=target
)
)
if derived_from:
if not self.contains_address(derived_from.address):
raise ValueError(
"Attempted to inject synthetic {target} derived from {derived_from}"
" into the BuildGraph, but {derived_from} was not in the BuildGraph."
" Synthetic Targets must be derived from no Target (None) or from a"
" Target already in the BuildGraph.".format(
target=target, derived_from=derived_from
)
)
self._derived_from_by_derivative[target.address] = derived_from.address
self._derivatives_by_derived_from[derived_from.address].append(target.address)
if derived_from or synthetic:
self.synthetic_addresses.add(address)
self._target_by_address[address] = target
for dependency_address in dependencies:
self.inject_dependency(dependent=address, dependency=dependency_address)
def inject_dependency(self, dependent, dependency):
"""Injects a dependency from `dependent` onto `dependency`.
It is an error to inject a dependency if the dependent doesn't already exist, but the reverse
is not an error.
:API: public
:param Address dependent: The (already injected) address of a Target to which `dependency`
is being added.
:param Address dependency: The dependency to be injected.
"""
self._invalidated()
if dependent not in self._target_by_address:
raise ValueError(
"Cannot inject dependency from {dependent} on {dependency} because the"
" dependent is not in the BuildGraph.".format(
dependent=dependent, dependency=dependency
)
)
# TODO(pl): Unfortunately this is an unhelpful time to error due to a cycle. Instead, we warn
# and allow the cycle to appear. It is the caller's responsibility to call sort_targets on the
# entire graph to generate a friendlier CycleException that actually prints the cycle.
# Alternatively, we could call sort_targets after every inject_dependency/inject_target, but
# that could have nasty performance implications. Alternative 2 would be to have an internal
# data structure of the topologically sorted graph which would have acceptable amortized
# performance for inserting new nodes, and also cycle detection on each insert.
if dependency not in self._target_by_address:
logger.warning(
"Injecting dependency from {dependent} on {dependency}, but the dependency"
" is not in the BuildGraph. This probably indicates a dependency cycle, but"
" it is not an error until sort_targets is called on a subgraph containing"
" the cycle.".format(dependent=dependent, dependency=dependency)
)
if dependency in self.dependencies_of(dependent):
logger.debug(
"{dependent} already depends on {dependency}".format(
dependent=dependent, dependency=dependency
)
)
else:
self._target_dependencies_by_address[dependent].add(dependency)
self._target_dependees_by_address[dependency].add(dependent)
def targets(self, predicate=None):
"""Returns all the targets in the graph in no particular order.
:API: public
:param predicate: A target predicate that will be used to filter the targets returned.
"""
return list(filter(predicate, self._target_by_address.values()))
def sorted_targets(self):
"""
:API: public
:return: targets ordered from most dependent to least.
"""
return sort_targets(self.targets())
def _walk_factory(self, dep_predicate):
"""Construct the right context object for managing state during a transitive walk."""
walk = None
if dep_predicate:
walk = self.DepPredicateWalk(dep_predicate)
else:
walk = self.NoDepPredicateWalk()
return walk
def walk_transitive_dependency_graph(
self,
addresses,
work,
predicate=None,
postorder=False,
dep_predicate=None,
prelude=None,
epilogue=None,
):
"""Given a work function, walks the transitive dependency closure of `addresses` using DFS.
:API: public
:param list<Address> addresses: The closure of `addresses` will be walked.
:param function work: The function that will be called on every target in the closure using
the specified traversal order.
:param bool postorder: When ``True``, the traversal order is postorder (children before
parents), else it is preorder (parents before children).
:param function predicate: If this parameter is not given, no Targets will be filtered
out of the closure. If it is given, any Target which fails the predicate will not be
walked, nor will its dependencies. Thus predicate effectively trims out any subgraph
that would only be reachable through Targets that fail the predicate.
:param function dep_predicate: Takes two parameters, the current target and the dependency of
the current target. If this parameter is not given, no dependencies will be filtered
when traversing the closure. If it is given, when the predicate fails, the edge to the dependency
will not be expanded.
:param function prelude: Function to run before any dependency expansion.
It takes the currently explored target as an argument.
If ``postorder`` is ``False``, it will run after the current node is visited.
It is not affected by ``dep_predicate``.
It is not run if ``predicate`` does not succeed.
:param function epilogue: Function to run after children nodes are visited.
It takes the currently explored target as an argument.
If ``postorder`` is ``True``, it runs before visiting the current node.
It is not affected by ``dep_predicate``.
It is not run if ``predicate`` is not passed.
"""
walk = self._walk_factory(dep_predicate)
def _walk_rec(addr, level=0):
# If we've followed an edge to this address, stop recursing.
if not walk.expand_once(addr, level):
return
target = self._target_by_address[addr]
if predicate and not predicate(target):
return
if not postorder and walk.do_work_once(addr):
work(target)
if prelude:
prelude(target)
for dep_address in self._target_dependencies_by_address[addr]:
if walk.expanded_or_worked(dep_address):
continue
if walk.dep_predicate(target, self._target_by_address[dep_address], level):
_walk_rec(dep_address, level + 1)
if epilogue:
epilogue(target)
if postorder and walk.do_work_once(addr):
work(target)
for address in addresses:
_walk_rec(address)
def walk_transitive_dependee_graph(
self, addresses, work, predicate=None, postorder=False, prelude=None, epilogue=None
):
"""Identical to `walk_transitive_dependency_graph`, but walks dependees preorder (or
postorder if the postorder parameter is True).
This is identical to reversing the direction of every arrow in the DAG, then calling
`walk_transitive_dependency_graph`.
:API: public
"""
walked = set()
def _walk_rec(addr):
if addr not in walked:
walked.add(addr)
target = self._target_by_address[addr]
if not predicate or predicate(target):
if not postorder:
work(target)
if prelude:
prelude(target)
for dep_address in self._target_dependees_by_address[addr]:
_walk_rec(dep_address)
if epilogue:
epilogue(target)
if postorder:
work(target)
for address in addresses:
_walk_rec(address)
def transitive_dependees_of_addresses(self, addresses, predicate=None, postorder=False):
"""Returns all transitive dependees of `addresses`.
Note that this uses `walk_transitive_dependee_graph` and the predicate is passed through,
hence it trims graphs rather than just filtering out Targets that do not match the predicate.
See `walk_transitive_dependee_graph for more detail on `predicate`.
:API: public
:param list<Address> addresses: The root addresses to transitively close over.
:param function predicate: The predicate passed through to `walk_transitive_dependee_graph`.
"""
ret = OrderedSet()
self.walk_transitive_dependee_graph(
addresses, ret.add, predicate=predicate, postorder=postorder
)
return ret
def transitive_subgraph_of_addresses(self, addresses, *vargs, **kwargs):
"""Returns all transitive dependencies of `addresses`.
Note that this uses `walk_transitive_dependencies_graph` and the predicate is passed through,
hence it trims graphs rather than just filtering out Targets that do not match the predicate.
See `walk_transitive_dependency_graph for more detail on `predicate`.
:API: public
:param list<Address> addresses: The root addresses to transitively close over.
:param function predicate: The predicate passed through to
`walk_transitive_dependencies_graph`.
:param bool postorder: When ``True``, the traversal order is postorder (children before
parents), else it is preorder (parents before children).
:param function predicate: If this parameter is not given, no Targets will be filtered
out of the closure. If it is given, any Target which fails the predicate will not be
walked, nor will its dependencies. Thus predicate effectively trims out any subgraph
that would only be reachable through Targets that fail the predicate.
:param function dep_predicate: Takes two parameters, the current target and the dependency of
the current target. If this parameter is not given, no dependencies will be filtered
when traversing the closure. If it is given, when the predicate fails, the edge to the dependency
will not be expanded.
"""
ret = OrderedSet()
self.walk_transitive_dependency_graph(addresses, ret.add, *vargs, **kwargs)
return ret
def transitive_subgraph_of_addresses_bfs(self, addresses, predicate=None, dep_predicate=None):
"""Returns the transitive dependency closure of `addresses` using BFS.
:API: public
:param list<Address> addresses: The closure of `addresses` will be walked.
:param function predicate: If this parameter is not given, no Targets will be filtered
out of the closure. If it is given, any Target which fails the predicate will not be
walked, nor will its dependencies. Thus predicate effectively trims out any subgraph
that would only be reachable through Targets that fail the predicate.
:param function dep_predicate: Takes two parameters, the current target and the dependency of
the current target. If this parameter is not given, no dependencies will be filtered
when traversing the closure. If it is given, when the predicate fails, the edge to the dependency
will not be expanded.
"""
walk = self._walk_factory(dep_predicate)
ordered_closure = OrderedSet()
to_walk = deque((0, addr) for addr in addresses)
while len(to_walk) > 0:
level, address = to_walk.popleft()
if not walk.expand_once(address, level):
continue
target = self._target_by_address[address]
if predicate and not predicate(target):
continue
if walk.do_work_once(address):
ordered_closure.add(target)
for dep_address in self._target_dependencies_by_address[address]:
if walk.expanded_or_worked(dep_address):
continue
if walk.dep_predicate(target, self._target_by_address[dep_address], level):
to_walk.append((level + 1, dep_address))
return ordered_closure
def inject_synthetic_target(
self, address, target_type, dependencies=None, derived_from=None, **kwargs
):
"""Constructs and injects Target at `address` with optional `dependencies` and
`derived_from`.
This method is useful especially for codegen, where a "derived" Target is injected
programmatically rather than read in from a BUILD file.
:API: public
:param Address address: The address of the new Target. Must not already be in the BuildGraph.
:param type target_type: The class of the Target to be constructed.
:param list<Address> dependencies: The dependencies of this Target, usually inferred or copied
from the `derived_from`.
:param Target derived_from: The Target this Target will derive from.
"""
target = target_type(name=address.target_name, address=address, build_graph=self, **kwargs)
self.inject_target(
target, dependencies=dependencies, derived_from=derived_from, synthetic=True
)
def maybe_inject_address_closure(self, address):
"""If the given address is not already injected to the graph, calls inject_address_closure.
:API: public
:param Address address: The address to inject. Must be resolvable by `self._address_mapper` or
else be the address of an already injected entity.
"""
if not self.contains_address(address):
self.inject_address_closure(address)
@abstractmethod
def inject_address_closure(self, address):
"""Resolves, constructs and injects a Target and its transitive closure of dependencies.
This method is idempotent and will short circuit for already injected addresses. For all other
addresses though, it delegates to an internal AddressMapper to resolve item the address points
to.
:API: public
:param Address address: The address to inject. Must be resolvable by `self._address_mapper` or
else be the address of an already injected entity.
"""
@abstractmethod
def inject_address_specs_closure(self, address_specs: Iterable[AddressSpec], fail_fast=None):
"""Resolves, constructs and injects Targets and their transitive closures of dependencies.
:API: public
:param address_specs: An iterable of AddressSpec objects to resolve and inject.
:param fail_fast: Whether to fail quickly for the first error, or to complete all
possible injections before failing.
:returns: Yields a sequence of resolved Address objects.
"""
def inject_specs_closure(self, specs: Iterable[AddressSpec], fail_fast=None):
"""Resolves, constructs and injects Targets and their transitive closures of dependencies.
:API: public
:param specs: An iterable of AddressSpec objects to resolve and inject.
:param fail_fast: Whether to fail quickly for the first error, or to complete all
possible injections before failing.
:returns: Yields a sequence of resolved Address objects.
"""
warn_or_error(
removal_version="1.27.0.dev0",
deprecated_entity_description="inject_specs_closure()",
hint="Use `inject_address_specs_closure()` instead. The API is the same as "
"before. This change is to prepare for Pants eventually supporting file system specs, "
"e.g. `./pants cloc foo.py`. In preparation, we renamed `Spec` to `AddressSpec`.",
)
def resolve(self, spec):
"""Returns an iterator over the target(s) the given address points to."""
address = Address.parse(spec)
# NB: This is an idempotent, short-circuiting call.
self.inject_address_closure(address)
return self.transitive_subgraph_of_addresses([address])
@abstractmethod
def resolve_address(self, address):
"""Maps an address in the virtual address space to an object.
:param Address address: the address to lookup in a BUILD file
:raises AddressLookupError: if the path to the address is not found.
:returns: The Addressable which address points to.
"""
class CycleException(Exception):
"""Thrown when a circular dependency is detected.
:API: public
"""
def __init__(self, cycle):
super().__init__(
"Cycle detected:\n\t{}".format(" ->\n\t".join(target.address.spec for target in cycle))
)
def invert_dependencies(targets):
"""
:API: public
:return: the full graph of dependencies for `targets` and the list of roots.
"""
roots = set()
inverted_deps = defaultdict(OrderedSet) # target -> dependent targets
visited = set()
path = OrderedSet()
def invert(tgt):
if tgt in path:
path_list = list(path)
cycle_head = path_list.index(tgt)
cycle = path_list[cycle_head:] + [tgt]
raise CycleException(cycle)
path.add(tgt)
if tgt not in visited:
visited.add(tgt)
if tgt.dependencies:
for dependency in tgt.dependencies:
inverted_deps[dependency].add(tgt)
invert(dependency)
else:
roots.add(tgt)
path.remove(tgt)
for target in targets:
invert(target)
return roots, inverted_deps
def sort_targets(targets):
"""
:API: public
:return: the targets that `targets` depend on sorted from most dependent to least.
"""
roots, inverted_deps = invert_dependencies(targets)
ordered = []
visited = set()
def topological_sort(target):
if target not in visited:
visited.add(target)
if target in inverted_deps:
for dep in inverted_deps[target]:
topological_sort(dep)
ordered.append(target)
for root in roots:
topological_sort(root)
return ordered