Saving to disk refactored into separate process

This commit is contained in:
Kovid Goyal 2009-05-16 15:45:53 -07:00
parent 17f4e28d82
commit aa6067f95f
13 changed files with 324 additions and 123 deletions

View File

@ -30,6 +30,9 @@ every time you add an HTML file to the library.\
OptionRecommendation.HIGH)])
of = self.temporary_file('_plugin_html2zip.zip')
opf = glob.glob(os.path.join(tdir, '*.opf'))[0]
ncx = glob.glob(os.path.join(tdir, '*.ncx'))
if ncx:
os.remove(ncx[0])
epub = initialize_container(of.name, os.path.basename(opf))
epub.add_dir(tdir)
epub.close()
@ -291,7 +294,7 @@ class PDFMetadataWriter(MetadataWriterPlugin):
name = 'Set PDF metadata'
file_types = set(['pdf'])
description = _('Set metadata in %s files') % 'PDF'
author = 'John Schember'
author = 'Kovid Goyal'
def set_metadata(self, stream, mi, type):
from calibre.ebooks.metadata.pdf import set_metadata

View File

@ -87,7 +87,7 @@ class ReadMetadata(Thread):
ids.remove(id)
except Empty:
break
job.update()
job.update(consume_notifications=False)
if not job.is_finished:
running = True
@ -119,3 +119,88 @@ def read_metadata(paths, result_queue, chunk=50):
t = ReadMetadata(tasks, result_queue)
t.start()
return t
class SaveWorker(Thread):
def __init__(self, result_queue, db, ids, path, by_author=False,
single_dir=False, single_format=None):
Thread.__init__(self)
self.daemon = True
self.path, self.by_author = path, by_author
self.single_dir, self.single_format = single_dir, single_format
self.ids = ids
self.library_path = db.library_path
self.canceled = False
self.result_queue = result_queue
self.error = None
self.start()
def run(self):
server = Server()
ids = set(self.ids)
tasks = server.split(list(ids))
jobs = set([])
for i, task in enumerate(tasks):
tids = [x[-1] for x in task]
job = ParallelJob('save_book',
'Save books (%d of %d)'%(i, len(tasks)),
lambda x,y:x,
args=[tids, self.library_path, self.path, self.single_dir,
self.single_format, self.by_author])
jobs.add(job)
server.add_job(job)
while not self.canceled:
time.sleep(0.2)
running = False
for job in jobs:
job.update(consume_notifications=False)
while True:
try:
id, title, ok = job.notifications.get_nowait()[0]
if id in ids:
self.result_queue.put((id, title, ok))
ids.remove(id)
except Empty:
break
if not job.is_finished:
running = True
if not running:
break
server.close()
time.sleep(1)
if self.canceled:
return
for job in jobs:
if job.failed:
prints(job.details)
self.error = job.details
if os.path.exists(job.log_path):
os.remove(job.log_path)
def save_book(task, library_path, path, single_dir, single_format,
by_author, notification=lambda x,y:x):
from calibre.library.database2 import LibraryDatabase2
db = LibraryDatabase2(library_path)
def callback(id, title):
notification((id, title, True))
return True
if single_format is None:
failures = []
db.export_to_dir(path, task, index_is_id=True, byauthor=by_author,
callback=callback, single_dir=single_dir)
else:
failures = db.export_single_format_to_dir(path, task, single_format,
index_is_id=True, callback=callback)
for id, title in failures:
notification((id, title, False))

View File

@ -150,7 +150,12 @@ class EbookIterator(object):
if self.opf.path_to_html_toc is not None and \
self.opf.path_to_html_toc not in self.spine:
self.spine.append(SpineItem(self.opf.path_to_html_toc))
try:
self.spine.append(SpineItem(self.opf.path_to_html_toc))
except:
import traceback
traceback.print_exc()
sizes = [i.character_count for i in self.spine]
self.pages = [math.ceil(i/float(self.CHARACTERS_PER_PAGE)) for i in sizes]

View File

@ -65,7 +65,7 @@ class RTFInput(InputFormatPlugin):
accelerators):
from calibre.ebooks.rtf.xsl import xhtml
from calibre.ebooks.metadata.meta import get_metadata
from calibre.ebooks.metadata.opf import OPFCreator
from calibre.ebooks.metadata.opf2 import OPFCreator
from calibre.ebooks.rtf2xml.ParseRtf import RtfInvalidCodeException
self.log = log
self.log('Converting RTF to XML...')

View File

@ -41,7 +41,7 @@ class Adder(QObject):
def __init__(self, parent, db, callback):
QObject.__init__(self, parent)
self.pd = ProgressDialog(_('Add books'), parent=parent)
self.pd = ProgressDialog(_('Adding...'), parent=parent)
self.db = db
self.pd.setModal(True)
self.pd.show()
@ -55,7 +55,7 @@ class Adder(QObject):
def add_recursive(self, root, single=True):
self.path = root
self.pd.set_msg(_('Searching for books in all sub-directories...'))
self.pd.set_msg(_('Searching in all sub-directories...'))
self.pd.set_min(0)
self.pd.set_max(0)
self.pd.value = 0
@ -162,3 +162,64 @@ class Adder(QObject):
self.add_formats(id, formats)
self.number_of_books_added += 1
class Saver(QObject):
def __init__(self, parent, db, callback, rows, path,
by_author=False, single_dir=False, single_format=None):
QObject.__init__(self, parent)
self.pd = ProgressDialog(_('Saving...'), parent=parent)
self.db = db
self.pd.setModal(True)
self.pd.show()
self.pd.set_min(0)
self._parent = parent
self.callback = callback
self.callback_called = False
self.rq = Queue()
self.ids = [x for x in map(db.id, [r.row() for r in rows]) if x is not None]
self.pd.set_max(len(self.ids))
self.pd.value = 0
self.failures = set([])
from calibre.ebooks.metadata.worker import SaveWorker
self.worker = SaveWorker(self.rq, db, self.ids, path, by_author,
single_dir, single_format)
self.connect(self.pd, SIGNAL('canceled()'), self.canceled)
self.timer = QTimer(self)
self.connect(self.timer, SIGNAL('timeout()'), self.update)
self.timer.start(200)
def canceled(self):
if self.timer is not None:
self.timer.stop()
if self.worker is not None:
self.worker.canceled = True
self.pd.hide()
if not self.callback_called:
self.callback(self.worker.path, self.failures, self.worker.error)
self.callback_called = True
def update(self):
if not self.ids or not self.worker.is_alive():
self.timer.stop()
self.pd.hide()
if not self.callback_called:
self.callback(self.worker.path, self.failures, self.worker.error)
self.callback_called = True
return
try:
id, title, ok = self.rq.get_nowait()
except Empty:
return
self.pd.value += 1
self.ids.remove(id)
if not isinstance(title, unicode):
title = str(title).decode('utf-8', preferred_encoding)
self.pd.set_msg(_('Saved')+' '+title)
if not ok:
self.failures.add(title)

View File

@ -204,19 +204,9 @@ class BooksModel(QAbstractTableModel):
''' Return list indices of all cells in index.row()'''
return [ self.index(index.row(), c) for c in range(self.columnCount(None))]
def save_to_disk(self, rows, path, single_dir=False, single_format=None,
callback=None):
rows = [row.row() for row in rows]
if single_format is None:
return self.db.export_to_dir(path, rows,
self.sorted_on[0] == 'authors',
single_dir=single_dir,
callback=callback)
else:
return self.db.export_single_format_to_dir(path, rows,
single_format,
callback=callback)
@property
def by_author(self):
return self.sorted_on[0] == 'authors'
def delete_books(self, indices):
ids = map(self.id, indices)

View File

@ -14,7 +14,7 @@ from PyQt4.Qt import Qt, SIGNAL, QObject, QCoreApplication, QUrl, QTimer, \
QProgressDialog, QMessageBox, QStackedLayout
from PyQt4.QtSvg import QSvgRenderer
from calibre import __version__, __appname__, islinux, sanitize_file_name, \
from calibre import __version__, __appname__, sanitize_file_name, \
iswindows, isosx, prints
from calibre.ptempfile import PersistentTemporaryFile
from calibre.utils.config import prefs, dynamic
@ -29,7 +29,6 @@ from calibre.gui2.cover_flow import CoverFlow, DatabaseImages, pictureflowerror
from calibre.gui2.widgets import ProgressIndicator
from calibre.gui2.dialogs.scheduler import Scheduler
from calibre.gui2.update import CheckForUpdates
from calibre.gui2.dialogs.progress import ProgressDialog
from calibre.gui2.main_window import MainWindow, option_parser as _option_parser
from calibre.gui2.main_ui import Ui_MainWindow
from calibre.gui2.device import DeviceManager, DeviceMenu, DeviceGUI, Emailer
@ -960,54 +959,44 @@ class Main(MainWindow, Ui_MainWindow, DeviceGUI):
self.save_to_disk(checked, True)
def save_to_disk(self, checked, single_dir=False, single_format=None):
rows = self.current_view().selectionModel().selectedRows()
if not rows or len(rows) == 0:
return error_dialog(self, _('Cannot save to disk'),
_('No books selected'), show=True)
progress = ProgressDialog(_('Saving to disk...'), min=0, max=len(rows),
parent=self)
def callback(count, msg):
progress.set_value(count)
progress.set_msg(_('Saved')+' '+msg)
QApplication.processEvents()
QApplication.sendPostedEvents()
QApplication.flush()
return not progress.canceled
dir = choose_dir(self, 'save to disk dialog',
path = choose_dir(self, 'save to disk dialog',
_('Choose destination directory'))
if not dir:
if not path:
return
progress.show()
QApplication.processEvents()
QApplication.sendPostedEvents()
QApplication.flush()
try:
if self.current_view() == self.library_view:
failures = self.current_view().model().save_to_disk(rows, dir,
single_dir=single_dir,
callback=callback,
single_format=single_format)
if failures and single_format is not None:
msg = _('Could not save the following books to disk, '
'because the %s format is not available for them')\
%single_format.upper()
det_msg = ''
for f in failures:
det_msg += '%s\n'%f[1]
warning_dialog(self, _('Could not save some ebooks'),
msg, det_msg).exec_()
QDesktopServices.openUrl(QUrl('file:'+dir))
else:
paths = self.current_view().model().paths(rows)
self.device_manager.save_books(
Dispatcher(self.books_saved), paths, dir)
finally:
progress.hide()
if self.current_view() is self.library_view:
from calibre.gui2.add import Saver
self._saver = Saver(self, self.library_view.model().db,
Dispatcher(self._books_saved), rows, path,
by_author=self.library_view.model().by_author,
single_dir=single_dir,
single_format=single_format)
else:
paths = self.current_view().model().paths(rows)
self.device_manager.save_books(
Dispatcher(self.books_saved), paths, path)
def _books_saved(self, path, failures, error):
single_format = self._saver.worker.single_format
self._saver = None
if error:
return error_dialog(self, _('Error while saving'),
_('There was an error while saving.'),
error, show=True)
if failures and single_format:
single_format = single_format.upper()
warning_dialog(self, _('Could not save some books'),
_('Could not save some books') + ', ' +
(_('as the %s format is not available for them.')%single_format) +
_('Click the show details button to see which ones.'),
'\n'.join(failures), show=True)
QDesktopServices.openUrl(QUrl.fromLocalFile(path))
def books_saved(self, job):
if job.failed:
@ -1681,54 +1670,111 @@ path_to_ebook to the database.
help=_('Log debugging information to console'))
return parser
def init_qt(args):
parser = option_parser()
opts, args = parser.parse_args(args)
if opts.with_library is not None and os.path.isdir(opts.with_library):
prefs.set('library_path', opts.with_library)
print 'Using library at', prefs['library_path']
app = Application(args)
actions = tuple(Main.create_application_menubar())
app.setWindowIcon(QIcon(':/library'))
QCoreApplication.setOrganizationName(ORG_NAME)
QCoreApplication.setApplicationName(APP_UID)
return app, opts, args, actions
def run_gui(opts, args, actions, listener, app):
initialize_file_icon_provider()
main = Main(listener, opts, actions)
sys.excepthook = main.unhandled_exception
if len(args) > 1:
args[1] = os.path.abspath(args[1])
main.add_filesystem_book(args[1])
ret = app.exec_()
if getattr(main, 'restart_after_quit', False):
e = sys.executable if getattr(sys, 'froze', False) else sys.argv[0]
print 'Restarting with:', e, sys.argv
os.execvp(e, sys.argv)
else:
if iswindows:
try:
main.system_tray_icon.hide()
except:
pass
return ret
def cant_start(msg=_('If you are sure it is not running')+', ',
what=None):
d = QMessageBox(QMessageBox.Critical, _('Cannot Start ')+__appname__,
'<p>'+(_('%s is already running.')%__appname__)+'</p>',
QMessageBox.Ok)
base = '<p>%s</p><p>%s %s'
where = __appname__ + ' '+_('may be running in the system tray, in the')+' '
if isosx:
where += _('upper right region of the screen.')
else:
where += _('lower right region of the screen.')
if what is None:
if iswindows:
what = _('try rebooting your computer.')
else:
what = _('try deleting the file')+': '+ADDRESS
d.setInformativeText(base%(where, msg, what))
d.exec_()
raise SystemExit(1)
class RC(Thread):
def run(self):
from multiprocessing.connection import Client
self.done = False
self.conn = Client(ADDRESS)
self.done = True
def communicate(args):
t = RC()
t.start()
time.sleep(3)
if not t.done:
f = os.path.expanduser('~/.calibre_calibre GUI.lock')
cant_start(what=_('try deleting the file')+': '+f)
raise SystemExit(1)
if len(args) > 1:
args[1] = os.path.abspath(args[1])
t.conn.send('launched:'+repr(args))
t.conn.close()
raise SystemExit(0)
def main(args=sys.argv):
pid = os.fork() if False and islinux else -1
if pid <= 0:
parser = option_parser()
opts, args = parser.parse_args(args)
if opts.with_library is not None and os.path.isdir(opts.with_library):
prefs.set('library_path', opts.with_library)
print 'Using library at', prefs['library_path']
app = Application(args)
actions = tuple(Main.create_application_menubar())
app.setWindowIcon(QIcon(':/library'))
QCoreApplication.setOrganizationName(ORG_NAME)
QCoreApplication.setApplicationName(APP_UID)
from multiprocessing.connection import Listener, Client
app, opts, args, actions = init_qt(args)
from calibre.utils.lock import singleinstance
from multiprocessing.connection import Listener
si = singleinstance('calibre GUI')
if si:
try:
listener = Listener(address=ADDRESS)
except socket.error, err:
try:
conn = Client(ADDRESS)
if len(args) > 1:
args[1] = os.path.abspath(args[1])
conn.send('launched:'+repr(args))
conn.close()
except:
extra = '' if iswindows else \
_('If you\'re sure it is not running, delete the file %s')\
%ADDRESS
QMessageBox.critical(None, _('Cannot Start ')+__appname__,
_('<p>%s is already running. %s</p>')%(__appname__, extra))
return 1
initialize_file_icon_provider()
main = Main(listener, opts, actions)
sys.excepthook = main.unhandled_exception
if len(args) > 1:
main.add_filesystem_book(args[1])
ret = app.exec_()
if getattr(main, 'restart_after_quit', False):
e = sys.executable if getattr(sys, 'froze', False) else sys.argv[0]
print 'Restarting with:', e, sys.argv
os.execvp(e, sys.argv)
else:
except socket.error:
if iswindows:
try:
main.system_tray_icon.hide()
except:
pass
return ret
cant_start()
os.remove(ADDRESS)
try:
listener = Listener(address=ADDRESS)
except socket.error:
cant_start()
else:
return run_gui(opts, args, actions, listener, app)
else:
return run_gui(opts, args, actions, listener, app)
try:
listener = Listener(address=ADDRESS)
except socket.error: # Good si is correct
communicate(args)
else:
return run_gui(opts, args, actions, listener, app)
return 0

View File

@ -1491,7 +1491,7 @@ books_series_link feeds
f.close()
count += 1
if callable(callback):
if not callback(count, mi.title):
if not callback(int(id), mi.title):
return
def export_single_format_to_dir(self, dir, indices, format,
@ -1527,7 +1527,7 @@ books_series_link feeds
pass
f.close()
if callable(callback):
if not callback(count, title):
if not callback(int(id), title):
break
return failures

View File

@ -39,7 +39,7 @@ class BaseJob(object):
self._status_text = _('Waiting...')
self._done_called = False
def update(self):
def update(self, consume_notifications=True):
if self.duration is not None:
self._run_state = self.FINISHED
self.percent = 100
@ -62,7 +62,7 @@ class BaseJob(object):
self._run_state = self.RUNNING
self._status_text = _('Working...')
while True:
while consume_notifications:
try:
self.percent, self._message = self.notifications.get_nowait()
self.percent *= 100.

View File

@ -35,7 +35,7 @@ class ConnectedWorker(Thread):
def start_job(self, job):
notification = PARALLEL_FUNCS[job.name][-1] is not None
self.conn.send((job.name, job.args, job.kwargs))
self.conn.send((job.name, job.args, job.kwargs, job.description))
if notification:
self.start()
else:
@ -204,7 +204,7 @@ class Server(Thread):
'''
Split a list into a list of sub lists, with the number of sub lists being
no more than the number of workers this server supports. Each sublist contains
two tuples of the form (i, x) where x is an element from the original list
2-tuples of the form (i, x) where x is an element from the original list
and i is the index of the element x in the original list.
'''
ans, count, pos = [], 0, 0

View File

@ -12,6 +12,7 @@ from threading import Thread
from Queue import Queue
from contextlib import closing
from binascii import unhexlify
from calibre import prints
PARALLEL_FUNCS = {
'lrfviewer' :
@ -28,6 +29,9 @@ PARALLEL_FUNCS = {
'read_metadata' :
('calibre.ebooks.metadata.worker', 'read_metadata_', 'notification'),
'save_book' :
('calibre.ebooks.metadata.worker', 'save_book', 'notification'),
}
class Progress(Thread):
@ -64,9 +68,10 @@ def main():
key = unhexlify(os.environ['CALIBRE_WORKER_KEY'])
resultf = unhexlify(os.environ['CALIBRE_WORKER_RESULT'])
with closing(Client(address, authkey=key)) as conn:
name, args, kwargs = conn.recv()
#print (name, args, kwargs)
#sys.stdout.flush()
name, args, kwargs, desc = conn.recv()
if desc:
prints(desc)
sys.stdout.flush()
func, notification = get_func(name)
notifier = Progress(conn)
if notification:

View File

@ -6,8 +6,6 @@
#include <podofo.h>
using namespace PoDoFo;
#include <string.h>
class podofo_pdfmem_wrapper : public PdfMemDocument {
public:
inline void set_info(PdfInfo *i) { this->SetInfo(i); }
@ -42,6 +40,12 @@ podofo_PDFDoc_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
return (PyObject *)self;
}
static void podofo_set_exception(const PdfError &err) {
const char *msg = PdfError::ErrorMessage(err.GetError());
if (msg == NULL) msg = err.what();
PyErr_SetString(PyExc_ValueError, msg);
}
static PyObject *
podofo_PDFDoc_load(podofo_PDFDoc *self, PyObject *args, PyObject *kwargs) {
char *buffer; Py_ssize_t size;
@ -50,10 +54,10 @@ podofo_PDFDoc_load(podofo_PDFDoc *self, PyObject *args, PyObject *kwargs) {
try {
self->doc->Load(buffer, size);
} catch(const PdfError & err) {
PyErr_SetString(PyExc_ValueError, PdfError::ErrorMessage(err.GetError()));
podofo_set_exception(err);
return NULL;
}
} else return NULL;
}
} else return NULL;
Py_INCREF(Py_None);
@ -68,7 +72,7 @@ podofo_PDFDoc_save(podofo_PDFDoc *self, PyObject *args, PyObject *kwargs) {
try {
self->doc->Write(buffer);
} catch(const PdfError & err) {
PyErr_SetString(PyExc_ValueError, PdfError::ErrorMessage(err.GetError()));
podofo_set_exception(err);
return NULL;
}
} else return NULL;

6
todo
View File

@ -3,6 +3,8 @@
* Rationalize books table. Add a pubdate column, remove the uri column (and associated support in add_books) and convert series_index to a float.
* Refactor save to disk into separate process
* Testing framework
* Welcome wizard
* MOBI navigation indexing support