Finish porting UDisks to use jeepney

This commit is contained in:
Kovid Goyal 2021-06-24 22:07:15 +05:30
parent 29d2fcf40c
commit 3e2c8ef699
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -6,14 +6,14 @@ __license__ = 'GPL v3'
__copyright__ = '2010, Kovid Goyal <kovid@kovidgoyal.net>' __copyright__ = '2010, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import os, re import os
import re
from polyglot.builtins import unicode_type, as_unicode from contextlib import suppress
def node_mountpoint(node): def node_mountpoint(node):
if isinstance(node, unicode_type): if isinstance(node, str):
node = node.encode('utf-8') node = node.encode('utf-8')
def de_mangle(raw): def de_mangle(raw):
@ -32,118 +32,130 @@ def basic_mount_options():
return ['rw', 'noexec', 'nosuid', 'nodev', 'uid=%d'%os.geteuid(), 'gid=%d'%os.getegid()] return ['rw', 'noexec', 'nosuid', 'nodev', 'uid=%d'%os.geteuid(), 'gid=%d'%os.getegid()]
class NoUDisks2(Exception): class UDisks:
pass
BUS_NAME = 'org.freedesktop.UDisks2'
BLOCK = f'{BUS_NAME}.Block'
FILESYSTEM = f'{BUS_NAME}.Filesystem'
DRIVE = f'{BUS_NAME}.Drive'
PATH = '/org/freedesktop/UDisks2'
class UDisks2(object): def __enter__(self):
from jeepney.io.blocking import open_dbus_connection
self.connection = open_dbus_connection(bus='SYSTEM')
return self
BLOCK = 'org.freedesktop.UDisks2.Block' def __exit__(self, *args):
FILESYSTEM = 'org.freedesktop.UDisks2.Filesystem' self.connection.close()
DRIVE = 'org.freedesktop.UDisks2.Drive' del self.connection
def __init__(self): def address(self, path='', interface=None):
import dbus from jeepney import DBusAddress
self.bus = dbus.SystemBus() path = os.path.join(self.PATH, path)
try: return DBusAddress(path, bus_name=self.BUS_NAME, interface=interface)
self.bus.get_object('org.freedesktop.UDisks2',
'/org/freedesktop/UDisks2') def send(self, msg):
except dbus.exceptions.DBusException as e: from jeepney import DBusErrorResponse, MessageType
if getattr(e, '_dbus_error_name', None) == 'org.freedesktop.DBus.Error.ServiceUnknown': reply = self.connection.send_and_get_reply(msg)
raise NoUDisks2() if reply.header.message_type is MessageType.error:
raise raise DBusErrorResponse(reply)
return reply
def introspect(self, object_path):
from jeepney import Introspectable
r = self.send(Introspectable(f'{self.PATH}/{object_path}', self.BUS_NAME).Introspect())
return r.body[0]
def get_device_node_path(self, devname):
from jeepney import Properties
p = Properties(self.address(f'block_devices/{devname}', self.BLOCK))
r = self.send(p.get('Device'))
return bytearray(r.body[0][1]).replace(b'\x00', b'').decode('utf-8')
def iter_block_devices(self):
xml = self.introspect('block_devices')
for m in re.finditer(r'name=[\'"](.+?)[\'"]', xml):
devname = m.group(1)
with suppress(Exception):
yield devname, self.get_device_node_path(devname)
def device(self, device_node_path): def device(self, device_node_path):
device_node_path = os.path.realpath(device_node_path) device_node_path = os.path.realpath(device_node_path)
devname = device_node_path.split('/')[-1] devname = device_node_path.split('/')[-1]
# First try the device name directly
# First we try a direct object path with suppress(Exception):
bd = self.bus.get_object('org.freedesktop.UDisks2', if self.get_device_node_path(devname) == device_node_path:
'/org/freedesktop/UDisks2/block_devices/%s'%devname) return devname
try:
device = bd.Get(self.BLOCK, 'Device',
dbus_interface='org.freedesktop.DBus.Properties')
device = bytearray(device).replace(b'\x00', b'').decode('utf-8')
except Exception:
device = None
if device == device_node_path:
return bd
# Enumerate all devices known to UDisks # Enumerate all devices known to UDisks
devs = self.bus.get_object('org.freedesktop.UDisks2', for q, devpath in self.iter_block_devices():
'/org/freedesktop/UDisks2/block_devices') if devpath == device_node_path:
xml = unicode_type(devs.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable')) return q
for dev in re.finditer(r'name=[\'"](.+?)[\'"]', xml): raise KeyError(f'{device_node_path} not known to UDisks2')
bd = self.bus.get_object('org.freedesktop.UDisks2',
'/org/freedesktop/UDisks2/block_devices/%s'%dev.group(1))
try:
device = bd.Get(self.BLOCK, 'Device',
dbus_interface='org.freedesktop.DBus.Properties')
device = bytearray(device).replace(b'\x00', b'').decode('utf-8')
except Exception:
device = None
if device == device_node_path:
return bd
raise ValueError('%r not known to UDisks2'%device_node_path) def filesystem_operation_message(self, device_node_path, function_name, **kw):
from jeepney import new_method_call
devname = self.device(device_node_path)
a = self.address(f'block_devices/{devname}', self.FILESYSTEM)
kw['auth.no_user_interaction'] = ('b', True)
return new_method_call(a, function_name, 'a{sv}', (kw,))
def mount(self, device_node_path): def mount(self, device_node_path):
d = self.device(device_node_path) msg = self.filesystem_operation_message(device_node_path, 'Mount', options=('s', ','.join(basic_mount_options())))
try: try:
return as_unicode(d.Mount( self.send(msg)
{
'auth.no_user_interaction':True,
'options':','.join(basic_mount_options())
},
dbus_interface=self.FILESYSTEM))
except Exception: except Exception:
# May be already mounted, check # May be already mounted, check
mp = node_mountpoint(unicode_type(device_node_path)) mp = node_mountpoint(str(device_node_path))
if mp is None: if mp is None:
raise raise
return mp return mp
def unmount(self, device_node_path): def unmount(self, device_node_path):
d = self.device(device_node_path) msg = self.filesystem_operation_message(device_node_path, 'Unmount', force=('b', True))
d.Unmount({'force':True, 'auth.no_user_interaction':True}, self.send(msg)
dbus_interface=self.FILESYSTEM)
def drive_for_device(self, device): def drive_for_device(self, device_node_path):
drive = device.Get(self.BLOCK, 'Drive', from jeepney import Properties
dbus_interface='org.freedesktop.DBus.Properties') devname = self.device(device_node_path)
return self.bus.get_object('org.freedesktop.UDisks2', drive) a = self.address(f'block_devices/{devname}', self.BLOCK)
msg = Properties(a).get('Drive')
r = self.send(msg)
return r.body[0][1]
def eject(self, device_node_path): def eject(self, device_node_path):
drive = self.drive_for_device(self.device(device_node_path)) from jeepney import new_method_call
drive.Eject({'auth.no_user_interaction':True}, drive = self.drive_for_device(device_node_path)
dbus_interface=self.DRIVE) a = self.address(drive, self.DRIVE)
msg = new_method_call(a, 'Eject', 'a{sv}', ({
'auth.no_user_interaction': ('b', True),
},))
self.send(msg)
def get_udisks(): def get_udisks():
return UDisks2() return UDisks()
def mount(node_path): def mount(node_path):
u = get_udisks() with get_udisks() as u:
u.mount(node_path) u.mount(node_path)
def eject(node_path): def eject(node_path):
u = get_udisks() with get_udisks() as u:
u.eject(node_path) u.eject(node_path)
def umount(node_path): def umount(node_path):
u = get_udisks() with get_udisks() as u:
u.unmount(node_path) u.unmount(node_path)
def test_udisks(ver=None): def test_udisks():
import sys import sys
dev = sys.argv[1] dev = sys.argv[1]
print('Testing with node', dev) print('Testing with node', dev)
u = get_udisks(ver=ver) with get_udisks() as u:
print('Using Udisks:', u.__class__.__name__) print('Using Udisks:', u.__class__.__name__)
print('Mounted at:', u.mount(dev)) print('Mounted at:', u.mount(dev))
print('Unmounting') print('Unmounting')
@ -152,5 +164,12 @@ def test_udisks(ver=None):
u.eject(dev) u.eject(dev)
def develop():
dev = '/dev/nvme0n1p3'
with get_udisks() as u:
print(u.device(dev))
print(u.drive_for_device(dev))
if __name__ == '__main__': if __name__ == '__main__':
test_udisks() test_udisks()