Added path clean-up and basic extraction method.

This commit is contained in:
Marshall T. Vandegrift 2008-07-19 14:51:31 -04:00
parent 1247adc0e8
commit 3737fd3e13

View File

@ -1,8 +1,10 @@
'''
Support for reading LIT files.
'''
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
'''
Support for reading the metadata from a lit file.
'''
import sys, struct, cStringIO, os
import functools
@ -39,6 +41,13 @@ RESET_HDRLEN = 12
RESET_UCLENGTH = 16
RESET_INTERVAL = 32
FLAG_OPENING = 1
FLAG_CLOSING = 2
FLAG_BLOCK = 4
FLAG_HEAD = 8
FLAG_ATOM = 16
XML_ENTITIES = ['&amp;', '&apos;', '&lt;', '&gt;', '&quot;']
def u32(bytes):
return struct.unpack('<L', bytes[:4])[0]
@ -87,13 +96,6 @@ def read_utf8_char(bytes, pos):
c = (c << 6) | (b & 0x3F)
return unichr(c), pos+elsize
FLAG_OPENING = 1
FLAG_CLOSING = 2
FLAG_BLOCK = 4
FLAG_HEAD = 8
FLAG_ATOM = 16
XML_ENTITIES = ['&amp;', '&apos;', '&lt;', '&gt;', '&quot;']
class UnBinary(object):
def __init__(self, bin, manifest, map=OPF_MAP):
self.manifest = manifest
@ -123,7 +125,10 @@ class UnBinary(object):
offset += 4
def item_path(self, internal_id):
return self.manifest.get(internal_id, internal_id)
try:
return self.manifest[internal_id].path
except KeyError:
return internal_id
def __unicode__(self):
return self.raw
@ -325,9 +330,7 @@ class ManifestItem(object):
self.offset = offset
self.root = root
self.state = state
self.prefix = state if state in ('images', 'css') else ''
self.prefix = self.prefix + os.sep if self.prefix else ''
self.path = self.prefix + self.original
self.path = self.original
def __eq__(self, other):
if hasattr(other, 'internal'):
@ -335,7 +338,7 @@ class ManifestItem(object):
return self.internal == other
def __repr__(self):
return "ManifestItem(internal='%s', path='%s')" \
return "ManifestItem(internal=%s, path=%s)" \
% (repr(self.internal), repr(self.path))
def preserve(function):
@ -348,7 +351,7 @@ def preserve(function):
functools.update_wrapper(wrapper, function)
return wrapper
class LitFile(object):
class LitReader(object):
PIECE_SIZE = 16
def magic():
@ -398,7 +401,6 @@ class LitFile(object):
return property(fget=fget)
guid = guid()
def header():
@preserve
def fget(self):
@ -410,8 +412,11 @@ class LitFile(object):
return property(fget=fget)
header = header()
def __init__(self, stream):
self._stream = stream
def __init__(self, filename_or_stream):
if hasattr(filename_or_stream, 'read'):
self._stream = filename_or_stream
else:
self._stream = open(filename_or_stream, 'rb')
if self.magic != 'ITOLITLS':
raise LitError('Not a valid LIT file')
if self.version != 1:
@ -467,7 +472,7 @@ class LitFile(object):
def read_header_pieces(self):
src = self.header[self.hdr_len:]
for i in range(self.num_pieces):
piece = src[i*self.PIECE_SIZE:(i+1)*self.PIECE_SIZE]
piece = src[i * self.PIECE_SIZE:(i + 1) * self.PIECE_SIZE]
if u32(piece[4:]) != 0 or u32(piece[12:]) != 0:
raise LitError('Piece %s has 64bit value' % repr(piece))
offset, size = u32(piece), int32(piece[8:])
@ -495,10 +500,8 @@ class LitFile(object):
if not piece.startswith('IFCM'):
raise LitError('Header piece #1 is not main directory.')
chunk_size, num_chunks = int32(piece[8:12]), int32(piece[24:28])
if (32 + chunk_size * num_chunks) != len(piece):
raise LitError('IFCM HEADER has incorrect length')
for chunk in range(num_chunks):
p = 32 + chunk * chunk_size
if piece[p:p+4] != 'AOLL':
@ -563,46 +566,39 @@ class LitFile(object):
def read_manifest(self, entry):
self.manifest = {}
raw = self._read_content(entry.offset, entry.size)
pos = 0
while pos < len(raw):
size = ord(raw[pos])
if size == 0: break
pos += 1
root = raw[pos:pos+size].decode('utf8')
pos += size
if pos >= len(raw):
raise LitError('Truncated manifest.')
while raw:
slen, raw = ord(raw[0]), raw[1:]
if slen == 0: break
root, raw = raw[:slen].decode('utf8'), raw[slen:]
if not raw:
raise LitError('Truncated manifest')
for state in ['spine', 'not spine', 'css', 'images']:
num_files = int32(raw[pos:pos+4])
pos += 4
num_files, raw = int32(raw), raw[4:]
if num_files == 0: continue
i = 0
while i < num_files:
if pos+5 >= len(raw):
raise LitError('Truncated manifest.')
offset = u32(raw[pos:pos+4])
pos += 4
slen = ord(raw[pos])
pos += 1
internal = raw[pos:pos+slen].decode('utf8')
pos += slen
slen = ord(raw[pos])
pos += 1
original = raw[pos:pos+slen].decode('utf8')
pos += slen
slen = ord(raw[pos])
pos += 1
mime_type = raw[pos:pos+slen].decode('utf8')
pos += slen + 1
self.manifest[internal] = \
ManifestItem(original, internal, mime_type,
offset, root, state)
i += 1
for i in xrange(num_files):
if len(raw) < 5:
raise LitError('Truncated manifest')
offset, raw = u32(raw), raw[4:]
slen, raw = ord(raw[0]), raw[1:]
internal, raw = raw[:slen].decode('utf8'), raw[slen:]
slen, raw = ord(raw[0]), raw[1:]
original, raw = raw[:slen].decode('utf8'), raw[slen:]
slen, raw = ord(raw[0]), raw[1:]
mime_type, raw = raw[:slen].decode('utf8'), raw[slen+1:]
self.manifest[internal] = ManifestItem(
original, internal, mime_type, offset, root, state)
mlist = self.manifest.values()
shared = mlist[0].path
for item in mlist[1:]:
path = item.path
while not path.startswith(shared):
shared = shared[:-1]
if shared == '':
break
else:
slen = len(shared)
for item in mlist:
item.path = item.path[slen:]
def read_meta(self, entry):
raw = self._read_content(entry.offset, entry.size)
@ -610,16 +606,12 @@ class LitFile(object):
self.meta = xml
def read_drm(self):
def exists_file(name):
try: self.get_file(name)
except KeyError: return False
return True
self.drmlevel = 0
if exists_file('/DRMStorage/Licenses/EUL'):
if '/DRMStorage/Licenses/EUL' in self.entries:
self.drmlevel = 5
elif exists_file('/DRMStorage/DRMBookplate'):
elif '/DRMStorage/DRMBookplate' in self.entries:
self.drmlevel = 3
elif exists_file('/DRMStorage/DRMSealed'):
elif '/DRMStorage/DRMSealed' in self.entries:
self.drmlevel = 1
else:
return
@ -686,7 +678,10 @@ class LitFile(object):
content = self._decrypt(content)
control = control[csize:]
elif guid == LZXCOMPRESS_GUID:
content = self._decompress_section(name, control, content)
reset_table = self.get_file(
'/'.join(['::DataSpace/Storage', name, 'Transform',
LZXCOMPRESS_GUID, 'InstanceData/ResetTable']))
content = self._decompress(content, control, reset_table)
control = control[csize:]
else:
raise LitError("Unrecognized transform: %s." % repr(guid))
@ -698,9 +693,14 @@ class LitFile(object):
raise LitError('Cannot extract content from a DRM protected ebook')
return msdes.new(self.bookkey).decrypt(content)
def _decompress_section(self, name, control, content):
def _decompress(self, content, control, reset_table):
if len(control) < 32 or u32(control[CONTROL_TAG:]) != LZXC_TAG:
raise LitError("Invalid ControlData tag value")
if len(reset_table) < (RESET_INTERVAL + 8):
raise LitError("Reset table is too short")
if u32(reset_table[RESET_UCLENGTH + 4:]) != 0:
raise LitError("Reset table has 64bit value for UCLENGTH")
result = []
window_size = 14
@ -712,13 +712,6 @@ class LitFile(object):
raise LitError("Invalid window in ControlData")
lzx.init(window_size)
reset_table = self.get_file('/'.join(
['::DataSpace/Storage', name, 'Transform',
LZXCOMPRESS_GUID, 'InstanceData/ResetTable']))
if len(reset_table) < (RESET_INTERVAL + 8):
raise LitError("Reset table is too short")
if u32(reset_table[RESET_UCLENGTH + 4:]) != 0:
raise LitError("Reset table has 64bit value for UCLENGTH")
ofs_entry = int32(reset_table[RESET_HDRLEN:]) + 8
uclength = int32(reset_table[RESET_UCLENGTH:])
accum = int32(reset_table[RESET_INTERVAL:])
@ -751,9 +744,36 @@ class LitFile(object):
raise LitError("Failed to completely decompress section")
return ''.join(result)
def extract_content(self, output_dir=os.getcwdu()):
output_dir = os.path.abspath(output_dir)
try:
opf_path = os.path.splitext(
os.path.basename(self._stream.name))[0] + '.opf'
except AttributeError:
opf_path = 'content.opf'
opf_path = os.path.join(output_dir, opf_path)
self._ensure_dir(opf_path)
with open(opf_path, 'w') as f:
f.write(self.get_markup_file('/meta').encode('utf-8'))
for entry in self.manifest.values():
path = os.path.join(output_dir, entry.path)
self._ensure_dir(path)
with open(path, 'w') as f:
if 'spine' in entry.state:
name = '/'.join(['/data', entry.internal, 'content'])
f.write(self.get_markup_file(name).encode('utf-8'))
else:
name = '/'.join(['/data', entry.internal])
f.write(self.get_file(name))
def _ensure_dir(self, path):
dir = os.path.dirname(path)
if not os.path.isdir(dir):
os.makedirs(dir)
def get_metadata(stream):
try:
litfile = LitFile(stream)
litfile = LitReader(stream)
src = litfile.meta.encode('utf-8')
mi = OPFReader(cStringIO.StringIO(src), dir=os.getcwd())
cover_url, cover_item = mi.cover, None
@ -775,16 +795,24 @@ def get_metadata(stream):
mi = MetaInformation(title, ['Unknown'])
return mi
def option_parser():
from calibre import OptionParser
parser = OptionParser(usage=_('%prog [options] EBOOK'))
parser.add_option('-o', '--output-dir', default='.',
help=_('Output directory. Defaults to current directory.'))
parser.add_option('--verbose', default=False, action='store_true',
help='Useful for debugging.')
return parser
def main(args=sys.argv):
parser = option_parser()
opts, args = parser.parse_args(args)
if len(args) != 2:
print >>sys.stderr, _('Usage: %s file.lit')%(args[0],)
parser.print_help()
return 1
mi = get_metadata(open(args[1], 'rb'))
print unicode(mi)
if mi.cover_data[1]:
cover = os.path.abspath(os.path.splitext(os.path.basename(args[1]))[0] + '.' + mi.cover_data[0])
open(cover, 'wb').write(mi.cover_data[1])
print _('Cover saved to'), cover
lr = LitReader(args[1])
lr.extract_content(opts.output_dir)
print _('OEB ebook created in'), opts.output_dir
return 0
if __name__ == '__main__':