Warning: This document is for the development version of python-dispatch.

Source code for pydispatch.aioutils

import asyncio
import threading
from _weakref import ref
from _weakrefset import _IterationGuard

from pydispatch.utils import (
    WeakMethodContainer,
    get_method_vars,
    _remove_dead_weakref,
)

[docs]class AioEmissionHoldLock(object): """Async context manager mixin for :class:`pydispatch.utils.EmissionHoldLock_` Supports use in :keyword:`async with` statements """ @property def aio_locks(self): d = getattr(self, '_aio_locks', None) if d is None: d = self._aio_locks = {} return d async def _build_aio_lock(self): loop = asyncio.get_event_loop() key = id(loop) lock = self.aio_locks.get(key) if lock is None: lock = asyncio.Lock(loop=loop) self.aio_locks[key] = lock return lock async def acquire_async(self): self.acquire() lock = await self._build_aio_lock() if not lock.locked(): await lock.acquire() async def release_async(self): lock = await self._build_aio_lock() if lock.locked: lock.release() self.release() async def __aenter__(self): await self.acquire_async() return self async def __aexit__(self, *args): await self.release_async()
[docs]class AioSimpleLock(object): """:class:`asyncio.Lock` alternative backed by a :class:`threading.Lock` This is a context manager that supports use in both :keyword:`with` and :keyword:`async with` context blocks. Attributes: lock: Instance of :class:`threading.Lock` .. versionadded:: 0.1.0 """ __slots__ = ('lock') def __init__(self): self.lock = threading.Lock()
[docs] def acquire(self, blocking=True, timeout=-1): """Acquire the :attr:`lock` Args: blocking (bool): See :meth:`threading.Lock.acquire` timeout (float): See :meth:`threading.Lock.acquire` Returns: bool: :obj:`True` if the lock was acquired, otherwise :obj:`False` """ result = self.lock.acquire(blocking, timeout) return result
[docs] def release(self): """Release the :attr:`lock` """ self.lock.release()
def __enter__(self): self.acquire() return self def __exit__(self, *args): self.release()
[docs] async def acquire_async(self): """Acquire the :attr:`lock` asynchronously """ r = self.acquire(blocking=False) while not r: await asyncio.sleep(.01) r = self.acquire(blocking=False)
async def __aenter__(self): await self.acquire_async() return self async def __aexit__(self, *args): self.release()
[docs]class AioEventWaiter(object): """Stores necessary information for a single "waiter" Used by :class:`AioEventWaiters` to handle :keyword:`awaiting <await>` an :class:`~pydispatch.dispatch.Event` on a specific :class:`event loop <asyncio.BaseEventLoop>` Attributes: loop: The :class:`EventLoop <asyncio.BaseEventLoop>` instance aio_event: An :class:`asyncio.Event` used to track event emission args (list): The positional arguments attached to the event kwargs (dict): The keyword arguments attached to the event .. versionadded:: 0.1.0 """ __slots__ = ('loop', 'aio_event', 'args', 'kwargs') def __init__(self, loop): self.loop = loop self.aio_event = asyncio.Event(loop=loop) self.args = [] self.kwargs = {}
[docs] def trigger(self, *args, **kwargs): """Called on event emission and notifies the :meth:`wait` method Called by :class:`AioEventWaiters` when the :class:`~pydispatch.dispatch.Event` instance is dispatched. Positional and keyword arguments are stored as instance attributes for use in the :meth:`wait` method and :attr:`aio_event` is set. """ self.args = args self.kwargs = kwargs self.aio_event.set()
[docs] async def wait(self): """Waits for event emission and returns the event parameters Returns: args (list): Positional arguments attached to the event kwargs (dict): Keyword arguments attached to the event """ await self.aio_event.wait() return self.args, self.kwargs
def __await__(self): task = asyncio.ensure_future(self.wait()) return task.__await__()
[docs]class AioEventWaiters(object): """Container used to manage :keyword:`await` use with events Used by :class:`pydispatch.dispatch.Event` when it is :keyword:`awaited <await>` Attributes: waiters (set): Instances of :class:`AioEventWaiter` currently "awaiting" the event lock (AioSimpleLock): A sync/async lock to guard modification to the :attr:`waiters` container during event emission .. versionadded:: 0.1.0 """ __slots__ = ('waiters', 'lock') def __init__(self): self.waiters = set() self.lock = AioSimpleLock()
[docs] async def add_waiter(self): """Add a :class:`AioEventWaiter` to the :attr:`waiters` container The event loop to use for :attr:`AioEventWaiter.loop` is found in the current context using :func:`asyncio.get_event_loop` Returns: waiter: The created :class:`AioEventWaiter` instance """ loop = asyncio.get_event_loop() async with self.lock: waiter = AioEventWaiter(loop) self.waiters.add(waiter) return waiter
[docs] async def wait(self): """Creates a :class:`waiter <AioEventWaiter>` and "awaits" its result This method is used by :class:`pydispatch.dispatch.Event` instances when they are "awaited" and is the primary functionality of :class:`AioEventWaiters` and :class:`AioEventWaiter`. Returns: args (list): Positional arguments attached to the event kwargs (dict): Keyword arguments attached to the event """ waiter = await self.add_waiter() return await waiter
def __await__(self): task = asyncio.ensure_future(self.wait()) return task.__await__()
[docs] def __call__(self, *args, **kwargs): """Triggers any stored :class:`waiters <AioEventWaiter>` Calls :meth:`AioEventWaiter.trigger` method on all instances stored in :attr:`waiters`. After completion, the :attr:`waiters` are removed. Args: *args: Positional arguments to pass to :meth:`AioEventWaiter.trigger` **kwargs: Keyword arguments to pass to :meth:`AioEventWaiter.trigger` """ with self.lock: for waiter in self.waiters: waiter.trigger(*args, **kwargs) self.waiters.clear()
[docs]class AioWeakMethodContainer(WeakMethodContainer): """Storage for coroutine functions as weak references .. versionadded:: 0.1.0 """ def __init__(self): super().__init__() def remove(wr, selfref=ref(self)): self = selfref() if self is not None: if self._iterating: self._pending_removals.append(wr.key) else: # Atomic removal is necessary since this function # can be called asynchronously by the GC _remove_dead_weakref(self.data, wr.key) self._on_weakref_fin(wr.key) self._remove = remove self.event_loop_map = {}
[docs] def add_method(self, loop, callback): """Add a coroutine function Args: loop: The :class:`event loop <asyncio.BaseEventLoop>` instance on which to schedule callbacks callback: The :term:`coroutine function` to add """ f, obj = get_method_vars(callback) wrkey = (f, id(obj)) self[wrkey] = obj self.event_loop_map[wrkey] = loop
[docs] def iter_instances(self): """Iterate over the stored objects .. seealso:: :meth:`pydispatch.utils.WeakMethodContainer.iter_instances` """ with _IterationGuard(self): yield from super().iter_instances()
[docs] def iter_methods(self): """Iterate over stored coroutine functions Yields: Stored :term:`coroutine function` objects .. seealso:: :meth:`pydispatch.utils.WeakMethodContainer.iter_instances` """ for wrkey, obj in self.iter_instances(): f, obj_id = wrkey loop = self.event_loop_map[wrkey] m = getattr(obj, f.__name__) yield loop, m
def _on_weakref_fin(self, key): if key in self.event_loop_map: del self.event_loop_map[key]
[docs] def submit_coroutine(self, coro, loop): """Schedule and await a coroutine on the specified loop The coroutine is wrapped and scheduled using :func:`asyncio.run_coroutine_threadsafe`. While the coroutine is "awaited", the result is not available as method returns immediately. Args: coro: The :term:`coroutine` to schedule loop: The :class:`event loop <asyncio.BaseEventLoop>` on which to schedule the coroutine Note: This method is used internally by :meth:`__call__` and is not meant to be called directly. """ async def _do_call(_coro): with _IterationGuard(self): await _coro asyncio.run_coroutine_threadsafe(_do_call(coro), loop=loop)
[docs] def __call__(self, *args, **kwargs): """Triggers all stored callbacks (coroutines) Args: *args: Positional arguments to pass to callbacks **kwargs: Keyword arguments to pass to callbacks """ for loop, m in self.iter_methods(): coro = m(*args, **kwargs) self.submit_coroutine(coro, loop)
def __delitem__(self, key): if key in self.event_loop_map: del self.event_loop_map[key] return super().__delitem__(key)