From 8c8e6fda0b953f86c75b193e0ce1df34e5d6ad00 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sun, 16 Apr 2023 11:32:57 +0530 Subject: [PATCH] Code to rename a group of files while keeping them locked --- src/calibre/utils/copy_files.py | 17 +++++++++++++++++ src/calibre/utils/copy_files_test.py | 11 ++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/calibre/utils/copy_files.py b/src/calibre/utils/copy_files.py index 540b96bf31..6715c7d32a 100644 --- a/src/calibre/utils/copy_files.py +++ b/src/calibre/utils/copy_files.py @@ -33,6 +33,10 @@ class UnixFileCopier: def __exit__(self, *a) -> None: pass + def rename_all(self) -> None: + for src_path, dest_path in self.copy_map.items(): + os.replace(src_path, dest_path) + def copy_all(self) -> None: for src_path, dest_path in self.copy_map.items(): with suppress(OSError): @@ -114,6 +118,10 @@ class WindowsFileCopier: f.write(raw) shutil.copystat(src_path, dest_path, follow_symlinks=False) + def rename_all(self) -> None: + for src_path, dest_path in self.copy_map.items(): + winutil.move_file(src_path, dest_path) + def delete_all_source_files(self) -> None: for src_path in self.copy_map: winutil.delete_file(make_long_path_useable(src_path)) @@ -123,6 +131,15 @@ def get_copier() -> Union[UnixFileCopier | WindowsFileCopier]: return WindowsFileCopier() if iswindows else UnixFileCopier() +def rename_files(src_to_dest_map: Dict[str, str]) -> None: + ' Rename a bunch of files. On Windows all files are locked before renaming so no other process can interfere. ' + copier = get_copier() + for s, d in src_to_dest_map.items(): + copier.register(s, d) + with copier: + copier.rename_all() + + def copy_files(src_to_dest_map: Dict[str, str], delete_source: bool = False) -> None: copier = get_copier() for s, d in src_to_dest_map.items(): diff --git a/src/calibre/utils/copy_files_test.py b/src/calibre/utils/copy_files_test.py index 1d7394bb39..4fd17cf33f 100644 --- a/src/calibre/utils/copy_files_test.py +++ b/src/calibre/utils/copy_files_test.py @@ -10,7 +10,7 @@ import unittest from calibre import walk from calibre.constants import iswindows -from .copy_files import copy_tree +from .copy_files import copy_tree, rename_files from .filenames import nlinks_file @@ -52,6 +52,15 @@ class TestCopyFiles(unittest.TestCase): self.tearDown() self.setUp() + def test_renaming_of_files(self): + for name in 'one two'.split(): + with open(os.path.join(self.tdir, name), 'w') as f: + f.write(name) + renames = {os.path.join(self.tdir, k): os.path.join(self.tdir, v) for k, v in {'one': 'One', 'two': 'three'}.items()} + rename_files(renames) + contents = set(os.listdir(self.tdir)) - {'base', 'src'} + self.ae(contents, {'One', 'three'}) + def test_copying_of_trees(self): src, dest = self.s(), self.d() copy_tree(src, dest)