Source code for creek.util

"""Utils for creek"""

from functools import (
    WRAPPER_ASSIGNMENTS,
    partial,
    update_wrapper as _update_wrapper,
    wraps as _wraps,
    singledispatch,
)
from itertools import islice

from typing import (
    Protocol,
    runtime_checkable,
    Tuple,
    Callable,
    Optional,
    Generator,
    Iterable,
    Any,
    Union,
    NewType,
    Iterator,
)


[docs]def iterate_skipping_errors( g: Iterable, error_callback: Optional[Callable[[BaseException], Any]] = None, caught_exceptions: Tuple[BaseException] = (Exception,), ) -> Generator: """ Iterate over a generator, skipping errors and calling an error callback if provided. :param g: The generator to iterate over :param error_callback: A callback to call when an error is encountered. :param caught_exceptions: The exceptions to catch and skip. :return: A generator that yields the values of the original generator, skipping errors. >>> list(iterate_skipping_errors(map(lambda x: 1 / x, [1, 0, 2]))) [1.0, 0.5] >>> list(iterate_skipping_errors(map(lambda x: 1 / x, [1, 0, 2]), print)) division by zero [1.0, 0.5] See https://github.com/i2mint/creek/issues/6 for more info. """ iterator = iter(g) while True: try: yield next(iterator) except StopIteration: break except caught_exceptions as e: if error_callback: error_callback(e)
[docs]def iterize(func, name=None): """From an In->Out function, makes a Iterator[In]->Itertor[Out] function. >>> f = lambda x: x * 10 >>> f(2) 20 >>> iterized_f = iterize(f) >>> list(iterized_f(iter([1,2,3]))) [10, 20, 30] """ iterized_func = partial(map, func) if name is not None: iterized_func.__name__ = name return iterized_func
IteratorItem = Any
[docs]@runtime_checkable class IterableType(Protocol): """An iterable type that can actually be used in singledispatch >>> assert isinstance([1, 2, 3], IterableType) >>> assert not isinstance(2, IterableType) """ def __iter__(self) -> Iterable[IteratorItem]: pass
[docs]@runtime_checkable class IteratorType(Protocol): """An iterator type that can actually be used in singledispatch >>> assert isinstance(iter([1, 2, 3]), IteratorType) >>> assert not isinstance([1, 2, 3], IteratorType) """ def __next__(self) -> IteratorItem: pass
[docs]@runtime_checkable class CursorFunc(Protocol): """An argument-less function returning an iterator's values""" def __call__(self) -> IteratorItem: """Get the next iterator's item and increment the cursor"""
IterType = NewType('IterType', Union[IteratorType, IterableType, CursorFunc]) IterType.__doc__ = 'A type that can be made into an iterator' wrapper_assignments = (*WRAPPER_ASSIGNMENTS, '__defaults__', '__kwdefaults__') update_wrapper = partial(_update_wrapper, assigned=wrapper_assignments) wraps = partial(_wraps, assigned=wrapper_assignments) # --------------------------------------------------------------------------------------- # iteratable, iterator, cursors # TODO: If bring i2 as dependency, use mk_sentinel here no_sentinel = type('no_sentinel', (), {})() no_default = type('no_default', (), {})()
[docs]class IteratorExit(BaseException): """Raised when an iterator should quit being iterated on, signaling this event any process that cares to catch the signal. We chose to inherit directly from `BaseException` instead of `Exception` for the same reason that `GeneratorExit` does: Because it's not technically an error. See: https://docs.python.org/3/library/exceptions.html#GeneratorExit """
DFLT_INTERRUPT_EXCEPTIONS = (StopIteration, IteratorExit, KeyboardInterrupt) def iterate_until_exception(iterator, interrupt_exceptions=DFLT_INTERRUPT_EXCEPTIONS): while True: try: next(iterator) except interrupt_exceptions: print('ending') break
[docs]def iterable_to_iterator(iterable: Iterable, sentinel=no_sentinel) -> Iterator: """Get an iterator from an iterable >>> iterable = [1, 2, 3] >>> iterator = iterable_to_iterator(iterable) >>> assert isinstance(iterator, Iterator) >>> assert list(iterator) == iterable You can also specify a sentinel, which will result in the iterator stoping just before it encounters that sentinel value >>> iterable = [1, 2, 3, 4, None, None, 7] >>> iterator = iterable_to_iterator(iterable, None) >>> assert isinstance(iterator, Iterator) >>> list(iterator) [1, 2, 3, 4] """ if sentinel is no_sentinel: return iter(iterable) else: return iter(iter(iterable).__next__, sentinel)
[docs]def iterator_to_cursor(iterator: Iterator, default=no_default) -> CursorFunc: """Get a cursor function for the input iterator. >>> iterator = iter([1, 2, 3]) >>> cursor = iterator_to_cursor(iterator) >>> assert callable(cursor) >>> assert cursor() == 1 >>> assert list(cursor_to_iterator(cursor)) == [2, 3] Note how we consumed the cursor till the end; by using cursor_to_iterator. Indeed, `list(iter(cursor))` wouldn't have worked since a cursor isn't a iterator, but a callable to get the items an the iterator would give you. You can specify a default. The default has the same role that it has for the `next` function: It makes the cursor function return that default when the iterator has been "consumed" (i.e. would raise a `StopIteration`). >>> iterator = iter([1, 2]) >>> cursor = iterator_to_cursor(iterator, None) >>> assert callable(cursor) >>> cursor() 1 >>> cursor() 2 And then... >>> assert cursor() is None >>> assert cursor() is None forever. """ if default is no_default: return partial(next, iterator) else: return partial(next, iterator, default)
[docs]def cursor_to_iterator(cursor: CursorFunc, sentinel=no_sentinel) -> Iterator: """Get an iterator from a cursor function. A cursor function is a callable that you call (without arguments) to get items of data one by one. Sometimes, especially in live io contexts, that's the kind interface you're given to consume a stream. >>> cursor = iter([1, 2, 3]).__next__ >>> assert not isinstance(cursor, Iterator) >>> assert not isinstance(cursor, Iterable) >>> assert callable(cursor) If you want to consume your stream as an iterator instead, use `cursor_to_iterator`. >>> iterator = cursor_to_iterator(cursor) >>> assert isinstance(iterator, Iterator) >>> list(iterator) [1, 2, 3] If you want your iterator to stop (without a fuss) as soon as the cursor returns a particular element (called a sentinel), say it: >>> cursor = iter([1, 2, None, None, 3]).__next__ >>> iterator = cursor_to_iterator(cursor, sentinel=None) >>> list(iterator) [1, 2] """ return iter(cursor, sentinel)
[docs]def iterable_to_cursor(iterable: Iterable) -> CursorFunc: """Get a cursor function from an iterable.""" iterator = iterable_to_iterator(iterable) return iterator_to_cursor(iterator)
[docs]@singledispatch def to_iterator(x: IteratorType, sentinel=no_sentinel): """Get an iterator from an iterable or a cursor function >>> from typing import Iterator >>> it = to_iterator([1, 2, 3]) >>> assert isinstance(it, Iterator) >>> list(it) [1, 2, 3] >>> list(it) [] >>> cursor = iter([1, 2, 3]).__next__ >>> assert isinstance(cursor, CursorFunc) >>> it = to_iterator(cursor) >>> assert isinstance(it, Iterator) >>> list(it) [1, 2, 3] >>> list(it) [] You can use sentinels too >>> list(to_iterator([1, 2, None, 4], sentinel=None)) [1, 2] >>> cursor = iter([1, 2, 3, 4, 5]).__next__ >>> list(to_iterator(cursor, sentinel=4)) [1, 2, 3] """ if sentinel is no_sentinel: return x else: cursor = x.__next__ return iter(cursor, sentinel)
@to_iterator.register def _(x: IterableType, sentinel=no_sentinel): return to_iterator.__wrapped__(iter(x), sentinel) # TODO: Use of __wrapped__ seems hacky. Better way? # TODO: Why does to_iterator(iter(x), sentinel) lead to infinite recursion? @to_iterator.register def _(x: CursorFunc, sentinel=no_sentinel): return iter(x, sentinel) # --------------------------------------------------------------------------------------- no_such_item = type('NoSuchItem', (), {})() class stream_util: def always_true(*args, **kwargs): return True def do_nothing(*args, **kwargs): pass def rewind(self, instance): instance.seek(0) def skip_lines(self, instance, n_lines_to_skip=0): instance.seek(0) class PreIter: def skip_items(self, instance, n): return islice(instance, n, None) def cls_wrap(cls, obj): if isinstance(obj, type): @wraps(obj, updated=()) class Wrap(cls): @wraps(obj.__init__) def __init__(self, *args, **kwargs): wrapped = obj(*args, **kwargs) super().__init__(wrapped) # Wrap.__signature__ = signature(obj) return Wrap else: return cls(obj) # TODO: Make identity_func "identifiable". If we use the following one, we can use == to detect it's use, # TODO: ... but there may be a way to annotate, register, or type any identity function so it can be detected. def identity_func(x): return x static_identity_method = staticmethod(identity_func) from inspect import signature, Signature # Note: Pipe code is completely independent (with inspect imports signature & Signature) # If you only need simple pipelines, use this, or even copy/paste it where needed. # TODO: Public interface mis-aligned with i2. funcs list here, in i2 it's dict. Align? # If we do so, it would be a breaking change since any dependents that expect funcs # to be a list of funcs will iterate over a iterable of names instead.
[docs]class Pipe: """Simple function composition. That is, gives you a callable that implements input -> f_1 -> ... -> f_n -> output. >>> def foo(a, b=2): ... return a + b >>> f = Pipe(foo, lambda x: print(f"x: {x}")) >>> f(3) x: 5 >>> len(f) 2 You can name functions, but this would just be for documentation purposes. The names are completely ignored. >>> g = Pipe( ... add_numbers = lambda x, y: x + y, ... multiply_by_2 = lambda x: x * 2, ... stringify = str ... ) >>> g(2, 3) '10' >>> len(g) 3 Notes: - Pipe instances don't have a __name__ etc. So some expectations of normal functions are not met. - Pipe instance are pickalable (as long as the functions that compose them are) You can specify a single functions: >>> Pipe(lambda x: x + 1)(2) 3 but >>> Pipe() Traceback (most recent call last): ... ValueError: You need to specify at least one function! You can specify an instance name and/or doc with the special (reserved) argument names ``__name__`` and ``__doc__`` (which therefore can't be used as function names): >>> f = Pipe(map, add_it=sum, __name__='map_and_sum', __doc__='Apply func and add') >>> f(lambda x: x * 10, [1, 2, 3]) 60 >>> f.__name__ 'map_and_sum' >>> f.__doc__ 'Apply func and add' """ funcs = () def __init__(self, *funcs, **named_funcs): named_funcs = self._process_reserved_names(named_funcs) funcs = list(funcs) + list(named_funcs.values()) self.funcs = funcs n_funcs = len(funcs) if n_funcs == 0: raise ValueError('You need to specify at least one function!') elif n_funcs == 1: other_funcs = () first_func = last_func = funcs[0] else: first_func, *other_funcs = funcs *_, last_func = other_funcs self.__signature__ = Pipe._signature_from_first_and_last_func( first_func, last_func ) self.first_func, self.other_funcs = first_func, other_funcs _reserved_names = ('__name__', '__doc__') def _process_reserved_names(self, named_funcs): for name in self._reserved_names: if (value := named_funcs.pop(name, None)) is not None: setattr(self, name, value) return named_funcs def __call__(self, *args, **kwargs): out = self.first_func(*args, **kwargs) for func in self.other_funcs: out = func(out) return out def __len__(self): return len(self.funcs) _dflt_signature = Signature.from_callable(lambda *args, **kwargs: None) @staticmethod def _signature_from_first_and_last_func(first_func, last_func): try: input_params = signature(first_func).parameters.values() except ValueError: # function doesn't have a signature, so take default input_params = Pipe._dflt_signature.parameters.values() try: return_annotation = signature(last_func).return_annotation except ValueError: # function doesn't have a signature, so take default return_annotation = Pipe._dflt_signature.return_annotation return Signature(tuple(input_params), return_annotation=return_annotation)