Repository URL to install this package:
|
Version:
1.26.0.dev0+gite506aa5f ▾
|
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
import logging
import multiprocessing
import queue
import threading
import requests
from requests import RequestException
from pants.cache.artifact_cache import ArtifactCache, NonfatalArtifactCacheError, UnreadableArtifact
logger = logging.getLogger(__name__)
# Reduce the somewhat verbose logging of requests.
# TODO do this in a central place
logging.getLogger("requests").setLevel(logging.WARNING)
class RequestsSession:
_session = None
@classmethod
def instance(cls):
if cls._session is None:
cls._session = requests.Session()
return cls._session
class RESTfulArtifactCache(ArtifactCache):
"""An artifact cache that stores the artifacts on a RESTful service."""
READ_SIZE_BYTES = 4 * 1024 * 1024
def __init__(
self, artifact_root, best_url_selector, local, read_timeout=4.0, write_timeout=4.0
):
"""
:param string artifact_root: The path under which cacheable products will be read/written.
:param BestUrlSelector best_url_selector: Url selector that supports fail-over. Each returned
url represents prefix for some RESTful service. We must be able to PUT and GET to any path
under this base.
:param BaseLocalArtifactCache local: local cache instance for storing and creating artifacts
"""
super().__init__(artifact_root)
self.best_url_selector = best_url_selector
self._read_timeout_secs = read_timeout
self._write_timeout_secs = write_timeout
self._localcache = local
def try_insert(self, cache_key, paths):
# Delegate creation of artifact to local cache.
with self._localcache.insert_paths(cache_key, paths) as tarfile:
# Upload local artifact to remote cache.
with open(tarfile, "rb") as infile:
if not self._request("PUT", cache_key, body=infile):
raise NonfatalArtifactCacheError("Failed to PUT {0}.".format(cache_key))
def has(self, cache_key):
if self._localcache.has(cache_key):
return True
return self._request("HEAD", cache_key) is not None
def use_cached_files(self, cache_key, results_dir=None):
if self._localcache.has(cache_key):
return self._localcache.use_cached_files(cache_key, results_dir)
queue = multiprocessing.Queue()
try:
response = self._request("GET", cache_key)
if response is not None:
threading.Thread(
target=_log_if_no_response,
args=(
60,
"\nStill downloading artifacts (either they're very large or the connection to the cache is slow)",
queue.get,
),
).start()
# Delegate storage and extraction to local cache
byte_iter = response.iter_content(self.READ_SIZE_BYTES)
res = self._localcache.store_and_use_artifact(cache_key, byte_iter, results_dir)
queue.put(None)
return res
except Exception as e:
logger.warning("\nError while reading from remote artifact cache: {0}\n".format(e))
queue.put(None)
# TODO(peiyu): clean up partially downloaded local file if any
return UnreadableArtifact(cache_key, e)
return False
def delete(self, cache_key):
self._localcache.delete(cache_key)
self._request("DELETE", cache_key)
# Returns a response if we get a 200, None if we get a 404 and raises an exception otherwise.
def _request(self, method, cache_key, body=None):
session = RequestsSession.instance()
with self.best_url_selector.select_best_url() as best_url:
url = self._url_for_key(best_url, cache_key)
logger.debug("Sending {0} request to {1}".format(method, url))
try:
if "PUT" == method:
response = session.put(
url, data=body, timeout=self._write_timeout_secs, allow_redirects=True
)
elif "GET" == method:
response = session.get(
url, timeout=self._read_timeout_secs, stream=True, allow_redirects=True
)
elif "HEAD" == method:
response = session.head(
url, timeout=self._read_timeout_secs, allow_redirects=True
)
elif "DELETE" == method:
response = session.delete(
url, timeout=self._write_timeout_secs, allow_redirects=True
)
else:
raise ValueError("Unknown request method {0}".format(method))
except RequestException as e:
raise NonfatalArtifactCacheError(
"Failed to {0} {1}. Error: {2}".format(method, url, e)
)
# Allow all 2XX responses. E.g., nginx returns 201 on PUT. HEAD may return 204.
if int(response.status_code / 100) == 2:
return response
elif response.status_code == 404:
logger.debug("404 returned for {0} request to {1}".format(method, url))
return None
else:
raise NonfatalArtifactCacheError(
"Failed to {0} {1}. Error: {2} {3}".format(
method, url, response.status_code, response.reason
)
)
def _url_suffix_for_key(self, cache_key):
return "{0}/{1}.tgz".format(cache_key.id, cache_key.hash)
def _url_for_key(self, url, cache_key):
path_prefix = url.path.rstrip("/")
path = "{0}/{1}".format(path_prefix, self._url_suffix_for_key(cache_key))
return "{0}://{1}{2}".format(url.scheme, url.netloc, path)
def _log_if_no_response(timeout_seconds, message, getter):
while True:
try:
getter(True, timeout_seconds)
return
except queue.Empty:
logger.info(message)