Add announce() method to ZeroconfThread.

portnov [2009-02-27 08:40:49]
Add announce() method to ZeroconfThread.

Also, read port number from config.
TODO: default port number must be in one place (`data' module?)
Filename
centrixd/__init__.py
centrixd/cxproto.py
centrixd/data.py
zeroconf.py
diff --git a/centrixd/__init__.py b/centrixd/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/centrixd/cxproto.py b/centrixd/cxproto.py
index 6c80ffc..9006636 100644
--- a/centrixd/cxproto.py
+++ b/centrixd/cxproto.py
@@ -8,8 +8,9 @@ from threading import Thread
 from cxutils import log
 from xmlserialize import serialize
 from data import CXMessage
+from cxconfig import config

-CXPORT = 9222
+CXPORT = config['Network'].get('Port', 9222)

 class CXProto(Protocol):
     status = None
diff --git a/centrixd/data.py b/centrixd/data.py
new file mode 100644
index 0000000..db3cbec
--- /dev/null
+++ b/centrixd/data.py
@@ -0,0 +1,36 @@
+import xmlserialize as xml
+
+#####################################
+# Some classes
+
+class Service(object):
+    def __init__(self):
+        self.ip = ''
+        self.port = 0
+        self.proto = ''
+        self.stype = ''
+        self.name = ''
+
+    def __str__(self):
+        if self.port:
+            return "%s:%s" % (self.ip, self.port)
+        else:
+            return self.ip
+
+    def __repr__(self):
+        return "<Service %s on %s:%s>" % (self.name,self.ip,self.port)
+
+class CXMessage(object):
+    def __init__(self,type=None,**kwargs):
+        self.type = type
+        self.__dict__.update(kwargs)
+
+    def __repr__(self):
+        return "<Message of type %s>" % self.type
+
+    def to_xml(self):
+        return xml.serialize(self)
+
+    @classmethod
+    def parse(cls, text):
+        return xml.deserialize(text,cls=cls)
diff --git a/zeroconf.py b/zeroconf.py
new file mode 100644
index 0000000..6ebb590
--- /dev/null
+++ b/zeroconf.py
@@ -0,0 +1,149 @@
+import sys
+import signal
+import threading
+import gobject
+import dbus
+from dbus.mainloop.glib import DBusGMainLoop
+import avahi
+import socket
+
+from centrixd import data
+from centrixd.cxconfig import config
+import machines
+
+hosts = []
+services = {}
+
+def err(str):
+    print "Error:", str
+
+# Shorter names of services
+servicenames = {'_ssh._tcp':  'ssh',
+                '_http._tcp': 'web',
+                '_ftp._tcp':  'ftp',
+                '_workstation._tcp': 'machine',
+                '_pulse-server._tcp':  'pulseaudio',
+                '_nameserver._udp': 'dns'}
+
+class ZeroconfThread(threading.Thread):
+    "Thread which does all zeroconf work"
+
+    def __init__(self,group=None,target=None,name=None,args=(), kwargs=None,verbose=None):
+        threading.Thread.__init__(self)
+        self.name = config['Centrix']['Node Name']
+        self.port = config['Network'].get('Port', 9222)
+        self.hostname = socket.gethostname()
+        self.types = args
+        self.done = {}
+        for type in args:
+            self.done[type] = False
+
+    def _all_done(self):
+        for v in self.done.values():
+            if not v:
+                return False
+        return True
+
+    def _interface_for_type(self,type):
+        return dbus.Interface(self.bus.get_object(avahi.DBUS_NAME,
+                self.server.ServiceBrowserNew(avahi.IF_UNSPEC,
+                        avahi.PROTO_UNSPEC,
+                        type, 'local', dbus.UInt32(0))),
+                avahi.DBUS_INTERFACE_SERVICE_BROWSER)
+
+    def _avahi_connect(self):
+        self.bus = dbus.SystemBus()
+        self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER),avahi.DBUS_INTERFACE_SERVER)
+
+
+    def announce(self):
+        self.group = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.EntryGroupNew()),
+                                    avahi.DBUS_INTERFACE_ENTRY_GROUP)
+#         self.group.connect_to_signal('StateChanged', self._entry_group_state_changed)
+        self.group.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0,
+             self.name, self._service_type(), 'local', self.hostname,
+             dbus.UInt16(self.port),
+             avahi.string_array_to_txt_array(self.txt))
+        self.group.Commit()
+
+
+    def browse(self,types):
+        print "Browsing for:", types
+        for type in types:
+            b = self._interface_for_type(type)
+            b.connect_to_signal('ItemNew', self.add_service)
+            b.connect_to_signal('ItemRemove', self.remove_service)
+
+    def add_service(self, interface,proto,name,stype,domain,flags):
+        print "Found service:", name
+        self.server.ResolveService(int(interface), int(proto),
+             name,stype,domain, avahi.PROTO_UNSPEC, dbus.UInt32(0),
+             reply_handler = self.listener.new_service,
+             error_handler = err)
+        self.done[stype] = True
+        if self._all_done():
+            self.lock.release()
+
+    def set_listener(self, ob):
+        self.listener = ob
+
+    def remove_service(self,iface,proto,name,stype,domain,flags):
+        print "Removing service", name
+
+    def quit(self):
+        print "Stop discovery."
+        self.ml.quit()
+        try:
+            self.lock.release()
+        except:
+            pass
+        return False
+
+    def run(self):
+        DBusGMainLoop(set_as_default=True)
+        if self.timeout:
+            gobject.timeout_add(self.timeout,self.quit)
+        self._avahi_connect()
+        self.browse(self.types)
+        self.ml = gobject.MainLoop()
+        gobject.threads_init()
+        self.ml.run()
+
+class ZeroconfListener(object):
+    def new_service(self, iface,proto,name,stype,domain, host, aproto,addr, port, txt,flags):
+        global services
+        global hosts
+
+        if stype == '_workstation._tcp':
+            host = machines.Host()
+            host.hostname = name.split()[0]
+            host.ip = addr
+            hosts.append(host)
+            return
+
+        sname = servicenames[stype]
+        svc = data.Service()
+        svc.name = name
+        svc.ip = addr
+        svc.port = port
+        services[sname] = svc
+#         print "services[%s] = %s" % (sname, svc)
+
+def discover(services=('_ssh._tcp','_workstation._tcp','_nameserver._udp'),timeout=None):
+    """Discover services of given types via zeroconf.
+    Returns (blocked) lock, which is released when all services are discovered."""
+    global browser
+    lock = threading.Lock()
+    lock.acquire()
+    listener = ZeroconfListener()
+    browser = ZeroconfThread(args=services)
+    browser.timeout = timeout
+    browser.lock = lock
+    browser.set_listener(listener)
+    browser.start()
+    return lock
+
+def stop():
+    "Cancel discovery"
+    global browser
+    browser.quit()
ViewGit