Use the same code to check if folder is in use as to move the folder

This commit is contained in:
Kovid Goyal 2023-06-15 09:58:08 +05:30
parent 9d9ee6dd5f
commit c42af1f3dc
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 22 additions and 14 deletions

View File

@ -40,12 +40,14 @@ from calibre.library.field_metadata import FieldMetadata
from calibre.ptempfile import PersistentTemporaryFile, TemporaryFile from calibre.ptempfile import PersistentTemporaryFile, TemporaryFile
from calibre.utils import pickle_binary_string, unpickle_binary_string from calibre.utils import pickle_binary_string, unpickle_binary_string
from calibre.utils.config import from_json, prefs, to_json, tweaks from calibre.utils.config import from_json, prefs, to_json, tweaks
from calibre.utils.copy_files import copy_files, copy_tree, rename_files from calibre.utils.copy_files import (
copy_files, copy_tree, rename_files, windows_check_if_files_in_use,
)
from calibre.utils.date import EPOCH, parse_date, utcfromtimestamp, utcnow from calibre.utils.date import EPOCH, parse_date, utcfromtimestamp, utcnow
from calibre.utils.filenames import ( from calibre.utils.filenames import (
WindowsAtomicFolderMove, ascii_filename, atomic_rename, copyfile_using_links, ascii_filename, atomic_rename, copyfile_using_links, copytree_using_links,
copytree_using_links, hardlink_file, is_case_sensitive, is_fat_filesystem, hardlink_file, is_case_sensitive, is_fat_filesystem, make_long_path_useable,
make_long_path_useable, remove_dir_if_empty, samefile, remove_dir_if_empty, samefile,
) )
from calibre.utils.formatter_functions import ( from calibre.utils.formatter_functions import (
compile_user_template_functions, formatter_functions, load_user_template_functions, compile_user_template_functions, formatter_functions, load_user_template_functions,
@ -1717,19 +1719,14 @@ class DB:
def windows_check_if_files_in_use(self, paths): def windows_check_if_files_in_use(self, paths):
''' '''
Raises an EACCES IOError if any of the files in the folder of book_id Raises an EACCES IOError if any of the files in the specified folders
are opened in another program on windows. are opened in another program on windows.
''' '''
if iswindows: if iswindows:
for path in paths: for path in paths:
spath = os.path.join(self.library_path, *path.split('/')) spath = os.path.join(self.library_path, *path.split('/'))
wam = None
if os.path.exists(spath): if os.path.exists(spath):
try: windows_check_if_files_in_use(spath)
wam = WindowsAtomicFolderMove(spath)
finally:
if wam is not None:
wam.close_handles()
def add_format(self, book_id, fmt, stream, title, author, path, current_name, mtime=None): def add_format(self, book_id, fmt, stream, title, author, path, current_name, mtime=None):
fmt = ('.' + fmt.lower()) if fmt else '' fmt = ('.' + fmt.lower()) if fmt else ''

View File

@ -8,7 +8,7 @@ import stat
import time import time
from collections import defaultdict from collections import defaultdict
from contextlib import suppress from contextlib import suppress
from typing import Callable, Dict, Set, Tuple, Union, List from typing import Callable, Dict, List, Set, Tuple, Union
from calibre.constants import filesystem_encoding, iswindows from calibre.constants import filesystem_encoding, iswindows
from calibre.utils.filenames import make_long_path_useable, samefile, windows_hardlink from calibre.utils.filenames import make_long_path_useable, samefile, windows_hardlink
@ -200,9 +200,13 @@ def copy_files(src_to_dest_map: Dict[str, str], delete_source: bool = False) ->
copier.copy_all() copier.copy_all()
def identity_transform(src_path: str, dest_path: str) -> str:
return dest_path
def register_folder_recursively( def register_folder_recursively(
src: str, copier: Union[UnixFileCopier, WindowsFileCopier], dest_dir: str, src: str, copier: Union[UnixFileCopier, WindowsFileCopier], dest_dir: str,
transform_destination_filename: Callable[[str, str], str] = lambda src_path, dest_path : dest_path, transform_destination_filename: Callable[[str, str], str] = identity_transform,
) -> None: ) -> None:
def dest_from_entry(dirpath: str, x: str) -> str: def dest_from_entry(dirpath: str, x: str) -> str:
@ -234,9 +238,16 @@ def register_folder_recursively(
copier.register(path, dest) copier.register(path, dest)
def windows_check_if_files_in_use(src_folder: str) -> None:
copier = get_copier()
register_folder_recursively(src_folder, copier, os.getcwd())
with copier:
pass
def copy_tree( def copy_tree(
src: str, dest: str, src: str, dest: str,
transform_destination_filename: Callable[[str, str], str] = lambda src_path, dest_path : dest_path, transform_destination_filename: Callable[[str, str], str] = identity_transform,
delete_source: bool = False delete_source: bool = False
) -> None: ) -> None:
''' '''