creek.base

The base objects of creek

class creek.base.Creek(stream)[source]

A layer-able version of the stream interface

There are three layering methods – pre_iter, data_to_obj, and post_iter – whose use is demonstrated in the iteration code below:

>>> from io import StringIO
>>>
>>> src = StringIO(
... '''a, b, c
... 1,2, 3
... 4, 5,6
... '''
... )
>>>
>>> from creek.base import Creek
>>>
>>> class MyCreek(Creek):
...     def data_to_obj(self, line):
...         return [x.strip() for x in line.strip().split(',')]
...
>>> stream = MyCreek(src)
>>>
>>> list(stream)
[['a', 'b', 'c'], ['1', '2', '3'], ['4', '5', '6']]

If we try that again, we’ll get an empty list since the cursor is at the end.

>>> list(stream)
[]

But if the underlying stream has a seek, so does the creek, so we can “rewind”

>>> stream.seek(0)
0
>>> list(stream)
[['a', 'b', 'c'], ['1', '2', '3'], ['4', '5', '6']]

You can also use next to get stream items one by one

>>> stream.seek(0)  # rewind again to get back to the beginning
0
>>> next(stream)
['a', 'b', 'c']
>>> next(stream)
['1', '2', '3']

Let’s add a filter! There’s two kinds you can use. One that is applied to the line before the data is transformed by data_to_obj, and the other that is applied after (to the obj).

>>> from creek.base import Creek
>>> from io import StringIO
>>>
>>> src = StringIO(
...     '''a, b, c
... 1,2, 3
... 4, 5,6
... ''')
>>> class MyFilteredCreek(MyCreek):
...     def post_iter(self, objs):
...         yield from filter(lambda obj: str.isnumeric(obj[0]), objs)
>>>
>>> s = MyFilteredCreek(src)
>>>
>>> list(s)
[['1', '2', '3'], ['4', '5', '6']]
>>> s.seek(0)
0
>>> next(s)
['1', '2', '3']
>>> next(s)
['4', '5', '6']

Recipes:

  • pre_iter: involving itertools.islice to skip header lines

  • pre_iter: involving enumerate to get line indices in stream iterator

  • pre_iter = functools.partial(map, pre_proc_func) to preprocess all streamitems with pre_proc_func

  • pre_iter: include filter before obj

  • post_iter: chain.from_iterable to flatten a chunked/segmented stream

  • post_iter: functools.partial(filter, condition) to filter yielded objs