prepare for mocking of fts workers

This commit is contained in:
Kovid Goyal 2022-02-20 10:20:52 +05:30
parent 4f90074eeb
commit a165b34d3c
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -3,12 +3,16 @@
# License: GPL v3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net> # License: GPL v3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net>
import os, sys import os
import traceback, subprocess import subprocess
import sys
import traceback
from contextlib import suppress from contextlib import suppress
from queue import Queue from queue import Queue
from threading import Thread from threading import Thread
from time import monotonic
from calibre import human_readable
from calibre.utils.ipc.simple_worker import start_pipe_worker from calibre.utils.ipc.simple_worker import start_pipe_worker
check_for_work = object() check_for_work = object()
@ -46,6 +50,9 @@ class Result:
class Worker(Thread): class Worker(Thread):
code_to_exec = 'from calibre.db.fts.text import main; main({!r})'
max_duration = 30 # minutes
def __init__(self, jobs_queue, supervise_queue): def __init__(self, jobs_queue, supervise_queue):
super().__init__(name='FTSWorker', daemon=True) super().__init__(name='FTSWorker', daemon=True)
self.currently_working = False self.currently_working = False
@ -73,20 +80,24 @@ class Worker(Thread):
self.working = False self.working = False
def run_job(self, job): def run_job(self, job):
time_limit = monotonic() + (self.max_duration * 60)
txtpath = job.path + '.txt' txtpath = job.path + '.txt'
errpath = job.path + '.error' errpath = job.path + '.error'
try: try:
with open(errpath, 'wb') as error: with open(errpath, 'wb') as error:
p = start_pipe_worker( p = start_pipe_worker(
f'from calibre.db.fts.text import main; main({job.path!r})', self.code_to_exec.format(job.path),
stdout=subprocess.DEVNULL, stderr=error, stdin=subprocess.DEVNULL, priority='low', stdout=subprocess.DEVNULL, stderr=error, stdin=subprocess.DEVNULL, priority='low',
) )
while self.keep_going: while self.keep_going and monotonic() <= time_limit:
with suppress(subprocess.TimeoutExpired): with suppress(subprocess.TimeoutExpired):
p.wait(0.1) p.wait(0.1)
break break
if p.returncode is None: if p.returncode is None:
p.kill() p.kill()
if monotonic() > time_limit:
return Result(job, _('Extracting text from the {0} file of size {1} took too long').format(
job.fmt, human_readable(job.fmt_size)))
return return
if os.path.exists(txtpath): if os.path.exists(txtpath):
return Result(job) return Result(job)