stream2py¶
digraph { rankdir=LR; "SourceReader" [shape=cds] "StreamBuffer" [shape=box3d] "SourceReader" -> "StreamBuffer" -> "BufferReader1"; "StreamBuffer" -> "BufferReader2"; "StreamBuffer" -> "BufferReader3"; }The Rationale¶
Stream2py enables you to create a single asynchronous Source Reader and simply distribute thread-safe readers to one or more consumers for the same data window. Or in other words, read and work on data without worrying about missing data.
Normal way to process audio recording:
import pyaudio
CHUNK = 1024
WIDTH = 2
CHANNELS = 1
RATE = 44100
RECORD_SECONDS = 5
p = pyaudio.PyAudio()
stream = p.open(
format=p.get_format_from_width(WIDTH),
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK
)
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK)
# risk of buffer overflow before next read
long_calculations(data)
stream.stop_stream()
stream.close()
p.terminate()
Stream2py way to process audio recording:
from stream2py.sources.audio import PyAudioSourceReader
CHUNK = 1024
WIDTH = 2
CHANNELS = 1
RATE = 44100
RECORD_SECONDS = 5
source_reader = PyAudioSourceReader(
rate=RATE,
width=WIDTH,
channels=CHANNELS,
frames_per_buffer=CHUNK
)
with StreamBuffer(
source_reader=source_reader, maxlen=int(RATE / CHUNK * RECORD_SECONDS)
) as stream_buffer:
buffer_reader = stream_buffer.mk_reader()
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = next(buffer_reader)
long_calculations(data)
How to use¶
There are 3 components
1. The SourceReader¶
A SourceReader defines how to get data with the methods: open(), read(), and close(), and also how the data is ordered with the key() method and an info property describing the instance.
from typing import Tuple
from stream2py import SourceReader
EnumeratedReadData = Tuple[int, str]
class LineSourceReader(SourceReader):
def __init__(self, file, mode='rt'):
self.file = file
self.mode = mode
self.line_count = 0
self._f = None
def open(self):
self.line_count = 0
self._f = open(self.file, self.mode)
def read(self) -> EnumeratedReadData:
read_data = (self.line_count, self._f.readline())
self.line_count += 1
return read_data
def close(self):
self._f.close()
def key(self, data: EnumeratedReadData):
return data[0]
@property
def info(self)
return dict(file=self.file, mode=self.mode)
2. The StreamBuffer¶
A StreamBuffer has 2 jobs: First, it manages the open, read, and close of a SourceReader and puts read data onto a thread-safe buffer. Second, it is a factory of BufferReaders instances for multiple consumers.
# ...Continued from above
# download txt from: https://www.gutenberg.org/cache/epub/1524/pg1524.txt
FILE = 'hamlet.txt'
source_reader = LineSourceReader(FILE)
with StreamBuffer(
source_reader=source_reader,
maxlen=None, # None should only be used when reading objects with fixed lengths like a file
) as stream_buffer:
buffer_reader = stream_buffer.mk_reader()
3. The BufferReader¶
A BufferReader gives data access to any number of consumers and provides methods to seek data such as next(), range(), head(), tail(). Each BufferReader instance has it’s own cursor keeping track of what data was last seen.
# ...Continued from above
line_1 = buffer_reader.next()
line_2 = next(buffer_reader)
line_1_10 = buffer_reader.range(start=1, stop=10)
The All-in-One Helper¶
Here’s a bonus helper function to shortcut the SourceReader class.
from time import sleep
from stream2py import mk_stream_buffer
FILE = 'hamlet.txt'
with mk_stream_buffer(
read_stream=lambda open_inst: next(open_inst, None),
open_stream=lambda: open(FILE, 'rt'),
close_stream=lambda open_inst: open_inst.close(),
key=lambda read_data: read_data[0],
maxlen=None,
) as stream_buffer:
buffer_reader = stream_buffer.mk_reader()
sleep(1)
# do stuff with buffer_reader
print(buffer_reader.range(start=40, stop=50))