Save to disk: Refactor to not open a database connection in the worker process. Also fix a bug that could lead to save failures not being reported.

This commit is contained in:
Kovid Goyal 2010-12-06 00:33:37 -07:00
parent 90fa43bf37
commit e31735960b
3 changed files with 160 additions and 55 deletions

View File

@ -8,12 +8,12 @@ __docformat__ = 'restructuredtext en'
from threading import Thread
from Queue import Empty
import os, time, sys, shutil
import os, time, sys, shutil, json
from calibre.utils.ipc.job import ParallelJob
from calibre.utils.ipc.server import Server
from calibre.ptempfile import PersistentTemporaryDirectory, TemporaryDirectory
from calibre import prints
from calibre import prints, isbytestring
from calibre.constants import filesystem_encoding
@ -194,14 +194,44 @@ class SaveWorker(Thread):
self.daemon = True
self.path, self.opts = path, opts
self.ids = ids
self.library_path = db.library_path
self.db = db
self.canceled = False
self.result_queue = result_queue
self.error = None
self.spare_server = spare_server
self.start()
def collect_data(self, ids):
from calibre.ebooks.metadata.opf2 import metadata_to_opf
data = {}
for i in set(ids):
mi = self.db.get_metadata(i, index_is_id=True, get_cover=True)
opf = metadata_to_opf(mi)
if isbytestring(opf):
opf = opf.decode('utf-8')
cpath = None
if mi.cover:
cpath = mi.cover
if isbytestring(cpath):
cpath = cpath.decode(filesystem_encoding)
formats = {}
fmts = self.db.formats(i, index_is_id=True, verify_formats=False)
if fmts:
fmts = fmts.split(',')
for fmt in fmts:
fpath = self.db.format_abspath(i, fmt, index_is_id=True)
if fpath is not None:
if isbytestring(fpath):
fpath = fpath.decode(filesystem_encoding)
formats[fmt.lower()] = fpath
data[i] = [opf, cpath, formats]
return data
def run(self):
with TemporaryDirectory('save_to_disk_data') as tdir:
self._run(tdir)
def _run(self, tdir):
from calibre.library.save_to_disk import config
server = Server() if self.spare_server is None else self.spare_server
ids = set(self.ids)
@ -212,12 +242,19 @@ class SaveWorker(Thread):
for pref in c.preferences:
recs[pref.name] = getattr(self.opts, pref.name)
plugboards = self.db.prefs.get('plugboards', {})
for i, task in enumerate(tasks):
tids = [x[-1] for x in task]
data = self.collect_data(tids)
dpath = os.path.join(tdir, '%d.json'%i)
with open(dpath, 'wb') as f:
f.write(json.dumps(data, ensure_ascii=False).encode('utf-8'))
job = ParallelJob('save_book',
'Save books (%d of %d)'%(i, len(tasks)),
lambda x,y:x,
args=[tids, self.library_path, self.path, recs])
args=[tids, dpath, plugboards, self.path, recs])
jobs.add(job)
server.add_job(job)
@ -226,21 +263,19 @@ class SaveWorker(Thread):
time.sleep(0.2)
running = False
for job in jobs:
job.update(consume_notifications=False)
while True:
try:
id, title, ok, tb = job.notifications.get_nowait()[0]
if id in ids:
self.result_queue.put((id, title, ok, tb))
ids.remove(id)
except Empty:
break
self.get_notifications(job, ids)
if not job.is_finished:
running = True
if not running:
break
for job in jobs:
for id_, title, ok, tb in job.result:
if id_ in ids:
self.result_queue.put((id_, title, ok, tb))
ids.remove(id_)
server.close()
time.sleep(1)
@ -257,21 +292,39 @@ class SaveWorker(Thread):
except:
pass
def get_notifications(self, job, ids):
job.update(consume_notifications=False)
while True:
try:
id, title, ok, tb = job.notifications.get_nowait()[0]
if id in ids:
self.result_queue.put((id, title, ok, tb))
ids.remove(id)
except Empty:
break
def save_book(task, library_path, path, recs, notification=lambda x,y:x):
from calibre.library.database2 import LibraryDatabase2
db = LibraryDatabase2(library_path)
from calibre.library.save_to_disk import config, save_to_disk
def save_book(ids, dpath, plugboards, path, recs, notification=lambda x,y:x):
from calibre.library.save_to_disk import config, save_serialized_to_disk
from calibre.customize.ui import apply_null_metadata
opts = config().parse()
for name in recs:
setattr(opts, name, recs[name])
results = []
def callback(id, title, failed, tb):
results.append((id, title, not failed, tb))
notification((id, title, not failed, tb))
return True
with apply_null_metadata:
save_to_disk(db, task, path, opts, callback)
data_ = json.loads(open(dpath, 'rb').read().decode('utf-8'))
data = {}
for k, v in data_.iteritems():
data[int(k)] = v
with apply_null_metadata:
save_serialized_to_disk(ids, data, plugboards, path, opts, callback)
return results

View File

@ -427,11 +427,23 @@ class Saver(QObject): # {{{
if not self.ids or not self.worker.is_alive():
self.timer.stop()
self.pd.hide()
while self.ids:
before = len(self.ids)
self.get_result()
if before == len(self.ids):
for i in list(self.ids):
self.failures.add(('id:%d'%i, 'Unknown error'))
self.ids.remove(i)
break
if not self.callback_called:
self.callback(self.worker.path, self.failures, self.worker.error)
self.callback_called = True
return
self.get_result()
def get_result(self):
try:
id, title, ok, tb = self.rq.get_nowait()
except Empty:
@ -441,6 +453,7 @@ class Saver(QObject): # {{{
if not isinstance(title, unicode):
title = str(title).decode(preferred_encoding, 'replace')
self.pd.set_msg(_('Saved')+' '+title)
if not ok:
self.failures.add((title, tb))
# }}}

View File

@ -6,7 +6,7 @@ __license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os, traceback, cStringIO, re
import os, traceback, cStringIO, re, shutil
from calibre.constants import DEBUG
from calibre.utils.config import Config, StringConfig, tweaks
@ -203,31 +203,49 @@ def get_components(template, mi, id, timefmt='%b %Y', length=250,
return shorten_components_to(length, components)
def save_book_to_disk(id, db, root, opts, length):
mi = db.get_metadata(id, index_is_id=True)
def save_book_to_disk(id_, db, root, opts, length):
mi = db.get_metadata(id_, index_is_id=True)
cover = db.cover(id_, index_is_id=True, as_path=True)
plugboards = db.prefs.get('plugboards', {})
available_formats = db.formats(id, index_is_id=True)
available_formats = db.formats(id_, index_is_id=True)
if not available_formats:
available_formats = []
else:
available_formats = [x.lower().strip() for x in
available_formats.split(',')]
formats = {}
fmts = db.formats(id_, index_is_id=True, verify_formats=False)
if fmts:
fmts = fmts.split(',')
for fmt in fmts:
fpath = db.format_abspath(id_, fmt, index_is_id=True)
if fpath is not None:
formats[fmt.lower()] = fpath
return do_save_book_to_disk(id_, mi, cover, plugboards,
formats, root, opts, length)
def do_save_book_to_disk(id_, mi, cover, plugboards,
format_map, root, opts, length):
available_formats = [x.lower().strip() for x in format_map.keys()]
if opts.formats == 'all':
asked_formats = available_formats
else:
asked_formats = [x.lower().strip() for x in opts.formats.split(',')]
formats = set(available_formats).intersection(set(asked_formats))
if not formats:
return True, id, mi.title
return True, id_, mi.title
components = get_components(opts.template, mi, id, opts.timefmt, length,
components = get_components(opts.template, mi, id_, opts.timefmt, length,
ascii_filename if opts.asciiize else sanitize_file_name,
to_lowercase=opts.to_lowercase,
replace_whitespace=opts.replace_whitespace)
base_path = os.path.join(root, *components)
base_name = os.path.basename(base_path)
dirpath = os.path.dirname(base_path)
# Don't test for existence first are the test could fail but
# Don't test for existence first as the test could fail but
# another worker process could create the directory before
# the call to makedirs
try:
@ -236,29 +254,23 @@ def save_book_to_disk(id, db, root, opts, length):
if not os.path.exists(dirpath):
raise
cdata = db.cover(id, index_is_id=True)
if opts.save_cover:
if cdata is not None:
with open(base_path+'.jpg', 'wb') as f:
f.write(cdata)
mi.cover = base_name+'.jpg'
else:
mi.cover = None
if opts.save_cover and cover and os.access(cover, os.R_OK):
with open(base_path+'.jpg', 'wb') as f:
with open(cover, 'rb') as s:
shutil.copyfileobj(s, f)
mi.cover = base_name+'.jpg'
else:
mi.cover = None
if opts.write_opf:
opf = metadata_to_opf(mi)
with open(base_path+'.opf', 'wb') as f:
f.write(opf)
if cdata is not None:
mi.cover_data = ('jpg', cdata)
mi.cover = None
written = False
for fmt in formats:
global plugboard_save_to_disk_value, plugboard_any_format_value
dev_name = plugboard_save_to_disk_value
plugboards = db.prefs.get('plugboards', {})
cpb = None
if fmt in plugboards:
cpb = plugboards[fmt]
@ -275,11 +287,12 @@ def save_book_to_disk(id, db, root, opts, length):
# Leave this here for a while, in case problems arise.
if cpb is not None:
prints('Save-to-disk using plugboard:', fmt, cpb)
data = db.format(id, fmt, index_is_id=True)
if data is None:
fp = format_map.get(fmt, None)
if fp is None:
continue
else:
written = True
with open(fp, 'rb') as f:
data = f.read()
written = True
if opts.update_metadata:
stream = cStringIO.StringIO()
stream.write(data)
@ -300,9 +313,21 @@ def save_book_to_disk(id, db, root, opts, length):
with open(fmt_path, 'wb') as f:
f.write(data)
return not written, id, mi.title
return not written, id_, mi.title
def _sanitize_args(root, opts):
if opts is None:
opts = config().parse()
if isinstance(root, unicode):
root = root.encode(filesystem_encoding)
root = os.path.abspath(root)
opts.template = preprocess_template(opts.template)
length = 1000 if supports_long_names(root) else 250
length -= len(root)
if length < 5:
raise ValueError('%r is too long.'%root)
return root, opts, length
def save_to_disk(db, ids, root, opts=None, callback=None):
'''
@ -316,17 +341,7 @@ def save_to_disk(db, ids, root, opts=None, callback=None):
:return: A list of failures. Each element of the list is a tuple
(id, title, traceback)
'''
if opts is None:
opts = config().parse()
if isinstance(root, unicode):
root = root.encode(filesystem_encoding)
root = os.path.abspath(root)
opts.template = preprocess_template(opts.template)
length = 1000 if supports_long_names(root) else 250
length -= len(root)
if length < 5:
raise ValueError('%r is too long.'%root)
root, opts, length = _sanitize_args(root, opts)
failures = []
for x in ids:
tb = ''
@ -343,4 +358,28 @@ def save_to_disk(db, ids, root, opts=None, callback=None):
break
return failures
def save_serialized_to_disk(ids, data, plugboards, root, opts, callback):
from calibre.ebooks.metadata.opf2 import OPF
root, opts, length = _sanitize_args(root, opts)
failures = []
for x in ids:
opf, cover, format_map = data[x]
if isinstance(opf, unicode):
opf = opf.encode('utf-8')
mi = OPF(cStringIO.StringIO(opf)).to_book_metadata()
tb = ''
try:
failed, id, title = do_save_book_to_disk(x, mi, cover, plugboards,
format_map, root, opts, length)
tb = _('Requested formats not available')
except:
failed, id, title = True, x, mi.title
tb = traceback.format_exc()
if failed:
failures.append((id, title, tb))
if callable(callback):
if not callback(int(id), title, failed, tb):
break
return failures