creek.tools

Tools to work with creek objects

class creek.tools.BufferStats(values=(), maxlen: int = <object object>, func: Callable = <built-in function sum>, add_new_val: Callable = <method 'append' of 'collections.deque' objects>)[source]

A callable (fifo) buffer. Calls add input to it, but also returns some results computed from it’s contents.

What “add” means is configurable (through add_new_val arg). Default is append, but can be extend etc.

>>> bs = BufferStats(maxlen=4, func=sum)
>>> list(map(bs, range(7)))
[0, 1, 3, 6, 10, 14, 18]

See what happens when you feed the same sequence again:

>>> list(map(bs, range(7)))
[15, 12, 9, 6, 10, 14, 18]

More examples:

>>> list(map(BufferStats(maxlen=4, func=''.join), 'abcdefgh'))
['a', 'ab', 'abc', 'abcd', 'bcde', 'cdef', 'defg', 'efgh']
>>> from math import prod
>>> list(map(BufferStats(maxlen=4, func=prod), range(7)))
[0, 0, 0, 0, 24, 120, 360]

With a different add_new_val choice.

>>> bs = BufferStats(maxlen=4, func=''.join, add_new_val=deque.appendleft)
>>> list(map(bs, 'abcdefgh'))
['a', 'ba', 'cba', 'dcba', 'edcb', 'fedc', 'gfed', 'hgfe']

With add_new_val=deque.extend, data can be fed in chunks. In the following, also see how we use iterize to get a function that takes an iterator and returns an iterator

>>> from creek.util import iterize
>>> window_stats = iterize(BufferStats(
...     maxlen=4, func=''.join, add_new_val=deque.extend)
... )
>>> chks = ['a', 'bc', 'def', 'gh']
>>> for x in window_stats(chks):
...     print(x)
a
abc
cdef
efgh

Note: To those who might think that they can optimize this for special cases: Yes you can. But SHOULD you? Is it worth the increase in complexity and reduction in flexibility? See https://github.com/thorwhalen/umpyre/blob/master/misc/performance_of_rolling_window_stats.md

class creek.tools.DynamicIndexer(start: Any = 0, idx_updater: Callable[[Any, DataItem], Any] = <function count_increments>)[source]
Parameters
  • start – The index to start at (the first data item will have this index)

  • idx_updater – The (Index, DataItem) -> Index

Let’s take a finite stream of finite iterables (strings here):

>>> stream = ['stream', 'of', 'different', 'sized', 'chunks']

The default DynamicIndexer just does what enumerate does:

>>> counter_index = DynamicIndexer()
>>> list(map(counter_index, stream))
[(0, 'stream'), (1, 'of'), (2, 'different'), (3, 'sized'), (4, 'chunks')]

That’s because it uses the default idx_updater function just increments by one. This function, DynamicIndexer.count_increments, is shown below

>>> def count_increments(current_idx, data_item, step=1):
...     return current_idx + step

To get the index starting at 10, we can specify start=10, and to step the index by 3 we can partialize count_increments:

>>> from functools import partial
>>> step3 = partial(count_increments, step=3)
>>> list(map(DynamicIndexer(start=10, idx_updater=step3), stream))
[(10, 'stream'), (13, 'of'), (16, 'different'), (19, 'sized'), (22, 'chunks')]

You can specify any custom idx_updater you want: The requirements being that this function should take (current_idx, data_item) as the input, and return the next “current index”, that is, what the index of the next data item will be. Note that count_increments ignored the data_item completely, but sometimes you want to take the data item into account. For example, your data item may contain several elements, and you want your index to index these elements, therefore you should update your index by incrementing it with the number of elements.

We have DynamicIndexer.size_increments for that, the code is shown below:

>>> def size_increments(current_idx, data_item, size_func=len):
...     return current_idx + size_func(data_item)
>>> size_index = DynamicIndexer(idx_updater=DynamicIndexer.size_increments)
>>> list(map(size_index, stream))
[(0, 'stream'), (6, 'of'), (8, 'different'), (17, 'sized'), (22, 'chunks')]

Q: What if I want the index of a data item to be a function of the data item itself?

A: Then you would use that function to make the (idxof(data_item), data_item) pairs directly. DynamicIndexer is for the use case where the index of an item depends on the (number of, sizes of, etc.) items that came before it.

class creek.tools.Segmenter(buffer: creek.tools.BufferStats, stats_buffer_callback: Callable[[Any, Iterable], Any] = <function return_buffer_on_stats_condition>)[source]
>>> gen = iter(range(200))
>>> bs = BufferStats(maxlen=10, func=sum)
>>> return_if_stats_is_odd = partial(
...     return_buffer_on_stats_condition,
...     cond=lambda x: x%2 == 1, else_val='The sum is not odd!'
... )
>>> seg = Segmenter(buffer=bs, stats_buffer_callback=return_if_stats_is_odd)

Since the sum of the values in the buffer [1] is odd, the buffer is returned:

>>> seg(new_val=1)
[1]

Adding 1 + 2 is still odd so:

>>> seg(new_val=2)
[1, 2]

Now since 1 + 2 + 5 is even, the else_val of return_if_stats_is_odd is returned instead

>>> seg(new_val=5)
'The sum is not odd!'
stats_buffer_callback(buffer: Iterable, cond: Callable = <function is_not_none>, else_val=None)
>>> return_buffer_on_stats_condition(
... stats=3, buffer=[1,2,3,4], cond=lambda x: x%2 == 1
... )
[1, 2, 3, 4]
>>> return_buffer_on_stats_condition(
... stats=3, buffer=[1,2,3,4], cond=lambda x: x%2 == 0, else_val='3 is not even!'
... )
'3 is not even!'
creek.tools.alt_dynamically_index(idx_updater: Callable[[Any, DataItem], Any] = <function count_increments>, start=0)[source]

Alternative to dynamically_index using itertools and partial

>>> def size_increments(current_idx, data_item, size_func=len):
...     return current_idx + size_func(data_item)
...
>>> stream = ['stream', 'of', 'different', 'sized', 'chunks']
>>> indexer = alt_dynamically_index(size_increments)
>>> t = list(indexer(stream))
>>> assert t == [(0, 'stream'), (6, 'of'), (8, 'different'), (17, 'sized'),
...              (22, 'chunks')]
creek.tools.apply_and_fanout(seq: Sequence, func: Callable[[Any], Iterable], idx: int) → Iterable[tuple][source]

Apply function (that returns an Iterable) to an element of a sequence and fanout (broadcast) the resulting items, to produce an iterable of tuples each containing one of these items along with ‘a copy’ of the other tuple elements

>>> list(apply_and_fanout([1, 'abc', 3], iter, 1))
[(1, 'a', 3), (1, 'b', 3), (1, 'c', 3)]
>>> list(apply_and_fanout(['bob', 'alice', 2], lambda x: x * ['hi'], 2))
[('bob', 'alice', 'hi'), ('bob', 'alice', 'hi')]
>>> list(apply_and_fanout(["bob", "alice", 2], lambda x: x.upper(), 1))
[('bob', 'A', 2), ('bob', 'L', 2), ('bob', 'I', 2), ('bob', 'C', 2), ('bob', 'E', 2)]

See also

fanout_and_flatten and fanout_and_flatten_dicts

creek.tools.apply_func_to_index(seq, apply_to_idx, func)[source]
>>> apply_func_to_index([1, 2, 3], 1, lambda x: x * 10)
(1, 20, 3)

If you’re going to apply the same function to the same index, you might want to partialize apply_func_to_index to be able to reuse it simply:

>>> from functools import partial
>>> f = partial(apply_func_to_index, apply_to_idx=0, func=str.upper)
>>> list(map(f, ['abc', 'defgh']))
[('A', 'b', 'c'), ('D', 'e', 'f', 'g', 'h')]
creek.tools.current_time(current_idx, obj)[source]

Doesn’t even look at current_idx or obj. Just gives the current time

creek.tools.dynamically_index(iterable: Iterable, start=0, idx_updater=<function count_increments>)[source]

Generalization of enumerate(iterable) that allows one to specify how the indices should be updated.

The default is the sae behavior as enumerate: Starts with 0 and increments by 1.

>>> stream = ['stream', 'of', 'different', 'sized', 'chunks']
>>> assert (list(dynamically_index(stream, start=2))
...     == list(enumerate(stream, start=2))
...     == [(2, 'stream'), (3, 'of'), (4, 'different'), (5, 'sized'), (6, 'chunks')]
... )

Say we wanted to increment the indices according to the size of the last item instead of just incrementing by 1 at every iteration tick…

>>> def size_increments(current_idx, data_item, size_func=len):
...     return current_idx + size_func(data_item)
>>> size_index = DynamicIndexer(idx_updater=DynamicIndexer.size_increments)
>>> list(map(size_index, stream))
[(0, 'stream'), (6, 'of'), (8, 'different'), (17, 'sized'), (22, 'chunks')]
creek.tools.fanout_and_flatten(iterable_of_seqs, func, idx, aggregator=<built-in method from_iterable of type object>)[source]

Apply apply_and_fanout to an iterable of sequences.

>>> seq_iterable = [('abcdef', 'first'), ('ghij', 'second')]
>>> func = lambda a: zip(*([iter(a)] * 2))  # func is a chunker
>>> assert list(fanout_and_flatten(seq_iterable, func, 0)) == [
...     (('a', 'b'), 'first'),
...     (('c', 'd'), 'first'),
...     (('e', 'f'), 'first'),
...     (('g', 'h'), 'second'),
...     (('i', 'j'), 'second')
... ]
creek.tools.fanout_and_flatten_dicts(iterable_of_dicts, func, fields, idx_field, aggregator=<built-in method from_iterable of type object>)[source]

Apply apply_and_fanout to an iterable of dicts.

>>> iterable_of_dicts = [
...     {'wf': 'abcdef', 'tag': 'first'}, {'wf': 'ghij', 'tag': 'second'}
... ]
>>> func = lambda a: zip(*([iter(a)] * 2))  # func is a chunker
>>> fields = ['wf', 'tag']
>>> idx_field = 'wf'
>>> assert list(
...     fanout_and_flatten_dicts(iterable_of_dicts, func, fields, idx_field)) == [
...         {'wf': ('a', 'b'), 'tag': 'first'},
...         {'wf': ('c', 'd'), 'tag': 'first'},
...         {'wf': ('e', 'f'), 'tag': 'first'},
...         {'wf': ('g', 'h'), 'tag': 'second'},
...         {'wf': ('i', 'j'), 'tag': 'second'}
... ]
creek.tools.filter_and_index_stream(stream: Iterable, data_item_filt, timestamper: Callable[[DataItem], Tuple[Any, DataItem]] = <class 'enumerate'>)[source]

Index a stream and filter it (based only on the data items).

>>> assert (
... list(filter_and_index_stream('this  is   a   stream', data_item_filt=' ')) == [
... (0, 't'),
... (1, 'h'),
... (2, 'i'),
... (3, 's'),
... (6, 'i'),
... (7, 's'),
... (11, 'a'),
... (15, 's'),
... (16, 't'),
... (17, 'r'),
... (18, 'e'),
... (19, 'a'),
... (20, 'm')
... ])
>>> list(filter_and_index_stream(
...     [1, 2, 3, 4, 5, 6, 7, 8],
...     data_item_filt=lambda x: x % 2))
[(0, 1), (2, 3), (4, 5), (6, 7)]
creek.tools.return_buffer_on_stats_condition(stats: Any, buffer: Iterable, cond: Callable = <function is_not_none>, else_val=None)[source]
>>> return_buffer_on_stats_condition(
... stats=3, buffer=[1,2,3,4], cond=lambda x: x%2 == 1
... )
[1, 2, 3, 4]
>>> return_buffer_on_stats_condition(
... stats=3, buffer=[1,2,3,4], cond=lambda x: x%2 == 0, else_val='3 is not even!'
... )
'3 is not even!'
creek.tools.segment_overlaps(bt_tt_segment, query_bt, query_tt)[source]

Returns True if, and only if, bt_tt_segment overlaps query interval.

A bt_tt_segment will need to be of the (bt, tt, *data) format. That is, an iterable of at least two elements (the bt and tt) followed with more elements (the actual segment data).

This function is made to be curried, as shown in the following example:

>>> from functools import partial
>>> overlapping_segments_filt = partial(segment_overlaps, query_bt=4, query_tt=8)
>>>
>>> list(filter(overlapping_segments_filt, [
...     (1, 3, 'completely before'),
...     (2, 4, 'still completely before (upper bounds are strict)'),
...     (3, 6, 'partially before, but overlaps bottom'),
...     (4, 5, 'totally', 'inside'),  # <- note this tuple has 4 elements
...     (5, 8),  # <- note this tuple has only the minimum (2) elements,
...     (7, 10, 'partially after, but overlaps top'),
...     (8, 11, 'completely after (strict upper bound)'),
...     (100, 101, 'completely after (obviously)')
... ]))  
[(3, 6, 'partially before, but overlaps bottom'),
(4, 5, 'totally', 'inside'),
(5, 8),
(7, 10, 'partially after, but overlaps top')]