Connect to folder: Allow connecting to the specified folder as a specific device. Works if the folder contains the contents of the device filesystem, such as when the device is mounted at that folder.

This commit is contained in:
Kovid Goyal 2025-03-06 14:14:58 +05:30
parent ae94ec0bc9
commit 5cf5cf04de
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
6 changed files with 94 additions and 19 deletions

View File

@ -733,7 +733,7 @@ def all_edit_book_tool_plugins():
_initialized_plugins = [] _initialized_plugins = []
def initialize_plugin(plugin, path_to_zip_file, installation_type): def initialize_plugin(plugin, path_to_zip_file=None, installation_type=PluginInstallationType.BUILTIN):
try: try:
p = plugin(path_to_zip_file) p = plugin(path_to_zip_file)
p.installation_type = installation_type p.installation_type = installation_type

View File

@ -4,6 +4,7 @@ Created on 15 May 2010
@author: charles @author: charles
''' '''
import os import os
from contextlib import suppress
from calibre.devices.usbms.driver import USBMS, BookList from calibre.devices.usbms.driver import USBMS, BookList
from calibre.ebooks import BOOK_EXTENSIONS from calibre.ebooks import BOOK_EXTENSIONS
@ -70,6 +71,12 @@ class FOLDER_DEVICE(USBMS):
self.booklist_class = BookList self.booklist_class = BookList
self.is_connected = True self.is_connected = True
def is_folder_still_available(self):
with suppress(OSError):
if self._main_prefix:
return os.path.isdir(self._main_prefix)
return False
def reset(self, key='-1', log_packets=False, report_progress=None, def reset(self, key='-1', log_packets=False, report_progress=None,
detected_device=None): detected_device=None):
pass pass

View File

@ -8,6 +8,8 @@ from calibre import prints
from calibre.constants import iswindows from calibre.constants import iswindows
from calibre.customize import Plugin from calibre.customize import Plugin
FAKE_DEVICE_SERIAL = '__fake_device_for_use_with_connect_to_folder__:'
class ModelMetadata(NamedTuple): class ModelMetadata(NamedTuple):
manufacturer_name: str manufacturer_name: str
@ -21,6 +23,10 @@ class ModelMetadata(NamedTuple):
def settings_key(self) -> str: def settings_key(self) -> str:
return f'{self.manufacturer_name} - {self.model_name}' return f'{self.manufacturer_name} - {self.model_name}'
def detected_device(self, folder_path):
from calibre.devices.scanner import USBDevice
return USBDevice(self.vendor_id, self.product_id, self.bcd, self.manufacturer_name, self.model_name, FAKE_DEVICE_SERIAL + folder_path)
class OpenPopupMessage: class OpenPopupMessage:

View File

@ -16,12 +16,13 @@ import subprocess
import sys import sys
import time import time
from collections import namedtuple from collections import namedtuple
from contextlib import suppress
from itertools import repeat from itertools import repeat
from calibre import prints from calibre import prints
from calibre.constants import is_debugging, isfreebsd, islinux, ismacos, iswindows from calibre.constants import is_debugging, isfreebsd, islinux, ismacos, iswindows
from calibre.devices.errors import DeviceError from calibre.devices.errors import DeviceError
from calibre.devices.interface import DevicePlugin, ModelMetadata from calibre.devices.interface import FAKE_DEVICE_SERIAL, DevicePlugin, ModelMetadata
from calibre.devices.usbms.deviceconfig import DeviceConfig from calibre.devices.usbms.deviceconfig import DeviceConfig
from calibre.utils.filenames import ascii_filename as sanitize from calibre.utils.filenames import ascii_filename as sanitize
from polyglot.builtins import iteritems, string_or_bytes from polyglot.builtins import iteritems, string_or_bytes
@ -125,6 +126,9 @@ class Device(DeviceConfig, DevicePlugin):
#: Put news in its own folder #: Put news in its own folder
NEWS_IN_FOLDER = True NEWS_IN_FOLDER = True
connected_folder_path = '' # used internally for fake folder device
eject_connected_folder = False
@classmethod @classmethod
def model_metadata(cls) -> tuple[ModelMetadata, ...]: def model_metadata(cls) -> tuple[ModelMetadata, ...]:
def get_representative_ids() -> tuple[int, int, int]: def get_representative_ids() -> tuple[int, int, int]:
@ -741,9 +745,35 @@ class Device(DeviceConfig, DevicePlugin):
self._card_b_prefix = self._card_b_vol = None self._card_b_prefix = self._card_b_vol = None
# ------------------------------------------------------ # ------------------------------------------------------
def is_folder_still_available(self):
if self.eject_connected_folder:
self.eject_connected_folder = False
self.connected_folder_path = ''
with suppress(OSError):
if self.connected_folder_path:
return os.path.isdir(self.connected_folder_path)
return False
def open(self, connected_device, library_uuid): def open(self, connected_device, library_uuid):
time.sleep(5)
self._main_prefix = self._card_a_prefix = self._card_b_prefix = None self._main_prefix = self._card_a_prefix = self._card_b_prefix = None
self.connected_folder_path = ''
if connected_device.serial and connected_device.serial.startswith(FAKE_DEVICE_SERIAL):
folder_path = connected_device.serial[len(FAKE_DEVICE_SERIAL):]
if not os.path.isdir(folder_path):
raise DeviceError(f'The path {folder_path} is not a folder cannot connect to it')
if not os.access(folder_path, os.R_OK | os.W_OK):
raise DeviceError(f'You do not have permission to read and write to {folder_path} cannot connect to it')
self._main_prefix = folder_path
self.current_library_uuid = library_uuid
self.device_being_opened = connected_device
try:
self.post_open_callback()
finally:
self.device_being_opened = None
self.connected_folder_path = folder_path
return
time.sleep(5)
self.device_being_opened = connected_device self.device_being_opened = connected_device
try: try:
if islinux: if islinux:
@ -816,6 +846,10 @@ class Device(DeviceConfig, DevicePlugin):
print('Udisks eject call for:', d, 'failed:') print('Udisks eject call for:', d, 'failed:')
print('\t', e) print('\t', e)
def unmount_device(self):
if self.connected_folder_path:
self.eject_connected_folder = True
def eject(self): def eject(self):
if islinux: if islinux:
try: try:

View File

@ -16,7 +16,7 @@ from qt.core import QAction, QActionGroup, QCoreApplication, QDialog, QDialogBut
from calibre import as_unicode, force_unicode, preferred_encoding, prints, sanitize_file_name from calibre import as_unicode, force_unicode, preferred_encoding, prints, sanitize_file_name
from calibre.constants import DEBUG from calibre.constants import DEBUG
from calibre.customize.ui import available_input_formats, available_output_formats, device_plugins, disabled_device_plugins from calibre.customize.ui import available_input_formats, available_output_formats, device_plugins, disabled_device_plugins, initialize_plugin
from calibre.devices.errors import ( from calibre.devices.errors import (
BlacklistedDevice, BlacklistedDevice,
FreeSpaceError, FreeSpaceError,
@ -35,7 +35,6 @@ from calibre.ebooks.metadata import authors_to_string
from calibre.gui2 import ( from calibre.gui2 import (
Dispatcher, Dispatcher,
FunctionDispatcher, FunctionDispatcher,
choose_dir,
config, config,
dynamic, dynamic,
error_dialog, error_dialog,
@ -319,6 +318,10 @@ class DeviceManager(Thread): # {{{
self.connected_slot(False, None) self.connected_slot(False, None)
def detect_device(self): def detect_device(self):
if self.is_device_connected and self.connected_device_kind in {'folder', 'folder-as-device'}:
if not self.connected_device.is_folder_still_available():
self.connected_device_removed()
return
self.scanner.scan() self.scanner.scan()
if self.is_device_connected: if self.is_device_connected:
@ -391,8 +394,8 @@ class DeviceManager(Thread): # {{{
# Mount devices that don't use USB, such as the folder device # Mount devices that don't use USB, such as the folder device
# This will be called on the GUI thread. Because of this, we must store # This will be called on the GUI thread. Because of this, we must store
# information that the scanner thread will use to do the real work. # information that the scanner thread will use to do the real work.
def mount_device(self, kls, kind, path): def mount_device(self, kls, kind, path, model_metadata=None):
self.mount_connection_requests.put((kls, kind, path)) self.mount_connection_requests.put((kls, kind, path, model_metadata))
# disconnect a device # disconnect a device
def umount_device(self, *args): def umount_device(self, *args):
@ -439,13 +442,29 @@ class DeviceManager(Thread): # {{{
self.devices_initialized.set() self.devices_initialized.set()
while self.keep_going: while self.keep_going:
kls = None kls = model_metadata = None
while True: while True:
try: try:
kls, device_kind, folder_path = self.mount_connection_requests.get_nowait() kls, device_kind, folder_path, model_metadata = self.mount_connection_requests.get_nowait()
except queue.Empty: except queue.Empty:
break break
if kls is not None: if model_metadata is not None:
try:
for candidate in self.devices:
if type(candidate) is model_metadata.driver_class:
dev = candidate
break
else:
# new device instance so run startup and set flag to
# run shutdown on disconnect
dev = initialize_plugin(model_metadata.driver_class)
self.run_startup(dev)
self.call_shutdown_on_disconnect = True
self.do_connect([[dev, model_metadata.detected_device(folder_path)],], device_kind=device_kind)
except Exception:
prints(f'Unable to open {device_kind} as device ({folder_path})')
traceback.print_exc()
elif kls is not None:
try: try:
dev = kls(folder_path) dev = kls(folder_path)
# We just created a new device instance. Call its startup # We just created a new device instance. Call its startup
@ -454,7 +473,7 @@ class DeviceManager(Thread): # {{{
self.run_startup(dev) self.run_startup(dev)
self.call_shutdown_on_disconnect = True self.call_shutdown_on_disconnect = True
self.do_connect([[dev, None],], device_kind=device_kind) self.do_connect([[dev, None],], device_kind=device_kind)
except: except Exception:
prints(f'Unable to open {device_kind} as device ({folder_path})') prints(f'Unable to open {device_kind} as device ({folder_path})')
traceback.print_exc() traceback.print_exc()
else: else:
@ -994,16 +1013,20 @@ class DeviceMixin: # {{{
self.default_thumbnail_prefs = prefs = override_prefs(cprefs) self.default_thumbnail_prefs = prefs = override_prefs(cprefs)
scale_cover(prefs, ratio) scale_cover(prefs, ratio)
def connect_to_folder_named(self, folder): def connect_to_folder_named(self, folder, model_metadata=None):
if os.path.exists(folder) and os.path.isdir(folder): if os.path.exists(folder) and os.path.isdir(folder):
self.device_manager.mount_device(kls=FOLDER_DEVICE, kind='folder', if model_metadata is not None:
path=folder) self.device_manager.mount_device(kls=None, kind='folder-as-device', path=folder, model_metadata=model_metadata)
else:
self.device_manager.mount_device(kls=FOLDER_DEVICE, kind='folder', path=folder)
def connect_to_folder(self): def connect_to_folder(self):
dir = choose_dir(self, 'Select Device Folder', from calibre.gui2.dialogs.connect_to_folder import ConnectToFolder
_('Select folder to open as device')) d = ConnectToFolder(self)
if dir is not None: if d.exec() == QDialog.DialogCode.Accepted:
self.connect_to_folder_named(dir) folder_path, model_metadata = d.ans
if folder_path:
self.connect_to_folder_named(folder_path, model_metadata)
# disconnect from folder devices # disconnect from folder devices
def disconnect_mounted_device(self): def disconnect_mounted_device(self):

View File

@ -43,7 +43,7 @@ class ChooseFolder(QWidget):
l.addWidget(bb) l.addWidget(bb)
def browse(self): def browse(self):
ans = choose_dir(self, 'connect-to-folder-browse-history', _('Choose folder to connect to')) ans = choose_dir(self, 'Select Device Folder', _('Select folder to open as device'))
if ans: if ans:
self.folder_edit.setText(ans) self.folder_edit.setText(ans)
@ -104,6 +104,11 @@ class ConnectToFolder(Dialog):
def __init__(self, parent=None): def __init__(self, parent=None):
super().__init__(_('Connect to folder'), 'connect-to-folder', parent=parent) super().__init__(_('Connect to folder'), 'connect-to-folder', parent=parent)
def sizeHint(self):
sz = super().sizeHint()
sz.setWidth(max(sz.width(), 600))
return sz
def setup_ui(self): def setup_ui(self):
self.l = l = QVBoxLayout(self) self.l = l = QVBoxLayout(self)
self.folder_chooser = fc = ChooseFolder(self) self.folder_chooser = fc = ChooseFolder(self)