meshed.slabs

Tools to generate slabs.

A slab is a dict that holds data generated by a stream for a given interval of time.

The main object of this module is Slabs, and object that defines how to generate multiple streams, in the form of slabs. More precisely, it defines how to source streams, operate on and combine these to create further streams, and even push these streams to further processes, all through a single simple interface: An (ordered) list of components that are called in sequence to either pull data from some sources, compute a new stream based on previous ones, or push some of the streams to further processes (such as visualization, or storage systems).

A slab is a collection of items of a same interval of time. We represent a slab using a dict or mapping. Typically, a slab will be the aggregation of multiple information streams that happened around the same time.

Slabs is a tool that allows you to source multiple streams into a stream of slabs that can contain the original data, or other datas computed from it, or both.

Note to developers, though the code below is a reduced form of the actual code, it should be enough to understand the general idea. For a discussion about the design of Slabs, see https://github.com/i2mint/meshed/discussions/49.

>>> class Slabs:
...     def _call_on_scope(self, scope):
...         '''
...         Calls the components 1 by 1, sourcing inputs and writing outputs in scope
...         '''
...
...     def __next__(self):
...         '''Get the next slab by calling _call_on_scope on an new empty scope.
...         At least one of the components will have to be argument-less and provide
...         some data for other components to get their inputs from, if any are needed.
...         '''
...         return self._call_on_scope(scope={})
...
...     def __iter__(self):
...         '''Iterates over slabs until a handle exception is raised.'''
...         # Simplified code:
...         with self:  # enter all the contexts that need to be entered
...             while True:  # loop until you encounter a handled exception
...                 try:
...                     yield next(self)
...                 except self.handle_exceptions as exc_val:
...                     # use specific exceptions to signal that iteration should stop
...                     break
class meshed.slabs.ExceptionHandler(*args, **kwargs)[source]

An exception handler is an argument-less callable that is called when a handled exception occurs during iteration. Most often, the handler does nothing, but could be used whose output will be ignored, unless it is do_not_break, which will signal that the iteration should continue.

exception meshed.slabs.ExceptionalException[source]

Raised when an exception was supposed to be handled, but no matching handler was found.

See the _handle_exception function, where it is raised.

exception meshed.slabs.IteratorExit[source]

Raised when an iterator should quit being iterated on, signaling this event any process that cares to catch the signal. We chose to inherit directly from BaseException instead of Exception for the same reason that GeneratorExit does: Because it’s not technically an error.

See: https://docs.python.org/3/library/exceptions.html#GeneratorExit

class meshed.slabs.Slabs(handle_exceptions: ~typing.Mapping[type, ~meshed.slabs.ExceptionHandler] | ~typing.Iterable[BaseException] | BaseException = (<class 'StopIteration'>, <class 'meshed.slabs.IteratorExit'>, <class 'KeyboardInterrupt'>), scope_factory: ~typing.Callable[[], ~typing.MutableMapping] = <class 'dict'>, **components)[source]

Object to source and manipulate multiple streams.

A slab is a collection of items of a same interval of time. We represent a slab using a dict or mapping. Typically, a slab will be the aggregation of multiple information streams that happened around the same time.

For example, say and edge device had a microphone, light, and movement sensor. An aggregate reading of these sensors could give you something like:

>>> slab = {'audio': [1, 2, 4], 'light': 126, 'movement': None}

movement is None because the sensor is off. If it were on, we’d have True or False as values.

From this information, you’d like to compute a turn_mov_on value based on the formula.

>>> from statistics import stdev
>>> vol = stdev
>>> should_turn_movement_sensor_on = lambda audio, light: vol(audio) * light > 50000

The produce of the volume and the lumens gives you 192, so you now have…

>>> slab = {
...     'audio': [1, 2, 4],
...     'light': 126,
...     'should_turn_movement_sensor_on': False,
...     'movement': None
... }

The next slab that comes in is

>>> slab = {'audio': [-96, 89, -92], 'light': 501, 'movement': None}

which puts us over the threshold so

>>> slab = {
...     'audio': [-96, 89, -92],
...     'light': 501,
...     'should_turn_movement_sensor_on': True,
...     'movement': None
... }

and the movement sensor is turned on, the movement is detected, a human_presence signal is computed, and a notification sent if that metric is above a given theshold.

The point here is that we incrementally compute various fields, enhancing our slab of information, and we do so iteratively over over slab that is streaming to us from our smart home device.

SlabsIter is there to help you create such slabs, from source to enhanced.

The situation above would look something along like this:

>>> from statistics import stdev
>>>
>>> vol = stdev
>>>
>>> # Making a slabs iter object
>>> def make_a_slabs_iter():
...
...     # Mocking the sensor readers
...     audio_sensor_read = iter([[1, 2, 3], [-96, 87, -92], [320, -96, 99]]).__next__
...     light_sensor_read = iter([126, 501, 523]).__next__
...     movement_sensor_read = iter([None, None, True]).__next__
...
...     return Slabs(
...         # The first three components get data from the sensors.
...         # The *_read objects are all callable, returning the next
...         # chunk of data for that sensor, if any.
...         audio=audio_sensor_read,
...         light=light_sensor_read,
...         movement=movement_sensor_read,
...         # The next
...         should_turn_movement_sensor_on = lambda audio, light: vol(audio) * light > 50000,
...         human_presence_score = lambda audio, light, movement: movement and sum([vol(audio), light]),
...         should_notify = lambda human_presence_score: human_presence_score and human_presence_score > 700,
...         notify = lambda should_notify: print('someone is there') if should_notify else None
...     )
...
>>>
>>> si = make_a_slabs_iter()
>>> next(si)  
{'audio': [1, 2, 3],
 'light': 126,
 'movement': None,
 'should_turn_movement_sensor_on': False,
 'human_presence_score': None,
 'should_notify': None,
 'notify': None}
>>> next(si)  
{'audio': [-96, 87, -92],
 'light': 501,
 'movement': None,
 'should_turn_movement_sensor_on': True,
 'human_presence_score': None,
 'should_notify': None,
 'notify': None}
>>> next(si)  
someone is there
{'audio': [320, -96, 99],
 'light': 523,
 'movement': True,
 'should_turn_movement_sensor_on': True,
 'human_presence_score': 731.1353726143957,
 'should_notify': True,
 'notify': None}

If you ask for the next slab, you’ll get a StopIteration (raised by the mocked sources since they reached the end of their iterators).

>>> next(si)  
Traceback (most recent call last):
  ...
StopIteration

That said, if you iterate through a SlabsIter that handles the StopIteration exception (it does by default), you’ll reach the end of you iteration gracefully.

>>> si = make_a_slabs_iter()
>>> for slab in si:
...     pass
someone is there
>>> si = make_a_slabs_iter()
>>> slabs = list(si)  # gather all the slabs
someone is there
>>> len(slabs)
3
>>> slabs[-1]  
{'audio': [320, -96, 99],
 'light': 523,
 'movement': True,
 'should_turn_movement_sensor_on': True,
 'human_presence_score': 731.1353726143957,
 'should_notify': True,
 'notify': None}

Note that Slabs uses a “scope” to store the intermediate results of the computation. This scope is a dict by default, but you can pass any MutableMapping to the scope_factory argument. This means that you can use other means to store intermediate results simply by wrapping them in a MutableMapping. For example, you could use message broker such as Redis to store the intermediate results, and have the components read and write to it.

To help you with this, check out the dol and py2store libraries.

dot_digraph(*args, **kwargs)[source]

Returns a dot_digraph of the DAG of the SlabsIter (see DAG.dot_digraph)

classmethod from_dag(func_nodes: ~typing.Iterable[~meshed.base.FuncNode], *, handle_exceptions: ~typing.Mapping[type, ~meshed.slabs.ExceptionHandler] | ~typing.Iterable[BaseException] | BaseException = (<class 'StopIteration'>, <class 'meshed.slabs.IteratorExit'>, <class 'KeyboardInterrupt'>), scope_factory: ~typing.Callable[[], ~typing.MutableMapping] = <class 'dict'>)

Make

classmethod from_func_nodes(func_nodes: ~typing.Iterable[~meshed.base.FuncNode], *, handle_exceptions: ~typing.Mapping[type, ~meshed.slabs.ExceptionHandler] | ~typing.Iterable[BaseException] | BaseException = (<class 'StopIteration'>, <class 'meshed.slabs.IteratorExit'>, <class 'KeyboardInterrupt'>), scope_factory: ~typing.Callable[[], ~typing.MutableMapping] = <class 'dict'>)[source]

Make

meshed.slabs.SlabsIter

alias of Slabs