Source code for py2store.utils.cumul_aggreg_write

"""
utils for bulk writing -- accumulate, aggregate and write when some condition is met
"""
import itertools
import time
from collections import defaultdict
from functools import reduce
from operator import add


def flush_on_exit(cls):
    def __enter__(self):
        return self

    def __exit__(self, *args, **kwargs):
        return self.flush_cache()

    new_cls = type(cls.__name__, (cls,), {})
    new_cls.__enter__ = __enter__
    new_cls.__exit__ = __exit__
    return new_cls


def let_through(gen):
    yield from gen


def key_count(gen, start=0):
    yield from enumerate(gen, start=start)


def join_byte_values_and_key_as_current_utc_milliseconds(gen):
    k = int(time.time() * 1000)
    yield k, b''.join(gen)


def join_string_values_and_key_as_current_utc_milliseconds(gen):
    k = int(time.time() * 1000)
    yield k, ''.join(gen)


def mk_kv_from_keygen(keygen=itertools.count()):
    def aggregate(gen):
        for k, v in zip(keygen, gen):
            yield k, v

    return aggregate


infinite_keycount_kvs = mk_kv_from_keygen(keygen=itertools.count())
no_initial = type('NoInitial', (), {})()


[docs]def mk_group_aggregator(item_to_kv, aggregator_op=add, initial=no_initial): """Make a generator transforming function that will (a) make a key for each given item, (b) group all items according to the key Args: item_to_kv: aggregator_op: initial: Returns: >>> # Collect words (as a csv string), grouped by the lower case of the first letter >>> ag = mk_group_aggregator(lambda item: (item[0].lower(), item), ... aggregator_op=lambda x, y: ', '.join([x, y])) >>> list(ag(['apple', 'bananna', 'Airplane'])) [('a', 'apple, Airplane'), ('b', 'bananna')] >>> # Collect (and concatinate) characters according to their ascii value modulo 3 >>> ag = mk_group_aggregator(lambda item: (item['age'], item['thing']), ... aggregator_op=lambda x, y: x + [y], ... initial=[]) >>> list(ag([{'age': 0, 'thing': 'new'}, {'age': 42, 'thing': 'every'}, {'age': 0, 'thing': 'just born'}])) [(0, ['new', 'just born']), (42, ['every'])] """ if initial is no_initial: aggregate_reduce = lambda v: reduce(aggregator_op, v) else: aggregate_reduce = lambda v: reduce(aggregator_op, v, initial) def aggregator(gen): d = defaultdict(list) for k, v in map(item_to_kv, gen): d[k].append(v) yield from ((k, aggregate_reduce(v)) for k, v in d.items()) return aggregator
[docs]def mk_group_aggregator_with_key_func( item_to_key, aggregator_op=add, initial=no_initial ): """Make a generator transforming function that will (a) make a key for each given item, (b) group all items according to the key Args: item_to_key: Function that takes an item of the generator and outputs the key that should be used to group items aggregator_op: The aggregation binary function that is used to aggregate two items together. The function is used as is by the functools.reduce, applied to the sequence of items that were collected for a given group initial: The "empty" element to start the reduce (aggregation) with, if necessary. Returns: >>> # Collect words (as a csv string), grouped by the lower case of the first letter >>> ag = mk_group_aggregator_with_key_func(lambda item: item[0].lower(), ... aggregator_op=lambda x, y: ', '.join([x, y])) >>> list(ag(['apple', 'bananna', 'Airplane'])) [('a', 'apple, Airplane'), ('b', 'bananna')] >>> >>> # Collect (and concatenate) characters according to their ascii value modulo 3 ... ag = mk_group_aggregator_with_key_func(lambda item: (ord(item) % 3)) >>> list(ag('abcdefghijklmnop')) [(1, 'adgjmp'), (2, 'behkn'), (0, 'cfilo')] >>> >>> # sum all even and odd number separately ... ag = mk_group_aggregator_with_key_func(lambda item: (item % 2)) >>> list(ag([1, 2, 3, 4, 5])) # sum of evens is 6, and sum of odds is 9 [(1, 9), (0, 6)] >>> >>> # if we wanted to collect all odds and evens, we'd need a different aggregator and initial ... ag = mk_group_aggregator_with_key_func(lambda item: (item % 2), aggregator_op=lambda x, y: x + [y], initial=[]) >>> list(ag([1, 2, 3, 4, 5])) [(1, [1, 3, 5]), (0, [2, 4])] """ return mk_group_aggregator( item_to_kv=lambda item: (item_to_key(item), item), aggregator_op=aggregator_op, initial=initial, )
[docs]@flush_on_exit class CumulAggregWrite: """ >>> store = dict() >>> key_count = lambda gen: enumerate(gen, start=0) >>> caw = CumulAggregWrite(store, cache_to_kv=key_count) >>> # Adding 3 items... >>> caw.append(3) >>> caw.append('hi') >>> caw.append({'a': complex, 'obj': [1,2,3]}) >>> >>> caw.cache # The cache now has 3 items [3, 'hi', {'a': <class 'complex'>, 'obj': [1, 2, 3]}] >>> caw.store # Store is still empty {} >>> # Flushing the items ############# >>> caw.flush_cache() >>> caw.cache # The cache now has no more items [] >>> caw.store # But the store has them. {0: 3, 1: 'hi', 2: {'a': <class 'complex'>, 'obj': [1, 2, 3]}} >>> >>> # One common use case of aggregating is when data is actually grouped and aggregated for storage >>> """ def __init__( self, store, cache_to_kv=infinite_keycount_kvs, mk_cache=list ): self.store = store self.cache_to_kv = cache_to_kv self._mk_cache = mk_cache self.cache = ( mk_cache() ) # Note: Better a cache factory, or the same object with an empty() method. def __setitem__(self, k, v): self.cache.append((k, v)) def append(self, item): self.cache.append(item) def extend(self, items): for item in items: self.append(item) def flush_cache(self): for k, v in self.cache_to_kv(self.cache): self.store[k] = v self.cache = self._mk_cache() def close(self): return self.flush_cache()
[docs]class CumulAggregWriteKvItems(CumulAggregWrite): def __init__(self, store): super().__init__( store, cache_to_kv=lambda gen: iter(gen), mk_cache=list )
[docs]def condition_flush_on_every_write(cache): """Boolean function used as flush_cache_condition to anytime the cache is non-empty""" return len(cache) > 0
[docs]class CumulAggregWriteWithAutoFlush(CumulAggregWrite): def __init__( self, store, cache_to_kv=infinite_keycount_kvs, mk_cache=list, flush_cache_condition=condition_flush_on_every_write, ): super().__init__(store, cache_to_kv, mk_cache) self.flush_cache_condition = flush_cache_condition def __setitem__(self, k, v): super().__setitem__(k, v) if self.flush_cache_condition(self.cache): self.flush_cache() def append(self, item): super().append(item) if self.flush_cache_condition(self.cache): self.flush_cache()