mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-09 03:04:10 -04:00
Run bulk metadata downloads in a separate process to workaround the problem of third party metadata download plugins with memory leaks. Also removes the need to batch metadata downloads into groups of 100 books at a time.
This commit is contained in:
parent
310c5c17d2
commit
927b7471b7
95
src/calibre/ebooks/metadata/sources/worker.py
Normal file
95
src/calibre/ebooks/metadata/sources/worker.py
Normal file
@ -0,0 +1,95 @@
|
||||
#!/usr/bin/env python
|
||||
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
|
||||
from __future__ import (unicode_literals, division, absolute_import,
|
||||
print_function)
|
||||
|
||||
__license__ = 'GPL v3'
|
||||
__copyright__ = '2012, Kovid Goyal <kovid@kovidgoyal.net>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
||||
import os
|
||||
from threading import Event
|
||||
from io import BytesIO
|
||||
|
||||
from calibre.utils.date import as_utc
|
||||
from calibre.ebooks.metadata.sources.identify import identify, msprefs
|
||||
from calibre.ebooks.metadata.book.base import Metadata
|
||||
from calibre.customize.ui import metadata_plugins
|
||||
from calibre.ebooks.metadata.sources.covers import download_cover
|
||||
from calibre.utils.logging import GUILog
|
||||
from calibre.ebooks.metadata.opf2 import metadata_to_opf, OPF
|
||||
|
||||
def merge_result(oldmi, newmi, ensure_fields=None):
|
||||
dummy = Metadata(_('Unknown'))
|
||||
for f in msprefs['ignore_fields']:
|
||||
if ':' in f or (ensure_fields and f in ensure_fields):
|
||||
continue
|
||||
setattr(newmi, f, getattr(dummy, f))
|
||||
fields = set()
|
||||
for plugin in metadata_plugins(['identify']):
|
||||
fields |= plugin.touched_fields
|
||||
|
||||
def is_equal(x, y):
|
||||
if hasattr(x, 'tzinfo'):
|
||||
x = as_utc(x)
|
||||
if hasattr(y, 'tzinfo'):
|
||||
y = as_utc(y)
|
||||
return x == y
|
||||
|
||||
for f in fields:
|
||||
# Optimize so that set_metadata does not have to do extra work later
|
||||
if not f.startswith('identifier:'):
|
||||
if (not newmi.is_null(f) and is_equal(getattr(newmi, f),
|
||||
getattr(oldmi, f))):
|
||||
setattr(newmi, f, getattr(dummy, f))
|
||||
|
||||
return newmi
|
||||
|
||||
def main(do_identify, covers, metadata, ensure_fields):
|
||||
failed_ids = set()
|
||||
failed_covers = set()
|
||||
all_failed = True
|
||||
log = GUILog()
|
||||
|
||||
for book_id, mi in metadata.iteritems():
|
||||
mi = OPF(BytesIO(mi), basedir=os.getcwdu(),
|
||||
populate_spine=False).to_book_metadata()
|
||||
title, authors, identifiers = mi.title, mi.authors, mi.identifiers
|
||||
cdata = None
|
||||
log.clear()
|
||||
|
||||
if do_identify:
|
||||
results = []
|
||||
try:
|
||||
results = identify(log, Event(), title=title, authors=authors,
|
||||
identifiers=identifiers)
|
||||
except:
|
||||
pass
|
||||
if results:
|
||||
all_failed = False
|
||||
mi = merge_result(mi, results[0], ensure_fields=ensure_fields)
|
||||
identifiers = mi.identifiers
|
||||
if not mi.is_null('rating'):
|
||||
# set_metadata expects a rating out of 10
|
||||
mi.rating *= 2
|
||||
with open('%d.mi'%book_id, 'wb') as f:
|
||||
f.write(metadata_to_opf(mi, default_lang='und'))
|
||||
else:
|
||||
log.error('Failed to download metadata for', title)
|
||||
failed_ids.add(book_id)
|
||||
|
||||
if covers:
|
||||
cdata = download_cover(log, title=title, authors=authors,
|
||||
identifiers=identifiers)
|
||||
if cdata is None:
|
||||
failed_covers.add(book_id)
|
||||
else:
|
||||
with open('%d.cover'%book_id, 'wb') as f:
|
||||
f.write(cdata[-1])
|
||||
all_failed = False
|
||||
|
||||
with open('%d.log'%book_id, 'wb') as f:
|
||||
f.write(log.html.encode('utf-8'))
|
||||
|
||||
return failed_ids, failed_covers, all_failed
|
||||
|
@ -7,20 +7,17 @@ __license__ = 'GPL v3'
|
||||
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
||||
import os, time, shutil
|
||||
from functools import partial
|
||||
from itertools import izip
|
||||
from threading import Event
|
||||
|
||||
from PyQt4.Qt import (QIcon, QDialog,
|
||||
QDialogButtonBox, QLabel, QGridLayout, QPixmap, Qt)
|
||||
|
||||
from calibre.gui2.threaded_jobs import ThreadedJob
|
||||
from calibre.ebooks.metadata.sources.identify import identify, msprefs
|
||||
from calibre.ebooks.metadata.sources.covers import download_cover
|
||||
from calibre.ebooks.metadata.book.base import Metadata
|
||||
from calibre.customize.ui import metadata_plugins
|
||||
from calibre.ptempfile import PersistentTemporaryFile
|
||||
from calibre.utils.date import as_utc
|
||||
from calibre.ebooks.metadata.opf2 import metadata_to_opf
|
||||
from calibre.utils.ipc.simple_worker import fork_job, WorkerError
|
||||
from calibre.ptempfile import (PersistentTemporaryDirectory,
|
||||
PersistentTemporaryFile)
|
||||
|
||||
# Start download {{{
|
||||
def show_config(gui, parent):
|
||||
@ -105,18 +102,18 @@ def start_download(gui, ids, callback, ensure_fields=None):
|
||||
if ret != d.Accepted:
|
||||
return
|
||||
|
||||
for batch in split_jobs(ids):
|
||||
job = ThreadedJob('metadata bulk download',
|
||||
_('Download metadata for %d books')%len(batch),
|
||||
download, (batch, gui.current_db, d.identify, d.covers,
|
||||
ensure_fields), {}, callback)
|
||||
gui.job_manager.run_threaded_job(job)
|
||||
job = ThreadedJob('metadata bulk download',
|
||||
_('Download metadata for %d books')%len(ids),
|
||||
download, (ids, gui.current_db, d.identify, d.covers,
|
||||
ensure_fields), {}, callback)
|
||||
gui.job_manager.run_threaded_job(job)
|
||||
gui.status_bar.show_message(_('Metadata download started'), 3000)
|
||||
|
||||
# }}}
|
||||
|
||||
def get_job_details(job):
|
||||
id_map, failed_ids, failed_covers, title_map, all_failed = job.result
|
||||
(aborted, good_ids, tdir, log_file, failed_ids, failed_covers, title_map,
|
||||
lm_map, all_failed) = job.result
|
||||
det_msg = []
|
||||
for i in failed_ids | failed_covers:
|
||||
title = title_map[i]
|
||||
@ -126,92 +123,90 @@ def get_job_details(job):
|
||||
title += (' ' + _('(Failed cover)'))
|
||||
det_msg.append(title)
|
||||
det_msg = '\n'.join(det_msg)
|
||||
return id_map, failed_ids, failed_covers, all_failed, det_msg
|
||||
return (aborted, good_ids, tdir, log_file, failed_ids, failed_covers,
|
||||
all_failed, det_msg, lm_map)
|
||||
|
||||
def merge_result(oldmi, newmi, ensure_fields=None):
|
||||
dummy = Metadata(_('Unknown'))
|
||||
for f in msprefs['ignore_fields']:
|
||||
if ':' in f or (ensure_fields and f in ensure_fields):
|
||||
continue
|
||||
setattr(newmi, f, getattr(dummy, f))
|
||||
fields = set()
|
||||
for plugin in metadata_plugins(['identify']):
|
||||
fields |= plugin.touched_fields
|
||||
class HeartBeat(object):
|
||||
CHECK_INTERVAL = 300 # seconds
|
||||
''' Check that the file count in tdir changes every five minutes '''
|
||||
|
||||
def is_equal(x, y):
|
||||
if hasattr(x, 'tzinfo'):
|
||||
x = as_utc(x)
|
||||
if hasattr(y, 'tzinfo'):
|
||||
y = as_utc(y)
|
||||
return x == y
|
||||
def __init__(self, tdir):
|
||||
self.tdir = tdir
|
||||
self.last_count = len(os.listdir(self.tdir))
|
||||
self.last_time = time.time()
|
||||
|
||||
for f in fields:
|
||||
# Optimize so that set_metadata does not have to do extra work later
|
||||
if not f.startswith('identifier:'):
|
||||
if (not newmi.is_null(f) and is_equal(getattr(newmi, f),
|
||||
getattr(oldmi, f))):
|
||||
setattr(newmi, f, getattr(dummy, f))
|
||||
def __call__(self):
|
||||
if time.time() - self.last_time > self.CHECK_INTERVAL:
|
||||
c = len(os.listdir(self.tdir))
|
||||
if c == self.last_count:
|
||||
return False
|
||||
self.last_count = c
|
||||
self.last_time = time.time()
|
||||
return True
|
||||
|
||||
newmi.last_modified = oldmi.last_modified
|
||||
# Fix log viewer, get_job_details, database update code
|
||||
# Test: abort, covers only, metadata only, both, 200 entry download, memory
|
||||
# consumption, all errors and on and on
|
||||
|
||||
return newmi
|
||||
|
||||
def download(ids, db, do_identify, covers, ensure_fields,
|
||||
def download(all_ids, db, do_identify, covers, ensure_fields,
|
||||
log=None, abort=None, notifications=None):
|
||||
ids = list(ids)
|
||||
metadata = [db.get_metadata(i, index_is_id=True, get_user_categories=False)
|
||||
for i in ids]
|
||||
batch_size = 10
|
||||
batches = split_jobs(all_ids, batch_size=batch_size)
|
||||
tdir = PersistentTemporaryDirectory('_metadata_bulk_')
|
||||
tf = PersistentTemporaryFile('_metadata_bulk_log_')
|
||||
tf.close()
|
||||
tf = tf.name
|
||||
heartbeat = HeartBeat(tdir)
|
||||
|
||||
failed_ids = set()
|
||||
failed_covers = set()
|
||||
title_map = {}
|
||||
ans = {}
|
||||
count = 0
|
||||
lm_map = {}
|
||||
ans = set()
|
||||
all_failed = True
|
||||
'''
|
||||
# Test apply dialog
|
||||
all_failed = do_identify = covers = False
|
||||
'''
|
||||
for i, mi in izip(ids, metadata):
|
||||
aborted = False
|
||||
count = 0
|
||||
|
||||
for ids in batches:
|
||||
if abort.is_set():
|
||||
log.error('Aborting...')
|
||||
break
|
||||
title, authors, identifiers = mi.title, mi.authors, mi.identifiers
|
||||
title_map[i] = title
|
||||
if do_identify:
|
||||
results = []
|
||||
try:
|
||||
results = identify(log, Event(), title=title, authors=authors,
|
||||
identifiers=identifiers)
|
||||
except:
|
||||
pass
|
||||
if results:
|
||||
all_failed = False
|
||||
mi = merge_result(mi, results[0], ensure_fields=ensure_fields)
|
||||
identifiers = mi.identifiers
|
||||
if not mi.is_null('rating'):
|
||||
# set_metadata expects a rating out of 10
|
||||
mi.rating *= 2
|
||||
else:
|
||||
log.error('Failed to download metadata for', title)
|
||||
failed_ids.add(i)
|
||||
# We don't want set_metadata operating on anything but covers
|
||||
mi = merge_result(mi, mi, ensure_fields=ensure_fields)
|
||||
if covers:
|
||||
cdata = download_cover(log, title=title, authors=authors,
|
||||
identifiers=identifiers)
|
||||
if cdata is not None:
|
||||
with PersistentTemporaryFile('.jpg', 'downloaded-cover-') as f:
|
||||
f.write(cdata[-1])
|
||||
mi.cover = f.name
|
||||
all_failed = False
|
||||
else:
|
||||
failed_covers.add(i)
|
||||
ans[i] = mi
|
||||
count += 1
|
||||
metadata = {i:db.get_metadata(i, index_is_id=True,
|
||||
get_user_categories=False) for i in ids}
|
||||
for i in ids:
|
||||
title_map[i] = metadata[i].title
|
||||
lm_map[i] = metadata[i].last_modified
|
||||
metadata = {i:metadata_to_opf(mi, default_lang='und') for i, mi in
|
||||
metadata.iteritems()}
|
||||
try:
|
||||
ret = fork_job('calibre.ebooks.metadata.sources.worker', 'main',
|
||||
(do_identify, covers, metadata, ensure_fields),
|
||||
cwd=tdir, abort=abort, heartbeat=heartbeat, no_output=True)
|
||||
except WorkerError as e:
|
||||
if e.orig_tb:
|
||||
raise Exception('Failed to download metadata. Original '
|
||||
'traceback: \n\n'+e.orig_tb)
|
||||
raise
|
||||
count += batch_size
|
||||
notifications.put((count/len(ids),
|
||||
_('Downloaded %(num)d of %(tot)d')%dict(num=count, tot=len(ids))))
|
||||
_('Downloaded %(num)d of %(tot)d')%dict(
|
||||
num=count, tot=len(all_ids))))
|
||||
|
||||
fids, fcovs, allf = ret['result']
|
||||
if not allf:
|
||||
all_failed = False
|
||||
failed_ids = failed_ids.union(fids)
|
||||
failed_covers = failed_covers.union(fcovs)
|
||||
ans = ans.union(set(ids) - fids)
|
||||
for book_id in ids:
|
||||
lp = os.path.join(tdir, '%d.log'%book_id)
|
||||
if os.path.exists(lp):
|
||||
with open(lp, 'rb') as f, open(tf, 'ab') as d:
|
||||
shutil.copyfileobj(f, d)
|
||||
|
||||
if abort.is_set():
|
||||
aborted = True
|
||||
log('Download complete, with %d failures'%len(failed_ids))
|
||||
return (ans, failed_ids, failed_covers, title_map, all_failed)
|
||||
|
||||
|
||||
return (aborted, ans, tdir, tf, failed_ids, failed_covers, title_map,
|
||||
lm_map, all_failed)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user