# -*- coding: utf-8 -*-
#
# Copyright (C) 2013-2017 Vinay Sajip.
# Licensed to the Python Software Foundation under a contributor agreement.
# See LICENSE.txt and CONTRIBUTORS.txt.
#
from __future__ import unicode_literals
import base64
import codecs
import datetime
import distutils.util
from email import message_from_file
import hashlib
import imp
import json
import logging
import os
import posixpath
import re
import shutil
import sys
import tempfile
import zipfile
from . import __version__, DistlibException
from .compat import sysconfig, ZipFile, fsdecode, text_type, filter
from .database import InstalledDistribution
from .metadata import Metadata, METADATA_FILENAME, WHEEL_METADATA_FILENAME
from .util import (FileOperator, convert_path, CSVReader, CSVWriter, Cache,
cached_property, get_cache_base, read_exports, tempdir)
from .version import NormalizedVersion, UnsupportedVersionError
logger = logging.getLogger(__name__)
cache = None # created when needed
if hasattr(sys, 'pypy_version_info'): # pragma: no cover
IMP_PREFIX = 'pp'
elif sys.platform.startswith('java'): # pragma: no cover
IMP_PREFIX = 'jy'
elif sys.platform == 'cli': # pragma: no cover
IMP_PREFIX = 'ip'
else:
IMP_PREFIX = 'cp'
VER_SUFFIX = sysconfig.get_config_var('py_version_nodot')
if not VER_SUFFIX: # pragma: no cover
VER_SUFFIX = '%s%s' % sys.version_info[:2]
PYVER = 'py' + VER_SUFFIX
IMPVER = IMP_PREFIX + VER_SUFFIX
ARCH = distutils.util.get_platform().replace('-', '_').replace('.', '_')
ABI = sysconfig.get_config_var('SOABI')
if ABI and ABI.startswith('cpython-'):
ABI = ABI.replace('cpython-', 'cp')
else:
def _derive_abi():
parts = ['cp', VER_SUFFIX]
if sysconfig.get_config_var('Py_DEBUG'):
parts.append('d')
if sysconfig.get_config_var('WITH_PYMALLOC'):
parts.append('m')
if sysconfig.get_config_var('Py_UNICODE_SIZE') == 4:
parts.append('u')
return ''.join(parts)
ABI = _derive_abi()
del _derive_abi
FILENAME_RE = re.compile(r'''
(?P<nm>[^-]+)
-(?P<vn>\d+[^-]*)
(-(?P<bn>\d+[^-]*))?
-(?P<py>\w+\d+(\.\w+\d+)*)
-(?P<bi>\w+)
-(?P<ar>\w+(\.\w+)*)
\.whl$
''', re.IGNORECASE | re.VERBOSE)
NAME_VERSION_RE = re.compile(r'''
(?P<nm>[^-]+)
-(?P<vn>\d+[^-]*)
(-(?P<bn>\d+[^-]*))?$
''', re.IGNORECASE | re.VERBOSE)
SHEBANG_RE = re.compile(br'\s*#![^\r\n]*')
SHEBANG_DETAIL_RE = re.compile(br'^(\s*#!("[^"]+"|\S+))\s+(.*)$')
SHEBANG_PYTHON = b'#!python'
SHEBANG_PYTHONW = b'#!pythonw'
if os.sep == '/':
to_posix = lambda o: o
else:
to_posix = lambda o: o.replace(os.sep, '/')
class Mounter(object):
def __init__(self):
self.impure_wheels = {}
self.libs = {}
def add(self, pathname, extensions):
self.impure_wheels[pathname] = extensions
self.libs.update(extensions)
def remove(self, pathname):
extensions = self.impure_wheels.pop(pathname)
for k, v in extensions:
if k in self.libs:
del self.libs[k]
def find_module(self, fullname, path=None):
if fullname in self.libs:
result = self
else:
result = None
return result
def load_module(self, fullname):
if fullname in sys.modules:
result = sys.modules[fullname]
else:
if fullname not in self.libs:
raise ImportError('unable to find extension for %s' % fullname)
result = imp.load_dynamic(fullname, self.libs[fullname])
result.__loader__ = self
parts = fullname.rsplit('.', 1)
if len(parts) > 1:
result.__package__ = parts[0]
return result
_hook = Mounter()
class Wheel(object):
"""
Class to build and install from Wheel files (PEP 427).
"""
wheel_version = (1, 1)
hash_kind = 'sha256'
def __init__(self, filename=None, sign=False, verify=False):
"""
Initialise an instance using a (valid) filename.
"""
self.sign = sign
self.should_verify = verify
self.buildver = ''
self.pyver = [PYVER]
self.abi = ['none']
self.arch = ['any']
self.dirname = os.getcwd()
if filename is None:
self.name = 'dummy'
self.version = '0.1'
self._filename = self.filename
else:
m = NAME_VERSION_RE.match(filename)
if m:
info = m.groupdict('')
self.name = info['nm']
# Reinstate the local version separator
self.version = info['vn'].replace('_', '-')
self.buildver = info['bn']
self._filename = self.filename
else:
dirname, filename = os.path.split(filename)
m = FILENAME_RE.match(filename)
if not m:
raise DistlibException('Invalid name or '
'filename: %r' % filename)
if dirname:
self.dirname = os.path.abspath(dirname)
self._filename = filename
info = m.groupdict('')
self.name = info['nm']
self.version = info['vn']
self.buildver = info['bn']
self.pyver = info['py'].split('.')
self.abi = info['bi'].split('.')
self.arch = info['ar'].split('.')
@property
def filename(self):
"""
Build and return a filename from the various components.
"""
if self.buildver:
buildver = '-' + self.buildver
else:
buildver = ''
pyver = '.'.join(self.pyver)
abi = '.'.join(self.abi)
arch = '.'.join(self.arch)
# replace - with _ as a local version separator
version = self.version.replace('-', '_')
return '%s-%s%s-%s-%s-%s.whl' % (self.name, version, buildver,
pyver, abi, arch)
@property
def exists(self):
path = os.path.join(self.dirname, self.filename)
return os.path.isfile(path)
@property
def tags(self):
for pyver in self.pyver:
for abi in self.abi:
for arch in self.arch:
yield pyver, abi, arch
@cached_property
def metadata(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
wrapper = codecs.getreader('utf-8')
with ZipFile(pathname, 'r') as zf:
wheel_metadata = self.get_wheel_metadata(zf)
wv = wheel_metadata['Wheel-Version'].split('.', 1)
file_version = tuple([int(i) for i in wv])
if file_version < (1, 1):
fns = [WHEEL_METADATA_FILENAME, METADATA_FILENAME, 'METADATA']
else:
fns = [WHEEL_METADATA_FILENAME, METADATA_FILENAME]
result = None
for fn in fns:
try:
metadata_filename = posixpath.join(info_dir, fn)
with zf.open(metadata_filename) as bf:
wf = wrapper(bf)
result = Metadata(fileobj=wf)
if result:
break
except KeyError:
pass
if not result:
raise ValueError('Invalid wheel, because metadata is '
'missing: looked in %s' % ', '.join(fns))
return result
def get_wheel_metadata(self, zf):
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
metadata_filename = posixpath.join(info_dir, 'WHEEL')
with zf.open(metadata_filename) as bf:
wf = codecs.getreader('utf-8')(bf)
message = message_from_file(wf)
return dict(message)
@cached_property
def info(self):
pathname = os.path.join(self.dirname, self.filename)
with ZipFile(pathname, 'r') as zf:
result = self.get_wheel_metadata(zf)
return result
def process_shebang(self, data):
m = SHEBANG_RE.match(data)
if m:
end = m.end()
shebang, data_after_shebang = data[:end], data[end:]
# Preserve any arguments after the interpreter
if b'pythonw' in shebang.lower():
shebang_python = SHEBANG_PYTHONW
else:
shebang_python = SHEBANG_PYTHON
m = SHEBANG_DETAIL_RE.match(shebang)
if m:
args = b' ' + m.groups()[-1]
else:
args = b''
shebang = shebang_python + args
data = shebang + data_after_shebang
else:
cr = data.find(b'\r')
lf = data.find(b'\n')
if cr < 0 or cr > lf:
term = b'\n'
else:
if data[cr:cr + 2] == b'\r\n':
term = b'\r\n'
else:
term = b'\r'
data = SHEBANG_PYTHON + term + data
return data
def get_hash(self, data, hash_kind=None):
if hash_kind is None:
hash_kind = self.hash_kind
try:
hasher = getattr(hashlib, hash_kind)
except AttributeError:
raise DistlibException('Unsupported hash algorithm: %r' % hash_kind)
result = hasher(data).digest()
result = base64.urlsafe_b64encode(result).rstrip(b'=').decode('ascii')
return hash_kind, result
def write_record(self, records, record_path, base):
records = list(records) # make a copy for sorting
p = to_posix(os.path.relpath(record_path, base))
records.append((p, '', ''))
records.sort()
with CSVWriter(record_path) as writer:
for row in records:
writer.writerow(row)
def write_records(self, info, libdir, archive_paths):
records = []
distinfo, info_dir = info
hasher = getattr(hashlib, self.hash_kind)
for ap, p in archive_paths:
with open(p, 'rb') as f:
data = f.read()
digest = '%s=%s' % self.get_hash(data)
size = os.path.getsize(p)
records.append((ap, digest, size))
p = os.path.join(distinfo, 'RECORD')
self.write_record(records, p, libdir)
ap = to_posix(os.path.join(info_dir, 'RECORD'))
archive_paths.append((ap, p))
def build_zip(self, pathname, archive_paths):
with ZipFile(pathname, 'w', zipfile.ZIP_DEFLATED) as zf:
for ap, p in archive_paths:
logger.debug('Wrote %s to %s in wheel', p, ap)
zf.write(p, ap)
def build(self, paths, tags=None, wheel_version=None):
"""
Build a wheel from files in specified paths, and use any specified tags
when determining the name of the wheel.
"""
if tags is None:
tags = {}
libkey = list(filter(lambda o: o in paths, ('purelib', 'platlib')))[0]
if libkey == 'platlib':
is_pure = 'false'
default_pyver = [IMPVER]
default_abi = [ABI]
default_arch = [ARCH]
Loading ...