MOBI inspect: Dump secondary index header

This commit is contained in:
Kovid Goyal 2011-07-30 23:57:01 -06:00
parent 0f88fd4f0d
commit c7e10229d4

View File

@ -413,6 +413,109 @@ class TagX(object): # {{{
self.num_values, bin(self.bitmask), self.eof)
# }}}
class SecondaryIndexHeader(object): # {{{
def __init__(self, record):
self.record = record
raw = self.record.raw
#open('/t/index_header.bin', 'wb').write(raw)
if raw[:4] != b'INDX':
raise ValueError('Invalid Secondary Index Record')
self.header_length, = struct.unpack('>I', raw[4:8])
self.unknown1 = raw[8:16]
self.index_type, = struct.unpack('>I', raw[16:20])
self.index_type_desc = {0: 'normal', 2:
'inflection', 6: 'calibre'}.get(self.index_type, 'unknown')
self.idxt_start, = struct.unpack('>I', raw[20:24])
self.index_count, = struct.unpack('>I', raw[24:28])
self.index_encoding_num, = struct.unpack('>I', raw[28:32])
self.index_encoding = {65001: 'utf-8', 1252:
'cp1252'}.get(self.index_encoding_num, 'unknown')
if self.index_encoding == 'unknown':
raise ValueError(
'Unknown index encoding: %d'%self.index_encoding_num)
self.unknown2 = raw[32:36]
self.num_index_entries, = struct.unpack('>I', raw[36:40])
self.ordt_start, = struct.unpack('>I', raw[40:44])
self.ligt_start, = struct.unpack('>I', raw[44:48])
self.num_of_ligt_entries, = struct.unpack('>I', raw[48:52])
self.num_of_cncx_blocks, = struct.unpack('>I', raw[52:56])
self.unknown3 = raw[56:180]
self.tagx_offset, = struct.unpack(b'>I', raw[180:184])
if self.tagx_offset != self.header_length:
raise ValueError('TAGX offset and header length disagree')
self.unknown4 = raw[184:self.header_length]
tagx = raw[self.header_length:]
if not tagx.startswith(b'TAGX'):
raise ValueError('Invalid TAGX section')
self.tagx_header_length, = struct.unpack('>I', tagx[4:8])
self.tagx_control_byte_count, = struct.unpack('>I', tagx[8:12])
tag_table = tagx[12:self.tagx_header_length]
if len(tag_table) % 4 != 0:
raise ValueError('Invalid Tag table')
num_tagx_entries = len(tag_table) // 4
self.tagx_entries = []
for i in range(num_tagx_entries):
self.tagx_entries.append(TagX(tag_table[i*4:(i+1)*4],
self.tagx_control_byte_count))
if self.tagx_entries and not self.tagx_entries[-1].is_eof:
raise ValueError('TAGX last entry is not EOF')
idxt0_pos = self.header_length+self.tagx_header_length
num = ord(raw[idxt0_pos])
count_pos = idxt0_pos+1+num
self.last_entry = raw[idxt0_pos+1:count_pos]
self.ncx_count, = struct.unpack(b'>H', raw[count_pos:count_pos+2])
# There may be some alignment zero bytes between the end of the idxt0
# and self.idxt_start
idxt = raw[self.idxt_start:]
if idxt[:4] != b'IDXT':
raise ValueError('Invalid IDXT header')
length_check, = struct.unpack(b'>H', idxt[4:6])
if length_check != self.header_length + self.tagx_header_length:
raise ValueError('Length check failed')
if idxt[6:].replace(b'\0', b''):
raise ValueError('Non null trailing bytes after IDXT')
def __str__(self):
ans = ['*'*20 + ' Secondary Index Header '+ '*'*20]
a = ans.append
def u(w):
a('Unknown: %r (%d bytes) (All zeros: %r)'%(w,
len(w), not bool(w.replace(b'\0', b'')) ))
a('Header length: %d'%self.header_length)
u(self.unknown1)
a('Index Type: %s (%d)'%(self.index_type_desc, self.index_type))
a('Offset to IDXT start: %d'%self.idxt_start)
a('Number of index records: %d'%self.index_count)
a('Index encoding: %s (%d)'%(self.index_encoding,
self.index_encoding_num))
u(self.unknown2)
a('Number of index entries: %d'% self.num_index_entries)
a('ORDT start: %d'%self.ordt_start)
a('LIGT start: %d'%self.ligt_start)
a('Number of LIGT entries: %d'%self.num_of_ligt_entries)
a('Number of cncx blocks: %d'%self.num_of_cncx_blocks)
u(self.unknown3)
a('TAGX offset: %d'%self.tagx_offset)
u(self.unknown4)
a('\n\n')
a('*'*20 + ' TAGX Header (%d bytes)'%self.tagx_header_length+ '*'*20)
a('Header length: %d'%self.tagx_header_length)
a('Control byte count: %d'%self.tagx_control_byte_count)
for i in self.tagx_entries:
a('\t' + repr(i))
a('Text of last entry in NCX: %s'% self.last_entry)
a('Number of entries in the NCX: %d'% self.ncx_count)
return '\n'.join(ans)
# }}}
class IndexHeader(object): # {{{
def __init__(self, record):
@ -462,7 +565,6 @@ class IndexHeader(object): # {{{
self.tagx_control_byte_count))
if self.tagx_entries and not self.tagx_entries[-1].is_eof:
raise ValueError('TAGX last entry is not EOF')
self.tagx_entries = self.tagx_entries[:-1]
idxt0_pos = self.header_length+self.tagx_header_length
last_num, consumed = decode_hex_number(raw[idxt0_pos:])
@ -480,6 +582,9 @@ class IndexHeader(object): # {{{
length_check, = struct.unpack(b'>H', idxt[4:6])
if length_check != self.header_length + self.tagx_header_length:
raise ValueError('Length check failed')
if idxt[6:].replace(b'\0', b''):
raise ValueError('Non null trailing bytes after IDXT')
def __str__(self):
ans = ['*'*20 + ' Index Header '+ '*'*20]
@ -624,7 +729,8 @@ class IndexEntry(object): # {{{
0x3f : 'article',
}
def __init__(self, ident, entry_type, raw, cncx, tagx_entries):
def __init__(self, ident, entry_type, raw, cncx, tagx_entries,
control_byte_count):
self.index = ident
self.raw = raw
self.tags = []
@ -638,14 +744,15 @@ class IndexEntry(object): # {{{
except KeyError:
raise ValueError('Unknown Index Entry type: %s'%hex(entry_type))
if control_byte_count not in (1, 2):
raise ValueError('Unknown control byte count: %d'%
control_byte_count)
self.flags = 0
if self.entry_type in ('periodical', 'article'):
large_tags = [t for t in tagx_entries if t.tag > 64]
if large_tags:
self.flags = ord(raw[0])
raw = raw[1:]
if control_byte_count == 2:
self.flags = ord(raw[0])
raw = raw[1:]
expected_tags = [tag for tag in tagx_entries if tag.bitmask &
entry_type]
@ -789,7 +896,8 @@ class IndexRecord(object): # {{{
pos = off+consumed+1
idxe = IndexEntry(index, entry_type,
indxt[pos:next_off], cncx,
index_header.tagx_entries)
index_header.tagx_entries,
index_header.tagx_control_byte_count)
self.indices.append(idxe)
rest = indxt[pos+self.indices[-1].consumed:]
@ -1172,6 +1280,10 @@ class MOBIFile(object): # {{{
self.index_header, self.cncx)
self.indexing_record_nums = set(xrange(pir,
pir+2+self.index_header.num_of_cncx_blocks))
self.secondary_index_record = self.secondary_index_record = None
sir = self.mobi_header.secondary_index_record
if sir != 0xffffffff:
self.secondary_index_header = SecondaryIndexHeader(self.records[sir])
ntr = self.mobi_header.number_of_text_records
@ -1247,6 +1359,9 @@ def inspect_mobi(path_or_stream, prefix='decompiled'): # {{{
print('\n\n', file=out)
print(str(f.cncx).encode('utf-8'), file=out)
print('\n\n', file=out)
if f.secondary_index_header is not None:
print(str(f.secondary_index_header).encode('utf-8'), file=out)
print('\n\n', file=out)
print(str(f.index_record), file=out)
with open(os.path.join(ddir, 'tbs_indexing.txt'), 'wb') as out:
print(str(f.tbs_indexing), file=out)