From d1f27b449f00e91c85056423cf83ecb204371d95 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sun, 27 Sep 2015 07:54:21 +0530 Subject: [PATCH] Code to parse metadata.kfx files for basic metadata --- src/calibre/ebooks/metadata/kfx.py | 329 +++++++++++++++++++++++++++++ 1 file changed, 329 insertions(+) create mode 100644 src/calibre/ebooks/metadata/kfx.py diff --git a/src/calibre/ebooks/metadata/kfx.py b/src/calibre/ebooks/metadata/kfx.py new file mode 100644 index 0000000000..201bfd8818 --- /dev/null +++ b/src/calibre/ebooks/metadata/kfx.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python2 +# vim:fileencoding=utf-8 +# License: GPLv3 Copyright: 2015, Kovid Goyal , John Howell ' + +from __future__ import (unicode_literals, division, absolute_import, + print_function) + +# Based on work of John Howell reversing the KFX format +# http://www.mobileread.com/forums/showpost.php?p=3176029&postcount=89 + +import struct, sys, base64, re +from collections import defaultdict + +from calibre.ebooks.metadata.book.base import Metadata +from calibre.utils.cleantext import clean_xml_chars +from calibre.utils.config_base import tweaks +from calibre.utils.date import parse_only_date +from calibre.utils.localization import canonicalize_lang +from calibre.utils.magick.draw import identify_data + +class InvalidKFX(ValueError): + pass + +# magic numbers for data structures +CONTAINER_MAGIC = b'CONT' +ENTITY_MAGIC = b'ENTY' +ION_MAGIC = b'\xe0\x01\x00\xea' + +# ION data types (comment shows equivalent python data type produced) +DT_BOOLEAN = 1 # True/False +DT_INTEGER = 2 # int +# str (using non-unicode to distinguish symbols from strings) +DT_PROPERTY = 7 +DT_STRING = 8 # unicode +DT_STRUCT = 11 # tuple +DT_LIST = 12 # list +DT_OBJECT = 13 # dict of property/value pairs + +# property names (non-unicode strings to distinguish them from ION strings in this program) +# These are place holders. The correct property names are unknown. +PROP_METADATA = b'P258' +PROP_METADATA2 = b'P490' +PROP_METADATA3 = b'P491' +PROP_METADATA_KEY = b'P492' +PROP_METADATA_VALUE = b'P307' +PROP_IMAGE = b'P417' + +METADATA_PROPERTIES = { + b'P10' : "languages", + b'P153': "title", + b'P154': "description", + b'P222': "authors", + b'P232': "publisher", +} + +COVER_KEY = "cover_image_base64" + + +def hexs(string, sep=' '): + return sep.join('%02x' % ord(b) for b in string) + +class PackedData(object): + + ''' + Simplify unpacking of packed binary data structures + ''' + + def __init__(self, data): + self.buffer = data + self.offset = 0 + + def unpack_one(self, fmt, advance=True): + return self.unpack_multi(fmt, advance)[0] + + def unpack_multi(self, fmt, advance=True): + fmt = fmt.encode('ascii') + result = struct.unpack_from(fmt, self.buffer, self.offset) + if advance: + self.advance(struct.calcsize(fmt)) + return result + + def extract(self, size): + data = self.buffer[self.offset:self.offset + size] + self.advance(size) + return data + + def advance(self, size): + self.offset += size + + def remaining(self): + return len(self.buffer) - self.offset + + +class PackedBlock(PackedData): + + ''' + Common header structure of container and entity blocks + ''' + + def __init__(self, data, magic): + PackedData.__init__(self, data) + + self.magic = self.unpack_one('4s') + if self.magic != magic: + raise InvalidKFX('%s magic number is incorrect (%s)' % + (magic, hexs(self.magic))) + + self.version = self.unpack_one('> 4 + data_len = cmd & 0x0f + if data_len == 14: + data_len = self.unpack_number() + + # print('cmd=%02x, len=%s: %s' % (cmd, data_len, hexs(self.buffer[self.offset:][:data_len]))) + + if data_type == DT_BOOLEAN: + return data_len != 0 # length is actually value + + if data_type == DT_INTEGER: + return self.unpack_unsigned_int(data_len) + + if data_type == DT_PROPERTY: + return property_name(self.unpack_unsigned_int(data_len)) + + if data_type == DT_STRING: + return self.extract(data_len).decode('utf8') + + if data_type == DT_STRUCT or data_type == DT_LIST: + ion = PackedIon(self.extract(data_len)) + result = [] + + while ion.remaining(): + result.append(ion.unpack_typed_value()) + + if data_type == DT_STRUCT: + result = tuple(result) + + return result + + if data_type == DT_OBJECT: + ion = PackedIon(self.extract(data_len)) + result = {} + + while (ion.remaining()): + symbol = property_name(ion.unpack_number()) + result[symbol] = ion.unpack_typed_value() + + return result + + # ignore unknown types + self.advance(data_len) + return None + + def unpack_number(self): + # variable length numbers, MSB first, 7 bits per byte, last byte is + # flagged by MSB set + number = 0 + while (True): + byte = self.unpack_one('B') + number = (number << 7) | (byte & 0x7f) + if byte >= 0x80: + return number + + def unpack_unsigned_int(self, length): + # unsigned big-endian (MSB first) + return struct.unpack_from(b'>Q', chr(0) * (8 - length) + self.extract(length))[0] + + +def property_name(property_number): + # This should be changed to translate property numbers to the proper + # strings using a symbol table + return b"P%d" % property_number + +def extract_metadata(container_data): + metadata = defaultdict(list) + + # locate book metadata within the container data structures + + for entity_type, entity_id, entity_value in container_data: + if entity_type == PROP_METADATA: + for key, value in entity_value.items(): + if key in METADATA_PROPERTIES: + metadata[METADATA_PROPERTIES[key]].append(value) + + elif entity_type == PROP_METADATA2: + for value1 in entity_value[PROP_METADATA3]: + for meta in value1[PROP_METADATA]: + metadata[meta[PROP_METADATA_KEY]].append(meta[PROP_METADATA_VALUE]) + + elif entity_type == PROP_IMAGE and COVER_KEY not in metadata: + # assume first image is the cover + metadata[COVER_KEY] = entity_value + + return metadata + +def dump_metadata(m): + d = dict(m) + d[COVER_KEY] = bool(d.get(COVER_KEY)) + from pprint import pprint + pprint(d) + +def read_metadata_kfx(stream, read_cover=True): + ' Read the metadata.kfx file that is found in the sdr book folder for KFX files ' + c = Container(stream.read()) + m = extract_metadata(c.decode()) + # dump_metadata(m) + + def has(x): + return m[x] and m[x][0] + + def get(x, single=True): + ans = m[x] + if single: + ans = clean_xml_chars(ans[0]) if ans else '' + else: + ans = [clean_xml_chars(y) for y in ans] + return ans + + title = get('title') or _('Unknown') + authors = get('authors', False) or [_('Unknown')] + auth_pat = re.compile(r'([^,]+?)\s*,\s+([^,]+)$') + + def fix_author(x): + if tweaks['author_sort_copy_method'] != 'copy': + m = auth_pat.match(x.strip()) + if m is not None: + return m.group(2) + ' ' + m.group(1) + return x + + mi = Metadata(title, [fix_author(x) for x in authors]) + if has('author'): + mi.author_sort = get('author') + if has('ASIN'): + mi.set_identifier('mobi-asin', get('ASIN')) + elif has('content_id'): + mi.set_identifier('mobi-asin', get('content_id')) + if has('languages'): + langs = list(filter(None, (canonicalize_lang(x) for x in get('languages', False)))) + if langs: + mi.languages = langs + if has('issue_date'): + try: + mi.pubdate = parse_only_date(get('issue_date')) + except Exception: + pass + if has('publisher') and get('publisher') != 'Unknown': + mi.publisher = get('publisher') + if read_cover and m[COVER_KEY]: + try: + data = base64.standard_b64decode(m[COVER_KEY]) + w, h, fmt = identify_data(data) + except Exception: + w, h, fmt = 0, 0, None + if fmt and w and h: + mi.cover_data = (fmt, data) + + return mi + +if __name__ == '__main__': + from calibre import prints + with open(sys.argv[-1], 'rb') as f: + mi = read_metadata_kfx(f) + prints(unicode(mi))