Pull from llasram so I can grok the requirements for the LIT->MOBI chain

This commit is contained in:
Kovid Goyal 2009-02-19 19:29:11 -08:00
commit 87ff17b50f
13 changed files with 1357 additions and 820 deletions

View File

@ -7,21 +7,25 @@ __license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net> ' \
'and Marshall T. Vandegrift <llasram@gmail.com>'
import sys, struct, cStringIO, os
import sys, struct, os
import functools
import re
from urlparse import urldefrag
from cStringIO import StringIO
from urllib import unquote as urlunquote
from lxml import etree
from calibre.ebooks.lit import LitError
from calibre.ebooks.lit.maps import OPF_MAP, HTML_MAP
import calibre.ebooks.lit.mssha1 as mssha1
from calibre.ebooks.oeb.base import urlnormalize
from calibre.ebooks.oeb.reader import OEBReader
from calibre.ebooks import DRMError
from calibre import plugins
lzx, lxzerror = plugins['lzx']
msdes, msdeserror = plugins['msdes']
__all__ = ["LitReader"]
XML_DECL = """<?xml version="1.0" encoding="UTF-8" ?>
"""
OPF_DECL = """<?xml version="1.0" encoding="UTF-8" ?>
@ -109,6 +113,9 @@ def consume_sized_utf8_string(bytes, zpad=False):
pos += 1
return u''.join(result), bytes[pos:]
def encode(string):
return unicode(string).encode('ascii', 'xmlcharrefreplace')
class UnBinary(object):
AMPERSAND_RE = re.compile(
r'&(?!(?:#[0-9]+|#x[0-9a-fA-F]+|[a-zA-Z_:][a-zA-Z0-9.-_:]+);)')
@ -119,13 +126,13 @@ class UnBinary(object):
def __init__(self, bin, path, manifest={}, map=HTML_MAP):
self.manifest = manifest
self.tag_map, self.attr_map, self.tag_to_attr_map = map
self.opf = map is OPF_MAP
self.bin = bin
self.is_html = map is HTML_MAP
self.dir = os.path.dirname(path)
self.buf = cStringIO.StringIO()
self.binary_to_text()
self.raw = self.buf.getvalue().lstrip().decode('utf-8')
buf = StringIO()
self.binary_to_text(bin, buf)
self.raw = buf.getvalue().lstrip()
self.escape_reserved()
self._tree = None
def escape_reserved(self):
raw = self.raw
@ -152,18 +159,20 @@ class UnBinary(object):
return '/'.join(relpath)
def __unicode__(self):
return self.raw.decode('utf-8')
def __str__(self):
return self.raw
def binary_to_text(self, base=0, depth=0):
def binary_to_text(self, bin, buf, index=0, depth=0):
tag_name = current_map = None
dynamic_tag = errors = 0
in_censorship = is_goingdown = False
state = 'text'
index = base
flags = 0
while index < len(self.bin):
c, index = read_utf8_char(self.bin, index)
while index < len(bin):
c, index = read_utf8_char(bin, index)
oc = ord(c)
if state == 'text':
@ -176,7 +185,7 @@ class UnBinary(object):
c = '>>'
elif c == '<':
c = '<<'
self.buf.write(c.encode('ascii', 'xmlcharrefreplace'))
buf.write(encode(c))
elif state == 'get flags':
if oc == 0:
@ -189,7 +198,7 @@ class UnBinary(object):
state = 'text' if oc == 0 else 'get attr'
if flags & FLAG_OPENING:
tag = oc
self.buf.write('<')
buf.write('<')
if not (flags & FLAG_CLOSING):
is_goingdown = True
if tag == 0x8000:
@ -206,7 +215,7 @@ class UnBinary(object):
tag_name = '?'+unichr(tag)+'?'
current_map = self.tag_to_attr_map[tag]
print 'WARNING: tag %s unknown' % unichr(tag)
self.buf.write(unicode(tag_name).encode('utf-8'))
buf.write(encode(tag_name))
elif flags & FLAG_CLOSING:
if depth == 0:
raise LitError('Extra closing tag')
@ -218,15 +227,14 @@ class UnBinary(object):
if not is_goingdown:
tag_name = None
dynamic_tag = 0
self.buf.write(' />')
buf.write(' />')
else:
self.buf.write('>')
index = self.binary_to_text(base=index, depth=depth+1)
buf.write('>')
index = self.binary_to_text(bin, buf, index, depth+1)
is_goingdown = False
if not tag_name:
raise LitError('Tag ends before it begins.')
self.buf.write(u''.join(
('</', tag_name, '>')).encode('utf-8'))
buf.write(encode(u''.join(('</', tag_name, '>'))))
dynamic_tag = 0
tag_name = None
state = 'text'
@ -246,7 +254,7 @@ class UnBinary(object):
in_censorship = True
state = 'get value length'
continue
self.buf.write(' ' + unicode(attr).encode('utf-8') + '=')
buf.write(' ' + encode(attr) + '=')
if attr in ['href', 'src']:
state = 'get href length'
else:
@ -254,40 +262,39 @@ class UnBinary(object):
elif state == 'get value length':
if not in_censorship:
self.buf.write('"')
buf.write('"')
count = oc - 1
if count == 0:
if not in_censorship:
self.buf.write('"')
buf.write('"')
in_censorship = False
state = 'get attr'
continue
state = 'get value'
if oc == 0xffff:
continue
if count < 0 or count > (len(self.bin) - index):
if count < 0 or count > (len(bin) - index):
raise LitError('Invalid character count %d' % count)
elif state == 'get value':
if count == 0xfffe:
if not in_censorship:
self.buf.write('%s"' % (oc - 1))
buf.write('%s"' % (oc - 1))
in_censorship = False
state = 'get attr'
elif count > 0:
if not in_censorship:
self.buf.write(c.encode(
'ascii', 'xmlcharrefreplace'))
buf.write(encode(c))
count -= 1
if count == 0:
if not in_censorship:
self.buf.write('"')
buf.write('"')
in_censorship = False
state = 'get attr'
elif state == 'get custom length':
count = oc - 1
if count <= 0 or count > len(self.bin)-index:
if count <= 0 or count > len(bin)-index:
raise LitError('Invalid character count %d' % count)
dynamic_tag += 1
state = 'get custom'
@ -297,26 +304,26 @@ class UnBinary(object):
tag_name += c
count -= 1
if count == 0:
self.buf.write(unicode(tag_name).encode('utf-8'))
buf.write(encode(tag_name))
state = 'get attr'
elif state == 'get attr length':
count = oc - 1
if count <= 0 or count > (len(self.bin) - index):
if count <= 0 or count > (len(bin) - index):
raise LitError('Invalid character count %d' % count)
self.buf.write(' ')
buf.write(' ')
state = 'get custom attr'
elif state == 'get custom attr':
self.buf.write(unicode(c).encode('utf-8'))
buf.write(encode(c))
count -= 1
if count == 0:
self.buf.write('=')
buf.write('=')
state = 'get value length'
elif state == 'get href length':
count = oc - 1
if count <= 0 or count > (len(self.bin) - index):
if count <= 0 or count > (len(bin) - index):
raise LitError('Invalid character count %d' % count)
href = ''
state = 'get href'
@ -330,10 +337,11 @@ class UnBinary(object):
if frag:
path = '#'.join((path, frag))
path = urlnormalize(path)
self.buf.write((u'"%s"' % path).encode('utf-8'))
buf.write(encode(u'"%s"' % path))
state = 'get attr'
return index
class DirectoryEntry(object):
def __init__(self, name, section, offset, size):
self.name = name
@ -348,6 +356,7 @@ class DirectoryEntry(object):
def __str__(self):
return repr(self)
class ManifestItem(object):
def __init__(self, original, internal, mime_type, offset, root, state):
self.original = original
@ -375,65 +384,87 @@ class ManifestItem(object):
% (self.internal, self.path, self.mime_type, self.offset,
self.root, self.state)
def preserve(function):
def wrapper(self, *args, **kwargs):
opos = self._stream.tell()
opos = self.stream.tell()
try:
return function(self, *args, **kwargs)
finally:
self._stream.seek(opos)
self.stream.seek(opos)
functools.update_wrapper(wrapper, function)
return wrapper
class LitReader(object):
class LitFile(object):
PIECE_SIZE = 16
XML_PARSER = etree.XMLParser(
recover=True, resolve_entities=False)
def __init__(self, filename_or_stream):
if hasattr(filename_or_stream, 'read'):
self.stream = filename_or_stream
else:
self.stream = open(filename_or_stream, 'rb')
try:
self.opf_path = os.path.splitext(
os.path.basename(self.stream.name))[0] + '.opf'
except AttributeError:
self.opf_path = 'content.opf'
if self.magic != 'ITOLITLS':
raise LitError('Not a valid LIT file')
if self.version != 1:
raise LitError('Unknown LIT version %d' % (self.version,))
self.read_secondary_header()
self.read_header_pieces()
self.read_section_names()
self.read_manifest()
self.read_drm()
def warn(self, msg):
print "WARNING: %s" % (msg,)
def magic():
@preserve
def fget(self):
self._stream.seek(0)
return self._stream.read(8)
self.stream.seek(0)
return self.stream.read(8)
return property(fget=fget)
magic = magic()
def version():
def fget(self):
self._stream.seek(8)
return u32(self._stream.read(4))
self.stream.seek(8)
return u32(self.stream.read(4))
return property(fget=fget)
version = version()
def hdr_len():
@preserve
def fget(self):
self._stream.seek(12)
return int32(self._stream.read(4))
self.stream.seek(12)
return int32(self.stream.read(4))
return property(fget=fget)
hdr_len = hdr_len()
def num_pieces():
@preserve
def fget(self):
self._stream.seek(16)
return int32(self._stream.read(4))
self.stream.seek(16)
return int32(self.stream.read(4))
return property(fget=fget)
num_pieces = num_pieces()
def sec_hdr_len():
@preserve
def fget(self):
self._stream.seek(20)
return int32(self._stream.read(4))
self.stream.seek(20)
return int32(self.stream.read(4))
return property(fget=fget)
sec_hdr_len = sec_hdr_len()
def guid():
@preserve
def fget(self):
self._stream.seek(24)
return self._stream.read(16)
self.stream.seek(24)
return self.stream.read(16)
return property(fget=fget)
guid = guid()
@ -443,44 +474,27 @@ class LitReader(object):
size = self.hdr_len \
+ (self.num_pieces * self.PIECE_SIZE) \
+ self.sec_hdr_len
self._stream.seek(0)
return self._stream.read(size)
self.stream.seek(0)
return self.stream.read(size)
return property(fget=fget)
header = header()
def __init__(self, filename_or_stream):
if hasattr(filename_or_stream, 'read'):
self._stream = filename_or_stream
else:
self._stream = open(filename_or_stream, 'rb')
if self.magic != 'ITOLITLS':
raise LitError('Not a valid LIT file')
if self.version != 1:
raise LitError('Unknown LIT version %d' % (self.version,))
self.entries = {}
self._read_secondary_header()
self._read_header_pieces()
self._read_section_names()
self._read_manifest()
self._read_meta()
self._read_drm()
@preserve
def __len__(self):
self._stream.seek(0, 2)
return self._stream.tell()
self.stream.seek(0, 2)
return self.stream.tell()
@preserve
def _read_raw(self, offset, size):
self._stream.seek(offset)
return self._stream.read(size)
def read_raw(self, offset, size):
self.stream.seek(offset)
return self.stream.read(size)
def _read_content(self, offset, size):
return self._read_raw(self.content_offset + offset, size)
def read_content(self, offset, size):
return self.read_raw(self.content_offset + offset, size)
def _read_secondary_header(self):
def read_secondary_header(self):
offset = self.hdr_len + (self.num_pieces * self.PIECE_SIZE)
bytes = self._read_raw(offset, self.sec_hdr_len)
bytes = self.read_raw(offset, self.sec_hdr_len)
offset = int32(bytes[4:])
while offset < len(bytes):
blocktype = bytes[offset:offset+4]
@ -508,21 +522,21 @@ class LitReader(object):
if not hasattr(self, 'content_offset'):
raise LitError('Could not figure out the content offset')
def _read_header_pieces(self):
def read_header_pieces(self):
src = self.header[self.hdr_len:]
for i in xrange(self.num_pieces):
piece = src[i * self.PIECE_SIZE:(i + 1) * self.PIECE_SIZE]
if u32(piece[4:]) != 0 or u32(piece[12:]) != 0:
raise LitError('Piece %s has 64bit value' % repr(piece))
offset, size = u32(piece), int32(piece[8:])
piece = self._read_raw(offset, size)
piece = self.read_raw(offset, size)
if i == 0:
continue # Dont need this piece
elif i == 1:
if u32(piece[8:]) != self.entry_chunklen or \
u32(piece[12:]) != self.entry_unknown:
raise LitError('Secondary header does not match piece')
self._read_directory(piece)
self.read_directory(piece)
elif i == 2:
if u32(piece[8:]) != self.count_chunklen or \
u32(piece[12:]) != self.count_unknown:
@ -533,12 +547,13 @@ class LitReader(object):
elif i == 4:
self.piece4_guid = piece
def _read_directory(self, piece):
def read_directory(self, piece):
if not piece.startswith('IFCM'):
raise LitError('Header piece #1 is not main directory.')
chunk_size, num_chunks = int32(piece[8:12]), int32(piece[24:28])
if (32 + (num_chunks * chunk_size)) != len(piece):
raise LitError('IFCM HEADER has incorrect length')
raise LitError('IFCM header has incorrect length')
self.entries = {}
for i in xrange(num_chunks):
offset = 32 + (i * chunk_size)
chunk = piece[offset:offset + chunk_size]
@ -572,17 +587,17 @@ class LitReader(object):
entry = DirectoryEntry(name, section, offset, size)
self.entries[name] = entry
def _read_section_names(self):
def read_section_names(self):
if '::DataSpace/NameList' not in self.entries:
raise LitError('Lit file does not have a valid NameList')
raw = self.get_file('::DataSpace/NameList')
if len(raw) < 4:
raise LitError('Invalid Namelist section')
pos = 4
self.num_sections = u16(raw[2:pos])
self.section_names = [""]*self.num_sections
self.section_data = [None]*self.num_sections
for section in xrange(self.num_sections):
num_sections = u16(raw[2:pos])
self.section_names = [""] * num_sections
self.section_data = [None] * num_sections
for section in xrange(num_sections):
size = u16(raw[pos:pos+2])
pos += 2
size = size*2 + 2
@ -592,11 +607,12 @@ class LitReader(object):
raw[pos:pos+size].decode('utf-16-le').rstrip('\000')
pos += size
def _read_manifest(self):
def read_manifest(self):
if '/manifest' not in self.entries:
raise LitError('Lit file does not have a valid manifest')
raw = self.get_file('/manifest')
self.manifest = {}
self.paths = {self.opf_path: None}
while raw:
slen, raw = ord(raw[0]), raw[1:]
if slen == 0: break
@ -637,28 +653,9 @@ class LitReader(object):
for item in mlist:
if item.path[0] == '/':
item.path = os.path.basename(item.path)
self.paths[item.path] = item
def _pretty_print(self, xml):
f = cStringIO.StringIO(xml.encode('utf-8'))
doc = etree.parse(f, parser=self.XML_PARSER)
pretty = etree.tostring(doc, encoding='ascii', pretty_print=True)
return XML_DECL + unicode(pretty)
def _read_meta(self):
path = 'content.opf'
raw = self.get_file('/meta')
xml = OPF_DECL
try:
xml += unicode(UnBinary(raw, path, self.manifest, OPF_MAP))
except LitError:
if 'PENGUIN group' not in raw: raise
print "WARNING: attempting PENGUIN malformed OPF fix"
raw = raw.replace(
'PENGUIN group', '\x00\x01\x18\x00PENGUIN group', 1)
xml += unicode(UnBinary(raw, path, self.manifest, OPF_MAP))
self.meta = xml
def _read_drm(self):
def read_drm(self):
self.drmlevel = 0
if '/DRMStorage/Licenses/EUL' in self.entries:
self.drmlevel = 5
@ -669,7 +666,7 @@ class LitReader(object):
else:
return
if self.drmlevel < 5:
msdes.deskey(self._calculate_deskey(), msdes.DE1)
msdes.deskey(self.calculate_deskey(), msdes.DE1)
bookkey = msdes.des(self.get_file('/DRMStorage/DRMSealed'))
if bookkey[0] != '\000':
raise LitError('Unable to decrypt title key!')
@ -677,7 +674,7 @@ class LitReader(object):
else:
raise DRMError("Cannot access DRM-protected book")
def _calculate_deskey(self):
def calculate_deskey(self):
hashfiles = ['/meta', '/DRMStorage/DRMSource']
if self.drmlevel == 3:
hashfiles.append('/DRMStorage/DRMBookplate')
@ -701,18 +698,18 @@ class LitReader(object):
def get_file(self, name):
entry = self.entries[name]
if entry.section == 0:
return self._read_content(entry.offset, entry.size)
return self.read_content(entry.offset, entry.size)
section = self.get_section(entry.section)
return section[entry.offset:entry.offset+entry.size]
def get_section(self, section):
data = self.section_data[section]
if not data:
data = self._get_section(section)
data = self.get_section_uncached(section)
self.section_data[section] = data
return data
def _get_section(self, section):
def get_section_uncached(self, section):
name = self.section_names[section]
path = '::DataSpace/Storage/' + name
transform = self.get_file(path + '/Transform/List')
@ -724,29 +721,29 @@ class LitReader(object):
raise LitError("ControlData is too short")
guid = msguid(transform)
if guid == DESENCRYPT_GUID:
content = self._decrypt(content)
content = self.decrypt(content)
control = control[csize:]
elif guid == LZXCOMPRESS_GUID:
reset_table = self.get_file(
'/'.join(('::DataSpace/Storage', name, 'Transform',
LZXCOMPRESS_GUID, 'InstanceData/ResetTable')))
content = self._decompress(content, control, reset_table)
content = self.decompress(content, control, reset_table)
control = control[csize:]
else:
raise LitError("Unrecognized transform: %s." % repr(guid))
transform = transform[16:]
return content
def _decrypt(self, content):
def decrypt(self, content):
length = len(content)
extra = length & 0x7
if extra > 0:
self._warn("content length not a multiple of block size")
self.warn("content length not a multiple of block size")
content += "\0" * (8 - extra)
msdes.deskey(self.bookkey, msdes.DE1)
return msdes.des(content)
def _decompress(self, content, control, reset_table):
def decompress(self, content, control, reset_table):
if len(control) < 32 or control[CONTROL_TAG:CONTROL_TAG+4] != "LZXC":
raise LitError("Invalid ControlData tag value")
if len(reset_table) < (RESET_INTERVAL + 8):
@ -787,7 +784,7 @@ class LitReader(object):
result.append(
lzx.decompress(content[base:size], window_bytes))
except lzx.LZXError:
self._warn("LZX decompression error; skipping chunk")
self.warn("LZX decompression error; skipping chunk")
bytes_remaining -= window_bytes
base = size
accum += int32(reset_table[RESET_INTERVAL:])
@ -797,80 +794,57 @@ class LitReader(object):
try:
result.append(lzx.decompress(content[base:], bytes_remaining))
except lzx.LZXError:
self._warn("LZX decompression error; skipping chunk")
self.warn("LZX decompression error; skipping chunk")
bytes_remaining = 0
if bytes_remaining > 0:
raise LitError("Failed to completely decompress section")
return ''.join(result)
def get_entry_content(self, entry, pretty_print=False):
if 'spine' in entry.state:
name = '/'.join(('/data', entry.internal, 'content'))
path = entry.path
raw = self.get_file(name)
decl, map = (OPF_DECL, OPF_MAP) \
if name == '/meta' else (HTML_DECL, HTML_MAP)
content = decl + unicode(UnBinary(raw, path, self.manifest, map))
if pretty_print:
content = self._pretty_print(content)
content = content.encode('utf-8')
class LitContainer(object):
"""Simple Container-interface, read-only accessor for LIT files."""
def __init__(self, filename_or_stream):
self._litfile = LitFile(filename_or_stream)
def namelist(self):
return self._litfile.paths.keys()
def exists(self, name):
return urlunquote(name) in self._litfile.paths
def read(self, name):
entry = self._litfile.paths[urlunquote(name)] if name else None
if entry is None:
content = OPF_DECL + self._read_meta()
elif 'spine' in entry.state:
internal = '/'.join(('/data', entry.internal, 'content'))
raw = self._litfile.get_file(internal)
unbin = UnBinary(raw, name, self._litfile.manifest, HTML_MAP)
content = HTML_DECL + str(unbin)
else:
name = '/'.join(('/data', entry.internal))
content = self.get_file(name)
internal = '/'.join(('/data', entry.internal))
content = self._litfile.get_file(internal)
return content
def extract_content(self, output_dir=os.getcwdu(), pretty_print=False):
output_dir = os.path.abspath(output_dir)
def _read_meta(self):
path = 'content.opf'
raw = self._litfile.get_file('/meta')
try:
opf_path = os.path.splitext(
os.path.basename(self._stream.name))[0] + '.opf'
except AttributeError:
opf_path = 'content.opf'
opf_path = os.path.join(output_dir, opf_path)
self._ensure_dir(opf_path)
with open(opf_path, 'wb') as f:
xml = self.meta
if pretty_print:
xml = self._pretty_print(xml)
f.write(xml.encode('utf-8'))
for entry in self.manifest.values():
path = os.path.join(output_dir, entry.path)
self._ensure_dir(path)
with open(path, 'wb') as f:
f.write(self.get_entry_content(entry, pretty_print))
unbin = UnBinary(raw, path, self._litfile.manifest, OPF_MAP)
except LitError:
if 'PENGUIN group' not in raw: raise
print "WARNING: attempting PENGUIN malformed OPF fix"
raw = raw.replace(
'PENGUIN group', '\x00\x01\x18\x00PENGUIN group', 1)
unbin = UnBinary(raw, path, self._litfile.manifest, OPF_MAP)
return str(unbin)
def _ensure_dir(self, path):
dir = os.path.dirname(path)
if not os.path.isdir(dir):
os.makedirs(dir)
def _warn(self, msg):
print "WARNING: %s" % (msg,)
class LitReader(OEBReader):
Container = LitContainer
DEFAULT_PROFILE = 'MSReader'
def option_parser():
from calibre.utils.config import OptionParser
parser = OptionParser(usage=_('%prog [options] LITFILE'))
parser.add_option(
'-o', '--output-dir', default='.',
help=_('Output directory. Defaults to current directory.'))
parser.add_option(
'-p', '--pretty-print', default=False, action='store_true',
help=_('Legibly format extracted markup. May modify meaningful whitespace.'))
parser.add_option(
'--verbose', default=False, action='store_true',
help=_('Useful for debugging.'))
return parser
def main(args=sys.argv):
parser = option_parser()
opts, args = parser.parse_args(args)
if len(args) != 2:
parser.print_help()
return 1
lr = LitReader(args[1])
lr.extract_content(opts.output_dir, opts.pretty_print)
print _('OEB ebook created in'), opts.output_dir
return 0
try:
import psyco
@ -878,6 +852,3 @@ try:
psyco.bind(UnBinary.binary_to_text)
except ImportError:
pass
if __name__ == '__main__':
sys.exit(main())

View File

@ -312,7 +312,7 @@ class LitWriter(object):
cover = None
if oeb.metadata.cover:
id = str(oeb.metadata.cover[0])
cover = oeb.manifest[id]
cover = oeb.manifest.ids[id]
for type, title in ALL_MS_COVER_TYPES:
if type not in oeb.guide:
oeb.guide.add(type, title, cover.href)

View File

@ -82,7 +82,20 @@ class MobiMLizer(object):
def __init__(self, ignore_tables=False):
self.ignore_tables = ignore_tables
def transform(self, oeb, context):
@classmethod
def config(cls, cfg):
group = cfg.add_group('mobiml', _('Mobipocket markup options.'))
group('ignore_tables', ['--ignore-tables'], default=False,
help=_('Render HTML tables as blocks of text instead of actual '
'tables. This is neccessary if the HTML contains very '
'large or complex tables.'))
return cfg
@classmethod
def generate(cls, opts):
return cls(ignore_tables=opts.ignore_tables)
def __call__(self, oeb, context):
oeb.logger.info('Converting XHTML to Mobipocket markup...')
self.oeb = oeb
self.profile = profile = context.dest

View File

@ -292,9 +292,29 @@ class Serializer(object):
buffer.seek(hoff)
buffer.write('%010d' % ioff)
class MobiFlattener(object):
def config(self, cfg):
return cfg
def generate(self, opts):
return self
def __call__(self, oeb, context):
fbase = context.dest.fbase
fkey = context.dest.fnums.values()
flattener = CSSFlattener(
fbase=fbase, fkey=fkey, unfloat=True, untable=True)
return flattener(oeb, context)
class MobiWriter(object):
COLLAPSE_RE = re.compile(r'[ \t\r\n\v]+')
DEFAULT_PROFILE = 'CybookG3'
TRANSFORMS = [HTMLTOCAdder, CaseMangler, MobiFlattener(), SVGRasterizer,
ManifestTrimmer, MobiMLizer]
def __init__(self, compression=None, imagemax=None,
prefer_author_sort=False):
@ -302,7 +322,32 @@ class MobiWriter(object):
self._imagemax = imagemax or OTHER_MAX_IMAGE_SIZE
self._prefer_author_sort = prefer_author_sort
def dump(self, oeb, path):
@classmethod
def config(cls, cfg):
"""Add any book-writing options to the :class:`Config` object
:param:`cfg`.
"""
mobi = cfg.add_group('mobipocket', _('Mobipocket-specific options.'))
mobi('compress', ['--compress'], default=False,
help=_('Compress file text using PalmDOC compression. '
'Results in smaller files, but takes a long time to run.'))
mobi('rescale_images', ['--rescale-images'], default=False,
help=_('Modify images to meet Palm device size limitations.'))
mobi('prefer_author_sort', ['--prefer-author-sort'], default=False,
help=_('When present, use the author sorting information for '
'generating the Mobipocket author metadata.'))
return cfg
@classmethod
def generate(cls, opts):
"""Generate a Writer instance from command-line options."""
compression = PALMDOC if opts.compress else UNCOMPRESSED
imagemax = PALM_MAX_IMAGE_SIZE if opts.rescale_images else None
prefer_author_sort = opts.prefer_author_sort
return cls(compression=compression, imagemax=imagemax,
prefer_author_sort=prefer_author_sort)
def __call__(self, oeb, path):
if hasattr(path, 'write'):
return self._dump_stream(oeb, path)
with open(path, 'w+b') as stream:
@ -542,21 +587,6 @@ def config(defaults=None):
else:
c = StringConfig(defaults, desc)
mobi = c.add_group('mobipocket', _('Mobipocket-specific options.'))
mobi('compress', ['--compress'], default=False,
help=_('Compress file text using PalmDOC compression. '
'Results in smaller files, but takes a long time to run.'))
mobi('rescale_images', ['--rescale-images'], default=False,
help=_('Modify images to meet Palm device size limitations.'))
mobi('toc_title', ['--toc-title'], default=None,
help=_('Title for any generated in-line table of contents.'))
mobi('ignore_tables', ['--ignore-tables'], default=False,
help=_('Render HTML tables as blocks of text instead of actual '
'tables. This is neccessary if the HTML contains very large '
'or complex tables.'))
mobi('prefer_author_sort', ['--prefer-author-sort'], default=False,
help=_('When present, use the author sorting information for '
'generating the Mobipocket author metadata.'))
profiles = c.add_group('profiles', _('Device renderer profiles. '
'Affects conversion of font sizes, image rescaling and rasterization '
'of tables. Valid profiles are: %s.') % ', '.join(_profiles))

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,98 @@
'''
Registry associating file extensions with Reader classes.
'''
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2008, Marshall T. Vandegrift <llasram@gmail.com>'
import sys, os, logging
from itertools import chain
from calibre.ebooks.oeb.base import OEBError
from calibre.ebooks.oeb.reader import OEBReader
from calibre.ebooks.oeb.writer import OEBWriter
from calibre.ebooks.lit.reader import LitReader
from calibre.ebooks.lit.writer import LitWriter
from calibre.ebooks.mobi.reader import MobiReader
from calibre.ebooks.mobi.writer import MobiWriter
from calibre.ebooks.oeb.base import Logger, OEBBook
from calibre.ebooks.oeb.profile import Context
from calibre.utils.config import Config
__all__ = ['get_reader']
REGISTRY = {
'.opf': (OEBReader, None),
'.lit': (LitReader, LitWriter),
'.mobi': (MobiReader, MobiWriter),
}
def ReaderFactory(path):
if os.path.isdir(path):
return OEBReader
ext = os.path.splitext(path)[1].lower()
Reader = REGISTRY.get(ext, (None, None))[0]
if Reader is None:
raise OEBError('Unknown e-book file extension %r' % ext)
return Reader
def WriterFactory(path):
if os.path.isdir(path):
return OEBWriter
ext = os.path.splitext(path)[1].lower()
if not os.path.exists(path) and not ext:
return OEBWriter
Writer = REGISTRY.get(ext, (None, None))[1]
if Writer is None:
raise OEBError('Unknown e-book file extension %r' % ext)
return Writer
def option_parser(Reader, Writer):
cfg = Config('ebook-convert', _('Options to control e-book conversion.'))
Reader.config(cfg)
for Transform in chain(Reader.TRANSFORMS, Writer.TRANSFORMS):
Transform.config(cfg)
Writer.config(cfg)
parser = cfg.option_parser()
parser.add_option('--encoding', default=None,
help=_('Character encoding for input. Default is to auto detect.'))
parser.add_option('-o', '--output', default=None,
help=_('Output file. Default is derived from input filename.'))
parser.add_option('-p', '--pretty-print', action='store_true',
default=False, help=_('Produce more human-readable XML output.'))
parser.add_option('-v', '--verbose', default=0, action='count',
help=_('Useful for debugging.'))
return parser
def main(argv=sys.argv):
if len(argv) < 3:
print _("Usage: ebook-convert INFILE OUTFILE [OPTIONS..]")
return 1
inpath, outpath = argv[1], argv[2]
Reader = ReaderFactory(inpath)
Writer = WriterFactory(outpath)
parser = option_parser(Reader, Writer)
opts, args = parser.parse_args(argv[3:])
if len(args) != 0:
parser.print_help()
return 1
logger = Logger(logging.getLogger('ebook-convert'))
logger.setup_cli_handler(opts.verbose)
encoding = opts.encoding
pretty_print = opts.pretty_print
oeb = OEBBook(encoding=encoding, pretty_print=pretty_print, logger=logger)
context = Context(Reader.DEFAULT_PROFILE, Writer.DEFAULT_PROFILE)
reader = Reader.generate(opts)
writer = Writer.generate(opts)
transforms = []
for Transform in chain(Reader.TRANSFORMS, Writer.TRANSFORMS):
transforms.append(Transform.generate(opts))
reader(oeb, inpath)
for transform in transforms:
transform(oeb, context)
writer(oeb, outpath)
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -0,0 +1,562 @@
"""
Container-/OPF-based input OEBBook reader.
"""
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2008, Marshall T. Vandegrift <llasram@gmail.com>'
import sys, os, uuid, copy
from itertools import izip, chain
from urlparse import urldefrag, urlparse
from urllib import unquote as urlunquote
from mimetypes import guess_type
from collections import defaultdict
from lxml import etree
from calibre.ebooks.oeb.base import OPF1_NS, OPF2_NS, OPF2_NSMAP, DC11_NS, \
DC_NSES, OPF
from calibre.ebooks.oeb.base import OEB_DOCS, OEB_STYLES, OEB_IMAGES, \
PAGE_MAP_MIME, JPEG_MIME, NCX_MIME, SVG_MIME
from calibre.ebooks.oeb.base import XMLDECL_RE, COLLAPSE_RE, CSSURL_RE, \
ENTITY_RE, LINK_SELECTORS, MS_COVER_TYPE
from calibre.ebooks.oeb.base import namespace, barename, qname, XPath, xpath
from calibre.ebooks.oeb.base import urlnormalize, xml2str
from calibre.ebooks.oeb.base import OEBError, OEBBook, DirContainer
from calibre.ebooks.oeb.writer import OEBWriter
from calibre.ebooks.oeb.entitydefs import ENTITYDEFS
from calibre.ebooks.metadata.epub import CoverRenderer
from calibre.startup import get_lang
from calibre.ptempfile import TemporaryDirectory
__all__ = ['OEBReader']
class OEBReader(object):
"""Read an OEBPS 1.x or OPF/OPS 2.0 file collection."""
COVER_SVG_XP = XPath('h:body//svg:svg[position() = 1]')
COVER_OBJECT_XP = XPath('h:body//h:object[@data][position() = 1]')
Container = DirContainer
"""Container type used to access book files. Override in sub-classes."""
DEFAULT_PROFILE = 'PRS505'
"""Default renderer profile for content read with this Reader."""
TRANSFORMS = []
"""List of transforms to apply to content read with this Reader."""
def __init__(self):
return
@classmethod
def config(cls, cfg):
"""Add any book-reading options to the :class:`Config` object
:param:`cfg`.
"""
return
@classmethod
def generate(cls, opts):
"""Generate a Reader instance from command-line options."""
return cls()
def __call__(self, oeb, path):
"""Read the book at :param:`path` into the :class:`OEBBook` object
:param:`oeb`.
"""
self.oeb = oeb
self.logger = oeb.logger
oeb.container = self.Container(path)
opf = self._read_opf()
self._all_from_opf(opf)
return oeb
def _clean_opf(self, opf):
nsmap = {}
for elem in opf.iter(tag=etree.Element):
nsmap.update(elem.nsmap)
for elem in opf.iter(tag=etree.Element):
if namespace(elem.tag) in ('', OPF1_NS):
elem.tag = OPF(barename(elem.tag))
nsmap.update(OPF2_NSMAP)
attrib = dict(opf.attrib)
nroot = etree.Element(OPF('package'),
nsmap={None: OPF2_NS}, attrib=attrib)
metadata = etree.SubElement(nroot, OPF('metadata'), nsmap=nsmap)
ignored = (OPF('dc-metadata'), OPF('x-metadata'))
for elem in xpath(opf, 'o2:metadata//*'):
if elem.tag in ignored:
continue
if namespace(elem.tag) in DC_NSES:
tag = barename(elem.tag).lower()
elem.tag = '{%s}%s' % (DC11_NS, tag)
metadata.append(elem)
for element in xpath(opf, 'o2:metadata//o2:meta'):
metadata.append(element)
for tag in ('o2:manifest', 'o2:spine', 'o2:tours', 'o2:guide'):
for element in xpath(opf, tag):
nroot.append(element)
return nroot
def _read_opf(self):
data = self.oeb.container.read(None)
data = self.oeb.decode(data)
data = XMLDECL_RE.sub('', data)
try:
opf = etree.fromstring(data)
except etree.XMLSyntaxError:
repl = lambda m: ENTITYDEFS.get(m.group(1), m.group(0))
data = ENTITY_RE.sub(repl, data)
opf = etree.fromstring(data)
self.logger.warn('OPF contains invalid HTML named entities')
ns = namespace(opf.tag)
if ns not in ('', OPF1_NS, OPF2_NS):
raise OEBError('Invalid namespace %r for OPF document' % ns)
opf = self._clean_opf(opf)
return opf
def _metadata_from_opf(self, opf):
uid = opf.get('unique-identifier', None)
self.oeb.uid = None
metadata = self.oeb.metadata
for elem in xpath(opf, '/o2:package/o2:metadata//*'):
term = elem.tag
value = elem.text
attrib = dict(elem.attrib)
nsmap = elem.nsmap
if term == OPF('meta'):
term = qname(attrib.pop('name', None), nsmap)
value = attrib.pop('content', None)
if value:
value = COLLAPSE_RE.sub(' ', value.strip())
if term and (value or attrib):
metadata.add(term, value, attrib, nsmap=nsmap)
haveuuid = haveid = False
for ident in metadata.identifier:
if unicode(ident).startswith('urn:uuid:'):
haveuuid = True
if 'id' in ident.attrib:
haveid = True
if not (haveuuid and haveid):
bookid = "urn:uuid:%s" % str(uuid.uuid4())
metadata.add('identifier', bookid, id='calibre-uuid')
if uid is None:
self.logger.warn(u'Unique-identifier not specified')
for item in metadata.identifier:
if not item.id:
continue
if uid is None or item.id == uid:
self.oeb.uid = item
break
else:
self.logger.warn(u'Unique-identifier %r not found' % uid)
for ident in metadata.identifier:
if 'id' in ident.attrib:
self.oeb.uid = metadata.identifier[0]
break
if not metadata.language:
self.logger.warn(u'Language not specified')
metadata.add('language', get_lang())
if not metadata.creator:
self.logger.warn('Creator not specified')
metadata.add('creator', self.oeb.translate(__('Unknown')))
if not metadata.title:
self.logger.warn('Title not specified')
metadata.add('title', self.oeb.translate(__('Unknown')))
def _manifest_add_missing(self):
manifest = self.oeb.manifest
known = set(manifest.hrefs)
unchecked = set(manifest.values())
while unchecked:
new = set()
for item in unchecked:
if (item.media_type in OEB_DOCS or
item.media_type[-4:] in ('/xml', '+xml')) and \
item.data is not None:
hrefs = [sel(item.data) for sel in LINK_SELECTORS]
for href in chain(*hrefs):
href, _ = urldefrag(href)
if not href:
continue
href = item.abshref(urlnormalize(href))
scheme = urlparse(href).scheme
if not scheme and href not in known:
new.add(href)
elif item.media_type in OEB_STYLES:
for match in CSSURL_RE.finditer(item.data):
href, _ = urldefrag(match.group('url'))
href = item.abshref(urlnormalize(href))
scheme = urlparse(href).scheme
if not scheme and href not in known:
new.add(href)
unchecked.clear()
for href in new:
known.add(href)
if not self.oeb.container.exists(href):
self.logger.warn('Referenced file %r not found' % href)
continue
self.logger.warn('Referenced file %r not in manifest' % href)
id, _ = manifest.generate(id='added')
guessed = guess_type(href)[0]
media_type = guessed or BINARY_MIME
added = manifest.add(id, href, media_type)
unchecked.add(added)
def _manifest_from_opf(self, opf):
manifest = self.oeb.manifest
for elem in xpath(opf, '/o2:package/o2:manifest/o2:item'):
id = elem.get('id')
href = elem.get('href')
media_type = elem.get('media-type', None)
if media_type is None:
media_type = elem.get('mediatype', None)
if media_type is None or media_type == 'text/xml':
guessed = guess_type(href)[0]
media_type = guessed or media_type or BINARY_MIME
fallback = elem.get('fallback')
if href in manifest.hrefs:
self.logger.warn(u'Duplicate manifest entry for %r' % href)
continue
if not self.oeb.container.exists(href):
self.logger.warn(u'Manifest item %r not found' % href)
continue
if id in manifest.ids:
self.logger.warn(u'Duplicate manifest id %r' % id)
id, href = manifest.generate(id, href)
manifest.add(id, href, media_type, fallback)
self._manifest_add_missing()
def _spine_add_extra(self):
manifest = self.oeb.manifest
spine = self.oeb.spine
unchecked = set(spine)
selector = XPath('h:body//h:a/@href')
extras = set()
while unchecked:
new = set()
for item in unchecked:
if item.media_type not in OEB_DOCS:
# TODO: handle fallback chains
continue
for href in selector(item.data):
href, _ = urldefrag(href)
if not href:
continue
href = item.abshref(urlnormalize(href))
if href not in manifest.hrefs:
continue
found = manifest.hrefs[href]
if found.media_type not in OEB_DOCS or \
found in spine or found in extras:
continue
new.add(found)
extras.update(new)
unchecked = new
version = int(self.oeb.version[0])
for item in sorted(extras):
if version >= 2:
self.logger.warn(
'Spine-referenced file %r not in spine' % item.href)
spine.add(item, linear=False)
def _spine_from_opf(self, opf):
spine = self.oeb.spine
manifest = self.oeb.manifest
for elem in xpath(opf, '/o2:package/o2:spine/o2:itemref'):
idref = elem.get('idref')
if idref not in manifest.ids:
self.logger.warn(u'Spine item %r not found' % idref)
continue
item = manifest.ids[idref]
spine.add(item, elem.get('linear'))
if len(spine) == 0:
raise OEBError("Spine is empty")
self._spine_add_extra()
def _guide_from_opf(self, opf):
guide = self.oeb.guide
manifest = self.oeb.manifest
for elem in xpath(opf, '/o2:package/o2:guide/o2:reference'):
href = elem.get('href')
path = urldefrag(href)[0]
if path not in manifest.hrefs:
self.logger.warn(u'Guide reference %r not found' % href)
continue
guide.add(elem.get('type'), elem.get('title'), href)
def _find_ncx(self, opf):
result = xpath(opf, '/o2:package/o2:spine/@toc')
if result:
id = result[0]
if id not in self.oeb.manifest.ids:
return None
item = self.oeb.manifest.ids[id]
self.oeb.manifest.remove(item)
return item
for item in self.oeb.manifest.values():
if item.media_type == NCX_MIME:
self.oeb.manifest.remove(item)
return item
return None
def _toc_from_navpoint(self, item, toc, navpoint):
children = xpath(navpoint, 'ncx:navPoint')
for child in children:
title = ''.join(xpath(child, 'ncx:navLabel/ncx:text/text()'))
title = COLLAPSE_RE.sub(' ', title.strip())
href = xpath(child, 'ncx:content/@src')
if not title or not href:
continue
href = item.abshref(urlnormalize(href[0]))
path, _ = urldefrag(href)
if path not in self.oeb.manifest.hrefs:
self.logger.warn('TOC reference %r not found' % href)
continue
id = child.get('id')
klass = child.get('class')
node = toc.add(title, href, id=id, klass=klass)
self._toc_from_navpoint(item, node, child)
def _toc_from_ncx(self, item):
if item is None:
return False
ncx = item.data
title = ''.join(xpath(ncx, 'ncx:docTitle/ncx:text/text()'))
title = COLLAPSE_RE.sub(' ', title.strip())
title = title or unicode(self.oeb.metadata.title[0])
toc = self.oeb.toc
toc.title = title
navmaps = xpath(ncx, 'ncx:navMap')
for navmap in navmaps:
self._toc_from_navpoint(item, toc, navmap)
return True
def _toc_from_tour(self, opf):
result = xpath(opf, 'o2:tours/o2:tour')
if not result:
return False
tour = result[0]
toc = self.oeb.toc
toc.title = tour.get('title')
sites = xpath(tour, 'o2:site')
for site in sites:
title = site.get('title')
href = site.get('href')
if not title or not href:
continue
path, _ = urldefrag(urlnormalize(href))
if path not in self.oeb.manifest.hrefs:
self.logger.warn('TOC reference %r not found' % href)
continue
id = site.get('id')
toc.add(title, href, id=id)
return True
def _toc_from_html(self, opf):
if 'toc' not in self.oeb.guide:
return False
itempath, frag = urldefrag(self.oeb.guide['toc'].href)
item = self.oeb.manifest.hrefs[itempath]
html = item.data
if frag:
elems = xpath(html, './/*[@id="%s"]' % frag)
if not elems:
elems = xpath(html, './/*[@name="%s"]' % frag)
elem = elems[0] if elems else html
while elem != html and not xpath(elem, './/h:a[@href]'):
elem = elem.getparent()
html = elem
titles = defaultdict(list)
order = []
for anchor in xpath(html, './/h:a[@href]'):
href = anchor.attrib['href']
href = item.abshref(urlnormalize(href))
path, frag = urldefrag(href)
if path not in self.oeb.manifest.hrefs:
continue
title = ' '.join(xpath(anchor, './/text()'))
title = COLLAPSE_RE.sub(' ', title.strip())
if href not in titles:
order.append(href)
titles[href].append(title)
toc = self.oeb.toc
for href in order:
toc.add(' '.join(titles[href]), href)
return True
def _toc_from_spine(self, opf):
toc = self.oeb.toc
titles = []
headers = []
for item in self.oeb.spine:
if not item.linear: continue
html = item.data
title = ''.join(xpath(html, '/h:html/h:head/h:title/text()'))
title = COLLAPSE_RE.sub(' ', title.strip())
if title:
titles.append(title)
headers.append('(unlabled)')
for tag in ('h1', 'h2', 'h3', 'h4', 'h5', 'strong'):
expr = '/h:html/h:body//h:%s[position()=1]/text()'
header = ''.join(xpath(html, expr % tag))
header = COLLAPSE_RE.sub(' ', header.strip())
if header:
headers[-1] = header
break
use = titles
if len(titles) > len(set(titles)):
use = headers
for title, item in izip(use, self.oeb.spine):
if not item.linear: continue
toc.add(title, item.href)
return True
def _toc_from_opf(self, opf, item):
if self._toc_from_ncx(item): return
if self._toc_from_tour(opf): return
self.logger.warn('No metadata table of contents found')
if self._toc_from_html(opf): return
self._toc_from_spine(opf)
def _pages_from_ncx(self, opf, item):
if item is None:
return False
ncx = item.data
ptargets = xpath(ncx, 'ncx:pageList/ncx:pageTarget')
if not ptargets:
return False
pages = self.oeb.pages
for ptarget in ptargets:
name = ''.join(xpath(ptarget, 'ncx:navLabel/ncx:text/text()'))
name = COLLAPSE_RE.sub(' ', name.strip())
href = xpath(ptarget, 'ncx:content/@src')
if not href:
continue
href = item.abshref(urlnormalize(href[0]))
id = ptarget.get('id')
type = ptarget.get('type', 'normal')
klass = ptarget.get('class')
pages.add(name, href, type=type, id=id, klass=klass)
return True
def _find_page_map(self, opf):
result = xpath(opf, '/o2:package/o2:spine/@page-map')
if result:
id = result[0]
if id not in self.oeb.manifest.ids:
return None
item = self.oeb.manifest.ids[id]
self.oeb.manifest.remove(item)
return item
for item in self.oeb.manifest.values():
if item.media_type == PAGE_MAP_MIME:
self.oeb.manifest.remove(item)
return item
return None
def _pages_from_page_map(self, opf):
item = self._find_page_map(opf)
if item is None:
return False
pmap = item.data
pages = self.oeb.pages
for page in xpath(pmap, 'o2:page'):
name = page.get('name', '')
href = page.get('href')
if not href:
continue
name = COLLAPSE_RE.sub(' ', name.strip())
href = item.abshref(urlnormalize(href))
type = 'normal'
if not name:
type = 'special'
elif name.lower().strip('ivxlcdm') == '':
type = 'front'
pages.add(name, href, type=type)
return True
def _pages_from_opf(self, opf, item):
if self._pages_from_ncx(opf, item): return
if self._pages_from_page_map(opf): return
return
def _cover_from_html(self, hcover):
with TemporaryDirectory('_html_cover') as tdir:
writer = OEBWriter()
writer(self.oeb, tdir)
path = os.path.join(tdir, urlunquote(hcover.href))
renderer = CoverRenderer(path)
data = renderer.image_data
id, href = self.oeb.manifest.generate('cover', 'cover.jpeg')
item = self.oeb.manifest.add(id, href, JPEG_MIME, data=data)
return item
def _locate_cover_image(self):
if self.oeb.metadata.cover:
id = str(self.oeb.metadata.cover[0])
item = self.oeb.manifest.ids.get(id, None)
if item is not None and item.media_type in OEB_IMAGES:
return item
else:
self.logger.warn('Invalid cover image @id %r' % id)
hcover = self.oeb.spine[0]
if 'cover' in self.oeb.guide:
href = self.oeb.guide['cover'].href
item = self.oeb.manifest.hrefs[href]
media_type = item.media_type
if media_type in OEB_IMAGES:
return item
elif media_type in OEB_DOCS:
hcover = item
html = hcover.data
if MS_COVER_TYPE in self.oeb.guide:
href = self.oeb.guide[MS_COVER_TYPE].href
item = self.oeb.manifest.hrefs.get(href, None)
if item is not None and item.media_type in OEB_IMAGES:
return item
if self.COVER_SVG_XP(html):
svg = copy.deepcopy(self.COVER_SVG_XP(html)[0])
href = os.path.splitext(hcover.href)[0] + '.svg'
id, href = self.oeb.manifest.generate(hcover.id, href)
item = self.oeb.manifest.add(id, href, SVG_MIME, data=svg)
return item
if self.COVER_OBJECT_XP(html):
object = self.COVER_OBJECT_XP(html)[0]
href = hcover.abshref(object.get('data'))
item = self.oeb.manifest.hrefs.get(href, None)
if item is not None and item.media_type in OEB_IMAGES:
return item
return self._cover_from_html(hcover)
def _ensure_cover_image(self):
cover = self._locate_cover_image()
if self.oeb.metadata.cover:
self.oeb.metadata.cover[0].value = cover.id
return
self.oeb.metadata.add('cover', cover.id)
def _all_from_opf(self, opf):
self.oeb.version = opf.get('version', '1.2')
self._metadata_from_opf(opf)
self._manifest_from_opf(opf)
self._spine_from_opf(opf)
self._guide_from_opf(opf)
item = self._find_ncx(opf)
self._toc_from_opf(opf, item)
self._pages_from_opf(opf, item)
self._ensure_cover_image()
def main(argv=sys.argv):
reader = OEBReader()
for arg in argv[1:]:
oeb = reader(OEBBook(), arg)
for name, doc in oeb.to_opf1().values():
print etree.tostring(doc, pretty_print=True)
for name, doc in oeb.to_opf2(page_map=True).values():
print etree.tostring(doc, pretty_print=True)
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -94,7 +94,15 @@ class CSSFlattener(object):
self.unfloat = unfloat
self.untable = untable
def transform(self, oeb, context):
@classmethod
def config(cls, cfg):
return cfg
@classmethod
def generate(cls, opts):
return cls()
def __call__(self, oeb, context):
oeb.logger.info('Flattening CSS and remapping font sizes...')
self.oeb = oeb
self.context = context

View File

@ -52,7 +52,18 @@ class HTMLTOCAdder(object):
self.title = title
self.style = style
def transform(self, oeb, context):
@classmethod
def config(cls, cfg):
group = cfg.add_group('htmltoc', _('HTML TOC generation options.'))
group('toc_title', ['--toc-title'], default=None,
help=_('Title for any generated in-line table of contents.'))
return cfg
@classmethod
def generate(cls, opts):
return cls(title=opts.toc_title)
def __call__(self, oeb, context):
if 'toc' in oeb.guide:
return
oeb.logger.info('Generating in-line TOC...')

View File

@ -29,7 +29,15 @@ CASE_MANGLER_CSS = """
TEXT_TRANSFORMS = set(['capitalize', 'uppercase', 'lowercase'])
class CaseMangler(object):
def transform(self, oeb, context):
@classmethod
def config(cls, cfg):
return cfg
@classmethod
def generate(cls, opts):
return cls()
def __call__(self, oeb, context):
oeb.logger.info('Applying case-transforming CSS...')
self.oeb = oeb
self.profile = context.source

View File

@ -34,7 +34,15 @@ class SVGRasterizer(object):
if QApplication.instance() is None:
QApplication([])
def transform(self, oeb, context):
@classmethod
def config(cls, cfg):
return cfg
@classmethod
def generate(cls, opts):
return cls()
def __call__(self, oeb, context):
oeb.logger.info('Rasterizing SVG images...')
self.oeb = oeb
self.profile = context.dest

View File

@ -13,7 +13,15 @@ from calibre.ebooks.oeb.base import LINK_SELECTORS, CSSURL_RE
from calibre.ebooks.oeb.base import urlnormalize
class ManifestTrimmer(object):
def transform(self, oeb, context):
@classmethod
def config(cls, cfg):
return cfg
@classmethod
def generate(cls, opts):
return cls()
def __call__(self, oeb, context):
oeb.logger.info('Trimming unused files from manifest...')
used = set()
hrefs = oeb.manifest.hrefs

View File

@ -0,0 +1,75 @@
'''
Directory output OEBBook writer.
'''
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2008, Marshall T. Vandegrift <llasram@gmail.com>'
import sys, os, logging
from calibre.ebooks.oeb.base import OPF_MIME, xml2str
from calibre.ebooks.oeb.base import Logger, DirContainer, OEBBook
__all__ = ['OEBWriter']
class OEBWriter(object):
DEFAULT_PROFILE = 'PRS505'
"""Default renderer profile for content written with this Writer."""
TRANSFORMS = []
"""List of transforms to apply to content written with this Writer."""
def __init__(self, version='2.0', page_map=False, pretty_print=False):
self.version = version
self.page_map = page_map
self.pretty_print = pretty_print
@classmethod
def config(cls, cfg):
"""Add any book-writing options to the :class:`Config` object
:param:`cfg`.
"""
oeb = cfg.add_group('oeb', _('OPF/NCX/etc. generation options.'))
versions = ['1.2', '2.0']
oeb('opf_version', ['--opf-version'], default='2.0', choices=versions,
help=_('OPF version to generate. Default is %default.'))
oeb('adobe_page_map', ['--adobe-page-map'], default=False,
help=_('Generate an Adobe "page-map" file if pagination '
'information is avaliable.'))
return cfg
@classmethod
def generate(cls, opts):
"""Generate a Writer instance from command-line options."""
version = opts.opf_version
page_map = opts.adobe_page_map
pretty_print = opts.pretty_print
return cls(version=version, page_map=page_map,
pretty_print=pretty_print)
def __call__(self, oeb, path):
"""Read the book in the :class:`OEBBook` object :param:`oeb` to a file
at :param:`path`.
"""
version = int(self.version[0])
opfname = None
if os.path.splitext(path)[1].lower() == '.opf':
opfname = os.path.basename(path)
path = os.path.dirname(path)
if not os.path.isdir(path):
os.mkdir(path)
output = DirContainer(path)
for item in oeb.manifest.values():
output.write(item.href, str(item))
if version == 1:
metadata = oeb.to_opf1()
elif version == 2:
metadata = oeb.to_opf2(page_map=self.page_map)
else:
raise OEBError("Unrecognized OPF version %r" % self.version)
pretty_print = self.pretty_print
for mime, (href, data) in metadata.items():
if opfname and mime == OPF_MIME:
href = opfname
output.write(href, xml2str(data, pretty_print=pretty_print))
return