import contextlib
import errno
import hashlib
import json
import os
import re
import shutil
import sys
import tempfile
import torch
import warnings
import zipfile
from pathlib import Path
from typing import Dict, Optional, Any
from urllib.error import HTTPError, URLError
from urllib.request import urlopen, Request
from urllib.parse import urlparse # noqa: F401
from torch.serialization import MAP_LOCATION
class _Faketqdm: # type: ignore[no-redef]
def __init__(self, total=None, disable=False,
unit=None, *args, **kwargs):
self.total = total
self.disable = disable
self.n = 0
# Ignore all extra *args and **kwargs lest you want to reinvent tqdm
def update(self, n):
if self.disable:
return
self.n += n
if self.total is None:
sys.stderr.write("\r{0:.1f} bytes".format(self.n))
else:
sys.stderr.write("\r{0:.1f}%".format(100 * self.n / float(self.total)))
sys.stderr.flush()
def close(self):
self.disable = True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.disable:
return
sys.stderr.write('\n')
try:
from tqdm import tqdm # If tqdm is installed use it, otherwise use the fake wrapper
except ImportError:
tqdm = _Faketqdm
__all__ = [
'download_url_to_file',
'get_dir',
'help',
'list',
'load',
'load_state_dict_from_url',
'set_dir',
]
# matches bfd8deac from resnet18-bfd8deac.pth
HASH_REGEX = re.compile(r'-([a-f0-9]*)\.')
_TRUSTED_REPO_OWNERS = ("facebookresearch", "facebookincubator", "pytorch", "fairinternal")
ENV_GITHUB_TOKEN = 'GITHUB_TOKEN'
ENV_TORCH_HOME = 'TORCH_HOME'
ENV_XDG_CACHE_HOME = 'XDG_CACHE_HOME'
DEFAULT_CACHE_DIR = '~/.cache'
VAR_DEPENDENCY = 'dependencies'
MODULE_HUBCONF = 'hubconf.py'
READ_DATA_CHUNK = 8192
_hub_dir = None
@contextlib.contextmanager
def _add_to_sys_path(path):
sys.path.insert(0, path)
try:
yield
finally:
sys.path.remove(path)
# Copied from tools/shared/module_loader to be included in torch package
def _import_module(name, path):
import importlib.util
from importlib.abc import Loader
spec = importlib.util.spec_from_file_location(name, path)
assert spec is not None
module = importlib.util.module_from_spec(spec)
assert isinstance(spec.loader, Loader)
spec.loader.exec_module(module)
return module
def _remove_if_exists(path):
if os.path.exists(path):
if os.path.isfile(path):
os.remove(path)
else:
shutil.rmtree(path)
def _git_archive_link(repo_owner, repo_name, ref):
# See https://docs.github.com/en/rest/reference/repos#download-a-repository-archive-zip
return f"https://github.com/{repo_owner}/{repo_name}/zipball/{ref}"
def _load_attr_from_module(module, func_name):
# Check if callable is defined in the module
if func_name not in dir(module):
return None
return getattr(module, func_name)
def _get_torch_home():
torch_home = os.path.expanduser(
os.getenv(ENV_TORCH_HOME,
os.path.join(os.getenv(ENV_XDG_CACHE_HOME,
DEFAULT_CACHE_DIR), 'torch')))
return torch_home
def _parse_repo_info(github):
if ':' in github:
repo_info, ref = github.split(':')
else:
repo_info, ref = github, None
repo_owner, repo_name = repo_info.split('/')
if ref is None:
# The ref wasn't specified by the user, so we need to figure out the
# default branch: main or master. Our assumption is that if main exists
# then it's the default branch, otherwise it's master.
try:
with urlopen(f"https://github.com/{repo_owner}/{repo_name}/tree/main/"):
ref = 'main'
except HTTPError as e:
if e.code == 404:
ref = 'master'
else:
raise
except URLError as e:
# No internet connection, need to check for cache as last resort
for possible_ref in ("main", "master"):
if os.path.exists(f"{get_dir()}/{repo_owner}_{repo_name}_{possible_ref}"):
ref = possible_ref
break
if ref is None:
raise RuntimeError(
"It looks like there is no internet connection and the "
f"repo could not be found in the cache ({get_dir()})"
) from e
return repo_owner, repo_name, ref
def _read_url(url):
with urlopen(url) as r:
return r.read().decode(r.headers.get_content_charset('utf-8'))
def _validate_not_a_forked_repo(repo_owner, repo_name, ref):
# Use urlopen to avoid depending on local git.
headers = {'Accept': 'application/vnd.github.v3+json'}
token = os.environ.get(ENV_GITHUB_TOKEN)
if token is not None:
headers['Authorization'] = f'token {token}'
for url_prefix in (
f'https://api.github.com/repos/{repo_owner}/{repo_name}/branches',
f'https://api.github.com/repos/{repo_owner}/{repo_name}/tags'):
page = 0
while True:
page += 1
url = f'{url_prefix}?per_page=100&page={page}'
response = json.loads(_read_url(Request(url, headers=headers)))
# Empty response means no more data to process
if not response:
break
for br in response:
if br['name'] == ref or br['commit']['sha'].startswith(ref):
return
raise ValueError(f'Cannot find {ref} in https://github.com/{repo_owner}/{repo_name}. '
'If it\'s a commit from a forked repo, please call hub.load() with forked repo directly.')
def _get_cache_or_reload(github, force_reload, trust_repo, calling_fn, verbose=True, skip_validation=False):
# Setup hub_dir to save downloaded files
hub_dir = get_dir()
if not os.path.exists(hub_dir):
os.makedirs(hub_dir)
# Parse github repo information
repo_owner, repo_name, ref = _parse_repo_info(github)
# Github allows branch name with slash '/',
# this causes confusion with path on both Linux and Windows.
# Backslash is not allowed in Github branch name so no need to
# to worry about it.
normalized_br = ref.replace('/', '_')
# Github renames folder repo-v1.x.x to repo-1.x.x
# We don't know the repo name before downloading the zip file
# and inspect name from it.
# To check if cached repo exists, we need to normalize folder names.
owner_name_branch = '_'.join([repo_owner, repo_name, normalized_br])
repo_dir = os.path.join(hub_dir, owner_name_branch)
# Check that the repo is in the trusted list
_check_repo_is_trusted(repo_owner, repo_name, owner_name_branch, trust_repo=trust_repo, calling_fn=calling_fn)
use_cache = (not force_reload) and os.path.exists(repo_dir)
if use_cache:
if verbose:
sys.stderr.write('Using cache found in {}\n'.format(repo_dir))
else:
# Validate the tag/branch is from the original repo instead of a forked repo
if not skip_validation:
_validate_not_a_forked_repo(repo_owner, repo_name, ref)
cached_file = os.path.join(hub_dir, normalized_br + '.zip')
_remove_if_exists(cached_file)
try:
url = _git_archive_link(repo_owner, repo_name, ref)
sys.stderr.write('Downloading: \"{}\" to {}\n'.format(url, cached_file))
download_url_to_file(url, cached_file, progress=False)
except HTTPError as err:
if err.code == 300:
# Getting a 300 Multiple Choices error likely means that the ref is both a tag and a branch
# in the repo. This can be disambiguated by explicitely using refs/heads/ or refs/tags
# See https://git-scm.com/book/en/v2/Git-Internals-Git-References
# Here, we do the same as git: we throw a warning, and assume the user wanted the branch
warnings.warn(
f"The ref {ref} is ambiguous. Perhaps it is both a tag and a branch in the repo? "
"Torchhub will now assume that it's a branch. "
"You can disambiguate tags and branches by explicitly passing refs/heads/branch_name or "
"refs/tags/tag_name as the ref. That might require using skip_validation=True."
)
disambiguated_branch_ref = f"refs/heads/{ref}"
url = _git_archive_link(repo_owner, repo_name, ref=disambiguated_branch_ref)
download_url_to_file(url, cached_file, progress=False)
else:
raise
with zipfile.ZipFile(cached_file) as cached_zipfile:
extraced_repo_name = cached_zipfile.infolist()[0].filename
extracted_repo = os.path.join(hub_dir, extraced_repo_name)
_remove_if_exists(extracted_repo)
# Unzip the code and rename the base folder
cached_zipfile.extractall(hub_dir)
_remove_if_exists(cached_file)
_remove_if_exists(repo_dir)
shutil.move(extracted_repo, repo_dir) # rename the repo
return repo_dir
def _check_repo_is_trusted(repo_owner, repo_name, owner_name_branch, trust_repo, calling_fn="load"):
hub_dir = get_dir()
filepath = os.path.join(hub_dir, "trusted_list")
if not os.path.exists(filepath):
Path(filepath).touch()
with open(filepath, 'r') as file:
trusted_repos = tuple(line.strip() for line in file)
# To minimize friction of introducing the new trust_repo mechanism, we consider that
# if a repo was already downloaded by torchhub, then it is already trusted (even if it's not in the allowlist)
trusted_repos_legacy = next(os.walk(hub_dir))[1]
owner_name = '_'.join([repo_owner, repo_name])
is_trusted = (
owner_name in trusted_repos
or owner_name_branch in trusted_repos_legacy
or repo_owner in _TRUSTED_REPO_OWNERS
)
# TODO: Remove `None` option in 2.0 and change the default to "check"
if trust_repo is None:
if not is_trusted:
warnings.warn(
"You are about to download and run code from an untrusted repository. In a future release, this won't "
"be allowed. To add the repository to your trusted list, change the command to {calling_fn}(..., "
"trust_repo=False) and a command prompt will appear asking for an explicit confirmation of trust, "
f"or {calling_fn}(..., trust_repo=True), which will assume that the prompt is to be answered with "
f"'yes'. You can also use {calling_fn}(..., trust_repo='check') which will only prompt for "
f"confirmation if the repo is not already trusted. This will eventually be the default behaviour")
return
if (trust_repo is False) or (trust_repo == "check" and not is_trusted):
response = input(
f"The repository {owner_name} does not belong to the list of trusted repositories and as such cannot be downloaded. "
"Do you trust this repository and wish to add it to the trusted list of repositories (y/N)?")
if response.lower() in ("y", "yes"):
if is_trusted:
print("The repository is already trusted.")
elif response.lower() in ("n", "no", ""):
raise Exception("Untrusted repository.")
else:
raise ValueError(f"Unrecognized response {response}.")
# At this point we're sure that the user trusts the repo (or wants to trust it)
if not is_trusted:
with open(filepath, "a") as file:
file.write(owner_name + "\n")
def _check_module_exists(name):
import importlib.util
return importlib.util.find_spec(name) is not None
def _check_dependencies(m):
dependencies = _load_attr_from_module(m, VAR_DEPENDENCY)
if dependencies is not None:
missing_deps = [pkg for pkg in dependencies if not _check_module_exists(pkg)]
if len(missing_deps):
raise RuntimeError('Missing dependencies: {}'.format(', '.join(missing_deps)))
def _load_entry_from_hubconf(m, model):
if not isinstance(model, str):
raise ValueError('Invalid input: model should be a string of function name')
# Note that if a missing dependency is imported at top level of hubconf, it will
# throw before this function. It's a chicken and egg situation where we have to
# load hubconf to know what're the dependencies, but to import hubconf it requires
# a missing package. This is fine, Python will throw proper error message for users.
_check_dependencies(m)
func = _load_attr_from_module(m, model)
if func is None or not callable(func):
raise RuntimeError('Cannot find callable {} in hubconf'.format(model))
return func
def get_dir():
Loading ...