Initial (partial) implementation of UDisks2 support

This commit is contained in:
Kovid Goyal 2012-06-22 14:07:36 +05:30
parent 8f78d19ce7
commit ae26c69cd8

View File

@ -5,7 +5,11 @@ __license__ = 'GPL v3'
__copyright__ = '2010, Kovid Goyal <kovid@kovidgoyal.net>' __copyright__ = '2010, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import os, dbus # First repeat after me: Linux desktop infrastructure is designed by a
# committee of rabid monkeys on crack. They would not know a decent desktop if
# it was driving the rabid monkey extermination truck that runs them over.
import os, dbus, re
def node_mountpoint(node): def node_mountpoint(node):
@ -56,21 +60,23 @@ class UDisks(object):
d = self.device(parent) d = self.device(parent)
d.DriveEject([]) d.DriveEject([])
class NoUdisks2(Exception): class NoUDisks2(Exception):
pass pass
class UDisks2(UDisks): class UDisks2(object):
BLOCK = 'org.freedesktop.UDisks2.Block'
FILESYSTEM = 'org.freedesktop.UDisks2.Filesystem'
def __init__(self): def __init__(self):
self.bus = dbus.SystemBus() self.bus = dbus.SystemBus()
try: try:
self.main = self.bus.get_object('org.freedesktop.UDisks2', self.bus.get_object('org.freedesktop.UDisks2',
'/org/freedesktop/UDisks2') '/org/freedesktop/UDisks2')
except dbus.exceptions.DBusException as e: except dbus.exceptions.DBusException as e:
if getattr(e, '_dbus_error_name', None) == 'org.freedesktop.DBus.Error.ServiceUnknown': if getattr(e, '_dbus_error_name', None) == 'org.freedesktop.DBus.Error.ServiceUnknown':
raise NoUdisks2() raise NoUDisks2()
raise raise
print self.main.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable')
def device(self, device_node_path): def device(self, device_node_path):
device_node_path = os.path.realpath(device_node_path) device_node_path = os.path.realpath(device_node_path)
@ -79,15 +85,61 @@ class UDisks2(UDisks):
# First we try a direct object path # First we try a direct object path
bd = self.bus.get_object('org.freedesktop.UDisks2', bd = self.bus.get_object('org.freedesktop.UDisks2',
'/org/freedesktop/UDisks2/block_devices/%s'%devname) '/org/freedesktop/UDisks2/block_devices/%s'%devname)
props = bd.getProperties(dbus_interface='org.freedesktop.UDisks2.BlockDevice') try:
print props device = bd.Get(self.BLOCK, 'Device',
dbus_interface='org.freedesktop.DBus.Properties')
device = bytearray(device).replace(b'\x00', b'').decode('utf-8')
except:
device = None
if device == device_node_path:
return bd
# Enumerate all devices known to UDisks
devs = self.bus.get_object('org.freedesktop.UDisks2',
'/org/freedesktop/UDisks2/block_devices')
xml = devs.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable')
for dev in re.finditer(r'name=[\'"](.+?)[\'"]', type(u'')(xml)):
bd = self.bus.get_object('org.freedesktop.UDisks2',
'/org/freedesktop/UDisks2/block_devices/%s2'%dev.group(1))
try:
device = bd.Get(self.BLOCK, 'Device',
dbus_interface='org.freedesktop.DBus.Properties')
device = bytearray(device).replace(b'\x00', b'').decode('utf-8')
except:
device = None
if device == device_node_path:
return bd
raise ValueError('%r not known to UDisks2'%device_node_path)
def mount(self, device_node_path):
d = self.device(device_node_path)
mount_options = ['rw', 'noexec', 'nosuid',
'sync', 'nodev', 'uid=%d'%os.geteuid(), 'gid=%d'%os.getegid()]
try:
return unicode(d.Mount(
{
'auth.no_user_interaction':True,
'options':','.join(mount_options)
},
dbus_interface=self.FILESYSTEM))
except:
# May be already mounted, check
mp = node_mountpoint(str(device_node_path))
if mp is None:
raise
return mp
def get_udisks(ver=None):
if ver is None:
try:
u = UDisks2()
except NoUDisks2:
u = UDisks()
return u
return UDisks2() if ver == 2 else UDisks()
def get_udisks():
try:
u = UDisks2()
except NoUdisks2:
u = UDisks()
return u
def mount(node_path): def mount(node_path):
u = UDisks() u = UDisks()
@ -105,12 +157,8 @@ def test_udisks(ver=None):
import sys import sys
dev = sys.argv[1] dev = sys.argv[1]
print 'Testing with node', dev print 'Testing with node', dev
if ver is None: u = get_udisks(ver=ver)
u = get_udisks() print 'Using Udisks:', u.__class__.__name__
else:
u = UDisks2() if ver == 2 else UDisks()
print 'Using Udisks:', u
print 'Mounted at:', u.mount(dev) print 'Mounted at:', u.mount(dev)
print 'Unmounting' print 'Unmounting'
u.unmount(dev) u.unmount(dev)