Source code for meshed.slabs

"""Tools to generate slabs.

A slab is a dict that holds data generated by a stream for a given interval of time.

The main object of this module is `Slabs`, and object that defines how to
generate multiple streams, in the form of slabs.
More precisely, it defines how to source streams, operate on and combine these to create
further streams, and even push these streams to further processes, all through a single
simple interface: An (ordered) list of components that are called in sequence to either
pull data from some sources, compute a new stream based on previous ones, or push some
of the streams to further processes (such as visualization, or storage systems).

A slab is a collection of items of a same interval of time.
We represent a slab using a `dict` or mapping.
Typically, a slab will be the aggregation of multiple information streams that
happened around the same time.

`Slabs` is a tool that allows you to source multiple streams into a stream of
slabs that can contain the original data, or other datas computed from it, or both.

Note to developers, though the code below is a reduced form of the actual code,
it should be enough to understand the general idea.
For a discussion about the design of Slabs, see
https://github.com/i2mint/meshed/discussions/49.


>>> class Slabs:
...     def _call_on_scope(self, scope):
...         '''
...         Calls the components 1 by 1, sourcing inputs and writing outputs in scope
...         '''
...
...     def __next__(self):
...         '''Get the next slab by calling _call_on_scope on an new empty scope.
...         At least one of the components will have to be argument-less and provide
...         some data for other components to get their inputs from, if any are needed.
...         '''
...         return self._call_on_scope(scope={})
...
...     def __iter__(self):
...         '''Iterates over slabs until a handle exception is raised.'''
...         # Simplified code:
...         with self:  # enter all the contexts that need to be entered
...             while True:  # loop until you encounter a handled exception
...                 try:
...                     yield next(self)
...                 except self.handle_exceptions as exc_val:
...                     # use specific exceptions to signal that iteration should stop
...                     break

"""

from typing import (
    Callable,
    Mapping,
    Iterable,
    Union,
    Any,
    MutableMapping,
    Protocol,
)
from i2 import Sig, ContextFanout
from meshed.base import FuncNode
from meshed.dag import DAG


[docs] class ExceptionalException(Exception): """Raised when an exception was supposed to be handled, but no matching handler was found. See the `_handle_exception` function, where it is raised. """
[docs] class IteratorExit(BaseException): """Raised when an iterator should quit being iterated on, signaling this event any process that cares to catch the signal. We chose to inherit directly from `BaseException` instead of `Exception` for the same reason that `GeneratorExit` does: Because it's not technically an error. See: https://docs.python.org/3/library/exceptions.html#GeneratorExit """
DFLT_INTERRUPT_EXCEPTIONS = (StopIteration, IteratorExit, KeyboardInterrupt) DoNotBreak = type('DoNotBreak', (), {}) do_not_break = DoNotBreak() do_not_break.__doc__ = ( 'Sentinel that should be used to signal Slabs iteration not to break. ' 'This sentinel should be returned by exception handlers if they want to tell ' 'the iteration not to stop (in all other cases, the iteration will stop)' ) IgnoredOutput = Any ExceptionHandlerOutput = Union[IgnoredOutput, DoNotBreak]
[docs] class ExceptionHandler(Protocol): """An exception handler is an argument-less callable that is called when a handled exception occurs during iteration. Most often, the handler does nothing, but could be used whose output will be ignored, unless it is do_not_break, which will signal that the iteration should continue.""" def __call__(self) -> ExceptionHandlerOutput: pass
# TODO: Make HandledExceptionsMap into a NewType? # doc: A map between exception types and exception handlers (callbacks) ExceptionType = type(BaseException) HandledExceptionsMap = Mapping[ExceptionType, ExceptionHandler] # doc: If none of the exceptions need handlers, you can just specify a list of them HandledExceptionsMapSpec = Union[ HandledExceptionsMap, Iterable[BaseException], # an iterable of exception types BaseException, # or just one exception type ] def do_nothing(): pass def log_and_return(msg, logger=print): logger(msg) return msg # TODO: Could consider (topologically) ordering the exceptions to reduce the matching # possibilities (see _handle_exception) def _get_handle_exceptions( handle_exceptions: HandledExceptionsMapSpec, ) -> HandledExceptionsMap: if isinstance(handle_exceptions, BaseException): # Only one? Ensure there's a tuple of exceptions: handle_exceptions = (handle_exceptions,) if not isinstance(handle_exceptions, Mapping): handle_exceptions = {exc_type: do_nothing for exc_type in handle_exceptions} return handle_exceptions def _handle_exception( instance, exc_val: BaseException, handle_exceptions: HandledExceptionsMap ) -> ExceptionHandlerOutput: """Looks for an exception type matching exc_val and calls the corresponding handler with """ inputs = dict(exc_val=exc_val, instance=instance) if type(exc_val) in handle_exceptions: # try precise matching first exception_handler = handle_exceptions[type(exc_val)] return _call_from_dict(inputs, exception_handler, Sig(exception_handler)) else: # if not, find the first matching parent for exc_type, exception_handler in handle_exceptions.items(): if isinstance(exc_val, exc_type): return _call_from_dict( inputs, exception_handler, Sig(exception_handler) ) # You never should get this far, but if you do, there's a problem, let's scream it: raise ExceptionalException( f"I couldn't find that exception in my handlers: {exc_val}" ) def _call_from_dict(kwargs: MutableMapping, func: Callable, sig: Sig): """A i2.call_forgivingly optimized for our purpose The sig argument needs to be the Sig(func) to work correctly. Two uses cases here: - using a scope dict as both the source of `Slabs` components, and as a place to temporarily store the outputs of these components. - exception handlers: We'd like the exception handlers to be easy to express. Maybe you need the object raising the exception to handle it, maybe you just want to log the event. In the first case, you the handler needs the said object to be passed to it, in the second, we don't need any arguments at all. With _call_from_dict, we don't have to choose, we just have to impose that the handler use specific keywords (namely `exc_val` and/or `instance`) when there are inputs. """ args, kwargs = sig.mk_args_and_kwargs( kwargs, allow_excess=True, ignore_kind=True, allow_partial=False, apply_defaults=True, ) return func(*args, **kwargs) def _conditional_pluralization(n_items, singular_msg, plural_msg): """To route to the right message (or template) according to ``n_items``""" if n_items == 1: return singular_msg else: return plural_msg def _validate_components(components): if not all(map(callable, components.values())): not_callable = [k for k, v in components.items() if not callable(v)] not_callable_keys = ', '.join(not_callable) # TODO: Analyze values of components further and enhance error message with # further suggestions. For example, if there's an iterator component c, # suggest that perhaps ``c.__next__`` was intended? # These component-based suggestions should be placed as a default of an # argument of _validate_components so that it can be parametrized. msg = _conditional_pluralization( len(not_callable), f'This component is not callable: {not_callable_keys}', f'These components are not callable: {not_callable_keys}', ) raise TypeError(msg) # TODO: Postelize (or add tooling for) the components specification and add validation.
[docs] class Slabs: """Object to source and manipulate multiple streams. A slab is a collection of items of a same interval of time. We represent a slab using a `dict` or mapping. Typically, a slab will be the aggregation of multiple information streams that happened around the same time. For example, say and edge device had a microphone, light, and movement sensor. An aggregate reading of these sensors could give you something like: >>> slab = {'audio': [1, 2, 4], 'light': 126, 'movement': None} `movement` is `None` because the sensor is off. If it were on, we'd have True or False as values. From this information, you'd like to compute a `turn_mov_on` value based on the formula. >>> from statistics import stdev >>> vol = stdev >>> should_turn_movement_sensor_on = lambda audio, light: vol(audio) * light > 50000 The produce of the volume and the lumens gives you 192, so you now have... >>> slab = { ... 'audio': [1, 2, 4], ... 'light': 126, ... 'should_turn_movement_sensor_on': False, ... 'movement': None ... } The next slab that comes in is >>> slab = {'audio': [-96, 89, -92], 'light': 501, 'movement': None} which puts us over the threshold so >>> slab = { ... 'audio': [-96, 89, -92], ... 'light': 501, ... 'should_turn_movement_sensor_on': True, ... 'movement': None ... } and the movement sensor is turned on, the movement is detected, a `human_presence` signal is computed, and a notification sent if that metric is above a given theshold. The point here is that we incrementally compute various fields, enhancing our slab of information, and we do so iteratively over over slab that is streaming to us from our smart home device. `SlabsIter` is there to help you create such slabs, from source to enhanced. The situation above would look something along like this: >>> from statistics import stdev >>> >>> vol = stdev >>> >>> # Making a slabs iter object >>> def make_a_slabs_iter(): ... ... # Mocking the sensor readers ... audio_sensor_read = iter([[1, 2, 3], [-96, 87, -92], [320, -96, 99]]).__next__ ... light_sensor_read = iter([126, 501, 523]).__next__ ... movement_sensor_read = iter([None, None, True]).__next__ ... ... return Slabs( ... # The first three components get data from the sensors. ... # The *_read objects are all callable, returning the next ... # chunk of data for that sensor, if any. ... audio=audio_sensor_read, ... light=light_sensor_read, ... movement=movement_sensor_read, ... # The next ... should_turn_movement_sensor_on = lambda audio, light: vol(audio) * light > 50000, ... human_presence_score = lambda audio, light, movement: movement and sum([vol(audio), light]), ... should_notify = lambda human_presence_score: human_presence_score and human_presence_score > 700, ... notify = lambda should_notify: print('someone is there') if should_notify else None ... ) ... >>> >>> si = make_a_slabs_iter() >>> next(si) # doctest: +NORMALIZE_WHITESPACE {'audio': [1, 2, 3], 'light': 126, 'movement': None, 'should_turn_movement_sensor_on': False, 'human_presence_score': None, 'should_notify': None, 'notify': None} >>> next(si) # doctest: +NORMALIZE_WHITESPACE {'audio': [-96, 87, -92], 'light': 501, 'movement': None, 'should_turn_movement_sensor_on': True, 'human_presence_score': None, 'should_notify': None, 'notify': None} >>> next(si) # doctest: +NORMALIZE_WHITESPACE someone is there {'audio': [320, -96, 99], 'light': 523, 'movement': True, 'should_turn_movement_sensor_on': True, 'human_presence_score': 731.1353726143957, 'should_notify': True, 'notify': None} If you ask for the next slab, you'll get a `StopIteration` (raised by the mocked sources since they reached the end of their iterators). >>> next(si) # doctest: +NORMALIZE_WHITESPACE Traceback (most recent call last): ... StopIteration That said, if you iterate through a `SlabsIter` that handles the `StopIteration` exception (it does by default), you'll reach the end of you iteration gracefully. >>> si = make_a_slabs_iter() >>> for slab in si: ... pass someone is there >>> si = make_a_slabs_iter() >>> slabs = list(si) # gather all the slabs someone is there >>> len(slabs) 3 >>> slabs[-1] # doctest: +NORMALIZE_WHITESPACE {'audio': [320, -96, 99], 'light': 523, 'movement': True, 'should_turn_movement_sensor_on': True, 'human_presence_score': 731.1353726143957, 'should_notify': True, 'notify': None} Note that ``Slabs`` uses a "scope" to store the intermediate results of the computation. This scope is a `dict` by default, but you can pass any ``MutableMapping`` to the ``scope_factory`` argument. This means that you can use other means to store intermediate results simply by wrapping them in a MutableMapping. For example, you could use message broker such as Redis to store the intermediate results, and have the components read and write to it. To help you with this, check out the `dol <https://pypi.org/project/dol/>`_ and `py2store <https://pypi.org/project/py2store/>`_ libraries. """ _output_of_context_enter = None def __init__( self, handle_exceptions: HandledExceptionsMapSpec = DFLT_INTERRUPT_EXCEPTIONS, scope_factory: Callable[[], MutableMapping] = dict, **components, ): _validate_components(components) self.components = components self.handle_exceptions = _get_handle_exceptions(handle_exceptions) self.scope_factory = scope_factory self._handled_exception_types = tuple(self.handle_exceptions) self.sigs = { name: Sig.sig_or_default(func) for name, func in self.components.items() } self.context = ContextFanout(**components) def _call_on_scope(self, scope: MutableMapping): """Calls the components 1 by 1, sourcing inputs and writing outputs in scope""" # for each component for name, component in self.components.items(): # call the component using scope to source any arguments it needs # and write the result in scope, under the component's name. scope[name] = _call_from_dict(scope, component, self.sigs[name]) return scope def __next__(self): """Get the next slab by calling _call_on_scope on an new empty scope. At least one of the components will have to be argument-less and provide some data for other components to get their inputs from, if any are needed. """ return self._call_on_scope(scope=self.scope_factory()) # TODO: Extend the flow control capabilities of execption handling # (see https://github.com/i2mint/meshed/discussions/49) def __iter__(self): """Iterates over slabs until a handle exception is raised.""" with self: # enter all the contexts that need to be entered while True: # loop until you encounter a handled exception try: yield next(self) except self._handled_exception_types as exc_val: handler_output = _handle_exception( self, exc_val, self.handle_exceptions ) # break, unless the handler tells us not to if handler_output is not do_not_break: self.exit_value = handler_output # remember, in case useful break def open(self): self._output_of_context_enter = self.context.__enter__() return self def close(self, exc_type=None, exc_val=None, exc_tb=None) -> None: return self._output_of_context_enter.__exit__(exc_type, exc_val, exc_tb) def run(self): for _ in self: pass __enter__ = open __exit__ = close __call__ = run
[docs] @classmethod def from_func_nodes( cls, func_nodes: Iterable[FuncNode], *, handle_exceptions: HandledExceptionsMapSpec = DFLT_INTERRUPT_EXCEPTIONS, scope_factory: Callable[[], MutableMapping] = dict, ): """Make""" func_nodes = list(func_nodes) assert all( list(fn.bind.keys()) == list(fn.bind.values()) for fn in func_nodes ), ( "You can't use `from_func_nodes` (yet) your binds are not trivial. " "That is, if any of your functions' arguments have different names than " "the var nodes they're bound to" ) # TODO: Make it work for non-trivial binds by using i2.wrapper components = {n.out: n.func for n in func_nodes} return cls( handle_exceptions=handle_exceptions, scope_factory=scope_factory, **components, )
from_dag = from_func_nodes # TODO: Have a vote if we want this alias. def to_func_nodes(self) -> Iterable[FuncNode]: for name, func in self.components.items(): yield FuncNode(func, name=name, out=name) def to_dag(self) -> DAG: return DAG(list(self.to_func_nodes())) # TODO: Add @wraps(dot_digraph_body) to have DAG.dot_digraph signature
[docs] def dot_digraph(self, *args, **kwargs): """ Returns a dot_digraph of the DAG of the SlabsIter (see ``DAG.dot_digraph``) """ dag = self.to_dag() return dag.dot_digraph()
SlabsIter = Slabs # for backward compatibility