Refactor OPF de-serialization into OEBReader.

This commit is contained in:
Marshall T. Vandegrift 2009-02-07 10:03:00 -05:00
parent 73d76f4eb6
commit 105d431f6c
4 changed files with 742 additions and 564 deletions

View File

@ -6,22 +6,18 @@ from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2008, Marshall T. Vandegrift <llasram@gmail.com>'
import os, sys, re, uuid, copy
from mimetypes import types_map, guess_type
import os, sys, re, uuid
from mimetypes import types_map
from collections import defaultdict
from types import StringTypes
from itertools import izip, count, chain
from itertools import count
from urlparse import urldefrag, urlparse, urlunparse
from urllib import unquote as urlunquote
from lxml import etree, html
import calibre
from calibre import LoggingInterface
from calibre.translations.dynamic import translate
from calibre.startup import get_lang
from calibre.ebooks.chardet import xml_to_unicode
from calibre.ebooks.oeb.entitydefs import ENTITYDEFS
from calibre.ebooks.metadata.epub import CoverRenderer
from calibre.ptempfile import TemporaryDirectory
XML_NS = 'http://www.w3.org/XML/1998/namespace'
XHTML_NS = 'http://www.w3.org/1999/xhtml'
@ -39,14 +35,13 @@ NCX_NS = 'http://www.daisy.org/z3986/2005/ncx/'
SVG_NS = 'http://www.w3.org/2000/svg'
XLINK_NS = 'http://www.w3.org/1999/xlink'
CALIBRE_NS = 'http://calibre.kovidgoyal.net/2009/metadata'
XPNSMAP = {
'h' : XHTML_NS, 'o1' : OPF1_NS, 'o2' : OPF2_NS,
'd09': DC09_NS, 'd10': DC10_NS, 'd11': DC11_NS,
'xsi': XSI_NS, 'dt' : DCTERMS_NS, 'ncx': NCX_NS,
'svg': SVG_NS, 'xl' : XLINK_NS
}
DC_PREFIXES = ('d11', 'd10', 'd09')
XPNSMAP = {'h' : XHTML_NS, 'o1' : OPF1_NS, 'o2' : OPF2_NS,
'd09': DC09_NS, 'd10': DC10_NS, 'd11': DC11_NS,
'xsi': XSI_NS, 'dt' : DCTERMS_NS, 'ncx': NCX_NS,
'svg': SVG_NS, 'xl' : XLINK_NS}
OPF1_NSMAP = {'dc': DC11_NS, 'oebpackage': OPF1_NS}
OPF2_NSMAP = {'opf': OPF2_NS, 'dc': DC11_NS, 'dcterms': DCTERMS_NS,
'xsi': XSI_NS, 'calibre': CALIBRE_NS}
def XML(name):
return '{%s}%s' % (XML_NS, name)
@ -105,7 +100,8 @@ SVG_MIME = types_map['.svg']
BINARY_MIME = 'application/octet-stream'
OEB_STYLES = set([CSS_MIME, OEB_CSS_MIME, 'text/x-oeb-css'])
OEB_DOCS = set([XHTML_MIME, 'text/html', OEB_DOC_MIME, 'text/x-oeb-document'])
OEB_DOCS = set([XHTML_MIME, 'text/html', OEB_DOC_MIME,
'text/x-oeb-document'])
OEB_RASTER_IMAGES = set([GIF_MIME, JPEG_MIME, PNG_MIME])
OEB_IMAGES = set([GIF_MIME, JPEG_MIME, PNG_MIME, SVG_MIME])
@ -167,8 +163,9 @@ def XPath(expr):
def xpath(elem, expr):
return elem.xpath(expr, namespaces=XPNSMAP)
def xml2str(root):
return etree.tostring(root, encoding='utf-8', xml_declaration=True)
def xml2str(root, pretty_print=False):
return etree.tostring(root, encoding='utf-8', xml_declaration=True,
pretty_print=pretty_print)
ASCII_CHARS = set(chr(x) for x in xrange(128))
UNIBYTE_CHARS = set(chr(x) for x in xrange(256))
@ -213,16 +210,38 @@ class Logger(LoggingInterface, object):
return object.__getattribute__(self, 'log_' + name)
class AbstractContainer(object):
def read_xml(self, path):
return etree.fromstring(
self.read(path), base_url=os.path.dirname(path))
class NullContainer(object):
def read(self, path):
raise OEBError('Attempt to read from NullContainer')
class DirContainer(AbstractContainer):
def __init__(self, rootdir):
self.rootdir = unicode(rootdir)
def write(self, path):
raise OEBError('Attempt to write to NullContainer')
def exists(self, path):
return False
def namelist(self):
return []
class DirContainer(object):
def __init__(self, path):
path = unicode(path)
ext = os.path.splitext(path)[1].lower()
if ext == '.opf':
self.opfname = os.path.basename(path)
self.rootdir = os.path.dirname(path)
return
self.rootdir = path
for path in self.namelist():
ext = os.path.splitext(path)[1].lower()
if ext == '.opf':
self.opfname = fname
return
self.opfname = None
def read(self, path):
if path is None:
path = self.opfname
path = os.path.join(self.rootdir, path)
with open(urlunquote(path), 'rb') as f:
return f.read()
@ -239,33 +258,14 @@ class DirContainer(AbstractContainer):
path = os.path.join(self.rootdir, path)
return os.path.isfile(urlunquote(path))
class DirWriter(object):
def __init__(self, version='2.0', page_map=False):
self.version = version
self.page_map = page_map
def dump(self, oeb, path):
version = int(self.version[0])
opfname = None
if os.path.splitext(path)[1].lower() == '.opf':
opfname = os.path.basename(path)
path = os.path.dirname(path)
if not os.path.isdir(path):
os.mkdir(path)
output = DirContainer(path)
for item in oeb.manifest.values():
output.write(item.href, str(item))
if version == 1:
metadata = oeb.to_opf1()
elif version == 2:
metadata = oeb.to_opf2(page_map=self.page_map)
else:
raise OEBError("Unrecognized OPF version %r" % self.version)
for mime, (href, data) in metadata.items():
if opfname and mime == OPF_MIME:
href = opfname
output.write(href, xml2str(data))
return
def namelist(self):
names = []
for root, dirs, files in os.walk(self.rootdir):
for fname in files:
fname = os.path.join(root, fname)
fname = fname.replace('\\', '/')
names.append(fname)
return names
class Metadata(object):
@ -279,9 +279,6 @@ class Metadata(object):
OPF_ATTRS = {'role': OPF('role'), 'file-as': OPF('file-as'),
'scheme': OPF('scheme'), 'event': OPF('event'),
'type': XSI('type'), 'lang': XML('lang'), 'id': 'id'}
OPF1_NSMAP = {'dc': DC11_NS, 'oebpackage': OPF1_NS}
OPF2_NSMAP = {'opf': OPF2_NS, 'dc': DC11_NS, 'dcterms': DCTERMS_NS,
'xsi': XSI_NS, 'calibre': CALIBRE_NS}
class Item(object):
@ -337,18 +334,20 @@ class Metadata(object):
if attr != nsattr:
attrib[nsattr] = attrib.pop(attr)
scheme = Attribute(lambda term : 'scheme' if term == OPF('meta') else OPF('scheme'),
scheme = Attribute(lambda term: 'scheme' if \
term == OPF('meta') else OPF('scheme'),
[DC('identifier'), OPF('meta')])
file_as = Attribute(OPF('file-as'), [DC('creator'), DC('contributor')])
role = Attribute(OPF('role'), [DC('creator'), DC('contributor')])
event = Attribute(OPF('event'), [DC('date')])
id = Attribute('id')
type = Attribute(XSI('type'), [DC('date'), DC('format'), DC('type')])
type = Attribute(XSI('type'), [DC('date'), DC('format'),
DC('type')])
lang = Attribute(XML('lang'), [DC('contributor'), DC('coverage'),
DC('creator'), DC('publisher'),
DC('relation'), DC('rights'),
DC('source'), DC('subject'),
OPF('meta')])
DC('creator'), DC('publisher'),
DC('relation'), DC('rights'),
DC('source'), DC('subject'),
OPF('meta')])
def __getitem__(self, key):
return self.attrib[key]
@ -445,21 +444,19 @@ class Metadata(object):
return nsmap
return property(fget=fget)
@apply
def _opf2_nsmap():
def fget(self):
nsmap = self._nsmap
nsmap.update(self.OPF2_NSMAP)
nsmap.update(OPF2_NSMAP)
return nsmap
return property(fget=fget)
def to_opf1(self, parent=None):
nsmap = self._opf1_nsmap
nsrmap = dict((value, key) for key, value in nsmap.items())
elem = element(parent, 'metadata', nsmap=nsmap)
dcmeta = element(elem, 'dc-metadata', nsmap=self.OPF1_NSMAP)
dcmeta = element(elem, 'dc-metadata', nsmap=OPF1_NSMAP)
xmeta = element(elem, 'x-metadata')
for term in self.items:
for item in self.items[term]:
@ -608,7 +605,7 @@ class Manifest(object):
def __str__(self):
data = self.data
if isinstance(data, etree._Element):
return xml2str(data)
return xml2str(data, pretty_print=self.oeb.pretty_print)
if isinstance(data, unicode):
return data.encode('utf-8')
return str(data)
@ -756,7 +753,7 @@ class Spine(object):
self.items = []
def _linear(self, linear):
if isinstance(linear, StringTypes):
if isinstance(linear, basestring):
linear = linear.lower()
if linear is None or linear in ('yes', 'true'):
linear = True
@ -838,7 +835,7 @@ class Guide(object):
('text', __('Main Text'))]
TYPES = set(t for t, _ in _TYPES_TITLES)
TITLES = dict(_TYPES_TITLES)
ORDER = dict((t, i) for (t, _), i in izip(_TYPES_TITLES, count(0)))
ORDER = dict((t, i) for i, (t, _) in enumerate(_TYPES_TITLES))
def __init__(self, oeb, type, title, href):
self.oeb = oeb
@ -1044,493 +1041,25 @@ class PageList(object):
class OEBBook(object):
COVER_SVG_XP = XPath('h:body//svg:svg[position() = 1]')
COVER_OBJECT_XP = XPath('h:body//h:object[@data][position() = 1]')
def __init__(self, opfpath=None, container=None, encoding=None,
logger=FauxLogger()):
if opfpath and not container:
container = DirContainer(os.path.dirname(opfpath))
opfpath = os.path.basename(opfpath)
self.container = container
def __init__(self, encoding=None, pretty_print=False, logger=FauxLogger()):
self.encoding = encoding
self.pretty_print = pretty_print
self.logger = logger
if opfpath or container:
opf = self._read_opf(opfpath)
self._all_from_opf(opf)
def _clean_opf(self, opf):
nsmap = {}
for elem in opf.iter(tag=etree.Element):
nsmap.update(elem.nsmap)
for elem in opf.iter(tag=etree.Element):
if namespace(elem.tag) in ('', OPF1_NS):
elem.tag = OPF(barename(elem.tag))
nsmap.update(Metadata.OPF2_NSMAP)
attrib = dict(opf.attrib)
nroot = etree.Element(OPF('package'),
nsmap={None: OPF2_NS}, attrib=attrib)
metadata = etree.SubElement(nroot, OPF('metadata'), nsmap=nsmap)
ignored = (OPF('dc-metadata'), OPF('x-metadata'))
for elem in xpath(opf, 'o2:metadata//*'):
if elem.tag in ignored:
continue
if namespace(elem.tag) in DC_NSES:
tag = barename(elem.tag).lower()
elem.tag = '{%s}%s' % (DC11_NS, tag)
metadata.append(elem)
for element in xpath(opf, 'o2:metadata//o2:meta'):
metadata.append(element)
for tag in ('o2:manifest', 'o2:spine', 'o2:tours', 'o2:guide'):
for element in xpath(opf, tag):
nroot.append(element)
return nroot
def _read_opf(self, opfpath):
data = self.container.read(opfpath)
data = self.decode(data)
data = XMLDECL_RE.sub('', data)
try:
opf = etree.fromstring(data)
except etree.XMLSyntaxError:
repl = lambda m: ENTITYDEFS.get(m.group(1), m.group(0))
data = ENTITY_RE.sub(repl, data)
opf = etree.fromstring(data)
self.logger.warn('OPF contains invalid HTML named entities')
ns = namespace(opf.tag)
if ns not in ('', OPF1_NS, OPF2_NS):
raise OEBError('Invalid namespace %r for OPF document' % ns)
opf = self._clean_opf(opf)
return opf
def _metadata_from_opf(self, opf):
uid = opf.get('unique-identifier', None)
self.version = '2.0'
self.container = NullContainer()
self.metadata = Metadata(self)
self.uid = None
self.metadata = metadata = Metadata(self)
for elem in xpath(opf, '/o2:package/o2:metadata//*'):
term = elem.tag
value = elem.text
attrib = dict(elem.attrib)
nsmap = elem.nsmap
if term == OPF('meta'):
term = qname(attrib.pop('name', None), nsmap)
value = attrib.pop('content', None)
if value:
value = COLLAPSE_RE.sub(' ', value.strip())
if term and (value or attrib):
metadata.add(term, value, attrib, nsmap=nsmap)
haveuuid = haveid = False
for ident in metadata.identifier:
if unicode(ident).startswith('urn:uuid:'):
haveuuid = True
if 'id' in ident.attrib:
haveid = True
if not (haveuuid and haveid):
bookid = "urn:uuid:%s" % str(uuid.uuid4())
metadata.add('identifier', bookid, id='calibre-uuid')
if uid is None:
self.logger.warn(u'Unique-identifier not specified')
for item in metadata.identifier:
if not item.id:
continue
if uid is None or item.id == uid:
self.uid = item
break
else:
self.logger.warn(u'Unique-identifier %r not found' % uid)
for ident in metadata.identifier:
if 'id' in ident.attrib:
self.uid = metadata.identifier[0]
break
if not metadata.language:
self.logger.warn(u'Language not specified')
metadata.add('language', get_lang())
if not metadata.creator:
self.logger.warn('Creator not specified')
metadata.add('creator', self.translate(__('Unknown')))
if not metadata.title:
self.logger.warn('Title not specified')
metadata.add('title', self.translate(__('Unknown')))
def _manifest_add_missing(self):
manifest = self.manifest
known = set(manifest.hrefs)
unchecked = set(manifest.values())
while unchecked:
new = set()
for item in unchecked:
if (item.media_type in OEB_DOCS or
item.media_type[-4:] in ('/xml', '+xml')) and \
item.data is not None:
hrefs = [sel(item.data) for sel in LINK_SELECTORS]
for href in chain(*hrefs):
href, _ = urldefrag(href)
if not href:
continue
href = item.abshref(urlnormalize(href))
scheme = urlparse(href).scheme
if not scheme and href not in known:
new.add(href)
elif item.media_type in OEB_STYLES:
for match in CSSURL_RE.finditer(item.data):
href, _ = urldefrag(match.group('url'))
href = item.abshref(urlnormalize(href))
scheme = urlparse(href).scheme
if not scheme and href not in known:
new.add(href)
unchecked.clear()
for href in new:
known.add(href)
if not self.container.exists(href):
self.logger.warn('Referenced file %r not found' % href)
continue
self.logger.warn('Referenced file %r not in manifest' % href)
id, _ = manifest.generate(id='added')
guessed = guess_type(href)[0]
media_type = guessed or BINARY_MIME
added = manifest.add(id, href, media_type)
unchecked.add(added)
def _manifest_from_opf(self, opf):
self.manifest = manifest = Manifest(self)
for elem in xpath(opf, '/o2:package/o2:manifest/o2:item'):
id = elem.get('id')
href = elem.get('href')
media_type = elem.get('media-type', None)
if media_type is None:
media_type = elem.get('mediatype', None)
if media_type is None or media_type == 'text/xml':
guessed = guess_type(href)[0]
media_type = guessed or media_type or BINARY_MIME
fallback = elem.get('fallback')
if href in manifest.hrefs:
self.logger.warn(u'Duplicate manifest entry for %r' % href)
continue
if not self.container.exists(href):
self.logger.warn(u'Manifest item %r not found' % href)
continue
if id in manifest.ids:
self.logger.warn(u'Duplicate manifest id %r' % id)
id, href = manifest.generate(id, href)
manifest.add(id, href, media_type, fallback)
self._manifest_add_missing()
def _spine_add_extra(self):
manifest = self.manifest
spine = self.spine
unchecked = set(spine)
selector = XPath('h:body//h:a/@href')
extras = set()
while unchecked:
new = set()
for item in unchecked:
if item.media_type not in OEB_DOCS:
# TODO: handle fallback chains
continue
for href in selector(item.data):
href, _ = urldefrag(href)
if not href:
continue
href = item.abshref(urlnormalize(href))
if href not in manifest.hrefs:
continue
found = manifest.hrefs[href]
if found.media_type not in OEB_DOCS or \
found in spine or found in extras:
continue
new.add(found)
extras.update(new)
unchecked = new
version = int(self.version[0])
for item in sorted(extras):
if version >= 2:
self.logger.warn(
'Spine-referenced file %r not in spine' % item.href)
spine.add(item, linear=False)
def _spine_from_opf(self, opf):
self.spine = spine = Spine(self)
for elem in xpath(opf, '/o2:package/o2:spine/o2:itemref'):
idref = elem.get('idref')
if idref not in self.manifest:
self.logger.warn(u'Spine item %r not found' % idref)
continue
item = self.manifest[idref]
spine.add(item, elem.get('linear'))
if len(spine) == 0:
raise OEBError("Spine is empty")
self._spine_add_extra()
def _guide_from_opf(self, opf):
self.guide = guide = Guide(self)
for elem in xpath(opf, '/o2:package/o2:guide/o2:reference'):
href = elem.get('href')
path = urldefrag(href)[0]
if path not in self.manifest.hrefs:
self.logger.warn(u'Guide reference %r not found' % href)
continue
guide.add(elem.get('type'), elem.get('title'), href)
def _find_ncx(self, opf):
result = xpath(opf, '/o2:package/o2:spine/@toc')
if result:
id = result[0]
if id not in self.manifest.ids:
return None
item = self.manifest.ids[id]
self.manifest.remove(item)
return item
for item in self.manifest.values():
if item.media_type == NCX_MIME:
self.manifest.remove(item)
return item
return None
def _toc_from_navpoint(self, item, toc, navpoint):
children = xpath(navpoint, 'ncx:navPoint')
for child in children:
title = ''.join(xpath(child, 'ncx:navLabel/ncx:text/text()'))
title = COLLAPSE_RE.sub(' ', title.strip())
href = xpath(child, 'ncx:content/@src')
if not title or not href:
continue
href = item.abshref(urlnormalize(href[0]))
path, _ = urldefrag(href)
if path not in self.manifest.hrefs:
self.logger.warn('TOC reference %r not found' % href)
continue
id = child.get('id')
klass = child.get('class')
node = toc.add(title, href, id=id, klass=klass)
self._toc_from_navpoint(item, node, child)
def _toc_from_ncx(self, item):
if item is None:
return False
ncx = item.data
title = ''.join(xpath(ncx, 'ncx:docTitle/ncx:text/text()'))
title = COLLAPSE_RE.sub(' ', title.strip())
title = title or unicode(self.metadata.title[0])
self.toc = toc = TOC(title)
navmaps = xpath(ncx, 'ncx:navMap')
for navmap in navmaps:
self._toc_from_navpoint(item, toc, navmap)
return True
def _toc_from_tour(self, opf):
result = xpath(opf, 'o2:tours/o2:tour')
if not result:
return False
tour = result[0]
self.toc = toc = TOC(tour.get('title'))
sites = xpath(tour, 'o2:site')
for site in sites:
title = site.get('title')
href = site.get('href')
if not title or not href:
continue
path, _ = urldefrag(urlnormalize(href))
if path not in self.manifest.hrefs:
self.logger.warn('TOC reference %r not found' % href)
continue
id = site.get('id')
toc.add(title, href, id=id)
return True
def _toc_from_html(self, opf):
if 'toc' not in self.guide:
return False
self.toc = toc = TOC()
itempath, frag = urldefrag(self.guide['toc'].href)
item = self.manifest.hrefs[itempath]
html = item.data
if frag:
elems = xpath(html, './/*[@id="%s"]' % frag)
if not elems:
elems = xpath(html, './/*[@name="%s"]' % frag)
elem = elems[0] if elems else html
while elem != html and not xpath(elem, './/h:a[@href]'):
elem = elem.getparent()
html = elem
titles = defaultdict(list)
order = []
for anchor in xpath(html, './/h:a[@href]'):
href = anchor.attrib['href']
href = item.abshref(urlnormalize(href))
path, frag = urldefrag(href)
if path not in self.manifest.hrefs:
continue
title = ' '.join(xpath(anchor, './/text()'))
title = COLLAPSE_RE.sub(' ', title.strip())
if href not in titles:
order.append(href)
titles[href].append(title)
for href in order:
toc.add(' '.join(titles[href]), href)
return True
def _toc_from_spine(self, opf):
self.toc = toc = TOC()
titles = []
headers = []
for item in self.spine:
if not item.linear: continue
html = item.data
title = ''.join(xpath(html, '/h:html/h:head/h:title/text()'))
title = COLLAPSE_RE.sub(' ', title.strip())
if title:
titles.append(title)
headers.append('(unlabled)')
for tag in ('h1', 'h2', 'h3', 'h4', 'h5', 'strong'):
expr = '/h:html/h:body//h:%s[position()=1]/text()'
header = ''.join(xpath(html, expr % tag))
header = COLLAPSE_RE.sub(' ', header.strip())
if header:
headers[-1] = header
break
use = titles
if len(titles) > len(set(titles)):
use = headers
for title, item in izip(use, self.spine):
if not item.linear: continue
toc.add(title, item.href)
return True
def _toc_from_opf(self, opf, item):
if self._toc_from_ncx(item): return
if self._toc_from_tour(opf): return
self.logger.warn('No metadata table of contents found')
if self._toc_from_html(opf): return
self._toc_from_spine(opf)
def _pages_from_ncx(self, opf, item):
if item is None:
return False
ncx = item.data
ptargets = xpath(ncx, 'ncx:pageList/ncx:pageTarget')
if not ptargets:
return False
pages = self.pages = PageList()
for ptarget in ptargets:
name = ''.join(xpath(ptarget, 'ncx:navLabel/ncx:text/text()'))
name = COLLAPSE_RE.sub(' ', name.strip())
href = xpath(ptarget, 'ncx:content/@src')
if not href:
continue
href = item.abshref(urlnormalize(href[0]))
id = ptarget.get('id')
type = ptarget.get('type', 'normal')
klass = ptarget.get('class')
pages.add(name, href, type=type, id=id, klass=klass)
return True
def _find_page_map(self, opf):
result = xpath(opf, '/o2:package/o2:spine/@page-map')
if result:
id = result[0]
if id not in self.manifest.ids:
return None
item = self.manifest.ids[id]
self.manifest.remove(item)
return item
for item in self.manifest.values():
if item.media_type == PAGE_MAP_MIME:
self.manifest.remove(item)
return item
return None
def _pages_from_page_map(self, opf):
item = self._find_page_map(opf)
if item is None:
return False
pmap = item.data
pages = self.pages = PageList()
for page in xpath(pmap, 'o2:page'):
name = page.get('name', '')
href = page.get('href')
if not href:
continue
name = COLLAPSE_RE.sub(' ', name.strip())
href = item.abshref(urlnormalize(href))
type = 'normal'
if not name:
type = 'special'
elif name.lower().strip('ivxlcdm') == '':
type = 'front'
pages.add(name, href, type=type)
return True
def _pages_from_opf(self, opf, item):
if self._pages_from_ncx(opf, item): return
if self._pages_from_page_map(opf): return
self.manifest = Manifest(self)
self.spine = Spine(self)
self.guide = Guide(self)
self.toc = TOC()
self.pages = PageList()
return
def _cover_from_html(self, hcover):
with TemporaryDirectory('_html_cover') as tdir:
writer = DirWriter()
writer.dump(self, tdir)
path = os.path.join(tdir, urlunquote(hcover.href))
renderer = CoverRenderer(path)
data = renderer.image_data
id, href = self.manifest.generate('cover', 'cover.jpeg')
item = self.manifest.add(id, href, JPEG_MIME, data=data)
return item
def _locate_cover_image(self):
if self.metadata.cover:
id = str(self.metadata.cover[0])
item = self.manifest.ids.get(id, None)
if item is not None and item.media_type in OEB_IMAGES:
return item
else:
self.logger.warn('Invalid cover image @id %r' % id)
hcover = self.spine[0]
if 'cover' in self.guide:
href = self.guide['cover'].href
item = self.manifest.hrefs[href]
media_type = item.media_type
if media_type in OEB_IMAGES:
return item
elif media_type in OEB_DOCS:
hcover = item
html = hcover.data
if MS_COVER_TYPE in self.guide:
href = self.guide[MS_COVER_TYPE].href
item = self.manifest.hrefs.get(href, None)
if item is not None and item.media_type in OEB_IMAGES:
return item
if self.COVER_SVG_XP(html):
svg = copy.deepcopy(self.COVER_SVG_XP(html)[0])
href = os.path.splitext(hcover.href)[0] + '.svg'
id, href = self.manifest.generate(hcover.id, href)
item = self.manifest.add(id, href, SVG_MIME, data=svg)
return item
if self.COVER_OBJECT_XP(html):
object = self.COVER_OBJECT_XP(html)[0]
href = hcover.abshref(object.get('data'))
item = self.manifest.hrefs.get(href, None)
if item is not None and item.media_type in OEB_IMAGES:
return item
return self._cover_from_html(hcover)
def _ensure_cover_image(self):
cover = self._locate_cover_image()
if self.metadata.cover:
self.metadata.cover[0].value = cover.id
return
self.metadata.add('cover', cover.id)
def _all_from_opf(self, opf):
self.version = opf.get('version', '1.2')
self._metadata_from_opf(opf)
self._manifest_from_opf(opf)
self._spine_from_opf(opf)
self._guide_from_opf(opf)
item = self._find_ncx(opf)
self._toc_from_opf(opf, item)
self._pages_from_opf(opf, item)
self._ensure_cover_image()
@classmethod
def generate(cls, opts):
encoding = opts.encoding
pretty_print = opts.pretty_print
return cls(encoding=encoding, pretty_print=pretty_print)
def translate(self, text):
lang = str(self.metadata.language[0])
@ -1652,16 +1181,3 @@ class OEBBook(object):
spine.attrib['page-map'] = id
results[PAGE_MAP_MIME] = (href, self.pages.to_page_map())
return results
def main(argv=sys.argv):
for arg in argv[1:]:
oeb = OEBBook(arg)
for name, doc in oeb.to_opf1().values():
print etree.tostring(doc, pretty_print=True)
for name, doc in oeb.to_opf2(page_map=True).values():
print etree.tostring(doc, pretty_print=True)
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -0,0 +1,20 @@
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2008, Marshall T. Vandegrift <llasram@gmail.com>'
import os
from calibre.ebooks.oeb.base import OEBError
from calibre.ebooks.oeb.reader import OEBReader
__all__ = ['get_reader']
READER_REGISTRY = {
'.opf': OEBReader,
}
def ReaderFactory(path):
ext = os.path.splitext(path)[1].lower()
if not ext:
return OEBReader
return READER_REGISTRY[ext]()

View File

@ -0,0 +1,535 @@
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2008, Marshall T. Vandegrift <llasram@gmail.com>'
import sys, os, uuid, copy
from itertools import izip, chain
from urlparse import urldefrag, urlparse
from urllib import unquote as urlunquote
from mimetypes import guess_type
from collections import defaultdict
from lxml import etree
from calibre.ebooks.oeb.base import OPF1_NS, OPF2_NS, OPF2_NSMAP, DC11_NS, \
DC_NSES, OPF
from calibre.ebooks.oeb.base import OEB_DOCS, OEB_STYLES, OEB_IMAGES, \
PAGE_MAP_MIME, JPEG_MIME, NCX_MIME, SVG_MIME
from calibre.ebooks.oeb.base import XMLDECL_RE, COLLAPSE_RE, CSSURL_RE, \
ENTITY_RE, LINK_SELECTORS, MS_COVER_TYPE
from calibre.ebooks.oeb.base import namespace, barename, qname, XPath, xpath
from calibre.ebooks.oeb.base import urlnormalize, xml2str
from calibre.ebooks.oeb.base import OEBError, OEBBook, DirContainer
from calibre.ebooks.oeb.writer import OEBWriter
from calibre.ebooks.oeb.entitydefs import ENTITYDEFS
from calibre.ebooks.metadata.epub import CoverRenderer
from calibre.startup import get_lang
from calibre.ptempfile import TemporaryDirectory
__all__ = ['OEBReader']
class OEBReader(object):
COVER_SVG_XP = XPath('h:body//svg:svg[position() = 1]')
COVER_OBJECT_XP = XPath('h:body//h:object[@data][position() = 1]')
Container = DirContainer
DEFAULT_PROFILE = 'PRS505'
def __call__(self, oeb, path):
self.oeb = oeb
self.logger = oeb.logger
oeb.container = self.Container(path)
opf = self._read_opf()
self._all_from_opf(opf)
return oeb
def _clean_opf(self, opf):
nsmap = {}
for elem in opf.iter(tag=etree.Element):
nsmap.update(elem.nsmap)
for elem in opf.iter(tag=etree.Element):
if namespace(elem.tag) in ('', OPF1_NS):
elem.tag = OPF(barename(elem.tag))
nsmap.update(OPF2_NSMAP)
attrib = dict(opf.attrib)
nroot = etree.Element(OPF('package'),
nsmap={None: OPF2_NS}, attrib=attrib)
metadata = etree.SubElement(nroot, OPF('metadata'), nsmap=nsmap)
ignored = (OPF('dc-metadata'), OPF('x-metadata'))
for elem in xpath(opf, 'o2:metadata//*'):
if elem.tag in ignored:
continue
if namespace(elem.tag) in DC_NSES:
tag = barename(elem.tag).lower()
elem.tag = '{%s}%s' % (DC11_NS, tag)
metadata.append(elem)
for element in xpath(opf, 'o2:metadata//o2:meta'):
metadata.append(element)
for tag in ('o2:manifest', 'o2:spine', 'o2:tours', 'o2:guide'):
for element in xpath(opf, tag):
nroot.append(element)
return nroot
def _read_opf(self):
data = self.oeb.container.read(None)
data = self.oeb.decode(data)
data = XMLDECL_RE.sub('', data)
try:
opf = etree.fromstring(data)
except etree.XMLSyntaxError:
repl = lambda m: ENTITYDEFS.get(m.group(1), m.group(0))
data = ENTITY_RE.sub(repl, data)
opf = etree.fromstring(data)
self.logger.warn('OPF contains invalid HTML named entities')
ns = namespace(opf.tag)
if ns not in ('', OPF1_NS, OPF2_NS):
raise OEBError('Invalid namespace %r for OPF document' % ns)
opf = self._clean_opf(opf)
return opf
def _metadata_from_opf(self, opf):
uid = opf.get('unique-identifier', None)
self.oeb.uid = None
metadata = self.oeb.metadata
for elem in xpath(opf, '/o2:package/o2:metadata//*'):
term = elem.tag
value = elem.text
attrib = dict(elem.attrib)
nsmap = elem.nsmap
if term == OPF('meta'):
term = qname(attrib.pop('name', None), nsmap)
value = attrib.pop('content', None)
if value:
value = COLLAPSE_RE.sub(' ', value.strip())
if term and (value or attrib):
metadata.add(term, value, attrib, nsmap=nsmap)
haveuuid = haveid = False
for ident in metadata.identifier:
if unicode(ident).startswith('urn:uuid:'):
haveuuid = True
if 'id' in ident.attrib:
haveid = True
if not (haveuuid and haveid):
bookid = "urn:uuid:%s" % str(uuid.uuid4())
metadata.add('identifier', bookid, id='calibre-uuid')
if uid is None:
self.logger.warn(u'Unique-identifier not specified')
for item in metadata.identifier:
if not item.id:
continue
if uid is None or item.id == uid:
self.oeb.uid = item
break
else:
self.logger.warn(u'Unique-identifier %r not found' % uid)
for ident in metadata.identifier:
if 'id' in ident.attrib:
self.oeb.uid = metadata.identifier[0]
break
if not metadata.language:
self.logger.warn(u'Language not specified')
metadata.add('language', get_lang())
if not metadata.creator:
self.logger.warn('Creator not specified')
metadata.add('creator', self.oeb.translate(__('Unknown')))
if not metadata.title:
self.logger.warn('Title not specified')
metadata.add('title', self.oeb.translate(__('Unknown')))
def _manifest_add_missing(self):
manifest = self.oeb.manifest
known = set(manifest.hrefs)
unchecked = set(manifest.values())
while unchecked:
new = set()
for item in unchecked:
if (item.media_type in OEB_DOCS or
item.media_type[-4:] in ('/xml', '+xml')) and \
item.data is not None:
hrefs = [sel(item.data) for sel in LINK_SELECTORS]
for href in chain(*hrefs):
href, _ = urldefrag(href)
if not href:
continue
href = item.abshref(urlnormalize(href))
scheme = urlparse(href).scheme
if not scheme and href not in known:
new.add(href)
elif item.media_type in OEB_STYLES:
for match in CSSURL_RE.finditer(item.data):
href, _ = urldefrag(match.group('url'))
href = item.abshref(urlnormalize(href))
scheme = urlparse(href).scheme
if not scheme and href not in known:
new.add(href)
unchecked.clear()
for href in new:
known.add(href)
if not self.oeb.container.exists(href):
self.logger.warn('Referenced file %r not found' % href)
continue
self.logger.warn('Referenced file %r not in manifest' % href)
id, _ = manifest.generate(id='added')
guessed = guess_type(href)[0]
media_type = guessed or BINARY_MIME
added = manifest.add(id, href, media_type)
unchecked.add(added)
def _manifest_from_opf(self, opf):
manifest = self.oeb.manifest
for elem in xpath(opf, '/o2:package/o2:manifest/o2:item'):
id = elem.get('id')
href = elem.get('href')
media_type = elem.get('media-type', None)
if media_type is None:
media_type = elem.get('mediatype', None)
if media_type is None or media_type == 'text/xml':
guessed = guess_type(href)[0]
media_type = guessed or media_type or BINARY_MIME
fallback = elem.get('fallback')
if href in manifest.hrefs:
self.logger.warn(u'Duplicate manifest entry for %r' % href)
continue
if not self.oeb.container.exists(href):
self.logger.warn(u'Manifest item %r not found' % href)
continue
if id in manifest.ids:
self.logger.warn(u'Duplicate manifest id %r' % id)
id, href = manifest.generate(id, href)
manifest.add(id, href, media_type, fallback)
self._manifest_add_missing()
def _spine_add_extra(self):
manifest = self.oeb.manifest
spine = self.oeb.spine
unchecked = set(spine)
selector = XPath('h:body//h:a/@href')
extras = set()
while unchecked:
new = set()
for item in unchecked:
if item.media_type not in OEB_DOCS:
# TODO: handle fallback chains
continue
for href in selector(item.data):
href, _ = urldefrag(href)
if not href:
continue
href = item.abshref(urlnormalize(href))
if href not in manifest.hrefs:
continue
found = manifest.hrefs[href]
if found.media_type not in OEB_DOCS or \
found in spine or found in extras:
continue
new.add(found)
extras.update(new)
unchecked = new
version = int(self.oeb.version[0])
for item in sorted(extras):
if version >= 2:
self.logger.warn(
'Spine-referenced file %r not in spine' % item.href)
spine.add(item, linear=False)
def _spine_from_opf(self, opf):
spine = self.oeb.spine
manifest = self.oeb.manifest
for elem in xpath(opf, '/o2:package/o2:spine/o2:itemref'):
idref = elem.get('idref')
if idref not in manifest.ids:
self.logger.warn(u'Spine item %r not found' % idref)
continue
item = manifest.ids[idref]
spine.add(item, elem.get('linear'))
if len(spine) == 0:
raise OEBError("Spine is empty")
self._spine_add_extra()
def _guide_from_opf(self, opf):
guide = self.oeb.guide
manifest = self.oeb.manifest
for elem in xpath(opf, '/o2:package/o2:guide/o2:reference'):
href = elem.get('href')
path = urldefrag(href)[0]
if path not in manifest.hrefs:
self.logger.warn(u'Guide reference %r not found' % href)
continue
guide.add(elem.get('type'), elem.get('title'), href)
def _find_ncx(self, opf):
result = xpath(opf, '/o2:package/o2:spine/@toc')
if result:
id = result[0]
if id not in self.oeb.manifest.ids:
return None
item = self.oeb.manifest.ids[id]
self.oeb.manifest.remove(item)
return item
for item in self.oeb.manifest.values():
if item.media_type == NCX_MIME:
self.oeb.manifest.remove(item)
return item
return None
def _toc_from_navpoint(self, item, toc, navpoint):
children = xpath(navpoint, 'ncx:navPoint')
for child in children:
title = ''.join(xpath(child, 'ncx:navLabel/ncx:text/text()'))
title = COLLAPSE_RE.sub(' ', title.strip())
href = xpath(child, 'ncx:content/@src')
if not title or not href:
continue
href = item.abshref(urlnormalize(href[0]))
path, _ = urldefrag(href)
if path not in self.oeb.manifest.hrefs:
self.logger.warn('TOC reference %r not found' % href)
continue
id = child.get('id')
klass = child.get('class')
node = toc.add(title, href, id=id, klass=klass)
self._toc_from_navpoint(item, node, child)
def _toc_from_ncx(self, item):
if item is None:
return False
ncx = item.data
title = ''.join(xpath(ncx, 'ncx:docTitle/ncx:text/text()'))
title = COLLAPSE_RE.sub(' ', title.strip())
title = title or unicode(self.oeb.metadata.title[0])
toc = self.oeb.toc
toc.title = title
navmaps = xpath(ncx, 'ncx:navMap')
for navmap in navmaps:
self._toc_from_navpoint(item, toc, navmap)
return True
def _toc_from_tour(self, opf):
result = xpath(opf, 'o2:tours/o2:tour')
if not result:
return False
tour = result[0]
toc = self.oeb.toc
toc.title = tour.get('title')
sites = xpath(tour, 'o2:site')
for site in sites:
title = site.get('title')
href = site.get('href')
if not title or not href:
continue
path, _ = urldefrag(urlnormalize(href))
if path not in self.oeb.manifest.hrefs:
self.logger.warn('TOC reference %r not found' % href)
continue
id = site.get('id')
toc.add(title, href, id=id)
return True
def _toc_from_html(self, opf):
if 'toc' not in self.oeb.guide:
return False
itempath, frag = urldefrag(self.oeb.guide['toc'].href)
item = self.oeb.manifest.hrefs[itempath]
html = item.data
if frag:
elems = xpath(html, './/*[@id="%s"]' % frag)
if not elems:
elems = xpath(html, './/*[@name="%s"]' % frag)
elem = elems[0] if elems else html
while elem != html and not xpath(elem, './/h:a[@href]'):
elem = elem.getparent()
html = elem
titles = defaultdict(list)
order = []
for anchor in xpath(html, './/h:a[@href]'):
href = anchor.attrib['href']
href = item.abshref(urlnormalize(href))
path, frag = urldefrag(href)
if path not in self.oeb.manifest.hrefs:
continue
title = ' '.join(xpath(anchor, './/text()'))
title = COLLAPSE_RE.sub(' ', title.strip())
if href not in titles:
order.append(href)
titles[href].append(title)
toc = self.oeb.toc
for href in order:
toc.add(' '.join(titles[href]), href)
return True
def _toc_from_spine(self, opf):
toc = self.oeb.toc
titles = []
headers = []
for item in self.oeb.spine:
if not item.linear: continue
html = item.data
title = ''.join(xpath(html, '/h:html/h:head/h:title/text()'))
title = COLLAPSE_RE.sub(' ', title.strip())
if title:
titles.append(title)
headers.append('(unlabled)')
for tag in ('h1', 'h2', 'h3', 'h4', 'h5', 'strong'):
expr = '/h:html/h:body//h:%s[position()=1]/text()'
header = ''.join(xpath(html, expr % tag))
header = COLLAPSE_RE.sub(' ', header.strip())
if header:
headers[-1] = header
break
use = titles
if len(titles) > len(set(titles)):
use = headers
for title, item in izip(use, self.oeb.spine):
if not item.linear: continue
toc.add(title, item.href)
return True
def _toc_from_opf(self, opf, item):
if self._toc_from_ncx(item): return
if self._toc_from_tour(opf): return
self.logger.warn('No metadata table of contents found')
if self._toc_from_html(opf): return
self._toc_from_spine(opf)
def _pages_from_ncx(self, opf, item):
if item is None:
return False
ncx = item.data
ptargets = xpath(ncx, 'ncx:pageList/ncx:pageTarget')
if not ptargets:
return False
pages = self.oeb.pages
for ptarget in ptargets:
name = ''.join(xpath(ptarget, 'ncx:navLabel/ncx:text/text()'))
name = COLLAPSE_RE.sub(' ', name.strip())
href = xpath(ptarget, 'ncx:content/@src')
if not href:
continue
href = item.abshref(urlnormalize(href[0]))
id = ptarget.get('id')
type = ptarget.get('type', 'normal')
klass = ptarget.get('class')
pages.add(name, href, type=type, id=id, klass=klass)
return True
def _find_page_map(self, opf):
result = xpath(opf, '/o2:package/o2:spine/@page-map')
if result:
id = result[0]
if id not in self.oeb.manifest.ids:
return None
item = self.oeb.manifest.ids[id]
self.oeb.manifest.remove(item)
return item
for item in self.oeb.manifest.values():
if item.media_type == PAGE_MAP_MIME:
self.oeb.manifest.remove(item)
return item
return None
def _pages_from_page_map(self, opf):
item = self._find_page_map(opf)
if item is None:
return False
pmap = item.data
pages = self.oeb.pages
for page in xpath(pmap, 'o2:page'):
name = page.get('name', '')
href = page.get('href')
if not href:
continue
name = COLLAPSE_RE.sub(' ', name.strip())
href = item.abshref(urlnormalize(href))
type = 'normal'
if not name:
type = 'special'
elif name.lower().strip('ivxlcdm') == '':
type = 'front'
pages.add(name, href, type=type)
return True
def _pages_from_opf(self, opf, item):
if self._pages_from_ncx(opf, item): return
if self._pages_from_page_map(opf): return
return
def _cover_from_html(self, hcover):
with TemporaryDirectory('_html_cover') as tdir:
writer = OEBWriter()
writer(self.oeb, tdir)
path = os.path.join(tdir, urlunquote(hcover.href))
renderer = CoverRenderer(path)
data = renderer.image_data
id, href = self.oeb.manifest.generate('cover', 'cover.jpeg')
item = self.oeb.manifest.add(id, href, JPEG_MIME, data=data)
return item
def _locate_cover_image(self):
if self.oeb.metadata.cover:
id = str(self.oeb.metadata.cover[0])
item = self.oeb.manifest.ids.get(id, None)
if item is not None and item.media_type in OEB_IMAGES:
return item
else:
self.logger.warn('Invalid cover image @id %r' % id)
hcover = self.oeb.spine[0]
if 'cover' in self.oeb.guide:
href = self.oeb.guide['cover'].href
item = self.oeb.manifest.hrefs[href]
media_type = item.media_type
if media_type in OEB_IMAGES:
return item
elif media_type in OEB_DOCS:
hcover = item
html = hcover.data
if MS_COVER_TYPE in self.oeb.guide:
href = self.oeb.guide[MS_COVER_TYPE].href
item = self.oeb.manifest.hrefs.get(href, None)
if item is not None and item.media_type in OEB_IMAGES:
return item
if self.COVER_SVG_XP(html):
svg = copy.deepcopy(self.COVER_SVG_XP(html)[0])
href = os.path.splitext(hcover.href)[0] + '.svg'
id, href = self.oeb.manifest.generate(hcover.id, href)
item = self.oeb.manifest.add(id, href, SVG_MIME, data=svg)
return item
if self.COVER_OBJECT_XP(html):
object = self.COVER_OBJECT_XP(html)[0]
href = hcover.abshref(object.get('data'))
item = self.oeb.manifest.hrefs.get(href, None)
if item is not None and item.media_type in OEB_IMAGES:
return item
return self._cover_from_html(hcover)
def _ensure_cover_image(self):
cover = self._locate_cover_image()
if self.oeb.metadata.cover:
self.oeb.metadata.cover[0].value = cover.id
return
self.oeb.metadata.add('cover', cover.id)
def _all_from_opf(self, opf):
self.oeb.version = opf.get('version', '1.2')
self._metadata_from_opf(opf)
self._manifest_from_opf(opf)
self._spine_from_opf(opf)
self._guide_from_opf(opf)
item = self._find_ncx(opf)
self._toc_from_opf(opf, item)
self._pages_from_opf(opf, item)
self._ensure_cover_image()
def main(argv=sys.argv):
reader = OEBReader()
for arg in argv[1:]:
oeb = reader(OEBBook(), arg)
for name, doc in oeb.to_opf1().values():
print etree.tostring(doc, pretty_print=True)
for name, doc in oeb.to_opf2(page_map=True).values():
print etree.tostring(doc, pretty_print=True)
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -0,0 +1,107 @@
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2008, Marshall T. Vandegrift <llasram@gmail.com>'
import sys, os, logging
from calibre.ebooks.oeb.base import OPF_MIME, xml2str
from calibre.ebooks.oeb.base import Logger, DirContainer, OEBBook
from calibre.utils.config import Config
__all__ = ['OEBWriter']
class OEBWriter(object):
DEFAULT_PROFILE = 'PRS505'
def __init__(self, version='2.0', page_map=False, pretty_print=False):
self.version = version
self.page_map = page_map
self.pretty_print = pretty_print
@classmethod
def config(cls, cfg):
oeb = cfg.add_group('oeb', _('OPF/NCX/etc. generation options.'))
versions = ['1.2', '2.0']
oeb('opf_version', ['--opf-version'], default='2.0', choices=versions,
help=_('OPF version to generate. Default is %default.'))
oeb('adobe_page_map', ['--adobe-page-map'], default=False,
help=_('Generate an Adobe "page-map" file if pagination '
'information is avaliable.'))
return cfg
@classmethod
def generate(cls, opts):
version = opts.opf_version
page_map = opts.adobe_page_map
pretty_print = opts.pretty_print
return cls(version=version, page_map=page_map,
pretty_print=pretty_print)
def __call__(self, oeb, path):
version = int(self.version[0])
opfname = None
if os.path.splitext(path)[1].lower() == '.opf':
opfname = os.path.basename(path)
path = os.path.dirname(path)
if not os.path.isdir(path):
os.mkdir(path)
output = DirContainer(path)
for item in oeb.manifest.values():
output.write(item.href, str(item))
if version == 1:
metadata = oeb.to_opf1()
elif version == 2:
metadata = oeb.to_opf2(page_map=self.page_map)
else:
raise OEBError("Unrecognized OPF version %r" % self.version)
pretty_print = self.pretty_print
for mime, (href, data) in metadata.items():
if opfname and mime == OPF_MIME:
href = opfname
output.write(href, xml2str(data, pretty_print=pretty_print))
return
def option_parser():
cfg = Config('oeb', _('Options to control OEB conversion.'))
OEBWriter.config(cfg)
parser = cfg.option_parser()
parser.add_option('--encoding', default=None,
help=_('Character encoding for files. Default is to auto detect.'))
parser.add_option('-o', '--output', default=None,
help=_('Output file. Default is derived from input filename.'))
parser.add_option('-p', '--pretty-print', action='store_true',
default=False, help=_('Produce more human-readable XML output.'))
parser.add_option('-v', '--verbose', default=0, action='count',
help=_('Useful for debugging.'))
return parser
def any2oeb(opts, inpath):
from calibre.ebooks.oeb.factory import ReaderFactory
logger = Logger(logging.getLogger('any2oeb'))
logger.setup_cli_handler(opts.verbose)
outpath = opts.output
if outpath is None:
outpath = os.path.basename(inpath)
outpath = os.path.splitext(outpath)[0]
encoding = opts.encoding
pretty_print = opts.pretty_print
oeb = OEBBook(encoding=encoding, pretty_print=pretty_print, logger=logger)
reader = ReaderFactory(inpath)
reader(oeb, inpath)
writer = OEBWriter.generate(opts)
writer(oeb, outpath)
return 0
def main(argv=sys.argv):
parser = option_parser()
opts, args = parser.parse_args(argv[1:])
if len(args) != 1:
parser.print_help()
return 1
inpath = args[0]
retval = any2oeb(opts, inpath)
return retval
if __name__ == '__main__':
sys.exit(main())