Get pipe reader working on windows where we cant wait for data to become available on a non-blocking pipe

This commit is contained in:
Kovid Goyal 2024-10-15 08:55:16 +05:30
parent 93dc4faa8b
commit 6051cf3995
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -566,7 +566,7 @@ class PiperEmbedded:
lang = get_lang()
lang = canonicalize_lang(lang) or lang
self._default_voice = self._voice_for_lang.get(lang) or self._voice_for_lang['eng']
self._current_voice = self._process = None
self._current_voice = self._process = self._pipe_reader = None
self._current_audio_rate = 0
def resolve_voice(self, lang: str, voice_name: str) -> Voice:
@ -610,10 +610,8 @@ class PiperEmbedded:
self._process.stdin.write(UTTERANCE_SEPARATOR)
self._process.stdin.flush()
stderr_data = b''
buf, piper_done, errors_from_piper = [], [], []
def stdout_callback(data: bytes) -> None:
buf.append(data)
buf = io.BytesIO()
piper_done, errors_from_piper = [], []
def stderr_callback(data: bytes) -> bool:
nonlocal stderr_data
@ -621,11 +619,11 @@ class PiperEmbedded:
return not piper_done
try:
unix_pipe_readers(self._process.stdout, self._process.stderr, stdout_callback, stderr_callback)
self._pipe_reader(buf.write, stderr_callback)
except Exception as e:
raise Exception(f'Reading output from piper process failed with error: {e} and STDERR: ' + '\n'.join(errors_from_piper))
raw_data = b''.join(buf)
raw_data = buf.getvalue()
if needs_conversion:
raw_data = resample_raw_audio_16bit(raw_data, self._current_audio_rate, sample_rate)
yield raw_data, duration_of_raw_audio_data(raw_data, sample_rate)
@ -637,6 +635,8 @@ class PiperEmbedded:
def shutdown(self):
if self._process is not None:
self._pipe_reader.close()
self._pipe_reader = None
self._process.stdin.close()
self._process.stdout.close()
self._process.stderr.close()
@ -652,25 +652,111 @@ class PiperEmbedded:
self._current_audio_rate, cmdline = piper_process_metadata(model_path, config_path, self._embedded_settings, self._current_voice)
import subprocess
self._process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
self._pipe_reader = (ThreadedPipeReader if iswindows else PipeReader)(self._process.stdout, self._process.stderr)
def unix_pipe_readers(stdout: BinaryIO, stderr: BinaryIO, stdout_callback, stderr_callback, timeout: float = 10.) -> None:
from select import select
out, err = stdout.fileno(), stderr.fileno()
os.set_blocking(out, False)
os.set_blocking(err, False)
readers = out, err
while True:
r, _, _ = select(readers, (), (), timeout)
if not r:
raise TimeoutError('Timed out waiting for output from piper process')
if out in r:
data = stdout.read(io.DEFAULT_BUFFER_SIZE)
stdout_callback(data)
if err in r:
data = stderr.read(io.DEFAULT_BUFFER_SIZE)
if not stderr_callback(data):
class PipeReader:
TIMEOUT = 10. # seconds
def __init__(self, stdout: BinaryIO, stderr: BinaryIO):
self.stdout_fd = stdout.fileno()
self.stderr_fd = stderr.fileno()
os.set_blocking(self.stdout_fd, False)
os.set_blocking(self.stderr_fd, False)
def close(self):
self.stderr_fd = self.stdout_fd = -1
def __call__(self, stdout_callback, stderr_callback):
from select import select
out, err = self.stdout_fd, self.stderr_fd
readers = out, err
buf = memoryview(bytearray(io.DEFAULT_BUFFER_SIZE))
def readall(fd: int) -> bytes:
output = io.BytesIO()
while True:
try:
num = os.readv(fd, (buf,))
except BlockingIOError:
break
else:
output.write(buf[:num])
return output.getvalue()
def readinto(fd: int, callback) -> None:
while True:
try:
num = os.readv(fd, (buf,))
except BlockingIOError:
break
else:
callback(buf[:num])
while True:
r, _, _ = select(readers, (), (), self.TIMEOUT)
if not r:
raise TimeoutError('Timed out waiting for output from piper process')
if out in r:
readinto(out, stdout_callback)
if err in r:
data = readall(err)
if not stderr_callback(data):
# In case there is new data written to stdout
readinto(out, stdout_callback)
break
class ThreadedPipeReader(PipeReader):
def __init__(self, stdout: BinaryIO, stderr: BinaryIO):
from queue import Queue
from threading import Event, Thread
self.shutting_down = Event()
self.queue = Queue()
self.stdout_thread = t = Thread(target=self._reader, args=(stdout.fileno(), True), daemon=True)
t.start()
self.stderr_thread = t = Thread(target=self._reader, args=(stderr.fileno(), False), daemon=True)
t.start()
def close(self):
self.shutting_down.set()
def __call__(self, stdout_callback, stderr_callback):
from queue import Empty
while True:
data, is_stdout, err = self.queue.get(True, self.TIMEOUT)
if err is not None:
raise err
if data:
if is_stdout:
stdout_callback(data)
else:
if not stderr_callback(data):
# in case more data was written to stdout
while True:
try:
data, is_stdout, err = self.queue.get_nowait()
except Empty:
break
if err is not None:
raise err
if is_stdout:
stdout_callback(data)
break
def _reader(self, pipe_fd: int, is_stdout: bool):
while not self.shutting_down.is_set():
try:
data = os.read(pipe_fd, io.DEFAULT_BUFFER_SIZE)
except OSError as e:
if not self.shutting_down.is_set():
self.queue.put((b'', is_stdout, e))
break
else:
self.queue.put((data, is_stdout, None))
def duration_of_raw_audio_data(data: bytes, sample_rate: int = HIGH_QUALITY_SAMPLE_RATE, bytes_per_sample: int = 2, num_channels: int = 1) -> float:
@ -690,7 +776,7 @@ def develop_embedded():
for data, duration in p.text_to_raw_audio_data((
'Hello, good day to you.', 'This is the second sentence.', 'This is the final sentence.'
)):
print(duration, len(data))
print(f'{duration=} {len(data)=}')
all_data.append(data)
sz += len(data)
all_data[0] = wav_header_for_pcm_data(sz, HIGH_QUALITY_SAMPLE_RATE)