From d90153cc4a1121109d0fefadf060ffb64bc96899 Mon Sep 17 00:00:00 2001 From: portnov Date: Tue, 3 Mar 2009 00:54:17 +0500 Subject: [PATCH] Move zeroconf module --- centrixd/zeroconf.py | 171 ++++++++++++++++++++++++++++++++++++++++++++++++++ test.py | 2 +- zeroconf.py | 171 -------------------------------------------------- 3 files changed, 172 insertions(+), 172 deletions(-) create mode 100644 centrixd/zeroconf.py delete mode 100644 zeroconf.py diff --git a/centrixd/zeroconf.py b/centrixd/zeroconf.py new file mode 100644 index 0000000..94eafaa --- /dev/null +++ b/centrixd/zeroconf.py @@ -0,0 +1,171 @@ +import sys +import signal +import threading +import gobject +import dbus +from dbus.mainloop.glib import DBusGMainLoop +import avahi +import socket + +from centrixd import data +from centrixd.cxconfig import config +import machines + +hosts = [] +services = {} + +def err(str): + print "Error:", str + +# Shorter names of services +servicenames = {'_ssh._tcp': 'ssh', + '_http._tcp': 'web', + '_ftp._tcp': 'ftp', + '_workstation._tcp': 'machine', + '_pulse-server._tcp': 'pulseaudio', + '_nameserver._udp': 'dns'} + +class ZeroconfThread(threading.Thread): + "Thread which does all zeroconf work" + + def __init__(self,group=None,target=None,name=None,args=(), kwargs=None,verbose=None): + threading.Thread.__init__(self) + + self.name = config['Centrix']['Node Name'] + self.port = config['Network'].get('Port', 9222) + mode = config['Centrix']['Mode'] + self.master = (mode == 'Master') +# TODO: if this server is not Master, detect Master (via zeroconf or from config) and announce it in TXT record + self.txt = '' + self.hostname = socket.gethostname() + '.local' + self.types = args + self.done = {} + for type in args: + self.done[type] = False + + def _all_done(self): + "Are all services discovered?" + for v in self.done.values(): + if not v: + return False + return True + + def _interface_for_type(self,type): + "Return Dbus interface to Avahi for given service type" + return dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, + self.server.ServiceBrowserNew(avahi.IF_UNSPEC, + avahi.PROTO_UNSPEC, + type, 'local', dbus.UInt32(0))), + avahi.DBUS_INTERFACE_SERVICE_BROWSER) + + def _avahi_connect(self): + "Connect to Avahi via Dbus" + self.bus = dbus.SystemBus() + self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER),avahi.DBUS_INTERFACE_SERVER) + + def _service_type(self): + "Return zeroconf service type, dependent on master/relay mode" + + if self.master: + t = 'master' + else: + t = 'relay' + return "_centrix_%s._tcp" % t + + def announce(self): + """Announce centrix server via Zeroconf/Avahi, + for client machines to can find server""" + + self.group = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.EntryGroupNew()), + avahi.DBUS_INTERFACE_ENTRY_GROUP) +# self.group.connect_to_signal('StateChanged', self._entry_group_state_changed) + print "Hostname:", self.hostname + self.group.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0, + self.name, self._service_type(), 'local', self.hostname, + dbus.UInt16(self.port), + []) + self.group.Commit() + + + def browse(self,types): + print "Browsing for:", types + for type in types: + b = self._interface_for_type(type) + b.connect_to_signal('ItemNew', self.add_service) + b.connect_to_signal('ItemRemove', self.remove_service) + + def add_service(self, interface,proto,name,stype,domain,flags): + print "Found service:", name + self.server.ResolveService(int(interface), int(proto), + name,stype,domain, avahi.PROTO_UNSPEC, dbus.UInt32(0), + reply_handler = self.listener.new_service, + error_handler = err) + self.done[stype] = True + if self._all_done(): + self.lock.release() + + def set_listener(self, ob): + self.listener = ob + + def remove_service(self,iface,proto,name,stype,domain,flags): + print "Removing service", name + + def quit(self): + print "Stop discovery." + self.ml.quit() + try: + self.lock.release() + except: + pass + return False + + def run(self): + DBusGMainLoop(set_as_default=True) + self._avahi_connect() + if self.timeout: + gobject.timeout_add(self.timeout,self.quit) + self.announce() + self.browse(self.types) + self.ml = gobject.MainLoop() + gobject.threads_init() + self.ml.run() + +class ZeroconfListener(object): + def new_service(self, iface,proto,name,stype,domain, host, aproto,addr, port, txt,flags): + global services + global hosts + print "Host:", host + + if stype == '_workstation._tcp': + host = machines.Host() + host.hostname = name.split()[0] + host.ip = addr + hosts.append(host) + return + + sname = servicenames[stype] + svc = data.Service() + svc.name = name + svc.ip = addr + svc.port = port + services[sname] = svc +# print "services[%s] = %s" % (sname, svc) + +def discover(services=('_ssh._tcp','_workstation._tcp','_nameserver._udp'),timeout=None): + """Discover services of given types via zeroconf. + Returns (blocked) lock, which is released when all services are discovered.""" + global browser + lock = threading.Lock() + lock.acquire() + listener = ZeroconfListener() + browser = ZeroconfThread(args=services) + browser.timeout = timeout + browser.lock = lock + browser.set_listener(listener) + browser.start() + return lock + +def stop(): + "Cancel discovery" + global browser + browser.quit() diff --git a/test.py b/test.py index fd20e47..8afd83f 100755 --- a/test.py +++ b/test.py @@ -9,7 +9,7 @@ from twisted.internet.protocol import Factory from twisted.protocols.basic import LineReceiver from twisted.internet import reactor -import zeroconf +from centrixd import zeroconf import templates import configs import configsets diff --git a/zeroconf.py b/zeroconf.py deleted file mode 100644 index 94eafaa..0000000 --- a/zeroconf.py +++ /dev/null @@ -1,171 +0,0 @@ -import sys -import signal -import threading -import gobject -import dbus -from dbus.mainloop.glib import DBusGMainLoop -import avahi -import socket - -from centrixd import data -from centrixd.cxconfig import config -import machines - -hosts = [] -services = {} - -def err(str): - print "Error:", str - -# Shorter names of services -servicenames = {'_ssh._tcp': 'ssh', - '_http._tcp': 'web', - '_ftp._tcp': 'ftp', - '_workstation._tcp': 'machine', - '_pulse-server._tcp': 'pulseaudio', - '_nameserver._udp': 'dns'} - -class ZeroconfThread(threading.Thread): - "Thread which does all zeroconf work" - - def __init__(self,group=None,target=None,name=None,args=(), kwargs=None,verbose=None): - threading.Thread.__init__(self) - - self.name = config['Centrix']['Node Name'] - self.port = config['Network'].get('Port', 9222) - mode = config['Centrix']['Mode'] - self.master = (mode == 'Master') -# TODO: if this server is not Master, detect Master (via zeroconf or from config) and announce it in TXT record - self.txt = '' - self.hostname = socket.gethostname() + '.local' - self.types = args - self.done = {} - for type in args: - self.done[type] = False - - def _all_done(self): - "Are all services discovered?" - for v in self.done.values(): - if not v: - return False - return True - - def _interface_for_type(self,type): - "Return Dbus interface to Avahi for given service type" - return dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, - self.server.ServiceBrowserNew(avahi.IF_UNSPEC, - avahi.PROTO_UNSPEC, - type, 'local', dbus.UInt32(0))), - avahi.DBUS_INTERFACE_SERVICE_BROWSER) - - def _avahi_connect(self): - "Connect to Avahi via Dbus" - self.bus = dbus.SystemBus() - self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER),avahi.DBUS_INTERFACE_SERVER) - - def _service_type(self): - "Return zeroconf service type, dependent on master/relay mode" - - if self.master: - t = 'master' - else: - t = 'relay' - return "_centrix_%s._tcp" % t - - def announce(self): - """Announce centrix server via Zeroconf/Avahi, - for client machines to can find server""" - - self.group = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.EntryGroupNew()), - avahi.DBUS_INTERFACE_ENTRY_GROUP) -# self.group.connect_to_signal('StateChanged', self._entry_group_state_changed) - print "Hostname:", self.hostname - self.group.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0, - self.name, self._service_type(), 'local', self.hostname, - dbus.UInt16(self.port), - []) - self.group.Commit() - - - def browse(self,types): - print "Browsing for:", types - for type in types: - b = self._interface_for_type(type) - b.connect_to_signal('ItemNew', self.add_service) - b.connect_to_signal('ItemRemove', self.remove_service) - - def add_service(self, interface,proto,name,stype,domain,flags): - print "Found service:", name - self.server.ResolveService(int(interface), int(proto), - name,stype,domain, avahi.PROTO_UNSPEC, dbus.UInt32(0), - reply_handler = self.listener.new_service, - error_handler = err) - self.done[stype] = True - if self._all_done(): - self.lock.release() - - def set_listener(self, ob): - self.listener = ob - - def remove_service(self,iface,proto,name,stype,domain,flags): - print "Removing service", name - - def quit(self): - print "Stop discovery." - self.ml.quit() - try: - self.lock.release() - except: - pass - return False - - def run(self): - DBusGMainLoop(set_as_default=True) - self._avahi_connect() - if self.timeout: - gobject.timeout_add(self.timeout,self.quit) - self.announce() - self.browse(self.types) - self.ml = gobject.MainLoop() - gobject.threads_init() - self.ml.run() - -class ZeroconfListener(object): - def new_service(self, iface,proto,name,stype,domain, host, aproto,addr, port, txt,flags): - global services - global hosts - print "Host:", host - - if stype == '_workstation._tcp': - host = machines.Host() - host.hostname = name.split()[0] - host.ip = addr - hosts.append(host) - return - - sname = servicenames[stype] - svc = data.Service() - svc.name = name - svc.ip = addr - svc.port = port - services[sname] = svc -# print "services[%s] = %s" % (sname, svc) - -def discover(services=('_ssh._tcp','_workstation._tcp','_nameserver._udp'),timeout=None): - """Discover services of given types via zeroconf. - Returns (blocked) lock, which is released when all services are discovered.""" - global browser - lock = threading.Lock() - lock.acquire() - listener = ZeroconfListener() - browser = ZeroconfThread(args=services) - browser.timeout = timeout - browser.lock = lock - browser.set_listener(listener) - browser.start() - return lock - -def stop(): - "Cancel discovery" - global browser - browser.quit() -- 1.7.2.3