creek.util

Utils for creek

class creek.util.CursorFunc(*args, **kwargs)[source]

An argument-less function returning an iterator’s values

class creek.util.IterableType(*args, **kwargs)[source]

An iterable type that can actually be used in singledispatch

>>> assert isinstance([1, 2, 3], IterableType)
>>> assert not isinstance(2, IterableType)
exception creek.util.IteratorExit[source]

Raised when an iterator should quit being iterated on, signaling this event any process that cares to catch the signal. We chose to inherit directly from BaseException instead of Exception for the same reason that GeneratorExit does: Because it’s not technically an error.

See: https://docs.python.org/3/library/exceptions.html#GeneratorExit

class creek.util.IteratorType(*args, **kwargs)[source]

An iterator type that can actually be used in singledispatch

>>> assert isinstance(iter([1, 2, 3]), IteratorType)
>>> assert not isinstance([1, 2, 3], IteratorType)
class creek.util.Pipe(*funcs, **named_funcs)[source]

Simple function composition. That is, gives you a callable that implements input -> f_1 -> … -> f_n -> output.

>>> def foo(a, b=2):
...     return a + b
>>> f = Pipe(foo, lambda x: print(f"x: {x}"))
>>> f(3)
x: 5
>>> len(f)
2

You can name functions, but this would just be for documentation purposes. The names are completely ignored.

>>> g = Pipe(
...     add_numbers = lambda x, y: x + y,
...     multiply_by_2 = lambda x: x * 2,
...     stringify = str
... )
>>> g(2, 3)
'10'
>>> len(g)
3

Notes

  • Pipe instances don’t have a __name__ etc. So some expectations of normal functions are not met.

  • Pipe instance are pickalable (as long as the functions that compose them are)

You can specify a single functions:

>>> Pipe(lambda x: x + 1)(2)
3

but

>>> Pipe()
Traceback (most recent call last):
  ...
ValueError: You need to specify at least one function!

You can specify an instance name and/or doc with the special (reserved) argument names __name__ and __doc__ (which therefore can’t be used as function names):

>>> f = Pipe(map, add_it=sum, __name__='map_and_sum', __doc__='Apply func and add')
>>> f(lambda x: x * 10, [1, 2, 3])
60
>>> f.__name__
'map_and_sum'
>>> f.__doc__
'Apply func and add'
creek.util.cursor_to_iterator(cursor: creek.util.CursorFunc, sentinel=<creek.util.no_sentinel object>) → Iterator[source]

Get an iterator from a cursor function.

A cursor function is a callable that you call (without arguments) to get items of data one by one.

Sometimes, especially in live io contexts, that’s the kind interface you’re given to consume a stream.

>>> cursor = iter([1, 2, 3]).__next__
>>> assert not isinstance(cursor, Iterator)
>>> assert not isinstance(cursor, Iterable)
>>> assert callable(cursor)

If you want to consume your stream as an iterator instead, use cursor_to_iterator.

>>> iterator = cursor_to_iterator(cursor)
>>> assert isinstance(iterator, Iterator)
>>> list(iterator)
[1, 2, 3]

If you want your iterator to stop (without a fuss) as soon as the cursor returns a particular element (called a sentinel), say it:

>>> cursor = iter([1, 2, None, None, 3]).__next__
>>> iterator = cursor_to_iterator(cursor, sentinel=None)
>>> list(iterator)
[1, 2]
creek.util.iterable_to_cursor(iterable: Iterable)creek.util.CursorFunc[source]

Get a cursor function from an iterable.

creek.util.iterable_to_iterator(iterable: Iterable, sentinel=<creek.util.no_sentinel object>) → Iterator[source]

Get an iterator from an iterable

>>> iterable = [1, 2, 3]
>>> iterator = iterable_to_iterator(iterable)
>>> assert isinstance(iterator, Iterator)
>>> assert list(iterator) == iterable

You can also specify a sentinel, which will result in the iterator stoping just before it encounters that sentinel value

>>> iterable = [1, 2, 3, 4, None, None, 7]
>>> iterator = iterable_to_iterator(iterable, None)
>>> assert isinstance(iterator, Iterator)
>>> list(iterator)
[1, 2, 3, 4]
creek.util.iterate_skipping_errors(g: Iterable, error_callback: Optional[Callable[[BaseException], Any]] = None, caught_exceptions: Tuple[BaseException] = (<class 'Exception'>,)) → Generator[source]

Iterate over a generator, skipping errors and calling an error callback if provided.

Parameters
  • g – The generator to iterate over

  • error_callback – A callback to call when an error is encountered.

  • caught_exceptions – The exceptions to catch and skip.

Returns

A generator that yields the values of the original generator,

skipping errors.

>>> list(iterate_skipping_errors(map(lambda x: 1 / x, [1, 0, 2])))
[1.0, 0.5]
>>> list(iterate_skipping_errors(map(lambda x: 1 / x, [1, 0, 2]), print))
division by zero
[1.0, 0.5]

See https://github.com/i2mint/creek/issues/6 for more info.

creek.util.iterator_to_cursor(iterator: Iterator, default=<creek.util.no_default object>)creek.util.CursorFunc[source]

Get a cursor function for the input iterator.

>>> iterator = iter([1, 2, 3])
>>> cursor = iterator_to_cursor(iterator)
>>> assert callable(cursor)
>>> assert cursor() == 1
>>> assert list(cursor_to_iterator(cursor)) == [2, 3]

Note how we consumed the cursor till the end; by using cursor_to_iterator. Indeed, list(iter(cursor)) wouldn’t have worked since a cursor isn’t a iterator, but a callable to get the items an the iterator would give you.

You can specify a default. The default has the same role that it has for the next function: It makes the cursor function return that default when the iterator has been “consumed” (i.e. would raise a StopIteration).

>>> iterator = iter([1, 2])
>>> cursor = iterator_to_cursor(iterator, None)
>>> assert callable(cursor)
>>> cursor()
1
>>> cursor()
2

And then…

>>> assert cursor() is None
>>> assert cursor() is None

forever.

creek.util.iterize(func, name=None)[source]

From an In->Out function, makes a Iterator[In]->Itertor[Out] function.

>>> f = lambda x: x * 10
>>> f(2)
20
>>> iterized_f = iterize(f)
>>> list(iterized_f(iter([1,2,3])))
[10, 20, 30]
creek.util.to_iterator(x: creek.util.IteratorType, sentinel=<creek.util.no_sentinel object>)[source]
creek.util.to_iterator(x: creek.util.IterableType, sentinel=<creek.util.no_sentinel object>)
creek.util.to_iterator(x: creek.util.CursorFunc, sentinel=<creek.util.no_sentinel object>)

Get an iterator from an iterable or a cursor function

>>> from typing import Iterator
>>> it = to_iterator([1, 2, 3])
>>> assert isinstance(it, Iterator)
>>> list(it)
[1, 2, 3]
>>> list(it)
[]
>>> cursor = iter([1, 2, 3]).__next__
>>> assert isinstance(cursor, CursorFunc)
>>> it = to_iterator(cursor)
>>> assert isinstance(it, Iterator)
>>> list(it)
[1, 2, 3]
>>> list(it)
[]

You can use sentinels too

>>> list(to_iterator([1, 2, None, 4], sentinel=None))
[1, 2]
>>> cursor = iter([1, 2, 3, 4, 5]).__next__
>>> list(to_iterator(cursor, sentinel=4))
[1, 2, 3]