diff --git a/src/calibre/devices/udisks.py b/src/calibre/devices/udisks.py index 182971e9e1..6beda24277 100644 --- a/src/calibre/devices/udisks.py +++ b/src/calibre/devices/udisks.py @@ -6,14 +6,14 @@ __license__ = 'GPL v3' __copyright__ = '2010, Kovid Goyal ' __docformat__ = 'restructuredtext en' -import os, re - -from polyglot.builtins import unicode_type, as_unicode +import os +import re +from contextlib import suppress def node_mountpoint(node): - if isinstance(node, unicode_type): + if isinstance(node, str): node = node.encode('utf-8') def de_mangle(raw): @@ -32,124 +32,143 @@ def basic_mount_options(): return ['rw', 'noexec', 'nosuid', 'nodev', 'uid=%d'%os.geteuid(), 'gid=%d'%os.getegid()] -class NoUDisks2(Exception): - pass +class UDisks: + BUS_NAME = 'org.freedesktop.UDisks2' + BLOCK = f'{BUS_NAME}.Block' + FILESYSTEM = f'{BUS_NAME}.Filesystem' + DRIVE = f'{BUS_NAME}.Drive' + PATH = '/org/freedesktop/UDisks2' -class UDisks2(object): + def __enter__(self): + from jeepney.io.blocking import open_dbus_connection + self.connection = open_dbus_connection(bus='SYSTEM') + return self - BLOCK = 'org.freedesktop.UDisks2.Block' - FILESYSTEM = 'org.freedesktop.UDisks2.Filesystem' - DRIVE = 'org.freedesktop.UDisks2.Drive' + def __exit__(self, *args): + self.connection.close() + del self.connection - def __init__(self): - import dbus - self.bus = dbus.SystemBus() - try: - self.bus.get_object('org.freedesktop.UDisks2', - '/org/freedesktop/UDisks2') - except dbus.exceptions.DBusException as e: - if getattr(e, '_dbus_error_name', None) == 'org.freedesktop.DBus.Error.ServiceUnknown': - raise NoUDisks2() - raise + def address(self, path='', interface=None): + from jeepney import DBusAddress + path = os.path.join(self.PATH, path) + return DBusAddress(path, bus_name=self.BUS_NAME, interface=interface) + + def send(self, msg): + from jeepney import DBusErrorResponse, MessageType + reply = self.connection.send_and_get_reply(msg) + if reply.header.message_type is MessageType.error: + raise DBusErrorResponse(reply) + return reply + + def introspect(self, object_path): + from jeepney import Introspectable + r = self.send(Introspectable(f'{self.PATH}/{object_path}', self.BUS_NAME).Introspect()) + return r.body[0] + + def get_device_node_path(self, devname): + from jeepney import Properties + p = Properties(self.address(f'block_devices/{devname}', self.BLOCK)) + r = self.send(p.get('Device')) + return bytearray(r.body[0][1]).replace(b'\x00', b'').decode('utf-8') + + def iter_block_devices(self): + xml = self.introspect('block_devices') + for m in re.finditer(r'name=[\'"](.+?)[\'"]', xml): + devname = m.group(1) + with suppress(Exception): + yield devname, self.get_device_node_path(devname) def device(self, device_node_path): device_node_path = os.path.realpath(device_node_path) devname = device_node_path.split('/')[-1] - - # First we try a direct object path - bd = self.bus.get_object('org.freedesktop.UDisks2', - '/org/freedesktop/UDisks2/block_devices/%s'%devname) - try: - device = bd.Get(self.BLOCK, 'Device', - dbus_interface='org.freedesktop.DBus.Properties') - device = bytearray(device).replace(b'\x00', b'').decode('utf-8') - except Exception: - device = None - - if device == device_node_path: - return bd - + # First try the device name directly + with suppress(Exception): + if self.get_device_node_path(devname) == device_node_path: + return devname # Enumerate all devices known to UDisks - devs = self.bus.get_object('org.freedesktop.UDisks2', - '/org/freedesktop/UDisks2/block_devices') - xml = unicode_type(devs.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable')) - for dev in re.finditer(r'name=[\'"](.+?)[\'"]', xml): - bd = self.bus.get_object('org.freedesktop.UDisks2', - '/org/freedesktop/UDisks2/block_devices/%s'%dev.group(1)) - try: - device = bd.Get(self.BLOCK, 'Device', - dbus_interface='org.freedesktop.DBus.Properties') - device = bytearray(device).replace(b'\x00', b'').decode('utf-8') - except Exception: - device = None - if device == device_node_path: - return bd + for q, devpath in self.iter_block_devices(): + if devpath == device_node_path: + return q + raise KeyError(f'{device_node_path} not known to UDisks2') - raise ValueError('%r not known to UDisks2'%device_node_path) + def filesystem_operation_message(self, device_node_path, function_name, **kw): + from jeepney import new_method_call + devname = self.device(device_node_path) + a = self.address(f'block_devices/{devname}', self.FILESYSTEM) + kw['auth.no_user_interaction'] = ('b', True) + return new_method_call(a, function_name, 'a{sv}', (kw,)) def mount(self, device_node_path): - d = self.device(device_node_path) + msg = self.filesystem_operation_message(device_node_path, 'Mount', options=('s', ','.join(basic_mount_options()))) try: - return as_unicode(d.Mount( - { - 'auth.no_user_interaction':True, - 'options':','.join(basic_mount_options()) - }, - dbus_interface=self.FILESYSTEM)) + self.send(msg) except Exception: # May be already mounted, check - mp = node_mountpoint(unicode_type(device_node_path)) + mp = node_mountpoint(str(device_node_path)) if mp is None: raise return mp def unmount(self, device_node_path): - d = self.device(device_node_path) - d.Unmount({'force':True, 'auth.no_user_interaction':True}, - dbus_interface=self.FILESYSTEM) + msg = self.filesystem_operation_message(device_node_path, 'Unmount', force=('b', True)) + self.send(msg) - def drive_for_device(self, device): - drive = device.Get(self.BLOCK, 'Drive', - dbus_interface='org.freedesktop.DBus.Properties') - return self.bus.get_object('org.freedesktop.UDisks2', drive) + def drive_for_device(self, device_node_path): + from jeepney import Properties + devname = self.device(device_node_path) + a = self.address(f'block_devices/{devname}', self.BLOCK) + msg = Properties(a).get('Drive') + r = self.send(msg) + return r.body[0][1] def eject(self, device_node_path): - drive = self.drive_for_device(self.device(device_node_path)) - drive.Eject({'auth.no_user_interaction':True}, - dbus_interface=self.DRIVE) + from jeepney import new_method_call + drive = self.drive_for_device(device_node_path) + a = self.address(drive, self.DRIVE) + msg = new_method_call(a, 'Eject', 'a{sv}', ({ + 'auth.no_user_interaction': ('b', True), + },)) + self.send(msg) def get_udisks(): - return UDisks2() + return UDisks() def mount(node_path): - u = get_udisks() - u.mount(node_path) + with get_udisks() as u: + u.mount(node_path) def eject(node_path): - u = get_udisks() - u.eject(node_path) + with get_udisks() as u: + u.eject(node_path) def umount(node_path): - u = get_udisks() - u.unmount(node_path) + with get_udisks() as u: + u.unmount(node_path) -def test_udisks(ver=None): +def test_udisks(): import sys dev = sys.argv[1] print('Testing with node', dev) - u = get_udisks(ver=ver) - print('Using Udisks:', u.__class__.__name__) - print('Mounted at:', u.mount(dev)) - print('Unmounting') - u.unmount(dev) - print('Ejecting:') - u.eject(dev) + with get_udisks() as u: + print('Using Udisks:', u.__class__.__name__) + print('Mounted at:', u.mount(dev)) + print('Unmounting') + u.unmount(dev) + print('Ejecting:') + u.eject(dev) + + +def develop(): + dev = '/dev/nvme0n1p3' + with get_udisks() as u: + print(u.device(dev)) + print(u.drive_for_device(dev)) if __name__ == '__main__':