Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / libpython3.6-minimal   deb

Repository URL to install this package:

Version: 3.6.5-1~deb9u1 

/ usr / lib / python3.6 / tempfile.py

"""Temporary files.

This module provides generic, low- and high-level interfaces for
creating temporary files and directories.  All of the interfaces
provided by this module can be used without fear of race conditions
except for 'mktemp'.  'mktemp' is subject to race conditions and
should not be used; it is provided for backward compatibility only.

The default path names are returned as str.  If you supply bytes as
input, all return values will be in bytes.  Ex:

    >>> tempfile.mkstemp()
    (4, '/tmp/tmptpu9nin8')
    >>> tempfile.mkdtemp(suffix=b'')
    b'/tmp/tmppbi8f0hy'

This module also provides some data items to the user:

  TMP_MAX  - maximum number of names that will be tried before
             giving up.
  tempdir  - If this is set to a string before the first use of
             any routine from this module, it will be considered as
             another candidate location to store temporary files.
"""

__all__ = [
    "NamedTemporaryFile", "TemporaryFile", # high level safe interfaces
    "SpooledTemporaryFile", "TemporaryDirectory",
    "mkstemp", "mkdtemp",                  # low level safe interfaces
    "mktemp",                              # deprecated unsafe interface
    "TMP_MAX", "gettempprefix",            # constants
    "tempdir", "gettempdir",
    "gettempprefixb", "gettempdirb",
   ]


# Imports.

import functools as _functools
import warnings as _warnings
import io as _io
import os as _os
try:
  import shutil as _shutil
  _rmtree = _shutil.rmtree
except ImportError:
  import sys as _sys
  import stat as _stat
  # version vulnerable to race conditions
  def _rmtree_unsafe(path, onerror):
    try:
        if _os.path.islink(path):
            # symlinks to directories are forbidden, see bug #1669
            raise OSError("Cannot call rmtree on a symbolic link")
    except OSError:
        onerror(_os.path.islink, path, _sys.exc_info())
        # can't continue even if onerror hook returns
        return
    names = []
    try:
        names = _os.listdir(path)
    except OSError:
        onerror(_os.listdir, path, _sys.exc_info())
    for name in names:
        fullname = _os.path.join(path, name)
        try:
            mode = _os.lstat(fullname).st_mode
        except OSError:
            mode = 0
        if _stat.S_ISDIR(mode):
            _rmtree_unsafe(fullname, onerror)
        else:
            try:
                _os.unlink(fullname)
            except OSError:
                onerror(_os.unlink, fullname, _sys.exc_info())
    try:
        _os.rmdir(path)
    except OSError:
        onerror(_os.rmdir, path, _sys.exc_info())

  # Version using fd-based APIs to protect against races
  def _rmtree_safe_fd(topfd, path, onerror):
    names = []
    try:
        names = _os.listdir(topfd)
    except OSError as err:
        err.filename = path
        onerror(_os.listdir, path, _sys.exc_info())
    for name in names:
        fullname = _os.path.join(path, name)
        try:
            orig_st = _os.stat(name, dir_fd=topfd, follow_symlinks=False)
            mode = orig_st.st_mode
        except OSError:
            mode = 0
        if _stat.S_ISDIR(mode):
            try:
                dirfd = _os.open(name, _os.O_RDONLY, dir_fd=topfd)
            except OSError:
                onerror(_os.open, fullname, _sys.exc_info())
            else:
                try:
                    if _os.path.samestat(orig_st, _os.fstat(dirfd)):
                        _rmtree_safe_fd(dirfd, fullname, onerror)
                        try:
                            _os.rmdir(name, dir_fd=topfd)
                        except OSError:
                            onerror(_os.rmdir, fullname, _sys.exc_info())
                    else:
                        try:
                            # This can only happen if someone replaces
                            # a directory with a symlink after the call to
                            # stat.S_ISDIR above.
                            raise OSError("Cannot call rmtree on a symbolic "
                                          "link")
                        except OSError:
                            onerror(_os.path.islink, fullname, _sys.exc_info())
                finally:
                    _os.close(dirfd)
        else:
            try:
                _os.unlink(name, dir_fd=topfd)
            except OSError:
                onerror(_os.unlink, fullname, _sys.exc_info())

  _use_fd_functions = ({_os.open, _os.stat, _os.unlink, _os.rmdir} <=
                     _os.supports_dir_fd and
                     _os.listdir in _os.supports_fd and
                     _os.stat in _os.supports_follow_symlinks)

  def _rmtree(path, ignore_errors=False, onerror=None):
    """Recursively delete a directory tree.

    If ignore_errors is set, errors are ignored; otherwise, if onerror
    is set, it is called to handle the error with arguments (func,
    path, exc_info) where func is platform and implementation dependent;
    path is the argument to that function that caused it to fail; and
    exc_info is a tuple returned by sys.exc_info().  If ignore_errors
    is false and onerror is None, an exception is raised.

    """
    if ignore_errors:
        def onerror(*args):
            pass
    elif onerror is None:
        def onerror(*args):
            raise
    if _use_fd_functions:
        # While the unsafe rmtree works fine on bytes, the fd based does not.
        if isinstance(path, bytes):
            path = _os.fsdecode(path)
        # Note: To guard against symlink races, we use the standard
        # lstat()/open()/fstat() trick.
        try:
            orig_st = _os.lstat(path)
        except Exception:
            onerror(_os.lstat, path, _sys.exc_info())
            return
        try:
            fd = _os.open(path, _os.O_RDONLY)
        except Exception:
            onerror(_os.lstat, path, _sys.exc_info())
            return
        try:
            if _os.path.samestat(orig_st, _os.fstat(fd)):
                _rmtree_safe_fd(fd, path, onerror)
                try:
                    _os.rmdir(path)
                except OSError:
                    onerror(_os.rmdir, path, _sys.exc_info())
            else:
                try:
                    # symlinks to directories are forbidden, see bug #1669
                    raise OSError("Cannot call rmtree on a symbolic link")
                except OSError:
                    onerror(_os.path.islink, path, _sys.exc_info())
        finally:
            _os.close(fd)
    else:
        return _rmtree_unsafe(path, onerror)

import errno as _errno
from random import Random as _Random
import weakref as _weakref

try:
    import _thread
except ImportError:
    import _dummy_thread as _thread
_allocate_lock = _thread.allocate_lock

_text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL
if hasattr(_os, 'O_NOFOLLOW'):
    _text_openflags |= _os.O_NOFOLLOW

_bin_openflags = _text_openflags
if hasattr(_os, 'O_BINARY'):
    _bin_openflags |= _os.O_BINARY

if hasattr(_os, 'TMP_MAX'):
    TMP_MAX = _os.TMP_MAX
else:
    TMP_MAX = 10000

# This variable _was_ unused for legacy reasons, see issue 10354.
# But as of 3.5 we actually use it at runtime so changing it would
# have a possibly desirable side effect...  But we do not want to support
# that as an API.  It is undocumented on purpose.  Do not depend on this.
template = "tmp"

# Internal routines.

_once_lock = _allocate_lock()

if hasattr(_os, "lstat"):
    _stat = _os.lstat
elif hasattr(_os, "stat"):
    _stat = _os.stat
else:
    # Fallback.  All we need is something that raises OSError if the
    # file doesn't exist.
    def _stat(fn):
        fd = _os.open(fn, _os.O_RDONLY)
        _os.close(fd)

def _exists(fn):
    try:
        _stat(fn)
    except OSError:
        return False
    else:
        return True


def _infer_return_type(*args):
    """Look at the type of all args and divine their implied return type."""
    return_type = None
    for arg in args:
        if arg is None:
            continue
        if isinstance(arg, bytes):
            if return_type is str:
                raise TypeError("Can't mix bytes and non-bytes in "
                                "path components.")
            return_type = bytes
        else:
            if return_type is bytes:
                raise TypeError("Can't mix bytes and non-bytes in "
                                "path components.")
            return_type = str
    if return_type is None:
        return str  # tempfile APIs return a str by default.
    return return_type


def _sanitize_params(prefix, suffix, dir):
    """Common parameter processing for most APIs in this module."""
    output_type = _infer_return_type(prefix, suffix, dir)
    if suffix is None:
        suffix = output_type()
    if prefix is None:
        if output_type is str:
            prefix = template
        else:
            prefix = _os.fsencode(template)
    if dir is None:
        if output_type is str:
            dir = gettempdir()
        else:
            dir = gettempdirb()
    return prefix, suffix, dir, output_type


class _RandomNameSequence:
    """An instance of _RandomNameSequence generates an endless
    sequence of unpredictable strings which can safely be incorporated
    into file names.  Each string is six characters long.  Multiple
    threads can safely use the same instance at the same time.

    _RandomNameSequence is an iterator."""

    characters = "abcdefghijklmnopqrstuvwxyz0123456789_"

    @property
    def rng(self):
        cur_pid = _os.getpid()
        if cur_pid != getattr(self, '_rng_pid', None):
            self._rng = _Random()
            self._rng_pid = cur_pid
        return self._rng

    def __iter__(self):
        return self

    def __next__(self):
        c = self.characters
        choose = self.rng.choice
        letters = [choose(c) for dummy in range(8)]
        return ''.join(letters)

def _candidate_tempdir_list():
    """Generate a list of candidate temporary directories which
    _get_default_tempdir will try."""

    dirlist = []

    # First, try the environment.
    for envname in 'TMPDIR', 'TEMP', 'TMP':
        dirname = _os.getenv(envname)
        if dirname: dirlist.append(dirname)

    # Failing that, try OS-specific locations.
    if _os.name == 'nt':
        dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
    else:
        dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])

    # As a last resort, the current directory.
    try:
        dirlist.append(_os.getcwd())
    except (AttributeError, OSError):
        dirlist.append(_os.curdir)

    return dirlist

def _get_default_tempdir():
    """Calculate the default directory to use for temporary files.
    This routine should be called exactly once.

    We determine whether or not a candidate temp dir is usable by
    trying to create and write to a file in that directory.  If this
    is successful, the test file is deleted.  To prevent denial of
    service, the name of the test file must be randomized."""

    namer = _RandomNameSequence()
    dirlist = _candidate_tempdir_list()

    for dir in dirlist:
        if dir != _os.curdir:
            dir = _os.path.abspath(dir)
        # Try only a few names per directory.
        for seq in range(100):
            name = next(namer)
            filename = _os.path.join(dir, name)
Loading ...