creek.infinite_sequence

Objects that support some list-like read operations on an unbounded stream. Essentially, trying to give you the impression that you have read access to infinite list, with some (parametrizable) limitations.

class creek.infinite_sequence.BufferedGetter(buffer_len, prefill=(), input_data_trans=<function asis>, query_trans=<function asis>, slice_get_postproc: Callable = <class 'list'>)[source]

BufferedGetter is intended to be a more general (but not optimized) class that offers a query-interface to a buffer, intended to be used when the buffer is being filled by a (possibly live) stream of data items.

By contrast… The IndexedBuffer is a particular case where the queries are slices and the index that is sliced on is an enumeration one. The InfiniteSeq is a class combining IndexedBuffer with a data source it can pull data from (according to the demands of the query).

>>> from creek.infinite_sequence import BufferedGetter
>>>
>>>
>>> b = BufferedGetter(20)
>>> b.extend([
...     (1, 3, 'completely before'),
...     (2, 4, 'still completely before (upper bounds are strict)'),
...     (3, 6, 'partially before, but overlaps bottom'),
...     (4, 5, 'totally', 'inside'),  # <- note this tuple has 4 elements
...     (5, 8),  # <- note this tuple has only the minimum (2) elements,
...     (7, 10, 'partially after, but overlaps top'),
...     (8, 11, 'completely after (strict upper bound)'),
...     (100, 101, 'completely after (obviously)')
... ])
>>> b[lambda x: 3 < x[0] < 8]  
[(4, 5, 'totally', 'inside'),
 (5, 8),
 (7, 10, 'partially after, but overlaps top')]
class creek.infinite_sequence.ExceptionRaiserCallbackMixin(*args, **kwargs)[source]

Make the instance callable and have the effect of raising the instance. Meant to add to an exception class so that instances of this class can be used as callbacks that raise the error

class creek.infinite_sequence.IndexedBuffer(buffer_len, prefill=(), if_overlaps_past=OverlapsPastError('Some of the data requested was in the past or in the future'), if_overlaps_future=OverlapsFutureError('Some of the data requested was in the past or in the future'), slice_get_postproc: Callable = <class 'list'>)[source]

A list-like object that gives a limited-past read view of an unbounded stream

For example, say we had the stream of increasing integers 0, 1, 2, … that is being fed to indexedBuffer

What IndexedBuffer(maxlen=4) offers is access to the buffer’s contents, but using the indices that the stream (if it were one big list in memory) would use instead of the buffer’s index.

0 1 2 3 [4 5 6 7] 8 9

IndexedBuffer uses collections.deque, exposing the append, extend, and clear methods, updating the index reference in a thread-safe manner.

>>> s = IndexedBuffer(buffer_len=4)
>>> s.extend(range(4))  # adding 4 elements in bulk (filling the buffer completely)
>>> list(s)
[0, 1, 2, 3]
>>> s[2]
2
>>> s[1:2]
[1]
>>> s[1:1]
[]

Let’s add two more elements (using append this time), making the buffer “shift”

>>> s.append(4)
>>> s.append(5)
>>> list(s)
[2, 3, 4, 5]
>>> s[2]
2
>>> s[5]
5
>>> s[2:5]
[2, 3, 4]
>>> s[3:6]
[3, 4, 5]
>>> assert s[2:6] == list(range(2, 6))

You can slice with step:

>>> s[2:6:2]
[2, 4]

You can slice with negatives >>> s[2:-2] [2, 3]

On the other hand, if you ask for something that is not in the buffer (anymore, or yet), you’ll get an error that tells you so:

>>> # element for idx 1 is missing in [2, 3, 4, 5]
>>> s[1:4]  
Traceback (most recent call last):
    ...
OverlapsPastError: You asked for slice(1, 4, None), but the buffer only contains the index range: 2:6
>>> # elements for 0:2 are missing (as well as 6:9, but OverlapsPastError trumps OverlapsFutureError
>>> s[0:9]  
Traceback (most recent call last):
    ...
OverlapsPastError: You asked for slice(0, 9, None), but the buffer only contains the index range: 2:6
>>> # element for 6:9 are missing in [2, 3, 4, 5]
>>> s[4:9]  
Traceback (most recent call last):
    ...
OverlapsFutureError: You asked for slice(4, 9, None), but the buffer only contains the index range: 2:6
extend(iterable: Iterable) → None

Extend buffer with an iterable of items

class creek.infinite_sequence.InfiniteSeq(iterator: Iterator, buffer_len: int)[source]

A list-like (read) view of an unbounded sequence/stream.

It is the combination of IndexedBuffer and an iterator that will be used to source the buffer according to the slices that are requested.

If a slice is requested whose data is “in the future”, the iterator will be consumed until the buffer can satisfy that request. If the requested slice has any part of it that is “in the past”, that is, has already been iterated through and is not in the buffer anymore, a OverlapsPastError will be raised.

Therefore, InfiniteSeq is meant for ordered slice queries of size no more than the buffer size. If these conditions are satisfied, an InfiniteSeq will behave (with i:j queries) as if it were one long list in memory.

Can be used with a live stream of data as long as the buffer size is big enough to handle the data production and query rates.

For example, take an iterator that cycles from 0 to 99 forever:

>>> from itertools import cycle
>>> iterator = cycle(range(100))

Let’s make an InfiniteSeq instance for this stream, accomodating for a view of up to 11 items.

>>> s = InfiniteSeq(iterator, buffer_len=11)

Let’s ask for element 15 (which is the (15 + 1)th element (and should have a value of 15).

>>> s[15]
15

Now, to get this value, the iterator will move forward up to that point; that is, until the buffer’s head (i.e. most recent) item contains that requested (15 + 1)th element. But the buffer is of size 11, so we still have access to a few previous elements:

>>> s[11]
11
>>> s[5:15]
[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

But if we asked for anything before index 5…

>>> s[2:7]  
Traceback (most recent call last):
    ...
OverlapsPastError: You asked for slice(2, 7, None), but the buffer only contains the index range: 5:16

So we can’t go backwards. But we can always go forwards:

>>> s[95:105]
[95, 96, 97, 98, 99, 0, 1, 2, 3, 4]

You can also use slices with step and with negative integers (referencing the head of the buffer)

>>> s[120:130:2]
[20, 22, 24, 26, 28]
>>> s[120:130]
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
>>> s[-8:-2]
[22, 23, 24, 25, 26, 27]

but you cannot slice farther back than the buffer

>>> try:
...     s[-20:-2]
... except OverlapsPastError as e:
...     msg_text = str(e)
>>> print(msg_text)
You asked for slice(110, 128, None), but the buffer only contains the index range: 119:130

Sometimes the source provides data in chunks. Sometimes these chunks are not even of fixed size. In those situations, you can use itertools.chain to “flatten” the iterator as in the following example:

>>> from creek.infinite_sequence import InfiniteSeq
>>> from typing import Mapping
>>>
>>> class Source(Mapping):
...     n = 100
...
...     __len__ = lambda self: self.n
...
...     def __iter__(self):
...         yield from range(self.n)
...
...     def __getitem__(self, k):
...         print(f"Asking for {k}")
...         return list(range(k * 10, (k + 1) * 10))
...
>>>
>>> source = Source()
>>>

See that when we ask for a chunk of data, there’s a print notification about it.

>>> assert source[3] == [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
Asking for 3

Now let’s make an iterator of the data and an InfiniteSeq (with buffer length 10) on top of it.

>>> from itertools import chain
>>> iterator = chain.from_iterable(source.values())
>>> s = InfiniteSeq(iterator, 10)

See that when you ask for :5, you see that chunk 0 is requested.

>>> s[:5]
Asking for 0
[0, 1, 2, 3, 4]

If you ask for something that’s already in the buffer, you won’t see the print notification though.

>>> s[4:8]
[4, 5, 6, 7]

The following shows you how InfiniteSeq “hits” the data source as it’s getting the data it needs for the request.

>>> s[8:12]
Asking for 1
[8, 9, 10, 11]
>>>
>>> s[40:42]
Asking for 2
Asking for 3
Asking for 4
[40, 41]
exception creek.infinite_sequence.NotDuringError(*args, **kwargs)[source]

IndexError that indicates that there was an attempt to index some data that is not contained in the buffer (i.e. is that a part of the request is NO LONGER, or NOT YET covered by the buffer)

exception creek.infinite_sequence.OverlapsFutureError(*args, **kwargs)[source]

IndexError that indicates that there was an attempt to index some data that is in the FUTURE (i.e. is NOT YET completely covered by the buffer)

exception creek.infinite_sequence.OverlapsPastError(*args, **kwargs)[source]

IndexError that indicates that there was an attempt to index some data that is in the PAST (i.e. is NO LONGER completely covered by the buffer)

exception creek.infinite_sequence.RelationNotHandledError[source]

TypeError that indicates that a relation is either not a valid one, or not handled by conditional clause.

class creek.infinite_sequence.Relations(value)[source]

Point-interval and interval-interval relations.

See Allen’s interval algebra for (some of the) interval relations (https://en.wikipedia.org/wiki/Allen%27s_interval_algebra).

creek.infinite_sequence.absolute_item(item, max_idx)[source]

Returns an item with absolute references: i.e. with negative indices idx resolved to max_idx + idx

>>> absolute_item(-1, 10)
9
>>> absolute_item(slice(-4, -2, 2), 10)
slice(6, 8, 2)

But anything else that’s not a slice or int will be left untouched (and will probably result in errors if you use with IndexedBuffer)

>>> absolute_item((-7, -2), 10)
(-7, -2)
creek.infinite_sequence.consume(gen, n)[source]

Consume n iterations of generator (without returning elements)

creek.infinite_sequence.none_safe_addition(x, y)[source]

Adds the two numbers if x is not None, or return None if not

creek.infinite_sequence.simple_interval_relationship(x: Tuple[Union[int, float], Union[int, float]], y: Tuple[Union[int, float], Union[int, float]], above_bt: Callable = <built-in function ge>, below_tt: Callable = <built-in function lt>)[source]

Get the simple relationship between intervals x and y.

Parameters
  • x – An point (a number) or an interval (a 2-tuple of numbers).

  • y – An interval; a 2-tuple of numbers.

  • above_bt – a above_bt(x_bt, y_bt) boolean function (ge or gt) deciding if x starts after y does.

  • below_tt – a below_tt(x_tt, y_tt) boolean function (lt or le) deciding if x ends before y does.

Returns

One of three relations Relations.BEFORE if some of x is below y, Relations.AFTER if some of x is after y, Relations.DURING if x is entirely with y

The target y interval is expressed only by it’s bounds, but we don’t know if

these are inclusive or not. The below_bt and above_tt allow us to express that by expressing how below the lowest (bt) bound and what higher than highest (tt) bound are defined.

The function is meant to be curried (partial), for example:

>>> from functools import partial
>>> from operator import le, lt, ge, gt
>>> default = simple_interval_relationship  # uses below_bt=ge, above_tt=lt
>>> including_bounds = partial(simple_interval_relationship, above_bt=ge, below_tt=le)
>>> excluding_bounds = partial(simple_interval_relationship, above_bt=gt, below_tt=lt)

Take (4, 8) as the target interval, and want to query the relationship of other points and intervals with it. No matter what the function is, they will always agree on any intervals that don’t share any bounds.

>>> for relation_func in (default, including_bounds, excluding_bounds):
...     print (
...         relation_func(3, (4, 8)),
...         relation_func(5, (4, 8)),
...         relation_func(9, (4, 8)),
...         relation_func((3, 7), (4, 8)),
...         relation_func((5, 7), (4, 8)),
...         relation_func((7, 9), (4, 8))
... )
Relations.BEFORE Relations.DURING Relations.AFTER Relations.BEFORE Relations.DURING Relations.AFTER
Relations.BEFORE Relations.DURING Relations.AFTER Relations.BEFORE Relations.DURING Relations.AFTER
Relations.BEFORE Relations.DURING Relations.AFTER Relations.BEFORE Relations.DURING Relations.AFTER

But if the two intervals share some bounds, these functions will diverge.

>>> for relation_func in (default, including_bounds, excluding_bounds):
...     print (
...         relation_func(4, (4, 8)),
...         relation_func(8, (4, 8)),
...         relation_func((4, 7), (4, 8)),
...         relation_func((4, 8), (4, 8)),
...         relation_func((5, 8), (4, 8))
... )
Relations.DURING Relations.AFTER Relations.DURING Relations.AFTER Relations.AFTER
Relations.DURING Relations.DURING Relations.DURING Relations.DURING Relations.DURING
Relations.BEFORE Relations.AFTER Relations.BEFORE Relations.BEFORE Relations.AFTER

The function can be used with the FIRST argument being a slice object as well. This can then be used to enable [i:j] access.

>>> for relation_func in (default, including_bounds, excluding_bounds):
...     print (
...         relation_func(slice(4, 7), (4, 8)),
...         relation_func(slice(4, 8), (4, 8)),
...         relation_func(slice(5, 8), (4, 8))
... )
Relations.DURING Relations.AFTER Relations.AFTER
Relations.DURING Relations.DURING Relations.DURING
Relations.BEFORE Relations.BEFORE Relations.AFTER
creek.infinite_sequence.validate_interval(interval)[source]

Asserts that input is a valid interval, raising a ValueError if not