# (c) Copyright 2009-2010, 2014-2015. CodeWeavers, Inc.
import re
import cxlog
import cxutils
def check_dbus():
global system_bus
global _dbus_string
global _dbus_interface
try:
import dbus
# Initialize the glib main loop
try:
import dbus.mainloop.glib
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
except ImportError:
# For compatibility with old python-dbus versions
# (e.g. Ubuntu <= 6.10)
import dbus.glib
system_bus = dbus.SystemBus()
_dbus_interface = dbus.Interface
_dbus_string = dbus.String
return True
except ImportError:
import cxfixes
cxfixes.add_error('dbus')
_dbus_string = None
return False
except:
import traceback
cxlog.warn("Can't connect to dbus:\n%s" % traceback.format_exc())
_dbus_string = None
return False
check_dbus()
def not_much(*_args):
pass
class error(Exception):
pass
class MissingDiskError(error):
pass
class Volume(object):
device = ''
# A string that uniquely identifies this device and does not change during
# its lifetime. If possible, this is the filename of a block device.
mounted = False
# True if the device is currently mounted. This can lag behind the actual
# device state, in which case calling mount, remount, or unmount will update
# it.
mountpoint = ''
# If the device is mounted, this is its current mount point. This can change
# if the device is mounted multiple times.
label = ''
# If possible, this is the label of the drive. An empty string otherwise.
is_disc = False
# True if the device is a CD or DVD.
has_disk = True
# True if a disk is present, or if it's unknown whether a disk is present.
def mount(self):
"""Mount the drive if it isn't mounted. Raises MissingDiskError if there
is no disk in the drive. Other exceptions are also possible."""
raise cxutils.not_yet_implemented()
def remount(self, _options='', _filesystem=''):
"""Remount the drive to change the options and/or filesystem. Raises
MissingDiskError if there is no disk in the drive. Other exceptions are
also possible."""
raise cxutils.not_yet_implemented()
def unmount(self):
"""Unmount the drive if it's mounted. This can raise an exception if it
fails."""
raise cxutils.not_yet_implemented()
_disc_filesystems = set(('cd9660', # FreeBSD
'iso9660', # Linux
'hsfs', # Solaris
'udf', # Linux
))
class UDisks2Volume(Volume):
objpath = ''
is_published = False # True if we reported this with add_notify
modified = False # True if the object was changed since add_notify was last called
def __init__(self):
self.interface_properties = {}
Volume.__init__(self)
def char_array_to_str(a):
return ''.join(chr(x) for x in a).split('\0', 1)[0]
class UDisks2MountPoints(object):
def __init__(self, add_notify=not_much, del_notify=not_much):
self.devices = {}
self.signals = []
if not _dbus_string:
raise Exception("dbus not available")
self.wanted_properties = {
_dbus_string('org.freedesktop.UDisks2.Block'): ('Device', 'IdLabel', 'IdType'),
_dbus_string('org.freedesktop.UDisks2.Filesystem'): ('MountPoints',),
}
self.udisks_obj = system_bus.get_object('org.freedesktop.UDisks2', '/org/freedesktop/UDisks2')
self.udisks = _dbus_interface(self.udisks_obj, 'org.freedesktop.DBus.ObjectManager')
self.add_notify = add_notify
self.del_notify = del_notify
self.signals.append(self.udisks.connect_to_signal("InterfacesAdded", self.interfaces_added))
self.signals.append(self.udisks.connect_to_signal("InterfacesRemoved", self.interfaces_removed))
self.signals.append(system_bus.add_signal_receiver(self.properties_changed, 'PropertiesChanged', 'org.freedesktop.DBus.Properties', 'org.freedesktop.UDisks2', path_keyword='objpath'))
for objpath, info in self.udisks.GetManagedObjects().iteritems():
self.interfaces_added(objpath, info)
def get_volume(self, objpath):
try:
volume = self.devices[objpath]
except KeyError:
volume = UDisks2Volume()
volume.objpath = objpath
self.devices[objpath] = volume
return volume
def property_changed(self, objpath, interface_name, name):
if interface_name == 'org.freedesktop.UDisks2.Block':
if name == 'Device':
volume = self.get_volume(objpath)
device = char_array_to_str(volume.interface_properties.get(interface_name, {}).get(name, ()))
if volume.is_published and volume.device != device:
self.del_notify(volume)
volume.is_published = False
volume.device = device
elif name == 'IdLabel':
volume = self.get_volume(objpath)
label = volume.interface_properties.get(interface_name, {}).get(name, ())
volume.modified = volume.modified or volume.label != label
volume.label = label
elif name == 'IdType':
volume = self.get_volume(objpath)
is_disc = volume.interface_properties.get(interface_name, {}).get(name, '') in _disc_filesystems
volume.modified = volume.modified or volume.is_disc != is_disc
volume.is_disc = is_disc
elif interface_name == 'org.freedesktop.UDisks2.Filesystem':
if name == 'MountPoints':
volume = self.get_volume(objpath)
mountpoints = volume.interface_properties.get(interface_name, {}).get(name, [])
if mountpoints:
mounted = True
mountpoint = char_array_to_str(mountpoints[0])
else:
mounted = False
mountpoint = ''
if volume.is_published and (volume.mounted != mounted or volume.mountpoint != mountpoint):
self.del_notify(volume)
volume.is_published = False
volume.mounted = mounted
volume.mountpoint = mountpoint
def _properties_changed(self, interface_name, props, invalidated_props, objpath):
volume = self.get_volume(objpath)
try:
known_props = volume.interface_properties[interface_name]
except KeyError:
known_props = {}
volume.interface_properties[interface_name] = known_props
for name, value in props.iteritems():
known_props[name] = value
self.property_changed(objpath, interface_name, name)
for name in invalidated_props:
try:
del known_props[name]
except KeyError:
pass
else:
if name in self.wanted_properties.get(interface_name, ()):
obj = system_bus.get_object('org.freedesktop.UDisks2', objpath)
obj_props = _dbus_interface(obj, 'org.freedesktop.DBus.Properties')
try:
known_props[name] = obj_props.Get(interface_name, name)
except:
# Probably just means the object disappeared before we could call Get
import traceback
cxlog.warn("Can't get property:\n%s" % traceback.format_exc())
self.property_changed(objpath, interface_name, name)
def properties_changed(self, interface_name, props, invalidated_props, objpath):
self._properties_changed(interface_name, props, invalidated_props, objpath)
self.report_changes()
def interfaces_added(self, objpath, info):
for interface_name, props in info.iteritems():
self._properties_changed(interface_name, props, (), objpath)
self.report_changes()
def interfaces_removed(self, objpath, interfaces):
volume = self.get_volume(objpath)
for interface in interfaces:
try:
del volume.interface_properties[interface]
except KeyError:
continue
if interface in self.wanted_properties:
for prop in self.wanted_properties[interface]:
self.property_changed(objpath, interface, prop)
self.report_changes()
def should_report_volume(self, volume):
if _dbus_string('org.freedesktop.UDisks2.Block') not in volume.interface_properties:
return False
if _dbus_string('org.freedesktop.UDisks2.Filesystem') not in volume.interface_properties:
return False
if not volume.device:
return False
return volume.mounted
def report_changes(self):
for volume in self.devices.itervalues():
if self.should_report_volume(volume) != volume.is_published:
if not volume.is_published:
self.add_notify(volume)
volume.is_published = True
volume.modified = False
else:
self.del_notify(volume)
volume.is_published = False
elif volume.is_published and volume.modified:
self.add_notify(volume)
volume.modified = False
def close(self):
self.add_notify = not_much
self.del_notify = not_much
for signal in self.signals:
signal.remove()
def __del__(self):
self.close()
class UDisksVolume(Volume):
objpath = ''
class UDisksMountPoints(object):
def __init__(self, add_notify=not_much, del_notify=not_much):
self.devices = {}
self.signals = []
if not _dbus_string:
raise Exception("dbus not available")
self.udisks_obj = system_bus.get_object('org.freedesktop.UDisks', '/org/freedesktop/UDisks')
self.udisks = _dbus_interface(self.udisks_obj, 'org.freedesktop.UDisks')
self.add_notify = add_notify
self.del_notify = del_notify
self.signals.append(self.udisks.connect_to_signal("DeviceAdded", self.new_device))
self.signals.append(self.udisks.connect_to_signal("DeviceChanged", self.new_device))
for objpath in self.udisks.EnumerateDevices():
self.new_device(objpath)
def new_device(self, objpath):
device_obj = system_bus.get_object('org.freedesktop.UDisks', objpath)
#device = _dbus_interface(device_obj, 'org.freedesktop.UDisks.Device')
device_props = _dbus_interface(device_obj, 'org.freedesktop.DBus.Properties')
mountpoints = device_props.Get('org.freedesktop.UDisks.Device', 'DeviceMountPaths')
if not device_props.Get('org.freedesktop.UDisks.Device', 'DeviceIsMounted') or not mountpoints:
if objpath in self.devices:
self.del_notify(self.devices[objpath])
del self.devices[objpath]
return
volume = UDisksVolume()
volume.objpath = objpath
volume.device = device_props.Get('org.freedesktop.UDisks.Device', 'DeviceFile')
volume.mounted = True
volume.label = device_props.Get('org.freedesktop.UDisks.Device', 'IdLabel')
volume.is_disc = device_props.Get('org.freedesktop.UDisks.Device', 'DeviceIsOpticalDisc') or device_props.Get('org.freedesktop.UDisks.Device', 'IdType') in _disc_filesystems
volume.mountpoint = mountpoints[0]
self.devices[objpath] = volume
self.add_notify(volume)
def close(self):
self.add_notify = not_much
self.del_notify = not_much
for signal in self.signals:
signal.remove()
def __del__(self):
self.close()
class HALVolume(Volume):
udi = ''
# The HAL unique device id
class HALMountPoints(object):
def new_device(self, udi, *_rest):
# Get the device object
device_obj = system_bus.get_object('org.freedesktop.Hal', udi)
# Get an interface to the device
device = _dbus_interface(device_obj, 'org.freedesktop.Hal.Device')
if not device.QueryCapability('volume'):
return
mp_path = device.GetProperty('volume.mount_point')
if mp_path != "":
mountpoint = HALVolume()
mountpoint.udi = udi
mountpoint.device = device.GetProperty('block.device')
mountpoint.mountpoint = mp_path
mountpoint.label = device.GetProperty('volume.label')
mountpoint.is_disc = bool(device.GetProperty('volume.is_disc'))
self.devices[udi] = mountpoint
self.add_notify(mountpoint)
if udi in self.device_signals:
return
def on_property_modified(*_args):
if device.GetProperty('volume.mount_point') != "":
self.new_device(udi)
else:
self.unmounted_device(udi)
self.device_signals[udi] = device.connect_to_signal("PropertyModified", on_property_modified)
def unmounted_device(self, udi):
if udi in self.devices:
self.del_notify(self.devices[udi])
del self.devices[udi]
def removed_device(self, udi, *_rest):
if udi in self.device_signals:
self.unmounted_device(udi)
self.device_signals[udi].remove()
del self.device_signals[udi]
def refresh(self):
# find all mountable devices
udis = self.hal.FindDeviceByCapability('volume')
for udi in udis:
self.new_device(udi)
def __init__(self, add_notify=not_much, del_notify=not_much):
self.device_signals = {}
self.devices = {}
self.signals = []
if not _dbus_string:
raise Exception("dbus not available")
# get a HAL object and an interface to HAL
self.hal_obj = system_bus.get_object('org.freedesktop.Hal', '/org/freedesktop/Hal/Manager')
self.hal = _dbus_interface(self.hal_obj, 'org.freedesktop.Hal.Manager')
if not self.hal:
raise Exception("Cannot connect to HAL")
self.add_notify = add_notify
self.del_notify = del_notify
self.signals.append(self.hal.connect_to_signal("DeviceAdded", self.new_device))
self.signals.append(self.hal.connect_to_signal("DeviceRemoved", self.removed_device))
self.signals.append(self.hal.connect_to_signal("NewCapability", self.new_device))
self.refresh()
def close(self):
self.add_notify = not_much
self.del_notify = not_much
for signal in self.signals:
signal.remove()
for signal in self.device_signals.values():
signal.remove()
def __del__(self):
self.close()
class UnixVolume(Volume):
# A mount point obtained from the mount command or /etc/fstab.
pass
class FailsafeMountPoints(object):
# The Linux mtab format assumes device paths don't contain spaces.
# Otherwise it is ambiguous.
LINUX_MTAB_RE = re.compile(r'(?P<device>\S+)\s+(?P<mountpoint>.+)\s+(?P<fs>\w+)\s+\S+\s+\d+\s+\d+$')
SOLARIS_MTAB_RE = re.compile(r'(?P<device>.+)\t(?P<mountpoint>.+)\t(?P<fs>\w+)\t')
LINUX_MOUNT_RE = re.compile(r'(?P<device>.+) on (?P<mountpoint>.+) type (?P<fs>\w+) \(.*\)')
FREEBSD_MOUNT_RE = re.compile(r'(?P<device>.+) on (?P<mountpoint>.+) \((?P<fs>\w+)(?:,.*)?\)')
def check_mount_cmd(self):
mountdata = None
for filename in ('/etc/mtab', '/etc/mnttab'):
try:
thisfile = open(filename, 'r')
mountdata = list(thisfile.readlines())
thisfile.close()
regexps = (self.LINUX_MTAB_RE, self.SOLARIS_MTAB_RE)
break
except IOError, ioerror:
cxlog.log('could not read %s: %s' % (filename, ioerror))
if mountdata is None:
retcode, out, err = cxutils.run(('mount', ), stdout=cxutils.GRAB,
stderr=cxutils.GRAB)
if retcode:
cxlog.log("mount failed (%s):\n%s%s" % (retcode, out, err))
mountdata = out.split('\n')
regexps = (self.LINUX_MOUNT_RE, self.FREEBSD_MOUNT_RE)
known_devices = self.devices.keys()
# Given the call parameters, communicate() always returns a string
for line in mountdata:
if line == '':
continue
for regexp in regexps:
match = regexp.match(line)
if match:
break
if match is None:
continue
device = match.group('device')
mp_path = match.group('mountpoint')
filesystem = match.group('fs')
if not device.startswith('/') or not mp_path.startswith('/') or \
filesystem == 'sysfs':
continue
if device in known_devices:
known_devices.remove(device)
else:
mountpoint = UnixVolume()
mountpoint.device = device
# Strange characters, spaces in particular, cause trouble in
# mtab files and thus are escaped.
mountpoint.mountpoint = cxutils.expand_octal_chars(mp_path)
mountpoint.is_disc = filesystem in _disc_filesystems
self.devices[device] = mountpoint
self.add_notify(mountpoint)
for device in known_devices:
self.del_notify(self.devices[device])
del self.devices[device]
return True # continue timer
def __init__(self, add_notify=not_much, del_notify=not_much):
# No Hal, so we resort to old-school methods.
import gobject
self.devices = {}
self.add_notify = add_notify
self.del_notify = del_notify
self.timer_src = []
self.check_mount_cmd()
self.timer_src = [gobject.timeout_add(2000, self.check_mount_cmd)]
def close(self):
import gobject
try:
# using lists for cheap atomic operations, since this can run on multiple threads
gobject.source_remove(self.timer_src.pop())
except IndexError:
pass
def __del__(self):
self.close()
# A list of notifier classes, with "better" notifiers at the start
notifier_classes = [
UDisks2MountPoints,
UDisksMountPoints,
HALMountPoints,
FailsafeMountPoints,
]
class MountPointsNotifier(object):
"A class that returns the combined results from the full set of notifiers"
def __init__(self, add_notify=not_much, del_notify=not_much):
self.add_notify = add_notify
self.del_notify = del_notify
self.notifiers = []
self.volumes = [{}]
for notifier_class in notifier_classes:
try:
def on_add(volume, index=len(self.notifiers)):
self.on_add(volume, index)
def on_del(volume, index=len(self.notifiers)):
self.on_del(volume, index)
notifier = notifier_class(on_add, on_del)
self.notifiers.append(notifier)
self.volumes.append({})
cxlog.log("using %s" % notifier_class)
except:
import traceback
cxlog.log("unable to create the %s object:\n%s" % (notifier_class.__name__, traceback.format_exc()))
if not self.notifiers:
cxlog.warn('could not create any mount point notifier')
self.volumes.pop()
def on_add(self, volume, index):
self.volumes[index][volume.device] = volume
for i in range(len(self.notifiers)):
if volume.device in self.volumes[i]:
if i < index:
# We already have a volume for this device, and it's better
return
elif i == index:
continue
else:
# This new device is better than the current best
self.del_notify(self.volumes[i][volume.device])
break
self.add_notify(volume)
def on_del(self, volume, index):
del self.volumes[index][volume.device]
self.del_notify(volume)
for volumes in self.volumes:
if volume.device in volumes:
self.add_notify(volumes[volume.device])
break
def close(self):
self.add_notify = not_much
self.del_notify = not_much
for notifier in self.notifiers:
notifier.close()
def _get_devices(self):
devices_dict = {}
for volumes in self.volumes[::-1]:
devices_dict.update(volumes)
return devices_dict
devices = property(_get_devices)
def GetVolumeList():
mountpointsnotify = MountPointsNotifier()
result = mountpointsnotify.devices.values()
mountpointsnotify.close()
return result