import dbus
+def dbus_to_str(value):
+ if isinstance(value, dbus.Byte):
+ return str(int(value))
+ elif isinstance(value, dbus.ByteArray):
+ return ','.join(str(ord(v)) for v in value)
+ elif isinstance(value, dbus.Array):
+ return ','.join(str(v) for v in value)
+ elif isinstance(value, dbus.Dictionary):
+ return ','.join('%s:%s' % (k, v) for k, v in value.iteritems())
+ else:
+ return str(value)
+
def get_dbus_message_type(message):
- return message.__class__.__name__.lower()[0:-7]
+ result = message.__class__.__name__[0:-7]
+ for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
+ result = result.replace(c, '_'+c.lower())
+ return result.strip('_')
class DbusBus(object):
__bus = None
cls.__bus = super(DbusBus, cls).__new__(cls)
return cls.__bus
+ def __init__(self):
+ from dbus.mainloop.glib import DBusGMainLoop
+ DBusGMainLoop(set_as_default=True)
+
@property
def system(self):
if not self.__system_bus:
self.__system_bus.add_message_filter(handler)
if self.__session_bus:
self.__session_bus.add_message_filter(handler)
-
def listen(self):
- from dbus.mainloop.glib import DBusGMainLoop
from gobject import MainLoop
- DBusGMainLoop(set_as_default=True)
loop = MainLoop()
loop.run()
-class DbusRuleMatcher(object):
+class DbusRule(object):
def __init__(self, bus_=None, type_=None, sender_=None, interface_=None, path_=None, member_=None, destination_=None, args_=[]):
self._bus = bus_
self._type = type_
if self._destination not in (None, message.get_destination()):
return False
- args_ = message.get_args_list()
- for i, arg in enumerate(args_):
- if i >= len(self._args):
- break
- if self._args[i] not in (None, arg):
- return False
+ if self._args is not None:
+ args_ = message.get_args_list()
+ for i, arg in enumerate(args_):
+ if i >= len(self._args):
+ break
+ if self._args[i] not in (None, str(arg)):
+ return False
return True