From a165b34d3c162963f9baf1555f7d22254233ab44 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sun, 20 Feb 2022 10:20:52 +0530 Subject: [PATCH] prepare for mocking of fts workers --- src/calibre/db/fts/pool.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/calibre/db/fts/pool.py b/src/calibre/db/fts/pool.py index 6ec411be43..305e75dbdf 100644 --- a/src/calibre/db/fts/pool.py +++ b/src/calibre/db/fts/pool.py @@ -3,12 +3,16 @@ # License: GPL v3 Copyright: 2022, Kovid Goyal -import os, sys -import traceback, subprocess +import os +import subprocess +import sys +import traceback from contextlib import suppress from queue import Queue from threading import Thread +from time import monotonic +from calibre import human_readable from calibre.utils.ipc.simple_worker import start_pipe_worker check_for_work = object() @@ -46,6 +50,9 @@ class Result: class Worker(Thread): + code_to_exec = 'from calibre.db.fts.text import main; main({!r})' + max_duration = 30 # minutes + def __init__(self, jobs_queue, supervise_queue): super().__init__(name='FTSWorker', daemon=True) self.currently_working = False @@ -73,20 +80,24 @@ class Worker(Thread): self.working = False def run_job(self, job): + time_limit = monotonic() + (self.max_duration * 60) txtpath = job.path + '.txt' errpath = job.path + '.error' try: with open(errpath, 'wb') as error: p = start_pipe_worker( - f'from calibre.db.fts.text import main; main({job.path!r})', + self.code_to_exec.format(job.path), stdout=subprocess.DEVNULL, stderr=error, stdin=subprocess.DEVNULL, priority='low', ) - while self.keep_going: + while self.keep_going and monotonic() <= time_limit: with suppress(subprocess.TimeoutExpired): p.wait(0.1) break if p.returncode is None: p.kill() + if monotonic() > time_limit: + return Result(job, _('Extracting text from the {0} file of size {1} took too long').format( + job.fmt, human_readable(job.fmt_size))) return if os.path.exists(txtpath): return Result(job)