Implement reading of dates

This commit is contained in:
Kovid Goyal 2016-06-21 20:35:12 +05:30
parent 00e255b91a
commit 71171486b6
3 changed files with 164 additions and 38 deletions

View File

@ -14,21 +14,11 @@ from calibre.ebooks.metadata import check_isbn, authors_to_string, string_to_aut
from calibre.ebooks.metadata.book.base import Metadata from calibre.ebooks.metadata.book.base import Metadata
from calibre.ebooks.metadata.utils import parse_opf, pretty_print_opf, ensure_unique, normalize_languages from calibre.ebooks.metadata.utils import parse_opf, pretty_print_opf, ensure_unique, normalize_languages
from calibre.ebooks.oeb.base import OPF2_NSMAP, OPF, DC from calibre.ebooks.oeb.base import OPF2_NSMAP, OPF, DC
from calibre.utils.date import parse_date as parse_date_, fix_only_date, is_date_undefined, isoformat
from calibre.utils.iso8601 import parse_iso8601
from calibre.utils.localization import canonicalize_lang from calibre.utils.localization import canonicalize_lang
# Utils {{{ # Utils {{{
# http://www.idpf.org/epub/vocab/package/pfx/
reserved_prefixes = {
'dcterms': 'http://purl.org/dc/terms/',
'epubsc': 'http://idpf.org/epub/vocab/sc/#',
'marc': 'http://id.loc.gov/vocabulary/',
'media': 'http://www.idpf.org/epub/vocab/overlays/#',
'onix': 'http://www.editeur.org/ONIX/book/codelists/current.html#',
'rendition':'http://www.idpf.org/vocab/rendition/#',
'schema': 'http://schema.org/',
'xsd': 'http://www.w3.org/2001/XMLSchema#',
}
_xpath_cache = {} _xpath_cache = {}
_re_cache = {} _re_cache = {}
@ -122,6 +112,20 @@ def simple_text(f):
# Prefixes {{{ # Prefixes {{{
# http://www.idpf.org/epub/vocab/package/pfx/
reserved_prefixes = {
'dcterms': 'http://purl.org/dc/terms/',
'epubsc': 'http://idpf.org/epub/vocab/sc/#',
'marc': 'http://id.loc.gov/vocabulary/',
'media': 'http://www.idpf.org/epub/vocab/overlays/#',
'onix': 'http://www.editeur.org/ONIX/book/codelists/current.html#',
'rendition':'http://www.idpf.org/vocab/rendition/#',
'schema': 'http://schema.org/',
'xsd': 'http://www.w3.org/2001/XMLSchema#',
}
CALIBRE_PREFIX = 'https://calibre-ebook.com'
def parse_prefixes(x): def parse_prefixes(x):
return {m.group(1):m.group(2) for m in re.finditer(r'(\S+): \s*(\S+)', x)} return {m.group(1):m.group(2) for m in re.finditer(r'(\S+): \s*(\S+)', x)}
@ -131,7 +135,7 @@ def read_prefixes(root):
return ans return ans
def expand_prefix(raw, prefixes): def expand_prefix(raw, prefixes):
return regex(r'(\S+)\s*:\s*(\S+)').sub(lambda m:(prefixes.get(m.group(1), m.group(1)) + ':' + m.group(2)), raw) return regex(r'(\S+)\s*:\s*(\S+)').sub(lambda m:(prefixes.get(m.group(1), m.group(1)) + ':' + m.group(2)), raw or '')
def ensure_prefix(root, prefixes, prefix, value=None): def ensure_prefix(root, prefixes, prefix, value=None):
prefixes[prefix] = value or reserved_prefixes[prefix] prefixes[prefix] = value or reserved_prefixes[prefix]
@ -437,6 +441,91 @@ def set_book_producers(root, prefixes, refines, producers):
metadata.append(m) metadata.append(m)
# }}} # }}}
# Dates {{{
def parse_date(raw, is_w3cdtf=False):
raw = raw.strip()
if is_w3cdtf:
ans = parse_iso8601(raw, assume_utc=True)
if 'T' not in raw and ' ' not in raw:
ans = fix_only_date(ans)
else:
ans = parse_date_(raw, assume_utc=True)
if ' ' not in raw and 'T' not in raw and (ans.hour, ans.minute, ans.second) == (0, 0, 0):
ans = fix_only_date(ans)
return ans
def read_pubdate(root, prefixes, refines):
for date in XPath('./opf:metadata/dc:date')(root):
val = (date.text or '').strip()
if val:
try:
return parse_date(val)
except Exception:
continue
def set_pubdate(root, prefixes, refines, val):
for date in XPath('./opf:metadata/dc:date')(root):
remove_element(date, refines)
if not is_date_undefined(val):
val = isoformat(val)
m = XPath('./opf:metadata')(root)[0]
d = m.makeelement(DC('date'))
d.text = val
m.append(d)
def read_timestamp(root, prefixes, refines):
pq = '%s:timestamp' % CALIBRE_PREFIX
sq = '%s:w3cdtf' % reserved_prefixes['dcterms']
for meta in XPath('./opf:metadata/opf:meta[@property]')(root):
val = (meta.text or '').strip()
if val:
prop = expand_prefix(meta.get('property'), prefixes)
if prop.lower() == pq:
scheme = expand_prefix(meta.get('scheme'), prefixes).lower()
try:
return parse_date(val, is_w3cdtf=scheme == sq)
except Exception:
continue
for meta in XPath('./opf:metadata/opf:meta[@name="calibre:timestamp"]')(root):
val = meta.get('content')
if val:
try:
return parse_date(val, is_w3cdtf=True)
except Exception:
continue
def set_timestamp(root, prefixes, refines, val):
ensure_prefix(root, prefixes, 'calibre', CALIBRE_PREFIX)
ensure_prefix(root, prefixes, 'dcterms')
pq = '%s:timestamp' % CALIBRE_PREFIX
for meta in XPath('./opf:metadata/opf:meta')(root):
prop = expand_prefix(meta.get('property'), prefixes)
if prop.lower() == pq or meta.get('name') == 'calibre:timestamp':
remove_element(meta, refines)
if not is_date_undefined(val):
val = isoformat(val)
m = XPath('./opf:metadata')(root)[0]
d = m.makeelement(OPF('meta'), attrib={'property':'calibre:timestamp', 'scheme':'dcterms:W3CDTF'})
d.text = val
m.append(d)
def read_last_modified(root, prefixes, refines):
pq = '%s:modified' % reserved_prefixes['dcterms']
sq = '%s:w3cdtf' % reserved_prefixes['dcterms']
for meta in XPath('./opf:metadata/opf:meta[@property]')(root):
val = (meta.text or '').strip()
if val:
prop = expand_prefix(meta.get('property'), prefixes)
if prop.lower() == pq:
scheme = expand_prefix(meta.get('scheme'), prefixes).lower()
try:
return parse_date(val, is_w3cdtf=scheme == sq)
except Exception:
continue
# }}}
def read_metadata(root): def read_metadata(root):
ans = Metadata(_('Unknown'), [_('Unknown')]) ans = Metadata(_('Unknown'), [_('Unknown')])
prefixes, refines = read_prefixes(root), read_refines(root) prefixes, refines = read_prefixes(root), read_refines(root)
@ -459,6 +548,15 @@ def read_metadata(root):
bkp = read_book_producers(root, prefixes, refines) bkp = read_book_producers(root, prefixes, refines)
if bkp: if bkp:
ans.book_producer = bkp[0] ans.book_producer = bkp[0]
pd = read_pubdate(root, prefixes, refines)
if not is_date_undefined(pd):
ans.pubdate = pd
ts = read_timestamp(root, prefixes, refines)
if not is_date_undefined(ts):
ans.timestamp = ts
lm = read_last_modified(root, prefixes, refines)
if not is_date_undefined(lm):
ans.last_modified = lm
return ans return ans
def get_metadata(stream): def get_metadata(stream):
@ -475,6 +573,8 @@ def apply_metadata(root, mi, cover_prefix='', cover_data=None, apply_null=False,
for i, aut in enumerate(mi.authors): for i, aut in enumerate(mi.authors):
authors.append(Author(aut, aus[i] if i < len(aus) else None)) authors.append(Author(aut, aus[i] if i < len(aus) else None))
set_authors(root, prefixes, refines, authors) set_authors(root, prefixes, refines, authors)
set_pubdate(root, prefixes, refines, mi.pubdate)
set_timestamp(root, prefixes, refines, mi.timestamp)
pretty_print_opf(root) pretty_print_opf(root)

View File

@ -14,10 +14,11 @@ from calibre.ebooks.metadata.opf3 import (
read_metadata, set_identifiers, XPath, set_application_id, read_title, read_metadata, set_identifiers, XPath, set_application_id, read_title,
read_refines, set_title, read_title_sort, read_languages, set_languages, read_refines, set_title, read_title_sort, read_languages, set_languages,
read_authors, Author, set_authors, ensure_prefix, read_prefixes, read_authors, Author, set_authors, ensure_prefix, read_prefixes,
read_book_producers, set_book_producers read_book_producers, set_book_producers, read_timestamp, set_timestamp,
read_pubdate, set_pubdate, CALIBRE_PREFIX, read_last_modified
) )
TEMPLATE = '''<package xmlns="http://www.idpf.org/2007/opf" version="3.0" unique-identifier="uid"><metadata xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:opf="http://www.idpf.org/2007/opf">{metadata}</metadata></package>''' # noqa TEMPLATE = '''<package xmlns="http://www.idpf.org/2007/opf" version="3.0" prefix="calibre: %s" unique-identifier="uid"><metadata xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:opf="http://www.idpf.org/2007/opf">{metadata}</metadata></package>''' % CALIBRE_PREFIX # noqa
default_refines = defaultdict(list) default_refines = defaultdict(list)
class TestOPF3(unittest.TestCase): class TestOPF3(unittest.TestCase):
@ -35,7 +36,7 @@ class TestOPF3(unittest.TestCase):
('xxx:onix', 'xxx:onix'), ('xxx:onix', 'xxx:onix'),
('xxx', 'xxx'), ('xxx', 'xxx'),
): ):
self.ae(expand_prefix(raw, reserved_prefixes), expanded) self.ae(expand_prefix(raw, reserved_prefixes.copy()), expanded)
root = self.get_opf() root = self.get_opf()
ensure_prefix(root, read_prefixes(root), 'calibre', 'https://calibre-ebook.com') ensure_prefix(root, read_prefixes(root), 'calibre', 'https://calibre-ebook.com')
ensure_prefix(root, read_prefixes(root), 'marc', reserved_prefixes['marc']) ensure_prefix(root, read_prefixes(root), 'marc', reserved_prefixes['marc'])
@ -46,7 +47,7 @@ class TestOPF3(unittest.TestCase):
def idt(val, scheme=None, iid=''): def idt(val, scheme=None, iid=''):
return '<dc:identifier id="{id}" {scheme}>{val}</dc:identifier>'.format(scheme=('opf:scheme="%s"'%scheme if scheme else ''), val=val, id=iid) return '<dc:identifier id="{id}" {scheme}>{val}</dc:identifier>'.format(scheme=('opf:scheme="%s"'%scheme if scheme else ''), val=val, id=iid)
def ri(root): def ri(root):
return dict(read_identifiers(root, reserved_prefixes, default_refines)) return dict(read_identifiers(root, read_prefixes(root), default_refines))
for m, result in ( for m, result in (
(idt('abc', 'ISBN'), {}), (idt('abc', 'ISBN'), {}),
@ -67,11 +68,11 @@ class TestOPF3(unittest.TestCase):
self.ae(mi.application_id, 'y') self.ae(mi.application_id, 'y')
root = self.get_opf(metadata=idt('i:1', iid='uid') + idt('r:1') + idt('o:1')) root = self.get_opf(metadata=idt('i:1', iid='uid') + idt('r:1') + idt('o:1'))
set_identifiers(root, reserved_prefixes, default_refines, {'i':'2', 'o':'2'}) set_identifiers(root, read_prefixes(root), default_refines, {'i':'2', 'o':'2'})
self.ae({'i':['2', '1'], 'r':['1'], 'o':['2']}, ri(root)) self.ae({'i':['2', '1'], 'r':['1'], 'o':['2']}, ri(root))
self.ae(1, len(XPath('//dc:identifier[@id="uid"]')(root))) self.ae(1, len(XPath('//dc:identifier[@id="uid"]')(root)))
root = self.get_opf(metadata=idt('i:1', iid='uid') + idt('r:1') + idt('o:1')) root = self.get_opf(metadata=idt('i:1', iid='uid') + idt('r:1') + idt('o:1'))
set_identifiers(root, reserved_prefixes, default_refines, {'i':'2', 'o':'2'}, force_identifiers=True) set_identifiers(root, read_prefixes(root), default_refines, {'i':'2', 'o':'2'}, force_identifiers=True)
self.ae({'i':['2', '1'], 'o':['2']}, ri(root)) self.ae({'i':['2', '1'], 'o':['2']}, ri(root))
root = self.get_opf(metadata=idt('i:1', iid='uid') + idt('r:1') + idt('o:1')) root = self.get_opf(metadata=idt('i:1', iid='uid') + idt('r:1') + idt('o:1'))
set_application_id(root, default_refines, 'y') set_application_id(root, default_refines, 'y')
@ -81,26 +82,26 @@ class TestOPF3(unittest.TestCase):
def test_title(self): # {{{ def test_title(self): # {{{
def rt(root): def rt(root):
return read_title(root, reserved_prefixes, read_refines(root)) return read_title(root, read_prefixes(root), read_refines(root))
def st(root, title, title_sort=None): def st(root, title, title_sort=None):
set_title(root, reserved_prefixes, read_refines(root), title, title_sort) set_title(root, read_prefixes(root), read_refines(root), title, title_sort)
return rt(root) return rt(root)
root = self.get_opf('''<dc:title/><dc:title id='t'>xxx</dc:title>''') root = self.get_opf('''<dc:title/><dc:title id='t'>xxx</dc:title>''')
self.ae(rt(root), 'xxx') self.ae(rt(root), 'xxx')
self.ae(st(root, 'abc', 'cba'), 'abc') self.ae(st(root, 'abc', 'cba'), 'abc')
self.ae(read_title_sort(root, reserved_prefixes, read_refines(root)), 'cba') self.ae(read_title_sort(root, read_prefixes(root), read_refines(root)), 'cba')
root = self.get_opf('''<dc:title>yyy</dc:title><dc:title id='t'>x xx root = self.get_opf('''<dc:title>yyy</dc:title><dc:title id='t'>x xx
</dc:title><meta refines='#t' property='title-type'>main</meta><meta name="calibre:title_sort" content="sorted"/>''') </dc:title><meta refines='#t' property='title-type'>main</meta><meta name="calibre:title_sort" content="sorted"/>''')
self.ae(rt(root), 'x xx') self.ae(rt(root), 'x xx')
self.ae(read_title_sort(root, reserved_prefixes, read_refines(root)), 'sorted') self.ae(read_title_sort(root, read_prefixes(root), read_refines(root)), 'sorted')
self.ae(st(root, 'abc'), 'abc') self.ae(st(root, 'abc'), 'abc')
# }}} # }}}
def test_languages(self): # {{{ def test_languages(self): # {{{
def rl(root): def rl(root):
return read_languages(root, reserved_prefixes, read_refines(root)) return read_languages(root, read_prefixes(root), read_refines(root))
def st(root, languages): def st(root, languages):
set_languages(root, reserved_prefixes, read_refines(root), languages) set_languages(root, read_prefixes(root), read_refines(root), languages)
return rl(root) return rl(root)
root = self.get_opf('''<dc:language>en-US</dc:language><dc:language>fr</dc:language>''') root = self.get_opf('''<dc:language>en-US</dc:language><dc:language>fr</dc:language>''')
self.ae(['eng', 'fra'], rl(root)) self.ae(['eng', 'fra'], rl(root))
@ -111,9 +112,9 @@ class TestOPF3(unittest.TestCase):
def test_authors(self): # {{{ def test_authors(self): # {{{
def rl(root): def rl(root):
return read_authors(root, reserved_prefixes, read_refines(root)) return read_authors(root, read_prefixes(root), read_refines(root))
def st(root, authors): def st(root, authors):
set_authors(root, reserved_prefixes, read_refines(root), authors) set_authors(root, read_prefixes(root), read_refines(root), authors)
return rl(root) return rl(root)
root = self.get_opf('''<dc:creator>a b</dc:creator>''') root = self.get_opf('''<dc:creator>a b</dc:creator>''')
self.ae([Author('a b', None)], rl(root)) self.ae([Author('a b', None)], rl(root))
@ -128,14 +129,14 @@ class TestOPF3(unittest.TestCase):
self.ae([Author('a b', 'b, a'), Author('c d', 'd, c')], rl(root)) self.ae([Author('a b', 'b, a'), Author('c d', 'd, c')], rl(root))
authors = [Author('x y', 'y, x'), Author('u i', None)] authors = [Author('x y', 'y, x'), Author('u i', None)]
self.ae(authors, st(root, authors)) self.ae(authors, st(root, authors))
self.assertIsNone(root.get('prefix')) self.ae(root.get('prefix'), 'calibre: %s' % CALIBRE_PREFIX)
# }}} # }}}
def test_book_producer(self): # {{{ def test_book_producer(self): # {{{
def rl(root): def rl(root):
return read_book_producers(root, reserved_prefixes, read_refines(root)) return read_book_producers(root, read_prefixes(root), read_refines(root))
def st(root, producers): def st(root, producers):
set_book_producers(root, reserved_prefixes, read_refines(root), producers) set_book_producers(root, read_prefixes(root), read_refines(root), producers)
return rl(root) return rl(root)
for scheme in ('scheme="marc:relators"', ''): for scheme in ('scheme="marc:relators"', ''):
root = self.get_opf('''<dc:contributor>a b</dc:contributor><dc:contributor id="1">c d</dc:contributor>''' root = self.get_opf('''<dc:contributor>a b</dc:contributor><dc:contributor id="1">c d</dc:contributor>'''
@ -146,6 +147,30 @@ class TestOPF3(unittest.TestCase):
self.ae('12'.split(), st(root, '12'.split())) self.ae('12'.split(), st(root, '12'.split()))
# }}} # }}}
def test_dates(self): # {{{
from calibre.utils.date import utcnow
def rl(root):
return read_pubdate(root, read_prefixes(root), read_refines(root)), read_timestamp(root, read_prefixes(root), read_refines(root))
def st(root, pd, ts):
set_pubdate(root, read_prefixes(root), read_refines(root), pd)
set_timestamp(root, read_prefixes(root), read_refines(root), ts)
return rl(root)
def ae(root, y1=None, y2=None):
x1, x2 = rl(root)
for x, y in ((x1, y1), (x2, y2)):
if y is None:
self.assertIsNone(x)
else:
self.ae(y, getattr(x, 'year', None))
root = self.get_opf('''<dc:date>1999-3-2</dc:date><meta property="calibre:timestamp" scheme="dcterms:W3CDTF">2001</meta>''')
ae(root, 1999, 2001)
n = utcnow()
self.ae(st(root, n, n), (n, n))
root = self.get_opf('''<dc:date>1999-3-2</dc:date><meta name="calibre:timestamp" content="2001-1-1"/>''')
ae(root, 1999, 2001)
root = self.get_opf('''<meta property="dcterms:modified">2003</meta>''')
self.ae(read_last_modified(root, read_prefixes(root), read_refines(root)).year, 2003)
# }}}
# Run tests {{{ # Run tests {{{

View File

@ -107,6 +107,14 @@ def parse_date(date_string, assume_utc=False, as_utc=True, default=None):
dt = dt.replace(tzinfo=_utc_tz if assume_utc else _local_tz) dt = dt.replace(tzinfo=_utc_tz if assume_utc else _local_tz)
return dt.astimezone(_utc_tz if as_utc else _local_tz) return dt.astimezone(_utc_tz if as_utc else _local_tz)
def fix_only_date(val):
n = val + timedelta(days=1)
if n.month > val.month:
val = val.replace(day=val.day-1)
if val.day == 1:
val = val.replace(day=2)
return val
def parse_only_date(raw, assume_utc=True, as_utc=True): def parse_only_date(raw, assume_utc=True, as_utc=True):
''' '''
Parse a date string that contains no time information in a manner that Parse a date string that contains no time information in a manner that
@ -116,14 +124,7 @@ def parse_only_date(raw, assume_utc=True, as_utc=True):
f = utcnow if assume_utc else now f = utcnow if assume_utc else now
default = f().replace(hour=0, minute=0, second=0, microsecond=0, default = f().replace(hour=0, minute=0, second=0, microsecond=0,
day=15) day=15)
ans = parse_date(raw, default=default, assume_utc=assume_utc, as_utc=as_utc) return fix_only_date(parse_date(raw, default=default, assume_utc=assume_utc, as_utc=as_utc))
n = ans + timedelta(days=1)
if n.month > ans.month:
ans = ans.replace(day=ans.day-1)
if ans.day == 1:
ans = ans.replace(day=2)
return ans
def strptime(val, fmt, assume_utc=False, as_utc=True): def strptime(val, fmt, assume_utc=False, as_utc=True):
dt = datetime.strptime(val, fmt) dt = datetime.strptime(val, fmt)