creek.tools¶
Tools to work with creek objects
-
class
creek.tools.
BufferStats
(values=(), maxlen: int = <object object>, func: Callable = <built-in function sum>, add_new_val: Callable = <method 'append' of 'collections.deque' objects>)[source]¶ A callable (fifo) buffer. Calls add input to it, but also returns some results computed from it’s contents.
What “add” means is configurable (through
add_new_val
arg). Default is append, but can be extend etc.>>> bs = BufferStats(maxlen=4, func=sum) >>> list(map(bs, range(7))) [0, 1, 3, 6, 10, 14, 18]
See what happens when you feed the same sequence again:
>>> list(map(bs, range(7))) [15, 12, 9, 6, 10, 14, 18]
More examples:
>>> list(map(BufferStats(maxlen=4, func=''.join), 'abcdefgh')) ['a', 'ab', 'abc', 'abcd', 'bcde', 'cdef', 'defg', 'efgh']
>>> from math import prod >>> list(map(BufferStats(maxlen=4, func=prod), range(7))) [0, 0, 0, 0, 24, 120, 360]
With a different
add_new_val
choice.>>> bs = BufferStats(maxlen=4, func=''.join, add_new_val=deque.appendleft) >>> list(map(bs, 'abcdefgh')) ['a', 'ba', 'cba', 'dcba', 'edcb', 'fedc', 'gfed', 'hgfe']
With
add_new_val=deque.extend
, data can be fed in chunks. In the following, also see how we use iterize to get a function that takes an iterator and returns an iterator>>> from creek.util import iterize >>> window_stats = iterize(BufferStats( ... maxlen=4, func=''.join, add_new_val=deque.extend) ... ) >>> chks = ['a', 'bc', 'def', 'gh'] >>> for x in window_stats(chks): ... print(x) a abc cdef efgh
Note: To those who might think that they can optimize this for special cases: Yes you can. But SHOULD you? Is it worth the increase in complexity and reduction in flexibility? See https://github.com/thorwhalen/umpyre/blob/master/misc/performance_of_rolling_window_stats.md
-
class
creek.tools.
DynamicIndexer
(start: Any = 0, idx_updater: Callable[[Any, DataItem], Any] = <function count_increments>)[source]¶ - Parameters
start – The index to start at (the first data item will have this index)
idx_updater – The (Index, DataItem) -> Index
Let’s take a finite stream of finite iterables (strings here):
>>> stream = ['stream', 'of', 'different', 'sized', 'chunks']
The default
DynamicIndexer
just does whatenumerate
does:>>> counter_index = DynamicIndexer() >>> list(map(counter_index, stream)) [(0, 'stream'), (1, 'of'), (2, 'different'), (3, 'sized'), (4, 'chunks')]
That’s because it uses the default
idx_updater
function just increments by one. This function, DynamicIndexer.count_increments, is shown below>>> def count_increments(current_idx, data_item, step=1): ... return current_idx + step
To get the index starting at 10, we can specify
start=10
, and to step the index by 3 we can partializecount_increments
:>>> from functools import partial >>> step3 = partial(count_increments, step=3) >>> list(map(DynamicIndexer(start=10, idx_updater=step3), stream)) [(10, 'stream'), (13, 'of'), (16, 'different'), (19, 'sized'), (22, 'chunks')]
You can specify any custom
idx_updater
you want: The requirements being that this function should take(current_idx, data_item)
as the input, and return the next “current index”, that is, what the index of the next data item will be. Note thatcount_increments
ignored thedata_item
completely, but sometimes you want to take the data item into account. For example, your data item may contain several elements, and you want your index to index these elements, therefore you should update your index by incrementing it with the number of elements.We have
DynamicIndexer.size_increments
for that, the code is shown below:>>> def size_increments(current_idx, data_item, size_func=len): ... return current_idx + size_func(data_item) >>> size_index = DynamicIndexer(idx_updater=DynamicIndexer.size_increments) >>> list(map(size_index, stream)) [(0, 'stream'), (6, 'of'), (8, 'different'), (17, 'sized'), (22, 'chunks')]
Q: What if I want the index of a data item to be a function of the data item itself?
A: Then you would use that function to make the
(idxof(data_item), data_item)
pairs directly.DynamicIndexer
is for the use case where the index of an item depends on the (number of, sizes of, etc.) items that came before it.
-
class
creek.tools.
Segmenter
(buffer: creek.tools.BufferStats, stats_buffer_callback: Callable[[Any, Iterable], Any] = <function return_buffer_on_stats_condition>)[source]¶ >>> gen = iter(range(200)) >>> bs = BufferStats(maxlen=10, func=sum) >>> return_if_stats_is_odd = partial( ... return_buffer_on_stats_condition, ... cond=lambda x: x%2 == 1, else_val='The sum is not odd!' ... ) >>> seg = Segmenter(buffer=bs, stats_buffer_callback=return_if_stats_is_odd)
Since the sum of the values in the buffer [1] is odd, the buffer is returned:
>>> seg(new_val=1) [1]
Adding
1 + 2
is still odd so:>>> seg(new_val=2) [1, 2]
Now since
1 + 2 + 5
is even, theelse_val
ofreturn_if_stats_is_odd
is returned instead>>> seg(new_val=5) 'The sum is not odd!'
-
stats_buffer_callback
(buffer: Iterable, cond: Callable = <function is_not_none>, else_val=None)¶ >>> return_buffer_on_stats_condition( ... stats=3, buffer=[1,2,3,4], cond=lambda x: x%2 == 1 ... ) [1, 2, 3, 4] >>> return_buffer_on_stats_condition( ... stats=3, buffer=[1,2,3,4], cond=lambda x: x%2 == 0, else_val='3 is not even!' ... ) '3 is not even!'
-
-
creek.tools.
alt_dynamically_index
(idx_updater: Callable[[Any, DataItem], Any] = <function count_increments>, start=0)[source]¶ Alternative to dynamically_index using itertools and partial
>>> def size_increments(current_idx, data_item, size_func=len): ... return current_idx + size_func(data_item) ... >>> stream = ['stream', 'of', 'different', 'sized', 'chunks'] >>> indexer = alt_dynamically_index(size_increments) >>> t = list(indexer(stream)) >>> assert t == [(0, 'stream'), (6, 'of'), (8, 'different'), (17, 'sized'), ... (22, 'chunks')]
-
creek.tools.
apply_and_fanout
(seq: Sequence, func: Callable[[Any], Iterable], idx: int) → Iterable[tuple][source]¶ Apply function (that returns an Iterable) to an element of a sequence and fanout (broadcast) the resulting items, to produce an iterable of tuples each containing one of these items along with ‘a copy’ of the other tuple elements
>>> list(apply_and_fanout([1, 'abc', 3], iter, 1)) [(1, 'a', 3), (1, 'b', 3), (1, 'c', 3)] >>> list(apply_and_fanout(['bob', 'alice', 2], lambda x: x * ['hi'], 2)) [('bob', 'alice', 'hi'), ('bob', 'alice', 'hi')] >>> list(apply_and_fanout(["bob", "alice", 2], lambda x: x.upper(), 1)) [('bob', 'A', 2), ('bob', 'L', 2), ('bob', 'I', 2), ('bob', 'C', 2), ('bob', 'E', 2)]
See also
fanout_and_flatten
andfanout_and_flatten_dicts
-
creek.tools.
apply_func_to_index
(seq, apply_to_idx, func)[source]¶ >>> apply_func_to_index([1, 2, 3], 1, lambda x: x * 10) (1, 20, 3)
If you’re going to apply the same function to the same index, you might want to partialize
apply_func_to_index
to be able to reuse it simply:>>> from functools import partial >>> f = partial(apply_func_to_index, apply_to_idx=0, func=str.upper) >>> list(map(f, ['abc', 'defgh'])) [('A', 'b', 'c'), ('D', 'e', 'f', 'g', 'h')]
-
creek.tools.
current_time
(current_idx, obj)[source]¶ Doesn’t even look at current_idx or obj. Just gives the current time
-
creek.tools.
dynamically_index
(iterable: Iterable, start=0, idx_updater=<function count_increments>)[source]¶ Generalization of enumerate(iterable) that allows one to specify how the indices should be updated.
The default is the sae behavior as enumerate: Starts with 0 and increments by 1.
>>> stream = ['stream', 'of', 'different', 'sized', 'chunks'] >>> assert (list(dynamically_index(stream, start=2)) ... == list(enumerate(stream, start=2)) ... == [(2, 'stream'), (3, 'of'), (4, 'different'), (5, 'sized'), (6, 'chunks')] ... )
Say we wanted to increment the indices according to the size of the last item instead of just incrementing by 1 at every iteration tick…
>>> def size_increments(current_idx, data_item, size_func=len): ... return current_idx + size_func(data_item) >>> size_index = DynamicIndexer(idx_updater=DynamicIndexer.size_increments) >>> list(map(size_index, stream)) [(0, 'stream'), (6, 'of'), (8, 'different'), (17, 'sized'), (22, 'chunks')]
-
creek.tools.
fanout_and_flatten
(iterable_of_seqs, func, idx, aggregator=<built-in method from_iterable of type object>)[source]¶ Apply apply_and_fanout to an iterable of sequences.
>>> seq_iterable = [('abcdef', 'first'), ('ghij', 'second')] >>> func = lambda a: zip(*([iter(a)] * 2)) # func is a chunker >>> assert list(fanout_and_flatten(seq_iterable, func, 0)) == [ ... (('a', 'b'), 'first'), ... (('c', 'd'), 'first'), ... (('e', 'f'), 'first'), ... (('g', 'h'), 'second'), ... (('i', 'j'), 'second') ... ]
-
creek.tools.
fanout_and_flatten_dicts
(iterable_of_dicts, func, fields, idx_field, aggregator=<built-in method from_iterable of type object>)[source]¶ Apply apply_and_fanout to an iterable of dicts.
>>> iterable_of_dicts = [ ... {'wf': 'abcdef', 'tag': 'first'}, {'wf': 'ghij', 'tag': 'second'} ... ] >>> func = lambda a: zip(*([iter(a)] * 2)) # func is a chunker >>> fields = ['wf', 'tag'] >>> idx_field = 'wf' >>> assert list( ... fanout_and_flatten_dicts(iterable_of_dicts, func, fields, idx_field)) == [ ... {'wf': ('a', 'b'), 'tag': 'first'}, ... {'wf': ('c', 'd'), 'tag': 'first'}, ... {'wf': ('e', 'f'), 'tag': 'first'}, ... {'wf': ('g', 'h'), 'tag': 'second'}, ... {'wf': ('i', 'j'), 'tag': 'second'} ... ]
-
creek.tools.
filter_and_index_stream
(stream: Iterable, data_item_filt, timestamper: Callable[[DataItem], Tuple[Any, DataItem]] = <class 'enumerate'>)[source]¶ Index a stream and filter it (based only on the data items).
>>> assert ( ... list(filter_and_index_stream('this is a stream', data_item_filt=' ')) == [ ... (0, 't'), ... (1, 'h'), ... (2, 'i'), ... (3, 's'), ... (6, 'i'), ... (7, 's'), ... (11, 'a'), ... (15, 's'), ... (16, 't'), ... (17, 'r'), ... (18, 'e'), ... (19, 'a'), ... (20, 'm') ... ])
>>> list(filter_and_index_stream( ... [1, 2, 3, 4, 5, 6, 7, 8], ... data_item_filt=lambda x: x % 2)) [(0, 1), (2, 3), (4, 5), (6, 7)]
-
creek.tools.
return_buffer_on_stats_condition
(stats: Any, buffer: Iterable, cond: Callable = <function is_not_none>, else_val=None)[source]¶ >>> return_buffer_on_stats_condition( ... stats=3, buffer=[1,2,3,4], cond=lambda x: x%2 == 1 ... ) [1, 2, 3, 4] >>> return_buffer_on_stats_condition( ... stats=3, buffer=[1,2,3,4], cond=lambda x: x%2 == 0, else_val='3 is not even!' ... ) '3 is not even!'
-
creek.tools.
segment_overlaps
(bt_tt_segment, query_bt, query_tt)[source]¶ Returns True if, and only if, bt_tt_segment overlaps query interval.
A bt_tt_segment will need to be of the
(bt, tt, *data)
format. That is, an iterable of at least two elements (thebt
andtt
) followed with more elements (the actual segment data).This function is made to be curried, as shown in the following example:
>>> from functools import partial >>> overlapping_segments_filt = partial(segment_overlaps, query_bt=4, query_tt=8) >>> >>> list(filter(overlapping_segments_filt, [ ... (1, 3, 'completely before'), ... (2, 4, 'still completely before (upper bounds are strict)'), ... (3, 6, 'partially before, but overlaps bottom'), ... (4, 5, 'totally', 'inside'), # <- note this tuple has 4 elements ... (5, 8), # <- note this tuple has only the minimum (2) elements, ... (7, 10, 'partially after, but overlaps top'), ... (8, 11, 'completely after (strict upper bound)'), ... (100, 101, 'completely after (obviously)') ... ])) [(3, 6, 'partially before, but overlaps bottom'), (4, 5, 'totally', 'inside'), (5, 8), (7, 10, 'partially after, but overlaps top')]