# -*- coding: utf-8 -*-
#
# Copyright (C) 2012 The Python Software Foundation.
# See LICENSE.txt and CONTRIBUTORS.txt.
#
"""Utility functions for copying and archiving files and directory trees.
XXX The functions here don't copy the resource fork or other metadata on Mac.
"""
import os
import sys
import stat
from os.path import abspath
import fnmatch
import collections
import errno
from . import tarfile
try:
import bz2
_BZ2_SUPPORTED = True
except ImportError:
_BZ2_SUPPORTED = False
try:
from pwd import getpwnam
except ImportError:
getpwnam = None
try:
from grp import getgrnam
except ImportError:
getgrnam = None
__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
"copytree", "move", "rmtree", "Error", "SpecialFileError",
"ExecError", "make_archive", "get_archive_formats",
"register_archive_format", "unregister_archive_format",
"get_unpack_formats", "register_unpack_format",
"unregister_unpack_format", "unpack_archive", "ignore_patterns"]
class Error(EnvironmentError):
pass
class SpecialFileError(EnvironmentError):
"""Raised when trying to do a kind of operation (e.g. copying) which is
not supported on a special file (e.g. a named pipe)"""
class ExecError(EnvironmentError):
"""Raised when a command could not be executed"""
class ReadError(EnvironmentError):
"""Raised when an archive cannot be read"""
class RegistryError(Exception):
"""Raised when a registry operation with the archiving
and unpacking registries fails"""
try:
WindowsError
except NameError:
WindowsError = None
def copyfileobj(fsrc, fdst, length=16*1024):
"""copy data from file-like object fsrc to file-like object fdst"""
while 1:
buf = fsrc.read(length)
if not buf:
break
fdst.write(buf)
def _samefile(src, dst):
# Macintosh, Unix.
if hasattr(os.path, 'samefile'):
try:
return os.path.samefile(src, dst)
except OSError:
return False
# All other platforms: check for same pathname.
return (os.path.normcase(os.path.abspath(src)) ==
os.path.normcase(os.path.abspath(dst)))
def copyfile(src, dst):
"""Copy data from src to dst"""
if _samefile(src, dst):
raise Error("`%s` and `%s` are the same file" % (src, dst))
for fn in [src, dst]:
try:
st = os.stat(fn)
except OSError:
# File most likely does not exist
pass
else:
# XXX What about other special files? (sockets, devices...)
if stat.S_ISFIFO(st.st_mode):
raise SpecialFileError("`%s` is a named pipe" % fn)
with open(src, 'rb') as fsrc:
with open(dst, 'wb') as fdst:
copyfileobj(fsrc, fdst)
def copymode(src, dst):
"""Copy mode bits from src to dst"""
if hasattr(os, 'chmod'):
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
os.chmod(dst, mode)
def copystat(src, dst):
"""Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
if hasattr(os, 'utime'):
os.utime(dst, (st.st_atime, st.st_mtime))
if hasattr(os, 'chmod'):
os.chmod(dst, mode)
if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
try:
os.chflags(dst, st.st_flags)
except OSError as why:
if (not hasattr(errno, 'EOPNOTSUPP') or
why.errno != errno.EOPNOTSUPP):
raise
def copy(src, dst):
"""Copy data and mode bits ("cp src dst").
The destination may be a directory.
"""
if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
copyfile(src, dst)
copymode(src, dst)
def copy2(src, dst):
"""Copy data and all stat info ("cp -p src dst").
The destination may be a directory.
"""
if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
copyfile(src, dst)
copystat(src, dst)
def ignore_patterns(*patterns):
"""Function that can be used as copytree() ignore parameter.
Patterns is a sequence of glob-style patterns
that are used to exclude files"""
def _ignore_patterns(path, names):
ignored_names = []
for pattern in patterns:
ignored_names.extend(fnmatch.filter(names, pattern))
return set(ignored_names)
return _ignore_patterns
def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
ignore_dangling_symlinks=False):
"""Recursively copy a directory tree.
The destination directory must not already exist.
If exception(s) occur, an Error is raised with a list of reasons.
If the optional symlinks flag is true, symbolic links in the
source tree result in symbolic links in the destination tree; if
it is false, the contents of the files pointed to by symbolic
links are copied. If the file pointed by the symlink doesn't
exist, an exception will be added in the list of errors raised in
an Error exception at the end of the copy process.
You can set the optional ignore_dangling_symlinks flag to true if you
want to silence this exception. Notice that this has no effect on
platforms that don't support os.symlink.
The optional ignore argument is a callable. If given, it
is called with the `src` parameter, which is the directory
being visited by copytree(), and `names` which is the list of
`src` contents, as returned by os.listdir():
callable(src, names) -> ignored_names
Since copytree() is called recursively, the callable will be
called once for each directory that is copied. It returns a
list of names relative to the `src` directory that should
not be copied.
The optional copy_function argument is a callable that will be used
to copy each file. It will be called with the source path and the
destination path as arguments. By default, copy2() is used, but any
function that supports the same signature (like copy()) can be used.
"""
names = os.listdir(src)
if ignore is not None:
ignored_names = ignore(src, names)
else:
ignored_names = set()
os.makedirs(dst)
errors = []
for name in names:
if name in ignored_names:
continue
srcname = os.path.join(src, name)
dstname = os.path.join(dst, name)
try:
if os.path.islink(srcname):
linkto = os.readlink(srcname)
if symlinks:
os.symlink(linkto, dstname)
else:
# ignore dangling symlink if the flag is on
if not os.path.exists(linkto) and ignore_dangling_symlinks:
continue
# otherwise let the copy occurs. copy2 will raise an error
copy_function(srcname, dstname)
elif os.path.isdir(srcname):
copytree(srcname, dstname, symlinks, ignore, copy_function)
else:
# Will raise a SpecialFileError for unsupported file types
copy_function(srcname, dstname)
# catch the Error from the recursive copytree so that we can
# continue with other files
except Error as err:
errors.extend(err.args[0])
except EnvironmentError as why:
errors.append((srcname, dstname, str(why)))
try:
copystat(src, dst)
except OSError as why:
if WindowsError is not None and isinstance(why, WindowsError):
# Copying file access times may fail on Windows
pass
else:
errors.extend((src, dst, str(why)))
if errors:
raise Error(errors)
def rmtree(path, ignore_errors=False, onerror=None):
"""Recursively delete a directory tree.
If ignore_errors is set, errors are ignored; otherwise, if onerror
is set, it is called to handle the error with arguments (func,
path, exc_info) where func is os.listdir, os.remove, or os.rmdir;
path is the argument to that function that caused it to fail; and
exc_info is a tuple returned by sys.exc_info(). If ignore_errors
is false and onerror is None, an exception is raised.
"""
if ignore_errors:
def onerror(*args):
pass
elif onerror is None:
def onerror(*args):
raise
try:
if os.path.islink(path):
# symlinks to directories are forbidden, see bug #1669
raise OSError("Cannot call rmtree on a symbolic link")
except OSError:
onerror(os.path.islink, path, sys.exc_info())
# can't continue even if onerror hook returns
return
names = []
try:
names = os.listdir(path)
except os.error:
onerror(os.listdir, path, sys.exc_info())
for name in names:
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except os.error:
mode = 0
if stat.S_ISDIR(mode):
rmtree(fullname, ignore_errors, onerror)
else:
try:
os.remove(fullname)
except os.error:
onerror(os.remove, fullname, sys.exc_info())
try:
os.rmdir(path)
except os.error:
onerror(os.rmdir, path, sys.exc_info())
def _basename(path):
# A basename() variant which first strips the trailing slash, if present.
# Thus we always get the last component of the path, even for directories.
return os.path.basename(path.rstrip(os.path.sep))
def move(src, dst):
"""Recursively move a file or directory to another location. This is
similar to the Unix "mv" command.
If the destination is a directory or a symlink to a directory, the source
is moved inside the directory. The destination path must not already
exist.
If the destination already exists but is not a directory, it may be
overwritten depending on os.rename() semantics.
If the destination is on our current filesystem, then rename() is used.
Otherwise, src is copied to the destination and then removed.
A lot more could be done here... A look at a mv.c shows a lot of
the issues this implementation glosses over.
"""
real_dst = dst
if os.path.isdir(dst):
if _samefile(src, dst):
# We might be on a case insensitive filesystem,
# perform the rename anyway.
os.rename(src, dst)
return
real_dst = os.path.join(dst, _basename(src))
if os.path.exists(real_dst):
raise Error("Destination path '%s' already exists" % real_dst)
try:
os.rename(src, real_dst)
except OSError:
if os.path.isdir(src):
if _destinsrc(src, dst):
raise Error("Cannot move a directory '%s' into itself '%s'." % (src, dst))
copytree(src, real_dst, symlinks=True)
rmtree(src)
else:
copy2(src, real_dst)
os.unlink(src)
def _destinsrc(src, dst):
src = abspath(src)
dst = abspath(dst)
if not src.endswith(os.path.sep):
src += os.path.sep
if not dst.endswith(os.path.sep):
Loading ...