mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-08 02:34:06 -04:00
Refactor setting of EPUB metadata
Move it into the new opf module and also avoid several unnecessary mem copies
This commit is contained in:
parent
e214e3c86c
commit
ec2671b809
@ -8,18 +8,15 @@ __copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
|
|||||||
import os, re, posixpath
|
import os, re, posixpath
|
||||||
from cStringIO import StringIO
|
from cStringIO import StringIO
|
||||||
from contextlib import closing
|
from contextlib import closing
|
||||||
from future_builtins import map
|
|
||||||
|
|
||||||
from calibre.utils.zipfile import ZipFile, BadZipfile, safe_replace
|
from calibre.utils.zipfile import ZipFile, BadZipfile, safe_replace
|
||||||
from calibre.utils.localunzip import LocalZipFile
|
from calibre.utils.localunzip import LocalZipFile
|
||||||
from calibre.ebooks.BeautifulSoup import BeautifulStoneSoup
|
from calibre.ebooks.BeautifulSoup import BeautifulStoneSoup
|
||||||
from calibre.ebooks.metadata import MetaInformation
|
from calibre.ebooks.metadata.opf import get_metadata as get_metadata_from_opf, set_metadata as set_metadata_opf
|
||||||
from calibre.ebooks.metadata.opf import get_metadata as get_metadata_from_opf
|
|
||||||
from calibre.ebooks.metadata.opf2 import OPF
|
from calibre.ebooks.metadata.opf2 import OPF
|
||||||
from calibre.ptempfile import TemporaryDirectory
|
from calibre.ptempfile import TemporaryDirectory
|
||||||
from calibre import CurrentDir, walk
|
from calibre import CurrentDir, walk
|
||||||
from calibre.constants import isosx
|
from calibre.constants import isosx
|
||||||
from calibre.utils.localization import lang_as_iso639_1
|
|
||||||
|
|
||||||
class EPubException(Exception):
|
class EPubException(Exception):
|
||||||
pass
|
pass
|
||||||
@ -263,61 +260,26 @@ def serialize_cover_data(new_cdata, cpath):
|
|||||||
from calibre.utils.img import save_cover_data_to
|
from calibre.utils.img import save_cover_data_to
|
||||||
return save_cover_data_to(new_cdata, data_fmt=os.path.splitext(cpath)[1][1:])
|
return save_cover_data_to(new_cdata, data_fmt=os.path.splitext(cpath)[1][1:])
|
||||||
|
|
||||||
def normalize_languages(opf_languages, mi_languages):
|
|
||||||
' Preserve original country codes and use 2-letter lang codes where possible '
|
|
||||||
from calibre.spell import parse_lang_code
|
|
||||||
def parse(x):
|
|
||||||
try:
|
|
||||||
return parse_lang_code(x)
|
|
||||||
except ValueError:
|
|
||||||
return None
|
|
||||||
opf_languages = filter(None, map(parse, opf_languages))
|
|
||||||
cc_map = {c.langcode:c.countrycode for c in opf_languages}
|
|
||||||
mi_languages = filter(None, map(parse, mi_languages))
|
|
||||||
def norm(x):
|
|
||||||
lc = x.langcode
|
|
||||||
cc = x.countrycode or cc_map.get(lc, None)
|
|
||||||
lc = lang_as_iso639_1(lc) or lc
|
|
||||||
if cc:
|
|
||||||
lc += '-' + cc
|
|
||||||
return lc
|
|
||||||
return list(map(norm, mi_languages))
|
|
||||||
|
|
||||||
def update_metadata(opf, mi, apply_null=False, update_timestamp=False, force_identifiers=False):
|
|
||||||
for x in ('guide', 'toc', 'manifest', 'spine'):
|
|
||||||
setattr(mi, x, None)
|
|
||||||
if mi.languages:
|
|
||||||
mi.languages = normalize_languages(list(opf.raw_languages) or [], mi.languages)
|
|
||||||
|
|
||||||
opf.smart_update(mi, apply_null=apply_null)
|
|
||||||
if getattr(mi, 'uuid', None):
|
|
||||||
opf.application_id = mi.uuid
|
|
||||||
if apply_null or force_identifiers:
|
|
||||||
opf.set_identifiers(mi.get_identifiers())
|
|
||||||
else:
|
|
||||||
orig = opf.get_identifiers()
|
|
||||||
orig.update(mi.get_identifiers())
|
|
||||||
opf.set_identifiers({k:v for k, v in orig.iteritems() if k and v})
|
|
||||||
if update_timestamp and mi.timestamp is not None:
|
|
||||||
opf.timestamp = mi.timestamp
|
|
||||||
|
|
||||||
def set_metadata(stream, mi, apply_null=False, update_timestamp=False, force_identifiers=False):
|
def set_metadata(stream, mi, apply_null=False, update_timestamp=False, force_identifiers=False):
|
||||||
stream.seek(0)
|
stream.seek(0)
|
||||||
reader = get_zip_reader(stream, root=os.getcwdu())
|
reader = get_zip_reader(stream, root=os.getcwdu())
|
||||||
raster_cover = reader.opf.raster_cover
|
|
||||||
mi = MetaInformation(mi)
|
|
||||||
new_cdata = None
|
new_cdata = None
|
||||||
replacements = {}
|
|
||||||
try:
|
try:
|
||||||
new_cdata = mi.cover_data[1]
|
new_cdata = mi.cover_data[1]
|
||||||
if not new_cdata:
|
if not new_cdata:
|
||||||
raise Exception('no cover')
|
raise Exception('no cover')
|
||||||
except:
|
except Exception:
|
||||||
try:
|
try:
|
||||||
new_cdata = open(mi.cover, 'rb').read()
|
with lopen(mi.cover, 'rb') as f:
|
||||||
except:
|
new_cdata = f.read()
|
||||||
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
opfbytes, ver, raster_cover = set_metadata_opf(
|
||||||
|
reader.read_bytes(reader.opf_path), mi,
|
||||||
|
cover_data=new_cdata, apply_null=apply_null, update_timestamp=update_timestamp, force_identifiers=force_identifiers)
|
||||||
cpath = None
|
cpath = None
|
||||||
|
replacements = {}
|
||||||
if new_cdata and raster_cover:
|
if new_cdata and raster_cover:
|
||||||
try:
|
try:
|
||||||
cpath = posixpath.join(posixpath.dirname(reader.opf_path),
|
cpath = posixpath.join(posixpath.dirname(reader.opf_path),
|
||||||
@ -330,15 +292,11 @@ def set_metadata(stream, mi, apply_null=False, update_timestamp=False, force_ide
|
|||||||
import traceback
|
import traceback
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
||||||
update_metadata(reader.opf, mi, apply_null=apply_null,
|
|
||||||
update_timestamp=update_timestamp, force_identifiers=force_identifiers)
|
|
||||||
|
|
||||||
newopf = reader.opf.render()
|
|
||||||
if isinstance(reader.archive, LocalZipFile):
|
if isinstance(reader.archive, LocalZipFile):
|
||||||
reader.archive.safe_replace(reader.container[OPF.MIMETYPE], newopf,
|
reader.archive.safe_replace(reader.container[OPF.MIMETYPE], opfbytes,
|
||||||
extra_replacements=replacements)
|
extra_replacements=replacements)
|
||||||
else:
|
else:
|
||||||
safe_replace(stream, reader.container[OPF.MIMETYPE], newopf,
|
safe_replace(stream, reader.container[OPF.MIMETYPE], opfbytes,
|
||||||
extra_replacements=replacements)
|
extra_replacements=replacements)
|
||||||
try:
|
try:
|
||||||
if cpath is not None:
|
if cpath is not None:
|
||||||
|
@ -7,7 +7,8 @@ from __future__ import (unicode_literals, division, absolute_import,
|
|||||||
|
|
||||||
from calibre.ebooks.metadata import parse_opf_version
|
from calibre.ebooks.metadata import parse_opf_version
|
||||||
from calibre.ebooks.metadata.opf2 import OPF
|
from calibre.ebooks.metadata.opf2 import OPF
|
||||||
from calibre.ebooks.metadata.utils import parse_opf
|
from calibre.ebooks.metadata.utils import parse_opf, normalize_languages
|
||||||
|
from calibre.ebooks.metadata import MetaInformation
|
||||||
|
|
||||||
class DummyFile(object):
|
class DummyFile(object):
|
||||||
|
|
||||||
@ -24,3 +25,33 @@ def get_metadata(stream):
|
|||||||
ver = parse_opf_version(root.get('version'))
|
ver = parse_opf_version(root.get('version'))
|
||||||
opf = OPF(None, preparsed_opf=root, read_toc=False)
|
opf = OPF(None, preparsed_opf=root, read_toc=False)
|
||||||
return opf.to_book_metadata(), ver, opf.raster_cover, opf.first_spine_item()
|
return opf.to_book_metadata(), ver, opf.raster_cover, opf.first_spine_item()
|
||||||
|
|
||||||
|
def set_metadata_opf2(root, mi, cover_data=None, apply_null=False, update_timestamp=False, force_identifiers=False):
|
||||||
|
mi = MetaInformation(mi)
|
||||||
|
for x in ('guide', 'toc', 'manifest', 'spine'):
|
||||||
|
setattr(mi, x, None)
|
||||||
|
opf = OPF(None, preparsed_opf=root, read_toc=False)
|
||||||
|
if mi.languages:
|
||||||
|
mi.languages = normalize_languages(list(opf.raw_languages) or [], mi.languages)
|
||||||
|
|
||||||
|
opf.smart_update(mi, apply_null=apply_null)
|
||||||
|
if getattr(mi, 'uuid', None):
|
||||||
|
opf.application_id = mi.uuid
|
||||||
|
if apply_null or force_identifiers:
|
||||||
|
opf.set_identifiers(mi.get_identifiers())
|
||||||
|
else:
|
||||||
|
orig = opf.get_identifiers()
|
||||||
|
orig.update(mi.get_identifiers())
|
||||||
|
opf.set_identifiers({k:v for k, v in orig.iteritems() if k and v})
|
||||||
|
if update_timestamp and mi.timestamp is not None:
|
||||||
|
opf.timestamp = mi.timestamp
|
||||||
|
return opf.render(), opf.raster_cover
|
||||||
|
|
||||||
|
def set_metadata(stream, mi, cover_data=None, apply_null=False, update_timestamp=False, force_identifiers=False):
|
||||||
|
if isinstance(stream, bytes):
|
||||||
|
stream = DummyFile(stream)
|
||||||
|
root = parse_opf(stream)
|
||||||
|
ver = parse_opf_version(root.get('version'))
|
||||||
|
opfbytes, raster_cover = set_metadata_opf2(
|
||||||
|
root, mi, cover_data=cover_data, apply_null=apply_null, update_timestamp=update_timestamp, force_identifiers=force_identifiers)
|
||||||
|
return opfbytes, ver, raster_cover
|
||||||
|
@ -4,9 +4,13 @@
|
|||||||
|
|
||||||
from __future__ import (unicode_literals, division, absolute_import,
|
from __future__ import (unicode_literals, division, absolute_import,
|
||||||
print_function)
|
print_function)
|
||||||
|
from future_builtins import map
|
||||||
|
|
||||||
|
from lxml import etree
|
||||||
|
|
||||||
from calibre.ebooks.chardet import xml_to_unicode
|
from calibre.ebooks.chardet import xml_to_unicode
|
||||||
from lxml import etree
|
from calibre.spell import parse_lang_code
|
||||||
|
from calibre.utils.localization import lang_as_iso639_1
|
||||||
|
|
||||||
PARSER = etree.XMLParser(recover=True, no_network=True)
|
PARSER = etree.XMLParser(recover=True, no_network=True)
|
||||||
|
|
||||||
@ -25,3 +29,22 @@ def parse_opf(stream_or_path):
|
|||||||
return root
|
return root
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_languages(opf_languages, mi_languages):
|
||||||
|
' Preserve original country codes and use 2-letter lang codes where possible '
|
||||||
|
def parse(x):
|
||||||
|
try:
|
||||||
|
return parse_lang_code(x)
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
opf_languages = filter(None, map(parse, opf_languages))
|
||||||
|
cc_map = {c.langcode:c.countrycode for c in opf_languages}
|
||||||
|
mi_languages = filter(None, map(parse, mi_languages))
|
||||||
|
def norm(x):
|
||||||
|
lc = x.langcode
|
||||||
|
cc = x.countrycode or cc_map.get(lc, None)
|
||||||
|
lc = lang_as_iso639_1(lc) or lc
|
||||||
|
if cc:
|
||||||
|
lc += '-' + cc
|
||||||
|
return lc
|
||||||
|
return list(map(norm, mi_languages))
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user