diff --git a/centrixd/__init__.py b/centrixd/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/centrixd/cxproto.py b/centrixd/cxproto.py index 6c80ffc..9006636 100644 --- a/centrixd/cxproto.py +++ b/centrixd/cxproto.py @@ -8,8 +8,9 @@ from threading import Thread from cxutils import log from xmlserialize import serialize from data import CXMessage +from cxconfig import config -CXPORT = 9222 +CXPORT = config['Network'].get('Port', 9222) class CXProto(Protocol): status = None diff --git a/centrixd/data.py b/centrixd/data.py new file mode 100644 index 0000000..db3cbec --- /dev/null +++ b/centrixd/data.py @@ -0,0 +1,36 @@ +import xmlserialize as xml + +##################################### +# Some classes + +class Service(object): + def __init__(self): + self.ip = '' + self.port = 0 + self.proto = '' + self.stype = '' + self.name = '' + + def __str__(self): + if self.port: + return "%s:%s" % (self.ip, self.port) + else: + return self.ip + + def __repr__(self): + return "<Service %s on %s:%s>" % (self.name,self.ip,self.port) + +class CXMessage(object): + def __init__(self,type=None,**kwargs): + self.type = type + self.__dict__.update(kwargs) + + def __repr__(self): + return "<Message of type %s>" % self.type + + def to_xml(self): + return xml.serialize(self) + + @classmethod + def parse(cls, text): + return xml.deserialize(text,cls=cls) diff --git a/zeroconf.py b/zeroconf.py new file mode 100644 index 0000000..6ebb590 --- /dev/null +++ b/zeroconf.py @@ -0,0 +1,149 @@ +import sys +import signal +import threading +import gobject +import dbus +from dbus.mainloop.glib import DBusGMainLoop +import avahi +import socket + +from centrixd import data +from centrixd.cxconfig import config +import machines + +hosts = [] +services = {} + +def err(str): + print "Error:", str + +# Shorter names of services +servicenames = {'_ssh._tcp': 'ssh', + '_http._tcp': 'web', + '_ftp._tcp': 'ftp', + '_workstation._tcp': 'machine', + '_pulse-server._tcp': 'pulseaudio', + '_nameserver._udp': 'dns'} + +class ZeroconfThread(threading.Thread): + "Thread which does all zeroconf work" + + def __init__(self,group=None,target=None,name=None,args=(), kwargs=None,verbose=None): + threading.Thread.__init__(self) + self.name = config['Centrix']['Node Name'] + self.port = config['Network'].get('Port', 9222) + self.hostname = socket.gethostname() + self.types = args + self.done = {} + for type in args: + self.done[type] = False + + def _all_done(self): + for v in self.done.values(): + if not v: + return False + return True + + def _interface_for_type(self,type): + return dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, + self.server.ServiceBrowserNew(avahi.IF_UNSPEC, + avahi.PROTO_UNSPEC, + type, 'local', dbus.UInt32(0))), + avahi.DBUS_INTERFACE_SERVICE_BROWSER) + + def _avahi_connect(self): + self.bus = dbus.SystemBus() + self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER),avahi.DBUS_INTERFACE_SERVER) + + + def announce(self): + self.group = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.EntryGroupNew()), + avahi.DBUS_INTERFACE_ENTRY_GROUP) +# self.group.connect_to_signal('StateChanged', self._entry_group_state_changed) + self.group.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0, + self.name, self._service_type(), 'local', self.hostname, + dbus.UInt16(self.port), + avahi.string_array_to_txt_array(self.txt)) + self.group.Commit() + + + def browse(self,types): + print "Browsing for:", types + for type in types: + b = self._interface_for_type(type) + b.connect_to_signal('ItemNew', self.add_service) + b.connect_to_signal('ItemRemove', self.remove_service) + + def add_service(self, interface,proto,name,stype,domain,flags): + print "Found service:", name + self.server.ResolveService(int(interface), int(proto), + name,stype,domain, avahi.PROTO_UNSPEC, dbus.UInt32(0), + reply_handler = self.listener.new_service, + error_handler = err) + self.done[stype] = True + if self._all_done(): + self.lock.release() + + def set_listener(self, ob): + self.listener = ob + + def remove_service(self,iface,proto,name,stype,domain,flags): + print "Removing service", name + + def quit(self): + print "Stop discovery." + self.ml.quit() + try: + self.lock.release() + except: + pass + return False + + def run(self): + DBusGMainLoop(set_as_default=True) + if self.timeout: + gobject.timeout_add(self.timeout,self.quit) + self._avahi_connect() + self.browse(self.types) + self.ml = gobject.MainLoop() + gobject.threads_init() + self.ml.run() + +class ZeroconfListener(object): + def new_service(self, iface,proto,name,stype,domain, host, aproto,addr, port, txt,flags): + global services + global hosts + + if stype == '_workstation._tcp': + host = machines.Host() + host.hostname = name.split()[0] + host.ip = addr + hosts.append(host) + return + + sname = servicenames[stype] + svc = data.Service() + svc.name = name + svc.ip = addr + svc.port = port + services[sname] = svc +# print "services[%s] = %s" % (sname, svc) + +def discover(services=('_ssh._tcp','_workstation._tcp','_nameserver._udp'),timeout=None): + """Discover services of given types via zeroconf. + Returns (blocked) lock, which is released when all services are discovered.""" + global browser + lock = threading.Lock() + lock.acquire() + listener = ZeroconfListener() + browser = ZeroconfThread(args=services) + browser.timeout = timeout + browser.lock = lock + browser.set_listener(listener) + browser.start() + return lock + +def stop(): + "Cancel discovery" + global browser + browser.quit()