IGN:Created lxml based OPF framework for epub-meta and cleaned up rtf2html code

This commit is contained in:
Kovid Goyal 2008-09-07 22:07:41 -07:00
parent 0bfde7ee2c
commit 66a6d60901
8 changed files with 287 additions and 95 deletions

View File

@ -2,7 +2,7 @@
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
from lxml import etree # Needed on OSX to ensure the correct libxml2 is loaded
import sys, os, re, logging, time, subprocess, mechanize, atexit
from htmlentitydefs import name2codepoint
from math import floor

View File

@ -2,7 +2,7 @@ __license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
__docformat__ = 'restructuredtext en'
__appname__ = 'calibre'
__version__ = '0.4.84b9'
__version__ = '0.4.84b10'
__author__ = "Kovid Goyal <kovid@kovidgoyal.net>"
'''
Various run time constants.

View File

@ -29,7 +29,13 @@ def detect(aBuf):
return u.result
# Added by Kovid
def xml_to_unicode(raw, verbose=False):
ENCODING_PATS = [
re.compile(r'<[^<>]+encoding=[\'"](.*?)[\'"][^<>]*>', re.IGNORECASE),
re.compile(r'<meta.*?content=[\'"].*?charset=([^\s\'"]+).*?[\'"].*?>', re.IGNORECASE)
]
ENTITY_PATTERN = re.compile(r'&(\S+?);')
def xml_to_unicode(raw, verbose=False, strip_encoding_pats=False, resolve_entities=False):
'''
Force conversion of byte string to unicode. Tries to look for XML/HTML
encoding declaration first, if not found uses the chardet library and
@ -41,11 +47,14 @@ def xml_to_unicode(raw, verbose=False):
return u'', encoding
if isinstance(raw, unicode):
return raw, encoding
match = re.compile(r'<[^<>]+encoding=[\'"](.*?)[\'"][^<>]*>', re.IGNORECASE).search(raw)
if match is None:
match = re.compile(r'<meta.*?content=[\'"].*?charset=([^\s\'"]+).*?[\'"]', re.IGNORECASE).search(raw)
if match is not None:
encoding = match.group(1)
for pat in ENCODING_PATS:
match = pat.search(raw)
if match:
encoding = match.group(1)
break
if strip_encoding_pats:
for pat in ENCODING_PATS:
raw = pat.sub('', raw)
if encoding is None:
try:
chardet = detect(raw)
@ -65,4 +74,12 @@ def xml_to_unicode(raw, verbose=False):
encoding = CHARSET_ALIASES[encoding]
if encoding == 'ascii':
encoding = 'utf-8'
return raw.decode(encoding, 'ignore'), encoding
raw = raw.decode(encoding, 'replace')
if resolve_entities:
from calibre import entity_to_unicode
from functools import partial
f = partial(entity_to_unicode, exceptions=['amp', 'apos', 'quot', 'lt', 'gt'])
raw = ENTITY_PATTERN.sub(f, raw)
return raw, encoding

View File

@ -13,7 +13,7 @@ from lxml.etree import XPath
get_text = XPath("//text()")
from calibre import LoggingInterface, unicode_path
from calibre.ebooks.chardet import xml_to_unicode
from calibre.ebooks.chardet import xml_to_unicode, ENCODING_PATS
from calibre.utils.config import Config, StringConfig
from calibre.ebooks.metadata.opf import OPFReader, OPFCreator
from calibre.ebooks.metadata import MetaInformation
@ -287,10 +287,6 @@ class PreProcessor(object):
class Parser(PreProcessor, LoggingInterface):
ENCODING_PATS = [re.compile(r'<[^<>]+encoding=[\'"](.*?)[\'"][^<>]*>', re.IGNORECASE),
re.compile(r'<meta.*?content=[\'"].*?charset=([^\s\'"]+).*?[\'"].*?>', re.IGNORECASE)]
def __init__(self, htmlfile, opts, tdir, resource_map, htmlfiles, name='htmlparser'):
LoggingInterface.__init__(self, logging.getLogger(name))
self.setup_cli_handler(opts.verbose)
@ -332,7 +328,7 @@ class Parser(PreProcessor, LoggingInterface):
src = open(self.htmlfile.path, 'rb').read().decode(self.htmlfile.encoding, 'replace')
src = self.preprocess(src)
# lxml chokes on unicode input when it contains encoding declarations
for pat in self.ENCODING_PATS:
for pat in ENCODING_PATS:
src = pat.sub('', src)
try:
self.root = html.document_fromstring(src)

View File

@ -1,11 +1,12 @@
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
import os, sys, tempfile, subprocess, shutil, logging, glob
import os, sys, tempfile, shutil, logging, glob
from lxml import etree
from calibre.ebooks.lrf import option_parser as lrf_option_parser
from calibre.ebooks.metadata.meta import get_metadata
from calibre.ebooks.lrf.html.convert_from import process_file as html_process_file
from calibre.ebooks import ConversionError
from calibre import isosx, setup_cli_handlers, __appname__
from calibre.libwand import convert, WandException
from calibre.ebooks.BeautifulSoup import BeautifulStoneSoup
@ -37,32 +38,6 @@ def convert_images(html, logger):
continue
return html
def generate_html(rtfpath, logger):
tdir = tempfile.mkdtemp(prefix=__appname__+'_')
cwd = os.path.abspath(os.getcwd())
os.chdir(tdir)
try:
logger.info('Converting to HTML...')
sys.stdout.flush()
handle, path = tempfile.mkstemp(dir=tdir, suffix='.html')
file = os.fdopen(handle, 'wb')
cmd = ' '.join([UNRTF, '"'+rtfpath+'"'])
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
raw = p.stdout.read()
ret = p.wait()
if ret != 0:
if len(raw) > 1000: #unrtf crashes occassionally on OSX and windows but still convert correctly
raw += '</body>\n</html>'
else:
logger.critical(p.stderr.read())
raise ConversionError, 'unrtf failed with error code: %d'%(ret,)
file.write(convert_images(raw, logger))
file.close()
return path
finally:
os.chdir(cwd)
def process_file(path, options, logger=None):
if logger is None:
level = logging.DEBUG if options.verbose else logging.INFO
@ -72,7 +47,7 @@ def process_file(path, options, logger=None):
f = open(rtf, 'rb')
mi = get_metadata(f, 'rtf')
f.close()
html = generate_html2(rtf, logger)
html = generate_html(rtf, logger)
tdir = os.path.dirname(html)
cwd = os.getcwdu()
try:
@ -162,8 +137,7 @@ def generate_xml(rtfpath):
return ofile
def generate_html2(rtfpath, logger):
from lxml import etree
def generate_html(rtfpath, logger):
logger.info('Converting RTF to XML...')
xml = generate_xml(rtfpath)
tdir = os.path.dirname(xml)

View File

@ -171,14 +171,13 @@ class MetaInformation(object):
if hasattr(mi, attr):
setattr(ans, attr, getattr(mi, attr))
def __init__(self, title, authors=[_('Unknown')]):
'''
@param title: title or "Unknown" or a MetaInformation object
@param authors: List of strings or []
'''
mi = None
if isinstance(title, MetaInformation):
if hasattr(title, 'title') and hasattr(title, 'authors'):
mi = title
title = mi.title
authors = mi.authors
@ -186,26 +185,15 @@ class MetaInformation(object):
self.author = authors # Needed for backward compatibility
#: List of strings or []
self.authors = authors
#: Sort text for author
self.author_sort = None if not mi else mi.author_sort
self.title_sort = None if not mi else mi.title_sort
self.comments = None if not mi else mi.comments
self.category = None if not mi else mi.category
self.publisher = None if not mi else mi.publisher
self.series = None if not mi else mi.series
self.series_index = None if not mi else mi.series_index
self.rating = None if not mi else mi.rating
self.isbn = None if not mi else mi.isbn
self.tags = [] if not mi else mi.tags
self.language = None if not mi else mi.language # Typically a string describing the language
self.tags = getattr(mi, 'tags', [])
#: mi.cover_data = (ext, data)
self.cover_data = mi.cover_data if (mi and hasattr(mi, 'cover_data')) else (None, None)
self.application_id = mi.application_id if (mi and hasattr(mi, 'application_id')) else None
self.manifest = getattr(mi, 'manifest', None)
self.toc = getattr(mi, 'toc', None)
self.spine = getattr(mi, 'spine', None)
self.guide = getattr(mi, 'guide', None)
self.cover = getattr(mi, 'cover', None)
self.cover_data = getattr(mi, 'cover_data', (None, None))
for x in ('author_sort', 'title_sort', 'comments', 'category', 'publisher',
'series', 'series_index', 'rating', 'isbn', 'language',
'application_id', 'manifest', 'toc', 'spine', 'guide', 'cover'
):
setattr(self, x, getattr(mi, x, None))
def smart_update(self, mi):
'''

View File

@ -6,14 +6,14 @@ __copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
'''Read meta information from epub files'''
import sys, os
from calibre.utils.zipfile import ZipFile, BadZipfile, safe_replace
from cStringIO import StringIO
from contextlib import closing
from calibre.utils.zipfile import ZipFile, BadZipfile, safe_replace
from calibre.ebooks.BeautifulSoup import BeautifulStoneSoup
from calibre.ebooks.metadata.opf import OPF, OPFReader, OPFCreator
from calibre.ebooks.metadata import get_parser, MetaInformation
from calibre.ebooks.metadata.opf2 import OPF
class EPubException(Exception):
pass
@ -49,14 +49,15 @@ class OCF(object):
def __init__(self):
raise NotImplementedError('Abstract base class')
class OCFReader(OCF):
def __init__(self):
try:
mimetype = self.open('mimetype').read().rstrip()
if mimetype != OCF.MIMETYPE:
raise EPubException
except (KeyError, EPubException):
raise EPubException("not an .epub OCF container")
print 'WARNING: Invalid mimetype declaration', mimetype
except:
print 'WARNING: Epub doesn\'t contain a mimetype declaration'
try:
with closing(self.open(OCF.CONTAINER_PATH)) as f:
@ -66,37 +67,26 @@ class OCFReader(OCF):
try:
with closing(self.open(self.container[OPF.MIMETYPE])) as f:
self.opf = OPFReader(f, self.root)
self.opf = OPF(f, self.root)
except KeyError:
raise EPubException("missing OPF package file")
class OCFZipReader(OCFReader):
def __init__(self, stream, mode='r'):
def __init__(self, stream, mode='r', root=None):
try:
self.archive = ZipFile(stream, mode=mode)
except BadZipfile:
raise EPubException("not a ZIP .epub OCF container")
self.root = getattr(stream, 'name', os.getcwd())
self.root = root
if self.root is None:
self.root = os.getcwdu()
if hasattr(stream, 'name'):
self.root = os.path.abspath(os.path.dirname(stream.name))
super(OCFZipReader, self).__init__()
def open(self, name, mode='r'):
return StringIO(self.archive.read(name))
class OCFZipWriter(object):
def __init__(self, stream):
reader = OCFZipReader(stream)
self.opf = reader.container[OPF.MIMETYPE]
self.stream = stream
self.root = getattr(stream, 'name', os.getcwd())
def set_metadata(self, mi):
stream = StringIO()
opf = OPFCreator(self.root, mi)
opf.render(stream)
stream.seek(0)
safe_replace(self.stream, self.opf, stream)
class OCFDirReader(OCFReader):
def __init__(self, path):
self.root = path
@ -111,12 +101,17 @@ def get_metadata(stream):
return OCFZipReader(stream).opf
def set_metadata(stream, mi):
OCFZipWriter(stream).set_metadata(mi)
reader = OCFZipReader(stream, root=os.getcwdu())
reader.opf.smart_update(mi)
newopf = StringIO(reader.opf.render())
safe_replace(stream, reader.container[OPF.MIMETYPE], newopf)
print newopf.getvalue()
def option_parser():
parser = get_parser('epub')
parser.remove_option('--category')
parser.add_option('--tags', default=None, help=_('A comma separated list of tags to set'))
parser.add_option('--tags', default=None,
help=_('A comma separated list of tags to set'))
return parser
def main(args=sys.argv):
@ -126,7 +121,7 @@ def main(args=sys.argv):
parser.print_help()
return 1
stream = open(args[1], 'r+b')
mi = MetaInformation(OCFZipReader(stream).opf)
mi = MetaInformation(OCFZipReader(stream, root=os.getcwdu()).opf)
if opts.title:
mi.title = opts.title
if opts.authors:
@ -136,8 +131,10 @@ def main(args=sys.argv):
if opts.comment:
mi.comments = opts.comment
stream.seek(0)
set_metadata(stream, mi)
print unicode(mi)
stream.close()
return 0
if __name__ == '__main__':

View File

@ -0,0 +1,220 @@
#!/usr/bin/env python
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
__docformat__ = 'restructuredtext en'
'''
lxml based OPF parser.
'''
import sys, unittest, functools, os
from lxml import etree
from calibre.ebooks.chardet import xml_to_unicode
class MetadataField(object):
def __init__(self, name, is_dc=True):
self.name = name
self.is_dc = is_dc
def __get__(self, obj, type=None):
ans = obj.get_metadata_element(self.name)
if ans is None:
return u''
return obj.get_text(ans)
def __set__(self, obj, val):
elem = obj.get_metadata_element(self.name)
if elem is None:
elem = obj.create_metadata_element(self.name, ns='dc' if self.is_dc else 'opf')
elem.text = unicode(val)
class OPF(object):
MIMETYPE = 'application/oebps-package+xml'
PARSER = etree.XMLParser(recover=True)
NAMESPACES = {
None : "http://www.idpf.org/2007/opf",
'dc' : "http://purl.org/dc/elements/1.1/",
'opf' : "http://www.idpf.org/2007/opf",
}
xpn = NAMESPACES.copy()
xpn.pop(None)
xpn['re'] = 'http://exslt.org/regular-expressions'
XPath = functools.partial(etree.XPath, namespaces=xpn)
TEXT = XPath('string()')
metadata_path = XPath('/opf:package/opf:metadata')
metadata_elem_path = XPath('/opf:package/opf:metadata/*[re:match(name(), $name, "i")]')
authors_path = XPath('/opf:package/opf:metadata/*' + \
'[re:match(name(), "creator", "i") and (@role="aut" or @opf:role="aut")]')
tags_path = XPath('/opf:package/opf:metadata/*[re:match(name(), "subject", "i")]')
isbn_path = XPath('/opf:package/opf:metadata/*[re:match(name(), "identifier", "i") and '+
'(re:match(@scheme, "isbn", "i") or re:match(@opf:scheme, "isbn", "i"))]')
title = MetadataField('title')
publisher = MetadataField('publisher')
language = MetadataField('language')
comments = MetadataField('description')
category = MetadataField('category')
series = MetadataField('series', is_dc=False)
series_index = MetadataField('series_index', is_dc=False)
rating = MetadataField('rating', is_dc=False)
def __init__(self, stream, basedir):
self.basedir = basedir
raw, self.encoding = xml_to_unicode(stream.read(), strip_encoding_pats=True, resolve_entities=True)
self.tree = etree.fromstring(raw, self.PARSER)
self.metadata = self.metadata_path(self.tree)
if not self.metadata:
raise ValueError('Malformed OPF file: No <metadata> element')
self.metadata = self.metadata[0]
def get_text(self, elem):
return u''.join(self.TEXT(elem))
@apply
def authors():
def fget(self):
ans = []
for elem in self.authors_path(self.tree):
ans.extend([x.strip() for x in self.get_text(elem).split(',')])
return ans
def fset(self, val):
remove = list(self.authors_path(self.tree))
for elem in remove:
self.metadata.remove(elem)
for author in val:
elem = self.create_metadata_element('creator', ns='dc',
attrib={'{%s}role'%self.NAMESPACES['opf']:'aut'})
elem.text = author
return property(fget=fget, fset=fset)
@apply
def author_sort():
def fget(self):
matches = self.authors_path(self.tree)
if matches:
ans = matches[0].get('opf:file-as', None)
return ans if ans else matches[0].get('file-as', None)
def fset(self, val):
matches = self.authors_path(self.tree)
if matches:
matches[0].set('file-as', unicode(val))
return property(fget=fget, fset=fset)
@apply
def tags():
def fget(self):
ans = []
for tag in self.tags_path(self.tree):
ans.append(self.get_text(tag))
return ans
def fset(self, val):
for tag in list(self.tags_path(self.tree)):
self.metadata.remove(tag)
for tag in val:
elem = self.create_metadata_element('subject', ns='dc')
elem.text = unicode(tag)
return property(fget=fget, fset=fset)
@apply
def isbn():
def fget(self):
for match in self.isbn_path(self.tree):
return match.text if match.text else None
def fset(self, val):
matches = self.isbn_path(self.tree)
if not matches:
matches = [self.create_metadata_element('dc:identifier',
attrib={'{%s}scheme'%self.NAMESPACES['opf']:'ISBN'})]
matches[0].text = unicode(val)
return property(fget=fget, fset=fset)
def get_metadata_element(self, name):
matches = self.metadata_elem_path(self.tree, name=name)
if matches:
return matches[0]
def create_metadata_element(self, name, attrib=None, ns='opf'):
elem = etree.SubElement(self.metadata, '{%s}%s'%(self.NAMESPACES[ns], name),
attrib=attrib, nsmap=self.NAMESPACES)
elem.tail = '\n'
return elem
def render(self, encoding='utf-8'):
return etree.tostring(self.tree, encoding='utf-8', pretty_print=True)
def smart_update(self, mi):
for attr in ('author_sort', 'title_sort', 'comments', 'category',
'publisher', 'series', 'series_index', 'rating',
'isbn', 'language', 'tags'):
val = getattr(mi, attr, None)
if val or val == []:
setattr(self, attr, val)
class OPFTest(unittest.TestCase):
def setUp(self):
import cStringIO
self.stream = cStringIO.StringIO(
'''\
<?xml version="1.0" encoding="UTF-8"?>
<package version="2.0" xmlns="http://www.idpf.org/2007/opf" >
<metadata xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:opf="http://www.idpf.org/2007/opf">
<dc:title>A Cool &amp; &copy; &#223; Title</dc:title>
<creator opf:role="aut" file-as="Monkey">Monkey Kitchen, Next</creator>
<dc:subject>One</dc:subject><dc:subject>Two</dc:subject>
<dc:identifier scheme="ISBN">123456789</dc:identifier>
</metadata>
</package>
'''
)
self.opf = OPF(self.stream, os.getcwd())
def testReading(self):
opf = self.opf
self.assertEqual(opf.title, u'A Cool & \xa9 \xdf Title')
self.assertEqual(opf.authors, u'Monkey Kitchen,Next'.split(','))
self.assertEqual(opf.author_sort, 'Monkey')
self.assertEqual(opf.tags, ['One', 'Two'])
self.assertEqual(opf.isbn, '123456789')
def testWriting(self):
for test in [('title', 'New & Title'), ('authors', ['One', 'Two']),
('author_sort', "Kitchen"), ('tags', ['Three']),
('isbn', 'a'), ('rating', '3')]:
setattr(self.opf, *test)
self.assertEqual(getattr(self.opf, test[0]), test[1])
self.opf.render()
def suite():
return unittest.TestLoader().loadTestsFromTestCase(OPFTest)
def test():
unittest.TextTestRunner(verbosity=2).run(suite())
def main(args=sys.argv):
return 0
if __name__ == '__main__':
sys.exit(test())