Source code for rtlsdr.rtlsdraio

"""
This module adds :mod:`asyncio` support for reading samples from the device.

The main functionality can be found in the :meth:`~rtlsdr.rtlsdraio.RtlSdrAio.stream`
method of :class:`rtlsdr.rtlsdraio.RtlSdrAio`.

Example:
    .. code-block:: python

       import asyncio
       from rtlsdr import RtlSdr

       async def streaming():
           sdr = RtlSdr()

           async for samples in sdr.stream():
               # do something with samples
               # ...

           # to stop streaming:
           await sdr.stop()

           # done
           sdr.close()

       loop = asyncio.get_event_loop()
       loop.run_until_complete(streaming())

"""

import logging
try:
    import asyncio
    AIO_AVAILABLE = True
except ImportError:
    AIO_AVAILABLE = False
from .rtlsdr import RtlSdr


log = logging.getLogger(__name__)

_CLASS_TEMPLATE = """
class AsyncCallbackIter:
    '''Convert a callback-based legacy async function into one supporting asyncio
    and Python 3.5+

    The queued data can be iterated using ``async for``

    Arguments:
        func_start: A callable which should take a single callback that will be
            passed data. Will be run in a seperate thread in case it blocks.
        func_stop (optional): A callable to stop ``func_start`` from calling the
            callback. Will be run in a seperate thread in case it blocks.
        queue_size (:obj:`int`, optional): The maximum amount of data
            that will be buffered.
        loop (optional): The ``asyncio.event_loop`` to use. If not supplied,
            :func:`asyncio.get_event_loop` will be used.

    '''

    def __init__(self, func_start, func_stop=None, queue_size=20, *, loop=None):
        self.queue = asyncio.Queue(queue_size)
        self.loop = loop if loop else asyncio.get_event_loop()
        self.func_stop = func_stop
        self.func_start = func_start

        self.running = False

    async def add_to_queue(self, *args):
        '''Add items to the queue

        Arguments:
            *args: Arguments to be added

        This method is a :obj:`~asyncio.coroutine`
        '''
        try:
            self.queue.put_nowait(args)
        except asyncio.QueueFull:
            log.info('extra callback data lost')

    def _callback(self, *args):
        if not self.running:
            return
        asyncio.run_coroutine_threadsafe(self.add_to_queue(*args), self.loop)

    async def start(self):
        '''Start the execution

        The callback given by ``func_start`` will be called by
        :meth:`asyncio.AbstractEventLoop.run_in_executor` and will continue
        until :meth:`stop` is called.

        This method is a :obj:`~asyncio.coroutine`
        '''

        assert(not self.running)

        # start legacy async function
        future = self.loop.run_in_executor(None, self.func_start, self._callback)
        asyncio.ensure_future(future, loop=self.loop)
        self.executor_task = future
        self.running = True

    async def stop(self):
        '''Stop the running executor task

        If ``func_stop`` was supplied, it will be called after the queue has
        been exhausted.

        This method is a :obj:`~asyncio.coroutine`
        '''

        assert(self.running)

        self.running = False

        # send a signal to stop
        iter_stopped = False
        while not iter_stopped:
            try:
                self.queue.put_nowait((StopAsyncIteration(),))
                iter_stopped = True
            except asyncio.QueueFull:
                try:
                    self.queue.task_done()
                except ValueError:
                    pass
        if self.func_stop:
            # stop legacy async function
            await self.loop.run_in_executor(None, self.func_stop)
        await self.executor_task

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.queue.get()
        self.queue.task_done()

        if isinstance(val[0], StopAsyncIteration):
            raise StopAsyncIteration

        #return val if len(val) > 1 else val[0]

        # slight hack for rtlsdr to ignore context object
        return val[0]
"""


if AIO_AVAILABLE:
    try:
        exec('async def test_for_async(): pass')
        exec('def test_unpack_operators(a, *, b): pass')
    except SyntaxError:
        AIO_AVAILABLE = False

if AIO_AVAILABLE:
    exec(_CLASS_TEMPLATE, globals(), locals())

[docs]class RtlSdrAio(RtlSdr): DEFAULT_READ_SIZE = 128*1024
[docs] def stream(self, num_samples_or_bytes=DEFAULT_READ_SIZE, format='samples', loop=None): """Start async streaming from SDR and return an async iterator (Python 3.5+). The :meth:`read_samples_async` method is called in an :class:`~concurrent.futures.Excecutor` instance using :meth:`asyncio.AbstractEventLoop.run_in_executor`. The returned asynchronous iterable can then used to retrieve sample data using ``async for`` syntax. Calling the :meth:`~rtlsdr.rtlsdraio.RtlSdrAio.stop` method will stop the ``read_samples_async`` session and close the ``Excecutor`` task. Arguments: num_samples_or_bytes (int): The number of bytes/samples that will be returned each iteration format (:obj:`str`, optional): Specifies whether raw data ("bytes") or IQ samples ("samples") will be returned loop (optional): An asyncio event loop Returns: An ``asynchronous iterator`` to yield sample data """ if format == 'samples': func_start = self.read_samples_async elif format == 'bytes': func_start = self.read_bytes_async else: raise ValueError('format "%s" not supported' % format) self.async_iter = AsyncCallbackIter(func_start=lambda cb: func_start(cb, num_samples_or_bytes), func_stop=self.cancel_read_async, loop=loop) asyncio.ensure_future(self.async_iter.start(), loop=loop) return self.async_iter
[docs] def stop(self): """Stop async stream Stops the ``read_samples_async`` and ``Excecutor`` task created by :meth:`stream`. """ return asyncio.ensure_future(self.async_iter.stop(), loop=self.async_iter.loop)