"""Utility functions for copying and archiving files and directory trees.
XXX The functions here don't copy the resource fork or other metadata on Mac.
_WINDOWS = os.name == 'nt'
COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux")
_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS
# CMD defaults in Windows 10
_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC"
__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
"copytree", "move", "rmtree", "Error", "SpecialFileError",
"ExecError", "make_archive", "get_archive_formats",
"register_archive_format", "unregister_archive_format",
"get_unpack_formats", "register_unpack_format",
"unregister_unpack_format", "unpack_archive",
"ignore_patterns", "chown", "which", "get_terminal_size",
# disk_usage is added later, if available on the platform
class SameFileError(Error):
"""Raised when source and destination are the same file."""
class SpecialFileError(OSError):
"""Raised when trying to do a kind of operation (e.g. copying) which is
not supported on a special file (e.g. a named pipe)"""
class ExecError(OSError):
"""Raised when a command could not be executed"""
class ReadError(OSError):
"""Raised when an archive cannot be read"""
class RegistryError(Exception):
"""Raised when a registry operation with the archiving
and unpacking registries fails"""
class _GiveupOnFastCopy(Exception):
"""Raised as a signal to fallback on using raw read()/write()
file copy when fast-copy functions fail to do so.
def _fastcopy_fcopyfile(fsrc, fdst, flags):
"""Copy a regular file content or metadata by using high-performance
fcopyfile(3) syscall (macOS).
raise _GiveupOnFastCopy(err) # not a regular file
posix._fcopyfile(infd, outfd, flags)
err.filename2 = fdst.name
if err.errno in {errno.EINVAL, errno.ENOTSUP}:
raise _GiveupOnFastCopy(err)
def _fastcopy_sendfile(fsrc, fdst):
"""Copy data from one regular mmap-like fd to another by using
high-performance sendfile(2) syscall.
This should work on Linux >= 2.6.33 only.
# Note: copyfileobj() is left alone in order to not introduce any
# unexpected breakage. Possible risks by using zero-copy calls
# - fdst cannot be open in "a"(ppend) mode
# - fsrc and fdst may be open in "t"(ext) mode
# - fsrc may be a BufferedReader (which hides unread data in a buffer),
# GzipFile (which decompresses data), HTTPResponse (which decodes
# - possibly others (e.g. encrypted fs/partition?)
raise _GiveupOnFastCopy(err) # not a regular file
# Hopefully the whole file will be copied in a single call.
# sendfile() is called in a loop 'till EOF is reached (0 return)
# so a bufsize smaller or bigger than the actual file size
# should not make any difference, also in case the file content
# changes while being copied.
blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB
blocksize = 2 ** 27 # 128MiB
# On 32-bit architectures truncate to 1GiB to avoid OverflowError,
if sys.maxsize < 2 ** 32:
blocksize = min(blocksize, 2 ** 30)
sent = os.sendfile(outfd, infd, offset, blocksize)
# ...in oder to have a more informative exception.
err.filename2 = fdst.name
if err.errno == errno.ENOTSOCK:
# sendfile() on this platform (probably Linux < 2.6.33)
# does not support copies between regular files (only
raise _GiveupOnFastCopy(err)
if err.errno == errno.ENOSPC: # filesystem is full
# Give up on first call and if no data was copied.
if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0:
raise _GiveupOnFastCopy(err)
def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE):
"""readinto()/memoryview() based variant of copyfileobj().
*fsrc* must support readinto() method and both files must be
# Localize variable access to minimize overhead.
fsrc_readinto = fsrc.readinto
with memoryview(bytearray(length)) as mv:
def copyfileobj(fsrc, fdst, length=0):
"""copy data from file-like object fsrc to file-like object fdst"""
# Localize variable access to minimize overhead.
if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'):
return os.path.samestat(src.stat(), os.stat(dst))
if hasattr(os.path, 'samefile'):
return os.path.samefile(src, dst)
# All other platforms: check for same pathname.
return (os.path.normcase(os.path.abspath(src)) ==
os.path.normcase(os.path.abspath(dst)))
return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn)
return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn)
def copyfile(src, dst, *, follow_symlinks=True):
"""Copy data from src to dst in the most efficient way possible.
If follow_symlinks is not set and src is a symbolic link, a new
symlink will be created instead of copying the file it points to.
sys.audit("shutil.copyfile", src, dst)
raise SameFileError("{!r} and {!r} are the same file".format(src, dst))
for i, fn in enumerate([src, dst]):
# File most likely does not exist
# XXX What about other special files? (sockets, devices...)
if stat.S_ISFIFO(st.st_mode):
fn = fn.path if isinstance(fn, os.DirEntry) else fn
raise SpecialFileError("`%s` is a named pipe" % fn)
if not follow_symlinks and _islink(src):
os.symlink(os.readlink(src), dst)
with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst:
_fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA)
except _GiveupOnFastCopy:
_fastcopy_sendfile(fsrc, fdst)
except _GiveupOnFastCopy:
# https://github.com/python/cpython/pull/7160#discussion_r195405230
elif _WINDOWS and file_size > 0:
_copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE))
# Issue 43219, raise a less confusing exception
except IsADirectoryError as e:
raise FileNotFoundError(f'Directory does not exist: {dst}') from e
def copymode(src, dst, *, follow_symlinks=True):
"""Copy mode bits from src to dst.
If follow_symlinks is not set, symlinks aren't followed if and only
if both `src` and `dst` are symlinks. If `lchmod` isn't available
(e.g. Linux) this method does nothing.
sys.audit("shutil.copymode", src, dst)
if not follow_symlinks and _islink(src) and os.path.islink(dst):
if hasattr(os, 'lchmod'):
stat_func, chmod_func = os.lstat, os.lchmod
stat_func, chmod_func = _stat, os.chmod
chmod_func(dst, stat.S_IMODE(st.st_mode))
if hasattr(os, 'listxattr'):
def _copyxattr(src, dst, *, follow_symlinks=True):
"""Copy extended filesystem attributes from `src` to `dst`.
Overwrite existing attributes.
If `follow_symlinks` is false, symlinks won't be followed.
names = os.listxattr(src, follow_symlinks=follow_symlinks)
if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL):
value = os.getxattr(src, name, follow_symlinks=follow_symlinks)
os.setxattr(dst, name, value, follow_symlinks=follow_symlinks)
if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA,
def _copyxattr(*args, **kwargs):
def copystat(src, dst, *, follow_symlinks=True):
Copy the permission bits, last access time, last modification time, and
flags from `src` to `dst`. On Linux, copystat() also copies the "extended
attributes" where possible. The file contents, owner, and group are
unaffected. `src` and `dst` are path-like objects or path names given as
If the optional flag `follow_symlinks` is not set, symlinks aren't
followed if and only if both `src` and `dst` are symlinks.
sys.audit("shutil.copystat", src, dst)
def _nop(*args, ns=None, follow_symlinks=None):
# follow symlinks (aka don't not follow symlinks)
follow = follow_symlinks or not (_islink(src) and os.path.islink(dst))
# use the real function if it exists
return getattr(os, name, _nop)
# use the real function only if it exists
# *and* it supports follow_symlinks
fn = getattr(os, name, _nop)
if fn in os.supports_follow_symlinks:
if isinstance(src, os.DirEntry):
st = src.stat(follow_symlinks=follow)
st = lookup("stat")(src, follow_symlinks=follow)
mode = stat.S_IMODE(st.st_mode)
lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns),
# We must copy extended attributes before the file is (potentially)
# chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
_copyxattr(src, dst, follow_symlinks=follow)
lookup("chmod")(dst, mode, follow_symlinks=follow)
except NotImplementedError:
# if we got a NotImplementedError, it's because
# * follow_symlinks=False,
# * lchown() is unavailable, and
# * fchownat() is unavailable or
# * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
# therefore we're out of options--we simply cannot chown the
# symlink. give up, suppress the error.
# (which is what shutil always did in this circumstance.)
if hasattr(st, 'st_flags'):
lookup("chflags")(dst, st.st_flags, follow_symlinks=follow)
for err in 'EOPNOTSUPP', 'ENOTSUP':
if hasattr(errno, err) and why.errno == getattr(errno, err):
def copy(src, dst, *, follow_symlinks=True):
"""Copy data and mode bits ("cp src dst"). Return the file's destination.
The destination may be a directory.
If follow_symlinks is false, symlinks won't be followed. This
resembles GNU's "cp -P src dst".
If source and destination are the same file, a SameFileError will be
dst = os.path.join(dst, os.path.basename(src))
copyfile(src, dst, follow_symlinks=follow_symlinks)
copymode(src, dst, follow_symlinks=follow_symlinks)
def copy2(src, dst, *, follow_symlinks=True):
"""Copy data and metadata. Return the file's destination.
Metadata is copied with copystat(). Please see the copystat function
The destination may be a directory.
If follow_symlinks is false, symlinks won't be followed. This
resembles GNU's "cp -P src dst".
dst = os.path.join(dst, os.path.basename(src))
copyfile(src, dst, follow_symlinks=follow_symlinks)
copystat(src, dst, follow_symlinks=follow_symlinks)
def ignore_patterns(*patterns):
"""Function that can be used as copytree() ignore parameter.
Patterns is a sequence of glob-style patterns
that are used to exclude files"""
def _ignore_patterns(path, names):
ignored_names.extend(fnmatch.filter(names, pattern))
return set(ignored_names)
def _copytree(entries, src, dst, symlinks, ignore, copy_function,
ignore_dangling_symlinks, dirs_exist_ok=False):
ignored_names = ignore(os.fspath(src), [x.name for x in entries])
os.makedirs(dst, exist_ok=dirs_exist_ok)
use_srcentry = copy_function is copy2 or copy_function is copy
if srcentry.name in ignored_names:
srcname = os.path.join(src, srcentry.name)
dstname = os.path.join(dst, srcentry.name)
srcobj = srcentry if use_srcentry else srcname
is_symlink = srcentry.is_symlink()
if is_symlink and os.name == 'nt':
# Special check for directory junctions, which appear as
# symlinks but we want to recurse.
lstat = srcentry.stat(follow_symlinks=False)
if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT:
linkto = os.readlink(srcname)
# We can't just leave it to `copy_function` because legacy
# code with a custom `copy_function` may rely on copytree
os.symlink(linkto, dstname)
copystat(srcobj, dstname, follow_symlinks=not symlinks)
# ignore dangling symlink if the flag is on
if not os.path.exists(linkto) and ignore_dangling_symlinks:
# otherwise let the copy occur. copy2 will raise an error
copytree(srcobj, dstname, symlinks, ignore,
copy_function, dirs_exist_ok=dirs_exist_ok)