Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pantsbuild.pants / cache / cache_setup.py
Size: Mime:
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import os
import threading
from collections import namedtuple
from urllib.parse import urlparse

from pants.base.build_environment import get_buildroot
from pants.cache.artifact import TarballArtifact
from pants.cache.artifact_cache import ArtifactCacheError
from pants.cache.local_artifact_cache import LocalArtifactCache, TempLocalArtifactCache
from pants.cache.pinger import BestUrlSelector, Pinger
from pants.cache.resolver import NoopResolver, Resolver, RESTfulResolver
from pants.cache.restful_artifact_cache import RESTfulArtifactCache
from pants.subsystem.subsystem import Subsystem
from pants.util.memo import memoized_property


class EmptyCacheSpecError(ArtifactCacheError):
    pass


class LocalCacheSpecRequiredError(ArtifactCacheError):
    pass


class CacheSpecFormatError(ArtifactCacheError):
    pass


class InvalidCacheSpecError(ArtifactCacheError):
    pass


class RemoteCacheSpecRequiredError(ArtifactCacheError):
    pass


class TooManyCacheSpecsError(ArtifactCacheError):
    pass


CacheSpec = namedtuple("CacheSpec", ["local", "remote"])


class CacheSetup(Subsystem):
    options_scope = "cache"

    @classmethod
    def register_options(cls, register):
        super().register_options(register)
        default_cache = [os.path.join(get_buildroot(), ".cache")]
        register(
            "--ignore",
            type=bool,
            help="Ignore all other cache configuration and skip using the cache.",
        )
        register(
            "--read", type=bool, default=True, help="Read build artifacts from cache, if available."
        )
        register(
            "--write", type=bool, default=True, help="Write build artifacts to cache, if available."
        )
        register(
            "--overwrite",
            advanced=True,
            type=bool,
            help="If writing build artifacts to cache, overwrite existing artifacts "
            "instead of skipping them.",
        )
        register(
            "--resolver",
            advanced=True,
            choices=["none", "rest"],
            default="none",
            help="Select which resolver strategy to use for discovering URIs that access "
            "artifact caches. none: use URIs from static config options, i.e. "
            "--read-from, --write-to. rest: look up URIs by querying a RESTful "
            "URL, which is a remote address from --read-from, --write-to.",
        )
        register(
            "--read-from",
            advanced=True,
            type=list,
            default=default_cache,
            help="The URIs of artifact caches to read directly from. Each entry is a URL of "
            "a RESTful cache, a path of a filesystem cache, or a pipe-separated list of "
            "alternate caches to choose from. This list is also used as input to "
            "the resolver. When resolver is 'none' list is used as is.",
        )
        register(
            "--write-to",
            advanced=True,
            type=list,
            default=default_cache,
            help="The URIs of artifact caches to write directly to. Each entry is a URL of"
            "a RESTful cache, a path of a filesystem cache, or a pipe-separated list of "
            "alternate caches to choose from. This list is also used as input to "
            "the resolver. When resolver is 'none' list is used as is.",
        )
        register(
            "--read-timeout",
            advanced=True,
            type=float,
            default=4.0,
            help="The read timeout for any remote caches in use, in seconds.",
        )
        register(
            "--write-timeout",
            advanced=True,
            type=float,
            default=4.0,
            help="The write timeout for any remote caches in use, in seconds.",
        )
        register(
            "--compression-level",
            advanced=True,
            type=int,
            default=5,
            help="The gzip compression level (0-9) for created artifacts.",
        )
        register(
            "--dereference-symlinks",
            type=bool,
            default=True,
            fingerprint=True,
            help="Dereference symlinks when creating cache tarball.",
        )
        register(
            "--max-entries-per-target",
            advanced=True,
            type=int,
            default=8,
            help="Maximum number of old cache files to keep per task target pair",
        )
        register(
            "--pinger-timeout",
            advanced=True,
            type=float,
            default=0.5,
            help="number of seconds before pinger times out",
        )
        register(
            "--pinger-tries",
            advanced=True,
            type=int,
            default=2,
            help="number of times pinger tries a cache",
        )
        register(
            "--write-permissions",
            advanced=True,
            type=str,
            default=None,
            help="Permissions to use when writing artifacts to a local cache, in octal.",
        )

    @classmethod
    def create_cache_factory_for_task(cls, task, **kwargs):
        scoped_options = cls.scoped_instance(task).get_options()
        return CacheFactory(scoped_options, task.context.log, task, **kwargs)


class CacheFactory:
    def __init__(self, options, log, task, pinger=None, resolver=None):
        """Create a cache factory from settings.

        :param options: Task's scoped options.
        :param log: Task's context log.
        :param task: Task to cache results for.
        :param pinger: Pinger to choose the best remote artifact cache URL.
        :param resolver: Resolver to look up remote artifact cache URLs.
        :return: cache factory.
        """
        self._options = options
        self._log = log
        self._task = task

        # Created on-demand.
        self._read_cache = None
        self._write_cache = None

        # Protects local filesystem setup, and assignment to the references above.
        self._cache_setup_lock = threading.Lock()

        # Caches are supposed to be close, and we don't want to waste time pinging on no-op builds.
        # So we ping twice with a short timeout.
        # TODO: Make lazy.
        self._pinger = pinger or Pinger(
            timeout=self._options.pinger_timeout, tries=self._options.pinger_tries
        )

        # resolver is also close but failing to resolve might have broader impact than
        # single ping failure, therefore use a higher timeout with more retries.
        if resolver:
            self._resolver = resolver
        elif self._options.resolver == "rest":
            self._resolver = RESTfulResolver(timeout=1.0, tries=3)
        else:
            self._resolver = NoopResolver()

        TarballArtifact.NATIVE_BINARY = task.context._scheduler._scheduler._native

    @staticmethod
    def make_task_cache_dirname(task):
        """Use the task fingerprint as the name of the cache subdirectory to store results from the
        task."""
        return task.fingerprint

    @memoized_property
    def _cache_dirname(self):
        return self.make_task_cache_dirname(self._task)

    @property
    def ignore(self):
        return self._options.ignore

    def read_cache_available(self):
        return not self.ignore and self._options.read and self.get_read_cache()

    def write_cache_available(self):
        return not self.ignore and self._options.write and self.get_write_cache()

    def overwrite(self):
        return self._options.overwrite

    def get_read_cache(self):
        """Returns the read cache for this setup, creating it if necessary.

        Returns None if no read cache is configured.
        """
        if self._options.read_from and not self._read_cache:
            cache_spec = self._resolve(self._sanitize_cache_spec(self._options.read_from))
            if cache_spec:
                with self._cache_setup_lock:
                    self._read_cache = self._do_create_artifact_cache(cache_spec, "will read from")
        return self._read_cache

    def get_write_cache(self):
        """Returns the write cache for this setup, creating it if necessary.

        Returns None if no write cache is configured.
        """
        if self._options.write_to and not self._write_cache:
            cache_spec = self._resolve(self._sanitize_cache_spec(self._options.write_to))
            if cache_spec:
                with self._cache_setup_lock:
                    self._write_cache = self._do_create_artifact_cache(cache_spec, "will write to")
        return self._write_cache

    # VisibleForTesting
    def _sanitize_cache_spec(self, spec):
        if not isinstance(spec, (list, tuple)):
            raise InvalidCacheSpecError(
                "Invalid artifact cache spec type: {0} ({1})".format(type(spec), spec)
            )

        if not spec:
            raise EmptyCacheSpecError()

        if len(spec) > 2:
            raise TooManyCacheSpecsError("Too many artifact cache specs: ({0})".format(spec))

        local_specs = [s for s in spec if self.is_local(s)]
        remote_specs = [s for s in spec if self.is_remote(s)]

        if not local_specs and not remote_specs:
            raise CacheSpecFormatError(
                "Invalid cache spec: {0}, must be either local or remote".format(spec)
            )

        if len(spec) == 2:
            if not local_specs:
                raise LocalCacheSpecRequiredError(
                    "One of two cache specs must be a local cache path."
                )
            if not remote_specs:
                raise RemoteCacheSpecRequiredError("One of two cache specs must be a remote spec.")

        local_spec = local_specs[0] if len(local_specs) > 0 else None
        remote_spec = remote_specs[0] if len(remote_specs) > 0 else None

        return CacheSpec(local=local_spec, remote=remote_spec)

    # VisibleForTesting
    def _resolve(self, spec):
        """Attempt resolving cache URIs when a remote spec is provided."""
        if not spec.remote:
            return spec

        try:
            resolved_urls = self._resolver.resolve(spec.remote)
            if resolved_urls:
                # keep the bar separated list of URLs convention
                return CacheSpec(local=spec.local, remote="|".join(resolved_urls))
            # no-op
            return spec
        except Resolver.ResolverError as e:
            self._log.warn("Error while resolving from {0}: {1}".format(spec.remote, str(e)))
            # If for some reason resolver fails we continue to use local cache
            if spec.local:
                return CacheSpec(local=spec.local, remote=None)
            # resolver fails but there is no local cache
            return None

    @staticmethod
    def is_local(string_spec):
        return string_spec.startswith("/") or string_spec.startswith("~")

    @staticmethod
    def is_remote(string_spec):
        # both artifact cache and resolver use REST, add new protocols here once they are supported
        return string_spec.startswith("http://") or string_spec.startswith("https://")

    def _baseurl(self, url):
        parsed_url = urlparse(url)
        return "{scheme}://{netloc}".format(scheme=parsed_url.scheme, netloc=parsed_url.netloc)

    def get_available_urls(self, urls):
        """Return reachable urls sorted by their ping times."""
        baseurl_to_urls = {self._baseurl(url): url for url in urls}
        pingtimes = self._pinger.pings(
            list(baseurl_to_urls.keys())
        )  # List of pairs (host, time in ms).
        self._log.debug(
            "Artifact cache server ping times: {}".format(
                ", ".join(["{}: {:.6f} secs".format(*p) for p in pingtimes])
            )
        )

        sorted_pingtimes = sorted(pingtimes, key=lambda x: x[1])
        available_urls = [
            baseurl_to_urls[baseurl]
            for baseurl, pingtime in sorted_pingtimes
            if pingtime < Pinger.UNREACHABLE
        ]
        self._log.debug("Available cache servers: {0}".format(available_urls))

        return available_urls

    def _do_create_artifact_cache(self, spec, action):
        """Returns an artifact cache for the specified spec.

        spec can be:
          - a path to a file-based cache root.
          - a URL of a RESTful cache root.
          - a bar-separated list of URLs, where we'll pick the one with the best ping times.
          - A list or tuple of two specs, local, then remote, each as described above
        """
        compression = self._options.compression_level
        if compression not in range(1, 10):
            raise ValueError("compression_level must be an integer 1-9: {}".format(compression))

        artifact_root = self._options.pants_workdir
        # If the artifact root is a symlink it is more efficient to readlink the symlink
        # only once in a FUSE context like VCFS. The artifact extraction root lets us extract
        # artifacts directly side-stepping a VCFS lookup if in use.
        artifact_extraction_root = (
            os.readlink(artifact_root) if os.path.islink(artifact_root) else artifact_root
        )

        def create_local_cache(parent_path):
            path = os.path.join(parent_path, self._cache_dirname)
            self._log.debug(
                "{0} {1} local artifact cache at {2}".format(self._task.stable_name(), action, path)
            )
            return LocalArtifactCache(
                artifact_root,
                artifact_extraction_root,
                path,
                compression,
                self._options.max_entries_per_target,
                permissions=self._options.write_permissions,
                dereference=self._options.dereference_symlinks,
            )

        def create_remote_cache(remote_spec, local_cache):
            urls = self.get_available_urls(remote_spec.split("|"))

            if len(urls) > 0:
                best_url_selector = BestUrlSelector(
                    ["{}/{}".format(url.rstrip("/"), self._cache_dirname) for url in urls]
                )
                local_cache = local_cache or TempLocalArtifactCache(
                    artifact_root, artifact_extraction_root, compression
                )
                return RESTfulArtifactCache(
                    artifact_root,
                    best_url_selector,
                    local_cache,
                    read_timeout=self._options.read_timeout,
                    write_timeout=self._options.write_timeout,
                )

        local_cache = create_local_cache(spec.local) if spec.local else None
        remote_cache = create_remote_cache(spec.remote, local_cache) if spec.remote else None
        if remote_cache:
            return remote_cache
        return local_cache