creek.scrap.async_utils

Utils to deal with async iteration

Making singledispatch work:

from typing import Generator, Iterator, Iterable, AsyncIterable, AsyncIterator
from functools import singledispatch, partial

from typing import Protocol, runtime_checkable

@runtime_checkable
class IterableType(Protocol):
    def __iter__(self):
        pass

@runtime_checkable
class CursorFunc(Protocol):
    def __call__(self):
        pass

@singledispatch
def to_iterator(x: IterableType):
    return iter(x)

will_never_happen = object()

@to_iterator.register
def _(x: CursorFunc):
    return iter(x, will_never_happen)

assert list(to_iterator([1,2,3])) == [1, 2, 3]

f = partial(next, iter([1,2,3]))
assert list(to_iterator(f)) == [1, 2, 3]

Trying to make async iterators/iterables/cursor_funcs utils

import asyncio


async def ticker(to=3, delay=0.5):
    # Yield numbers from 0 to `to` every `delay` seconds.
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

async def my_aiter(async_iterable):
    async for i in async_iterable:
        yield i

t = [i async for i in my_aiter(ticker(3, 0.2))]
assert t == [0, 1, 2]

# t = list(my_aiter(ticker(3, 0.2)))
# # TypeError: 'async_generator' object is not iterable
# # and
# t = await list(ticker(3, 0.2))
# # TypeError: 'async_generator' object is not iterable

# But...

async def alist(async_iterable):
    return [i async for i in async_iterable]

t = await alist(ticker(3, 0.2))
assert t == [0, 1, 2]
creek.scrap.async_utils.aiter_with_sentinel(cursor_func: NewType.<locals>.new_type, sentinel: Any) → AsyncIterator[source]

Like iter(async_callable, sentinel) builtin but for async callables

creek.scrap.async_utils.iterable_to_iterator(iterable: Union[Iterable, AsyncIterable]) → Union[Iterator, AsyncIterator][source]

Get an iterator from an iterable (whether async or not)

>>> iterable = [1, 2, 3]
>>> iterator = iterable_to_iterator(iterable)
>>> assert isinstance(iterator, Iterator)
>>> assert list(iterator) == iterable