Repository URL to install this package:
# Copyright (C) 2003 Python Software Foundation
import unittest
import unittest.mock
import shutil
import tempfile
import sys
import stat
import os
import os.path
import errno
import functools
import pathlib
import subprocess
import random
import string
import contextlib
import io
from shutil import (make_archive,
register_archive_format, unregister_archive_format,
get_archive_formats, Error, unpack_archive,
register_unpack_format, RegistryError,
unregister_unpack_format, get_unpack_formats,
SameFileError, _GiveupOnFastCopy)
import tarfile
import zipfile
try:
import posix
except ImportError:
posix = None
from test import support
from test.support import TESTFN, FakePath
TESTFN2 = TESTFN + "2"
MACOS = sys.platform.startswith("darwin")
AIX = sys.platform[:3] == 'aix'
try:
import grp
import pwd
UID_GID_SUPPORT = True
except ImportError:
UID_GID_SUPPORT = False
try:
import _winapi
except ImportError:
_winapi = None
def _fake_rename(*args, **kwargs):
# Pretend the destination path is on a different filesystem.
raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
def mock_rename(func):
@functools.wraps(func)
def wrap(*args, **kwargs):
try:
builtin_rename = os.rename
os.rename = _fake_rename
return func(*args, **kwargs)
finally:
os.rename = builtin_rename
return wrap
def write_file(path, content, binary=False):
"""Write *content* to a file located at *path*.
If *path* is a tuple instead of a string, os.path.join will be used to
make a path. If *binary* is true, the file will be opened in binary
mode.
"""
if isinstance(path, tuple):
path = os.path.join(*path)
with open(path, 'wb' if binary else 'w') as fp:
fp.write(content)
def write_test_file(path, size):
"""Create a test file with an arbitrary size and random text content."""
def chunks(total, step):
assert total >= step
while total > step:
yield step
total -= step
if total:
yield total
bufsize = min(size, 8192)
chunk = b"".join([random.choice(string.ascii_letters).encode()
for i in range(bufsize)])
with open(path, 'wb') as f:
for csize in chunks(size, bufsize):
f.write(chunk)
assert os.path.getsize(path) == size
def read_file(path, binary=False):
"""Return contents from a file located at *path*.
If *path* is a tuple instead of a string, os.path.join will be used to
make a path. If *binary* is true, the file will be opened in binary
mode.
"""
if isinstance(path, tuple):
path = os.path.join(*path)
with open(path, 'rb' if binary else 'r') as fp:
return fp.read()
def rlistdir(path):
res = []
for name in sorted(os.listdir(path)):
p = os.path.join(path, name)
if os.path.isdir(p) and not os.path.islink(p):
res.append(name + '/')
for n in rlistdir(p):
res.append(name + '/' + n)
else:
res.append(name)
return res
def supports_file2file_sendfile():
# ...apparently Linux and Solaris are the only ones
if not hasattr(os, "sendfile"):
return False
srcname = None
dstname = None
try:
with tempfile.NamedTemporaryFile("wb", delete=False) as f:
srcname = f.name
f.write(b"0123456789")
with open(srcname, "rb") as src:
with tempfile.NamedTemporaryFile("wb", delete=False) as dst:
dstname = dst.name
infd = src.fileno()
outfd = dst.fileno()
try:
os.sendfile(outfd, infd, 0, 2)
except OSError:
return False
else:
return True
finally:
if srcname is not None:
support.unlink(srcname)
if dstname is not None:
support.unlink(dstname)
SUPPORTS_SENDFILE = supports_file2file_sendfile()
# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
# The AIX command 'dump -o program' gives XCOFF header information
# The second word of the last line in the maxdata value
# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
def _maxdataOK():
if AIX and sys.maxsize == 2147483647:
hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
maxdata=hdrs.split("\n")[-1].split()[1]
return int(maxdata,16) >= 0x20000000
else:
return True
class TestShutil(unittest.TestCase):
def setUp(self):
super(TestShutil, self).setUp()
self.tempdirs = []
def tearDown(self):
super(TestShutil, self).tearDown()
while self.tempdirs:
d = self.tempdirs.pop()
shutil.rmtree(d, os.name in ('nt', 'cygwin'))
def mkdtemp(self):
"""Create a temporary directory that will be cleaned up.
Returns the path of the directory.
"""
basedir = None
if sys.platform == "win32":
basedir = os.path.realpath(os.getcwd())
d = tempfile.mkdtemp(dir=basedir)
self.tempdirs.append(d)
return d
def test_rmtree_works_on_bytes(self):
tmp = self.mkdtemp()
victim = os.path.join(tmp, 'killme')
os.mkdir(victim)
write_file(os.path.join(victim, 'somefile'), 'foo')
victim = os.fsencode(victim)
self.assertIsInstance(victim, bytes)
shutil.rmtree(victim)
@support.skip_unless_symlink
def test_rmtree_fails_on_symlink(self):
tmp = self.mkdtemp()
dir_ = os.path.join(tmp, 'dir')
os.mkdir(dir_)
link = os.path.join(tmp, 'link')
os.symlink(dir_, link)
self.assertRaises(OSError, shutil.rmtree, link)
self.assertTrue(os.path.exists(dir_))
self.assertTrue(os.path.lexists(link))
errors = []
def onerror(*args):
errors.append(args)
shutil.rmtree(link, onerror=onerror)
self.assertEqual(len(errors), 1)
self.assertIs(errors[0][0], os.path.islink)
self.assertEqual(errors[0][1], link)
self.assertIsInstance(errors[0][2][1], OSError)
@support.skip_unless_symlink
def test_rmtree_works_on_symlinks(self):
tmp = self.mkdtemp()
dir1 = os.path.join(tmp, 'dir1')
dir2 = os.path.join(dir1, 'dir2')
dir3 = os.path.join(tmp, 'dir3')
for d in dir1, dir2, dir3:
os.mkdir(d)
file1 = os.path.join(tmp, 'file1')
write_file(file1, 'foo')
link1 = os.path.join(dir1, 'link1')
os.symlink(dir2, link1)
link2 = os.path.join(dir1, 'link2')
os.symlink(dir3, link2)
link3 = os.path.join(dir1, 'link3')
os.symlink(file1, link3)
# make sure symlinks are removed but not followed
shutil.rmtree(dir1)
self.assertFalse(os.path.exists(dir1))
self.assertTrue(os.path.exists(dir3))
self.assertTrue(os.path.exists(file1))
@unittest.skipUnless(_winapi, 'only relevant on Windows')
def test_rmtree_fails_on_junctions(self):
tmp = self.mkdtemp()
dir_ = os.path.join(tmp, 'dir')
os.mkdir(dir_)
link = os.path.join(tmp, 'link')
_winapi.CreateJunction(dir_, link)
self.assertRaises(OSError, shutil.rmtree, link)
self.assertTrue(os.path.exists(dir_))
self.assertTrue(os.path.lexists(link))
errors = []
def onerror(*args):
errors.append(args)
shutil.rmtree(link, onerror=onerror)
self.assertEqual(len(errors), 1)
self.assertIs(errors[0][0], os.path.islink)
self.assertEqual(errors[0][1], link)
self.assertIsInstance(errors[0][2][1], OSError)
@unittest.skipUnless(_winapi, 'only relevant on Windows')
def test_rmtree_works_on_junctions(self):
tmp = self.mkdtemp()
dir1 = os.path.join(tmp, 'dir1')
dir2 = os.path.join(dir1, 'dir2')
dir3 = os.path.join(tmp, 'dir3')
for d in dir1, dir2, dir3:
os.mkdir(d)
file1 = os.path.join(tmp, 'file1')
write_file(file1, 'foo')
link1 = os.path.join(dir1, 'link1')
_winapi.CreateJunction(dir2, link1)
link2 = os.path.join(dir1, 'link2')
_winapi.CreateJunction(dir3, link2)
link3 = os.path.join(dir1, 'link3')
_winapi.CreateJunction(file1, link3)
# make sure junctions are removed but not followed
shutil.rmtree(dir1)
self.assertFalse(os.path.exists(dir1))
self.assertTrue(os.path.exists(dir3))
self.assertTrue(os.path.exists(file1))
def test_rmtree_errors(self):
# filename is guaranteed not to exist
filename = tempfile.mktemp()
self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
# test that ignore_errors option is honored
shutil.rmtree(filename, ignore_errors=True)
# existing file
tmpdir = self.mkdtemp()
write_file((tmpdir, "tstfile"), "")
filename = os.path.join(tmpdir, "tstfile")
with self.assertRaises(NotADirectoryError) as cm:
shutil.rmtree(filename)
# The reason for this rather odd construct is that Windows sprinkles
# a \*.* at the end of file names. But only sometimes on some buildbots
possible_args = [filename, os.path.join(filename, '*.*')]
self.assertIn(cm.exception.filename, possible_args)
self.assertTrue(os.path.exists(filename))
# test that ignore_errors option is honored
shutil.rmtree(filename, ignore_errors=True)
self.assertTrue(os.path.exists(filename))
errors = []
def onerror(*args):
errors.append(args)
shutil.rmtree(filename, onerror=onerror)
self.assertEqual(len(errors), 2)
self.assertIs(errors[0][0], os.scandir)
self.assertEqual(errors[0][1], filename)
self.assertIsInstance(errors[0][2][1], NotADirectoryError)
self.assertIn(errors[0][2][1].filename, possible_args)
self.assertIs(errors[1][0], os.rmdir)
self.assertEqual(errors[1][1], filename)
self.assertIsInstance(errors[1][2][1], NotADirectoryError)
self.assertIn(errors[1][2][1].filename, possible_args)
@unittest.skipIf(sys.platform[:6] == 'cygwin',
"This test can't be run on Cygwin (issue #1071513).")
@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
"This test can't be run reliably as root (issue #1076467).")
def test_on_error(self):
self.errorState = 0
os.mkdir(TESTFN)
self.addCleanup(shutil.rmtree, TESTFN)
self.child_file_path = os.path.join(TESTFN, 'a')
self.child_dir_path = os.path.join(TESTFN, 'b')
support.create_empty_file(self.child_file_path)
os.mkdir(self.child_dir_path)
old_dir_mode = os.stat(TESTFN).st_mode
old_child_file_mode = os.stat(self.child_file_path).st_mode
old_child_dir_mode = os.stat(self.child_dir_path).st_mode
# Make unwritable.
new_mode = stat.S_IREAD|stat.S_IEXEC
os.chmod(self.child_file_path, new_mode)
os.chmod(self.child_dir_path, new_mode)
os.chmod(TESTFN, new_mode)
self.addCleanup(os.chmod, TESTFN, old_dir_mode)
self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
# Test whether onerror has actually been called.
self.assertEqual(self.errorState, 3,
"Expected call to onerror function did not happen.")
def check_args_to_onerror(self, func, arg, exc):
Loading ...