From 9d9ee6dd5f2cc03adc7e9954d732582e020830b5 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Thu, 15 Jun 2023 09:38:54 +0530 Subject: [PATCH] Refactor for more clarity --- src/calibre/utils/copy_files.py | 64 ++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/src/calibre/utils/copy_files.py b/src/calibre/utils/copy_files.py index 7adcd12c3b..c0973a996f 100644 --- a/src/calibre/utils/copy_files.py +++ b/src/calibre/utils/copy_files.py @@ -200,6 +200,40 @@ def copy_files(src_to_dest_map: Dict[str, str], delete_source: bool = False) -> copier.copy_all() +def register_folder_recursively( + src: str, copier: Union[UnixFileCopier, WindowsFileCopier], dest_dir: str, + transform_destination_filename: Callable[[str, str], str] = lambda src_path, dest_path : dest_path, +) -> None: + + def dest_from_entry(dirpath: str, x: str) -> str: + path = os.path.join(dirpath, x) + rel = os.path.relpath(path, src) + return os.path.join(dest_dir, rel) + + def raise_error(e: OSError) -> None: + raise e + + copier.register_folder(src) + for (dirpath, dirnames, filenames) in os.walk(src, onerror=raise_error): + for d in dirnames: + path = os.path.join(dirpath, d) + dest = dest_from_entry(dirpath, d) + os.makedirs(make_long_path_useable(dest), exist_ok=True) + shutil.copystat(make_long_path_useable(path), make_long_path_useable(dest), follow_symlinks=False) + copier.register_folder(path) + for f in filenames: + path = os.path.join(dirpath, f) + dest = dest_from_entry(dirpath, f) + dest = transform_destination_filename(path, dest) + if not iswindows: + s = os.stat(path, follow_symlinks=False) + if stat.S_ISLNK(s.st_mode): + link_dest = os.readlink(path) + os.symlink(link_dest, dest) + continue + copier.register(path, dest) + + def copy_tree( src: str, dest: str, transform_destination_filename: Callable[[str, str], str] = lambda src_path, dest_path : dest_path, @@ -222,36 +256,8 @@ def copy_tree( raise ValueError(f'Cannot copy tree if the source and destination are the same: {src!r} == {dest!r}') dest_dir = dest - def raise_error(e: OSError) -> None: - raise e - - def dest_from_entry(dirpath: str, x: str) -> str: - path = os.path.join(dirpath, x) - rel = os.path.relpath(path, src) - return os.path.join(dest_dir, rel) - - copier = get_copier(delete_source) - copier.register_folder(src) - for (dirpath, dirnames, filenames) in os.walk(src, onerror=raise_error): - for d in dirnames: - path = os.path.join(dirpath, d) - dest = dest_from_entry(dirpath, d) - os.makedirs(make_long_path_useable(dest), exist_ok=True) - shutil.copystat(make_long_path_useable(path), make_long_path_useable(dest), follow_symlinks=False) - copier.register_folder(path) - for f in filenames: - path = os.path.join(dirpath, f) - dest = dest_from_entry(dirpath, f) - dest = transform_destination_filename(path, dest) - if not iswindows: - s = os.stat(path, follow_symlinks=False) - if stat.S_ISLNK(s.st_mode): - link_dest = os.readlink(path) - os.symlink(link_dest, dest) - continue - copier.register(path, dest) - + register_folder_recursively(src, copier, dest_dir, transform_destination_filename) with copier: copier.copy_all()