# -*- coding: utf-8 -*-
"""
requests.utils
~~~~~~~~~~~~~~
This module provides utility functions that are used within Requests
that are also useful for external consumption.
"""
import codecs
import contextlib
import io
import os
import re
import socket
import struct
import sys
import tempfile
import warnings
import zipfile
from .__version__ import __version__
from . import certs
# to_native_string is unused here, but imported here for backwards compatibility
from ._internal_utils import to_native_string
from .compat import parse_http_list as _parse_list_header
from .compat import (
quote, urlparse, bytes, str, OrderedDict, unquote, getproxies,
proxy_bypass, urlunparse, basestring, integer_types, is_py3,
proxy_bypass_environment, getproxies_environment, Mapping)
from .cookies import cookiejar_from_dict
from .structures import CaseInsensitiveDict
from .exceptions import (
InvalidURL, InvalidHeader, FileModeWarning, UnrewindableBodyError)
NETRC_FILES = ('.netrc', '_netrc')
DEFAULT_CA_BUNDLE_PATH = certs.where()
DEFAULT_PORTS = {'http': 80, 'https': 443}
if sys.platform == 'win32':
# provide a proxy_bypass version on Windows without DNS lookups
def proxy_bypass_registry(host):
try:
if is_py3:
import winreg
else:
import _winreg as winreg
except ImportError:
return False
try:
internetSettings = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r'Software\Microsoft\Windows\CurrentVersion\Internet Settings')
# ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
proxyEnable = int(winreg.QueryValueEx(internetSettings,
'ProxyEnable')[0])
# ProxyOverride is almost always a string
proxyOverride = winreg.QueryValueEx(internetSettings,
'ProxyOverride')[0]
except OSError:
return False
if not proxyEnable or not proxyOverride:
return False
# make a check value list from the registry entry: replace the
# '<local>' string by the localhost entry and the corresponding
# canonical entry.
proxyOverride = proxyOverride.split(';')
# now check if we match one of the registry values.
for test in proxyOverride:
if test == '<local>':
if '.' not in host:
return True
test = test.replace(".", r"\.") # mask dots
test = test.replace("*", r".*") # change glob sequence
test = test.replace("?", r".") # change glob char
if re.match(test, host, re.I):
return True
return False
def proxy_bypass(host): # noqa
"""Return True, if the host should be bypassed.
Checks proxy settings gathered from the environment, if specified,
or the registry.
"""
if getproxies_environment():
return proxy_bypass_environment(host)
else:
return proxy_bypass_registry(host)
def dict_to_sequence(d):
"""Returns an internal sequence dictionary update."""
if hasattr(d, 'items'):
d = d.items()
return d
def super_len(o):
total_length = None
current_position = 0
if hasattr(o, '__len__'):
total_length = len(o)
elif hasattr(o, 'len'):
total_length = o.len
elif hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
total_length = os.fstat(fileno).st_size
# Having used fstat to determine the file length, we need to
# confirm that this file was opened up in binary mode.
if 'b' not in o.mode:
warnings.warn((
"Requests has determined the content-length for this "
"request using the binary size of the file: however, the "
"file has been opened in text mode (i.e. without the 'b' "
"flag in the mode). This may lead to an incorrect "
"content-length. In Requests 3.0, support will be removed "
"for files in text mode."),
FileModeWarning
)
if hasattr(o, 'tell'):
try:
current_position = o.tell()
except (OSError, IOError):
# This can happen in some weird situations, such as when the file
# is actually a special file descriptor like stdin. In this
# instance, we don't know what the length is, so set it to zero and
# let requests chunk it instead.
if total_length is not None:
current_position = total_length
else:
if hasattr(o, 'seek') and total_length is None:
# StringIO and BytesIO have seek but no useable fileno
try:
# seek to end of file
o.seek(0, 2)
total_length = o.tell()
# seek back to current position to support
# partially read file-like objects
o.seek(current_position or 0)
except (OSError, IOError):
total_length = 0
if total_length is None:
total_length = 0
return max(0, total_length - current_position)
def get_netrc_auth(url, raise_errors=False):
"""Returns the Requests tuple auth for a given url from netrc."""
try:
from netrc import netrc, NetrcParseError
netrc_path = None
for f in NETRC_FILES:
try:
loc = os.path.expanduser('~/{}'.format(f))
except KeyError:
# os.path.expanduser can fail when $HOME is undefined and
# getpwuid fails. See https://bugs.python.org/issue20164 &
# https://github.com/requests/requests/issues/1846
return
if os.path.exists(loc):
netrc_path = loc
break
# Abort early if there isn't one.
if netrc_path is None:
return
ri = urlparse(url)
# Strip port numbers from netloc. This weird `if...encode`` dance is
# used for Python 3.2, which doesn't support unicode literals.
splitstr = b':'
if isinstance(url, str):
splitstr = splitstr.decode('ascii')
host = ri.netloc.split(splitstr)[0]
try:
_netrc = netrc(netrc_path).authenticators(host)
if _netrc:
# Return with login / password
login_i = (0 if _netrc[0] else 1)
return (_netrc[login_i], _netrc[2])
except (NetrcParseError, IOError):
# If there was a parsing error or a permissions issue reading the file,
# we'll just skip netrc auth unless explicitly asked to raise errors.
if raise_errors:
raise
# AppEngine hackiness.
except (ImportError, AttributeError):
pass
def guess_filename(obj):
"""Tries to guess the filename of the given object."""
name = getattr(obj, 'name', None)
if (name and isinstance(name, basestring) and name[0] != '<' and
name[-1] != '>'):
return os.path.basename(name)
def extract_zipped_paths(path):
"""Replace nonexistent paths that look like they refer to a member of a zip
archive with the location of an extracted copy of the target, or else
just return the provided path unchanged.
"""
if os.path.exists(path):
# this is already a valid path, no need to do anything further
return path
# find the first valid part of the provided path and treat that as a zip archive
# assume the rest of the path is the name of a member in the archive
archive, member = os.path.split(path)
while archive and not os.path.exists(archive):
archive, prefix = os.path.split(archive)
member = '/'.join([prefix, member])
if not zipfile.is_zipfile(archive):
return path
zip_file = zipfile.ZipFile(archive)
if member not in zip_file.namelist():
return path
# we have a valid zip archive and a valid member of that archive
tmp = tempfile.gettempdir()
extracted_path = os.path.join(tmp, *member.split('/'))
if not os.path.exists(extracted_path):
extracted_path = zip_file.extract(member, path=tmp)
return extracted_path
def from_key_val_list(value):
"""Take an object and test to see if it can be represented as a
dictionary. Unless it can not be represented as such, return an
OrderedDict, e.g.,
::
>>> from_key_val_list([('key', 'val')])
OrderedDict([('key', 'val')])
>>> from_key_val_list('string')
ValueError: cannot encode objects that are not 2-tuples
>>> from_key_val_list({'key': 'val'})
OrderedDict([('key', 'val')])
:rtype: OrderedDict
"""
if value is None:
return None
if isinstance(value, (str, bytes, bool, int)):
raise ValueError('cannot encode objects that are not 2-tuples')
return OrderedDict(value)
def to_key_val_list(value):
"""Take an object and test to see if it can be represented as a
dictionary. If it can be, return a list of tuples, e.g.,
::
>>> to_key_val_list([('key', 'val')])
[('key', 'val')]
>>> to_key_val_list({'key': 'val'})
[('key', 'val')]
>>> to_key_val_list('string')
ValueError: cannot encode objects that are not 2-tuples.
:rtype: list
"""
if value is None:
return None
if isinstance(value, (str, bytes, bool, int)):
raise ValueError('cannot encode objects that are not 2-tuples')
if isinstance(value, Mapping):
value = value.items()
return list(value)
# From mitsuhiko/werkzeug (used with permission).
def parse_list_header(value):
"""Parse lists as described by RFC 2068 Section 2.
In particular, parse comma-separated lists where the elements of
the list may include quoted-strings. A quoted-string could
contain a comma. A non-quoted string could have quotes in the
middle. Quotes are removed automatically after parsing.
It basically works like :func:`parse_set_header` just that items
may appear multiple times and case sensitivity is preserved.
The return value is a standard :class:`list`:
>>> parse_list_header('token, "quoted value"')
['token', 'quoted value']
To create a header from the :class:`list` again, use the
:func:`dump_header` function.
:param value: a string with a list header.
:return: :class:`list`
:rtype: list
"""
result = []
for item in _parse_list_header(value):
if item[:1] == item[-1:] == '"':
item = unquote_header_value(item[1:-1])
result.append(item)
return result
# From mitsuhiko/werkzeug (used with permission).
def parse_dict_header(value):
"""Parse lists of key, value pairs as described by RFC 2068 Section 2 and
Loading ...