Source code for py2store.examples.write_caches
"""
stores that implement various write caching algorithms
"""
from py2store.utils.cumul_aggreg_write import (
join_byte_values_and_key_as_current_utc_milliseconds,
CumulAggregWriteWithAutoFlush,
)
def append_and_print_state(store, data):
store.append(data)
print(f'Appended [{data}].\tcache: {store.cache},\tstore: {store.store}')
def timestamp_on_store():
s = CumulAggregWriteWithAutoFlush(
store={},
cache_to_kv=join_byte_values_and_key_as_current_utc_milliseconds,
flush_cache_condition=lambda x: len(x) >= 3,
)
append_and_print_state(s, b'Hello')
append_and_print_state(s, b'World')
append_and_print_state(s, b'!')
append_and_print_state(s, b'Wassup?')
[docs]def timestamp_on_cache_and_concatenate_all_values():
"""The cache timestamps (with system clock) every item on insertion (append) and uses the min timestamp as
a key for storage."""
from collections import UserList
import time
class TimestampedItemsCache(UserList):
def append(self, item):
ts = time.time()
super().append((ts, item))
def min_key_joined_values(items):
sorted_items = sorted(items, key=lambda x: x[0])
k = sorted_items[0][0]
join_char = sorted_items[0][1][
0:0
] # better way to get '' or b'' according to data type?
yield k, join_char.join(x[1] for x in sorted_items)
s = CumulAggregWriteWithAutoFlush(
store={},
cache_to_kv=min_key_joined_values,
flush_cache_condition=lambda x: len(x) >= 3,
mk_cache=TimestampedItemsCache,
)
append_and_print_state(s, b'Hello')
time.sleep(0.1)
append_and_print_state(s, b'World')
time.sleep(0.1)
append_and_print_state(s, b'!')
time.sleep(0.1)
append_and_print_state(s, b'Wassup?')
time.sleep(0.1)