Repository URL to install this package:
"""Temporary files.
This module provides generic, low- and high-level interfaces for
creating temporary files and directories. All of the interfaces
provided by this module can be used without fear of race conditions
except for 'mktemp'. 'mktemp' is subject to race conditions and
should not be used; it is provided for backward compatibility only.
The default path names are returned as str. If you supply bytes as
input, all return values will be in bytes. Ex:
>>> tempfile.mkstemp()
(4, '/tmp/tmptpu9nin8')
>>> tempfile.mkdtemp(suffix=b'')
b'/tmp/tmppbi8f0hy'
This module also provides some data items to the user:
TMP_MAX - maximum number of names that will be tried before
giving up.
tempdir - If this is set to a string before the first use of
any routine from this module, it will be considered as
another candidate location to store temporary files.
"""
__all__ = [
"NamedTemporaryFile", "TemporaryFile", # high level safe interfaces
"SpooledTemporaryFile", "TemporaryDirectory",
"mkstemp", "mkdtemp", # low level safe interfaces
"mktemp", # deprecated unsafe interface
"TMP_MAX", "gettempprefix", # constants
"tempdir", "gettempdir",
"gettempprefixb", "gettempdirb",
]
# Imports.
import functools as _functools
import warnings as _warnings
import io as _io
import os as _os
try:
import shutil as _shutil
_rmtree = _shutil.rmtree
except ImportError:
import sys as _sys
import stat as _stat
# version vulnerable to race conditions
def _rmtree_unsafe(path, onerror):
try:
if _os.path.islink(path):
# symlinks to directories are forbidden, see bug #1669
raise OSError("Cannot call rmtree on a symbolic link")
except OSError:
onerror(_os.path.islink, path, _sys.exc_info())
# can't continue even if onerror hook returns
return
names = []
try:
names = _os.listdir(path)
except OSError:
onerror(_os.listdir, path, _sys.exc_info())
for name in names:
fullname = _os.path.join(path, name)
try:
mode = _os.lstat(fullname).st_mode
except OSError:
mode = 0
if _stat.S_ISDIR(mode):
_rmtree_unsafe(fullname, onerror)
else:
try:
_os.unlink(fullname)
except OSError:
onerror(_os.unlink, fullname, _sys.exc_info())
try:
_os.rmdir(path)
except OSError:
onerror(_os.rmdir, path, _sys.exc_info())
# Version using fd-based APIs to protect against races
def _rmtree_safe_fd(topfd, path, onerror):
names = []
try:
names = _os.listdir(topfd)
except OSError as err:
err.filename = path
onerror(_os.listdir, path, _sys.exc_info())
for name in names:
fullname = _os.path.join(path, name)
try:
orig_st = _os.stat(name, dir_fd=topfd, follow_symlinks=False)
mode = orig_st.st_mode
except OSError:
mode = 0
if _stat.S_ISDIR(mode):
try:
dirfd = _os.open(name, _os.O_RDONLY, dir_fd=topfd)
except OSError:
onerror(_os.open, fullname, _sys.exc_info())
else:
try:
if _os.path.samestat(orig_st, _os.fstat(dirfd)):
_rmtree_safe_fd(dirfd, fullname, onerror)
try:
_os.rmdir(name, dir_fd=topfd)
except OSError:
onerror(_os.rmdir, fullname, _sys.exc_info())
else:
try:
# This can only happen if someone replaces
# a directory with a symlink after the call to
# stat.S_ISDIR above.
raise OSError("Cannot call rmtree on a symbolic "
"link")
except OSError:
onerror(_os.path.islink, fullname, _sys.exc_info())
finally:
_os.close(dirfd)
else:
try:
_os.unlink(name, dir_fd=topfd)
except OSError:
onerror(_os.unlink, fullname, _sys.exc_info())
_use_fd_functions = ({_os.open, _os.stat, _os.unlink, _os.rmdir} <=
_os.supports_dir_fd and
_os.listdir in _os.supports_fd and
_os.stat in _os.supports_follow_symlinks)
def _rmtree(path, ignore_errors=False, onerror=None):
"""Recursively delete a directory tree.
If ignore_errors is set, errors are ignored; otherwise, if onerror
is set, it is called to handle the error with arguments (func,
path, exc_info) where func is platform and implementation dependent;
path is the argument to that function that caused it to fail; and
exc_info is a tuple returned by sys.exc_info(). If ignore_errors
is false and onerror is None, an exception is raised.
"""
if ignore_errors:
def onerror(*args):
pass
elif onerror is None:
def onerror(*args):
raise
if _use_fd_functions:
# While the unsafe rmtree works fine on bytes, the fd based does not.
if isinstance(path, bytes):
path = _os.fsdecode(path)
# Note: To guard against symlink races, we use the standard
# lstat()/open()/fstat() trick.
try:
orig_st = _os.lstat(path)
except Exception:
onerror(_os.lstat, path, _sys.exc_info())
return
try:
fd = _os.open(path, _os.O_RDONLY)
except Exception:
onerror(_os.lstat, path, _sys.exc_info())
return
try:
if _os.path.samestat(orig_st, _os.fstat(fd)):
_rmtree_safe_fd(fd, path, onerror)
try:
_os.rmdir(path)
except OSError:
onerror(_os.rmdir, path, _sys.exc_info())
else:
try:
# symlinks to directories are forbidden, see bug #1669
raise OSError("Cannot call rmtree on a symbolic link")
except OSError:
onerror(_os.path.islink, path, _sys.exc_info())
finally:
_os.close(fd)
else:
return _rmtree_unsafe(path, onerror)
import errno as _errno
from random import Random as _Random
import weakref as _weakref
try:
import _thread
except ImportError:
import _dummy_thread as _thread
_allocate_lock = _thread.allocate_lock
_text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL
if hasattr(_os, 'O_NOFOLLOW'):
_text_openflags |= _os.O_NOFOLLOW
_bin_openflags = _text_openflags
if hasattr(_os, 'O_BINARY'):
_bin_openflags |= _os.O_BINARY
if hasattr(_os, 'TMP_MAX'):
TMP_MAX = _os.TMP_MAX
else:
TMP_MAX = 10000
# This variable _was_ unused for legacy reasons, see issue 10354.
# But as of 3.5 we actually use it at runtime so changing it would
# have a possibly desirable side effect... But we do not want to support
# that as an API. It is undocumented on purpose. Do not depend on this.
template = "tmp"
# Internal routines.
_once_lock = _allocate_lock()
if hasattr(_os, "lstat"):
_stat = _os.lstat
elif hasattr(_os, "stat"):
_stat = _os.stat
else:
# Fallback. All we need is something that raises OSError if the
# file doesn't exist.
def _stat(fn):
fd = _os.open(fn, _os.O_RDONLY)
_os.close(fd)
def _exists(fn):
try:
_stat(fn)
except OSError:
return False
else:
return True
def _infer_return_type(*args):
"""Look at the type of all args and divine their implied return type."""
return_type = None
for arg in args:
if arg is None:
continue
if isinstance(arg, bytes):
if return_type is str:
raise TypeError("Can't mix bytes and non-bytes in "
"path components.")
return_type = bytes
else:
if return_type is bytes:
raise TypeError("Can't mix bytes and non-bytes in "
"path components.")
return_type = str
if return_type is None:
return str # tempfile APIs return a str by default.
return return_type
def _sanitize_params(prefix, suffix, dir):
"""Common parameter processing for most APIs in this module."""
output_type = _infer_return_type(prefix, suffix, dir)
if suffix is None:
suffix = output_type()
if prefix is None:
if output_type is str:
prefix = template
else:
prefix = _os.fsencode(template)
if dir is None:
if output_type is str:
dir = gettempdir()
else:
dir = gettempdirb()
return prefix, suffix, dir, output_type
class _RandomNameSequence:
"""An instance of _RandomNameSequence generates an endless
sequence of unpredictable strings which can safely be incorporated
into file names. Each string is six characters long. Multiple
threads can safely use the same instance at the same time.
_RandomNameSequence is an iterator."""
characters = "abcdefghijklmnopqrstuvwxyz0123456789_"
@property
def rng(self):
cur_pid = _os.getpid()
if cur_pid != getattr(self, '_rng_pid', None):
self._rng = _Random()
self._rng_pid = cur_pid
return self._rng
def __iter__(self):
return self
def __next__(self):
c = self.characters
choose = self.rng.choice
letters = [choose(c) for dummy in range(8)]
return ''.join(letters)
def _candidate_tempdir_list():
"""Generate a list of candidate temporary directories which
_get_default_tempdir will try."""
dirlist = []
# First, try the environment.
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = _os.getenv(envname)
if dirname: dirlist.append(dirname)
# Failing that, try OS-specific locations.
if _os.name == 'nt':
dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
else:
dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
# As a last resort, the current directory.
try:
dirlist.append(_os.getcwd())
except (AttributeError, OSError):
dirlist.append(_os.curdir)
return dirlist
def _get_default_tempdir():
"""Calculate the default directory to use for temporary files.
This routine should be called exactly once.
We determine whether or not a candidate temp dir is usable by
trying to create and write to a file in that directory. If this
is successful, the test file is deleted. To prevent denial of
service, the name of the test file must be randomized."""
namer = _RandomNameSequence()
dirlist = _candidate_tempdir_list()
for dir in dirlist:
if dir != _os.curdir:
dir = _os.path.abspath(dir)
# Try only a few names per directory.
for seq in range(100):
name = next(namer)
filename = _os.path.join(dir, name)
Loading ...