Source code for vidhubcontrol.discovery

import asyncio
import ipaddress
import logging

from pydispatch import Dispatcher, Property
from pydispatch.properties import DictProperty

logger = logging.getLogger(__name__)

try:
    import zeroconf
    ZEROCONF_AVAILABLE = True
except ImportError: # pragma: no cover
    zeroconf = None
    ZEROCONF_AVAILABLE = False

from vidhubcontrol.utils import find_ip_addresses

PUBLISH_TTL = 60

def convert_bytes_dict(d):
    return {str(k, 'UTF-8'):str(d[k], 'UTF-8') for k in d.keys()}

def convert_dict_bytes(d):
    return {bytes(k, 'UTF-8'):bytes(d[k], 'UTF-8') for k in d.keys()}

[docs]class ServiceInfo(Dispatcher): """Container for Zeroconf service information Closely related to :class:`zeroconf.ServiceInfo` Attributes: type (str): Fully qualified service type name (str): Fully qualified service name server (str): Fully qualified name for service host (defaults to :attr:`name`) address (:class:`ipaddress.IPv4Address`): The service ip address port (int): The service port properties (dict): Custom properties for the service """ properties = DictProperty() _attrs = ['type', 'name', 'server', 'address', 'port', 'properties'] def __init__(self, **kwargs): for attr in self._attrs: setattr(self, attr, kwargs.get(attr))
[docs] @classmethod def from_zc_info(cls, info): """Creates an instance from a :class:`zeroconf.ServiceInfo` object Arguments: info (:class:`zeroconf.ServiceInfo`): Returns: An instance of :class:`ServiceInfo` """ kwargs = {} for attr in cls._attrs: val = getattr(info, attr) if attr == 'properties': val = convert_bytes_dict(val) elif attr == 'address': val = ipaddress.ip_address(val) kwargs[attr] = val return cls(**kwargs)
@property def id(self): """Unique id for the service as a ``tuple`` of (:attr:`type`, :attr:`name`) """ return (self.type, self.name)#, self.address, self.port)
[docs] def to_zc_info(self): """Creates a copy as an instance of :class:`zeroconf.ServiceInfo` """ kwargs = {} for attr in self._attrs: val = getattr(self, attr) if attr == 'properties': val = convert_dict_bytes(val) elif attr == 'address': if isinstance(val, ipaddress.IPv4Interface): val = val.ip.packed elif isinstance(val, ipaddress.IPv4Address): val = val.packed else: val = ipaddress.ip_address(val).packed kwargs[attr] = val type_ = kwargs.pop('type') name = kwargs.pop('name') return zeroconf.ServiceInfo(type_, name, **kwargs)
[docs] def update(self, other): """Updates the :attr:`properties` from another :class:`ServiceInfo` instance """ if self.properties == other.properties: return self.properties = other.properties.copy()
def __hash__(self): return hash(self.id) def __eq__(self, other): return self.id == other.id def __repr__(self): return '<{self.__class__.__name__}> {self}'.format(self=self) def __str__(self): return '{self.name}: {self.type} ({self.address}:{self.port}), properties={self.properties}'.format(self=self)
[docs]class Message(object): """A message to communicate actions to and from :class:`Listener` Attributes: info: The :class:`ServiceInfo` related to the message Note: This class and its subclasses are not meant to be used directly. They are used internally in :class:`Listener` methods. """ def __init__(self, info): self.info = info def __repr__(self): return str(self) def __str__(self): return '{self.__class__.__name__}: {self.info}'.format(self=self)
[docs]class AddedMessage(Message): pass
[docs]class RemovedMessage(Message): pass
[docs]class PublishMessage(Message): def __init__(self, info, ttl=PUBLISH_TTL): super().__init__(info) self.ttl = ttl
[docs]class UnPublishMessage(Message): pass
[docs]class Listener(Dispatcher): """An async zeroconf service listener Allows async communication with :class:`zeroconf.Zeroconf` through :meth:`asyncio.AbstractEventLoop.run_in_executor` calls. Arguments: mainloop (:class:`asyncio.BaseEventLoop`): asyncio event loop instance service_type (str): The fully qualified service type name to subscribe to Attributes: services (dict): All services currently discovered as instances of :class:`ServiceInfo`. Stored using :attr:`ServiceInfo.id` as keys message_queue (:class:`asyncio.Queue`): Used to communicate actions and events with instances of :class:`Message` published_services (dict): Stores services that have been published using :meth:`publish_service` as :class:`ServiceInfo` instances. """ _events_ = ['service_added', 'service_removed'] services = DictProperty() def __init__(self, mainloop, service_type): self.mainloop = mainloop self.service_type = service_type self.running = False self.stopped = asyncio.Event() self.message_queue = asyncio.Queue() self.zeroconf = None self.published_services = {}
[docs] async def start(self): """Starts the service listener Runs :class:`zeroconf.Zeroconf` in an :class:`~concurrent.futures.Executor` instance through `asyncio.AbstractEventLoop.run_in_executor` (see :meth:`run_zeroconf`). """ await self.mainloop.run_in_executor(None, self.run_zeroconf) self.running = True self.run_future = asyncio.ensure_future(self.run(), loop=self.mainloop)
[docs] async def run(self): """Main loop for communicating with :class:`zeroconf.Zeroconf` Waits for messages on the :attr:`message_queue` and processes them. The loop will exit if an object placed on the queue is not an instance of :class:`Message`. When the loop exits, the :class:`zeroconf.Zeroconf` instance will be closed. """ while self.running: msg = await self.message_queue.get() self.message_queue.task_done() if not isinstance(msg, Message): self.running = False break elif isinstance(msg, AddedMessage): if msg.info.id in self.services: self.services[msg.info.id].update(msg.info) else: await self.add_service_info(msg.info) elif isinstance(msg, RemovedMessage): if msg.info.id in self.services: await self.remove_service_info(msg.info) elif isinstance(msg, PublishMessage): if not ZEROCONF_AVAILABLE: continue zc_info = msg.info.to_zc_info() await self.mainloop.run_in_executor( None, self.zeroconf.register_service, zc_info, msg.ttl, ) elif isinstance(msg, UnPublishMessage): zc_info = msg.info.to_zc_info() await self.mainloop.run_in_executor( None, self.zeroconf.unregister_service, zc_info, ) await self.mainloop.run_in_executor(None, self.stop_zeroconf) self.stopped.set()
[docs] async def stop(self): """Stops the loop in :meth:`run` """ if not self.running: return self.message_queue.put_nowait(None) await self.stopped.wait()
[docs] def run_zeroconf(self): """Starts :class:`zeroconf.Zeroconf` and :class:`zeroconf.ServiceBrowser` instances This is meant to be called inside of an :class:`concurrent.futures.Executor` and not used directly. """ if not ZEROCONF_AVAILABLE: return self.zeroconf = zeroconf.Zeroconf() self.zeroconf.listener = self self.browser = zeroconf.ServiceBrowser(self.zeroconf, self.service_type, self)
[docs] def stop_zeroconf(self): """Closes the :class:`zeroconf.Zeroconf` instance This is meant to be called inside of an :class:`concurrent.futures.Executor` and not used directly. """ if self.zeroconf is None: return self.zeroconf.close()
[docs] async def add_message(self, msg): """Adds a message to the :attr:`message_queue` Arguments: msg (:class:`Message`): Message to send """ await self.message_queue.put(msg)
async def add_service_info(self, info, **kwargs): self.services[info.id] = info self.emit('service_added', info, **kwargs) async def remove_service_info(self, info, **kwargs): del self.services[info.id] self.emit('service_removed', info, **kwargs) def remove_service(self, zc, type_, name): info = ServiceInfo(type=type_, name=name) msg = RemovedMessage(info) asyncio.run_coroutine_threadsafe(self.add_message(msg), loop=self.mainloop) def add_service(self, zc, type_, name): info = zc.get_service_info(type_, name) info = ServiceInfo.from_zc_info(info) msg = AddedMessage(info) asyncio.run_coroutine_threadsafe(self.add_message(msg), loop=self.mainloop) async def get_local_ifaces(self, refresh=False): ifaces = getattr(self, '_local_ifaces', None) if ifaces is not None and not refresh: return ifaces ifaces = self._local_ifaces = [iface for iface_name, iface in find_ip_addresses()] return ifaces async def get_local_hostname(self): name = getattr(self, '_local_hostname', None) if name is not None: return name name = None for iface in await self.get_local_ifaces(): _name, srv = await self.mainloop.getnameinfo((str(iface.ip), 80)) if _name is not None and _name != 'localhost': name = _name break if name is None: name = 'localhost' self._local_hostname = name return name
[docs] async def publish_service(self, type_, port, name=None, addresses=None, properties=None, ttl=PUBLISH_TTL): """Publishes a service on the network Arguments: type_ (str): Fully qualified service type port (int): The service port name (str, optional): Fully qualified service name. If not provided, this will be generated from the ``type_`` and the hostname detected by :meth:`get_local_hostname` addresses (optional): If provided, an ``iterable`` of IP addresses to publish. Can be :class:`ipaddress.IPv4Address` or any type that can be parsed by :func:`ipaddress.ip_address` properties (dict, optional): Custom properties for the service ttl (int, optional): The TTL value to publish. Defaults to :const:`PUBLISH_TTL` """ hostname = await self.get_local_hostname() if name is None: name = '.'.join([hostname, type_]) if addresses is None: addresses = await self.get_local_ifaces() if properties is None: properties = {} info_kwargs = { 'type':type_, 'port':port, 'name':name, 'properties':properties, } for addr in addresses: if not isinstance(addr, ipaddress.IPv4Address): addr = ipaddress.IPv4Address(addr) info_kwargs['address'] = addr info = ServiceInfo(**info_kwargs) if info.id not in self.published_services: self.published_services[info.id] = {} if info.address in self.published_services[info.id]: continue self.published_services[info.id][info.address] = info msg = PublishMessage(info, ttl) asyncio.run_coroutine_threadsafe(self.add_message(msg), loop=self.mainloop)
[docs] async def unpublish_service(self, type_, port, name=None, addresses=None, properties=None): """Removes a service published through :meth:`publish_service` Arguments: type_ (str): Fully qualified service type port (int): The service port name (str, optional): Fully qualified service name. If not provided, this will be generated from the ``type_`` and the hostname detected by :meth:`get_local_hostname` addresses (optional): If provided, an ``iterable`` of IP addresses to unpublish. Can be :class:`ipaddress.IPv4Address` or any type that can be parsed by :func:`ipaddress.ip_address` properties (dict, optional): Custom properties for the service """ hostname = await self.get_local_hostname() if name is None: name = '.'.join([hostname, type_]) if addresses is None: addresses = await self.get_local_ifaces() if properties is None: properties = {} info_kwargs = { 'type':type_, 'port':port, 'name':name, 'properties':properties, } for addr in addresses: if not isinstance(addr, ipaddress.IPv4Address): addr = ipaddress.IPv4Address(addr) info_kwargs['address'] = addr info = ServiceInfo(**info_kwargs) if info.id not in self.published_services: continue if info.address not in self.published_services[info.id]: continue del self.published_services[info.id][info.address] msg = PublishMessage(info) asyncio.run_coroutine_threadsafe(self.add_message(msg), loop=self.mainloop)
[docs]class BMDDiscovery(Listener): """Zeroconf listener for Blackmagic devices Attributes: vidhubs (dict): Contains discovered Videohub devices. This :class:`~pydispatch.properties.DictProperty` can be used to subscribe to changes. smart_views (dict): Contains discovered SmartView devices. This :class:`~pydispatch.properties.DictProperty` can be used to subscribe to changes. smart_scopes (dict): Contains discovered SmartScope devices. This :class:`~pydispatch.properties.DictProperty` can be used to subscribe to changes. """ vidhubs = DictProperty() smart_views = DictProperty() smart_scopes = DictProperty() def __init__(self, mainloop, service_type='_blackmagic._tcp.local.'): super().__init__(mainloop, service_type) async def add_service_info(self, info, **kwargs): device_cls = info.properties.get('class') bmd_id = info.properties.get('unique id', '').upper() if device_cls == 'Videohub': self.vidhubs[bmd_id] = info kwargs.update({'class':device_cls, 'id':bmd_id, 'device_type':'vidhub'}) elif info.properties.get('class') == 'SmartView': if 'SmartScope' in info.properties.get('name', ''): self.smart_scopes[bmd_id] = info kwargs['device_type'] = 'smartscope' else: self.smart_views[bmd_id] = info kwargs['device_type'] = 'smartview' kwargs.update({'class':device_cls, 'id':bmd_id}) await super().add_service_info(info, **kwargs) async def remove_service_info(self, info, **kwargs): device_cls = info.properties.get('class') bmd_id = info.properties.get('unique id', '').upper() if bmd_id in self.vidhubs and device_cls == 'Videohub': del self.vidhubs[bmd_id] kwargs.update({'class':device_cls, 'id':bmd_id, 'device_type':'vidhub'}) elif bmd_id in self.smart_views and device_cls == 'SmartView': del self.smart_views[bmd_id] kwargs.update({'class':device_cls, 'id':bmd_id, 'device_type':'smartview'}) elif bmd_id in self.smart_scopes and device_cls == 'SmartView': del self.smart_scopes[bmd_id] kwargs.update({'class':device_cls, 'id':bmd_id, 'device_type':'smartscope'}) await super().remove_service_info(info, **kwargs)
def main(): loop = asyncio.get_event_loop() loop.set_debug(True) listener = BMDDiscovery(loop) def on_service_added(info, **kwargs): logger.info('Added: {}'.format(info)) def on_service_removed(info, **kwargs): logger.info('Removed: {}'.format(info)) listener.bind(service_added=on_service_added, service_removed=on_service_removed) async def run(): await listener.start() await asyncio.sleep(5) logger.info(str(listener.services)) await listener.stop() loop.run_until_complete(run()) return listener if __name__ == '__main__': main()