Repository URL to install this package:
# tempfile.py unit tests.
import tempfile
import errno
import io
import os
import pathlib
import signal
import sys
import re
import warnings
import contextlib
import stat
import weakref
from unittest import mock
import unittest
from test import support
from test.support import script_helper
has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
has_spawnl = hasattr(os, 'spawnl')
# TEST_FILES may need to be tweaked for systems depending on the maximum
# number of files that can be opened at one time (see ulimit -n)
if sys.platform.startswith('openbsd'):
TEST_FILES = 48
else:
TEST_FILES = 100
# This is organized as one test for each chunk of code in tempfile.py,
# in order of their appearance in the file. Testing which requires
# threads is not done here.
class TestLowLevelInternals(unittest.TestCase):
def test_infer_return_type_singles(self):
self.assertIs(str, tempfile._infer_return_type(''))
self.assertIs(bytes, tempfile._infer_return_type(b''))
self.assertIs(str, tempfile._infer_return_type(None))
def test_infer_return_type_multiples(self):
self.assertIs(str, tempfile._infer_return_type('', ''))
self.assertIs(bytes, tempfile._infer_return_type(b'', b''))
with self.assertRaises(TypeError):
tempfile._infer_return_type('', b'')
with self.assertRaises(TypeError):
tempfile._infer_return_type(b'', '')
def test_infer_return_type_multiples_and_none(self):
self.assertIs(str, tempfile._infer_return_type(None, ''))
self.assertIs(str, tempfile._infer_return_type('', None))
self.assertIs(str, tempfile._infer_return_type(None, None))
self.assertIs(bytes, tempfile._infer_return_type(b'', None))
self.assertIs(bytes, tempfile._infer_return_type(None, b''))
with self.assertRaises(TypeError):
tempfile._infer_return_type('', None, b'')
with self.assertRaises(TypeError):
tempfile._infer_return_type(b'', None, '')
def test_infer_return_type_pathlib(self):
self.assertIs(str, tempfile._infer_return_type(pathlib.Path('/')))
# Common functionality.
class BaseTestCase(unittest.TestCase):
str_check = re.compile(r"^[a-z0-9_-]{8}$")
b_check = re.compile(br"^[a-z0-9_-]{8}$")
def setUp(self):
self._warnings_manager = support.check_warnings()
self._warnings_manager.__enter__()
warnings.filterwarnings("ignore", category=RuntimeWarning,
message="mktemp", module=__name__)
def tearDown(self):
self._warnings_manager.__exit__(None, None, None)
def nameCheck(self, name, dir, pre, suf):
(ndir, nbase) = os.path.split(name)
npre = nbase[:len(pre)]
nsuf = nbase[len(nbase)-len(suf):]
if dir is not None:
self.assertIs(
type(name),
str
if type(dir) is str or isinstance(dir, os.PathLike) else
bytes,
"unexpected return type",
)
if pre is not None:
self.assertIs(type(name), str if type(pre) is str else bytes,
"unexpected return type")
if suf is not None:
self.assertIs(type(name), str if type(suf) is str else bytes,
"unexpected return type")
if (dir, pre, suf) == (None, None, None):
self.assertIs(type(name), str, "default return type must be str")
# check for equality of the absolute paths!
self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
"file %r not in directory %r" % (name, dir))
self.assertEqual(npre, pre,
"file %r does not begin with %r" % (nbase, pre))
self.assertEqual(nsuf, suf,
"file %r does not end with %r" % (nbase, suf))
nbase = nbase[len(pre):len(nbase)-len(suf)]
check = self.str_check if isinstance(nbase, str) else self.b_check
self.assertTrue(check.match(nbase),
"random characters %r do not match %r"
% (nbase, check.pattern))
class TestExports(BaseTestCase):
def test_exports(self):
# There are no surprising symbols in the tempfile module
dict = tempfile.__dict__
expected = {
"NamedTemporaryFile" : 1,
"TemporaryFile" : 1,
"mkstemp" : 1,
"mkdtemp" : 1,
"mktemp" : 1,
"TMP_MAX" : 1,
"gettempprefix" : 1,
"gettempprefixb" : 1,
"gettempdir" : 1,
"gettempdirb" : 1,
"tempdir" : 1,
"template" : 1,
"SpooledTemporaryFile" : 1,
"TemporaryDirectory" : 1,
}
unexp = []
for key in dict:
if key[0] != '_' and key not in expected:
unexp.append(key)
self.assertTrue(len(unexp) == 0,
"unexpected keys: %s" % unexp)
class TestRandomNameSequence(BaseTestCase):
"""Test the internal iterator object _RandomNameSequence."""
def setUp(self):
self.r = tempfile._RandomNameSequence()
super().setUp()
def test_get_six_char_str(self):
# _RandomNameSequence returns a six-character string
s = next(self.r)
self.nameCheck(s, '', '', '')
def test_many(self):
# _RandomNameSequence returns no duplicate strings (stochastic)
dict = {}
r = self.r
for i in range(TEST_FILES):
s = next(r)
self.nameCheck(s, '', '', '')
self.assertNotIn(s, dict)
dict[s] = 1
def supports_iter(self):
# _RandomNameSequence supports the iterator protocol
i = 0
r = self.r
for s in r:
i += 1
if i == 20:
break
@unittest.skipUnless(hasattr(os, 'fork'),
"os.fork is required for this test")
def test_process_awareness(self):
# ensure that the random source differs between
# child and parent.
read_fd, write_fd = os.pipe()
pid = None
try:
pid = os.fork()
if not pid:
# child process
os.close(read_fd)
os.write(write_fd, next(self.r).encode("ascii"))
os.close(write_fd)
# bypass the normal exit handlers- leave those to
# the parent.
os._exit(0)
# parent process
parent_value = next(self.r)
child_value = os.read(read_fd, len(parent_value)).decode("ascii")
finally:
if pid:
# best effort to ensure the process can't bleed out
# via any bugs above
try:
os.kill(pid, signal.SIGKILL)
except OSError:
pass
# Read the process exit status to avoid zombie process
os.waitpid(pid, 0)
os.close(read_fd)
os.close(write_fd)
self.assertNotEqual(child_value, parent_value)
class TestCandidateTempdirList(BaseTestCase):
"""Test the internal function _candidate_tempdir_list."""
def test_nonempty_list(self):
# _candidate_tempdir_list returns a nonempty list of strings
cand = tempfile._candidate_tempdir_list()
self.assertFalse(len(cand) == 0)
for c in cand:
self.assertIsInstance(c, str)
def test_wanted_dirs(self):
# _candidate_tempdir_list contains the expected directories
# Make sure the interesting environment variables are all set.
with support.EnvironmentVarGuard() as env:
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname:
env[envname] = os.path.abspath(envname)
cand = tempfile._candidate_tempdir_list()
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname: raise ValueError
self.assertIn(dirname, cand)
try:
dirname = os.getcwd()
except (AttributeError, OSError):
dirname = os.curdir
self.assertIn(dirname, cand)
# Not practical to try to verify the presence of OS-specific
# paths in this list.
# We test _get_default_tempdir some more by testing gettempdir.
class TestGetDefaultTempdir(BaseTestCase):
"""Test _get_default_tempdir()."""
def test_no_files_left_behind(self):
# use a private empty directory
with tempfile.TemporaryDirectory() as our_temp_directory:
# force _get_default_tempdir() to consider our empty directory
def our_candidate_list():
return [our_temp_directory]
with support.swap_attr(tempfile, "_candidate_tempdir_list",
our_candidate_list):
# verify our directory is empty after _get_default_tempdir()
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
def raise_OSError(*args, **kwargs):
raise OSError()
with support.swap_attr(io, "open", raise_OSError):
# test again with failing io.open()
with self.assertRaises(FileNotFoundError):
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
def bad_writer(*args, **kwargs):
fp = orig_open(*args, **kwargs)
fp.write = raise_OSError
return fp
with support.swap_attr(io, "open", bad_writer) as orig_open:
# test again with failing write()
with self.assertRaises(FileNotFoundError):
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
class TestGetCandidateNames(BaseTestCase):
"""Test the internal function _get_candidate_names."""
def test_retval(self):
# _get_candidate_names returns a _RandomNameSequence object
obj = tempfile._get_candidate_names()
self.assertIsInstance(obj, tempfile._RandomNameSequence)
def test_same_thing(self):
# _get_candidate_names always returns the same object
a = tempfile._get_candidate_names()
b = tempfile._get_candidate_names()
self.assertTrue(a is b)
@contextlib.contextmanager
def _inside_empty_temp_dir():
dir = tempfile.mkdtemp()
try:
with support.swap_attr(tempfile, 'tempdir', dir):
yield
finally:
support.rmtree(dir)
def _mock_candidate_names(*names):
return support.swap_attr(tempfile,
'_get_candidate_names',
lambda: iter(names))
class TestBadTempdir:
def test_read_only_directory(self):
with _inside_empty_temp_dir():
oldmode = mode = os.stat(tempfile.tempdir).st_mode
mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
os.chmod(tempfile.tempdir, mode)
try:
if os.access(tempfile.tempdir, os.W_OK):
self.skipTest("can't set the directory read-only")
with self.assertRaises(PermissionError):
self.make_temp()
self.assertEqual(os.listdir(tempfile.tempdir), [])
finally:
os.chmod(tempfile.tempdir, oldmode)
Loading ...