Keep annotations sorted in CFI order when merging

This commit is contained in:
Kovid Goyal 2020-06-30 11:24:41 +05:30
parent b6cbf67e4c
commit 5075fc2d36
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 86 additions and 8 deletions

View File

@ -120,6 +120,8 @@ def find_tests(which_tests=None, exclude_tests=None):
a(find_tests())
from calibre.utils.xml_parse import find_tests
a(find_tests())
from calibre.gui2.viewer.annotations import find_tests
a(find_tests())
if ok('misc'):
from calibre.ebooks.metadata.tag_mapper import find_tests
a(find_tests())

View File

@ -191,7 +191,7 @@ def cfi_sort_key(cfi, only_path=True):
return (), (0, (0, 0), 0)
if not pcfi:
import sys
print('Failed to parse CFI: %r' % pcfi, file=sys.stderr)
print('Failed to parse CFI: %r' % cfi, file=sys.stderr)
return (), (0, (0, 0), 0)
steps = get_steps(pcfi)
step_nums = tuple(s.get('num', 0) for s in steps)

View File

@ -6,9 +6,11 @@
import os
from collections import defaultdict
from io import BytesIO
from itertools import chain
from operator import itemgetter
from threading import Thread
from calibre.ebooks.epub.cfi.parse import cfi_sort_key
from calibre.gui2.viewer.convert_book import update_book
from calibre.gui2.viewer.integration import save_annotations_list_to_library
from calibre.gui2.viewer.web_view import viewer_config_dir
@ -23,30 +25,76 @@ from polyglot.builtins import iteritems, itervalues
from polyglot.queue import Queue
annotations_dir = os.path.join(viewer_config_dir, 'annots')
no_cfi_sort_key = cfi_sort_key('/99999999')
def parse_annotations(raw):
return list(_parse_annotations(raw))
def merge_annots_with_identical_field(annots, field='title'):
def bookmark_sort_key(b):
if b.get('pos_type') == 'epubcfi':
return cfi_sort_key(b['pos'], only_path=False)
return no_cfi_sort_key
def highlight_sort_key(hl):
cfi = hl.get('start_cfi')
if cfi:
return cfi_sort_key(cfi, only_path=False)
return no_cfi_sort_key
def sort_annot_list_by_position_in_book(annots, annot_type):
annots.sort(key={'bookmark': bookmark_sort_key, 'highlight': highlight_sort_key}[annot_type])
def merge_annots_with_identical_field(a, b, field='title'):
title_groups = defaultdict(list)
for a in annots:
title_groups[a[field]].append(a)
for x in chain(a, b):
title_groups[x[field]].append(x)
for tg in itervalues(title_groups):
tg.sort(key=itemgetter('timestamp'), reverse=True)
seen = set()
for a in annots:
title = a[field]
changed = False
ans = []
for x in chain(a, b):
title = x[field]
if title not in seen:
seen.add(title)
yield title_groups[title][0]
grp = title_groups[title]
if len(grp) > 1 and grp[0]['timestamp'] != grp[1]['timestamp']:
changed = True
ans.append(grp[0])
if len(ans) != len(a) or len(ans) != len(b):
changed = True
return changed, ans
def merge_annot_lists(a, b, annot_type):
if not a:
return list(b)
if not b:
return list(a)
if annot_type == 'last-read':
ans = a + b
ans.sort(key=itemgetter('timestamp'), reverse=True)
return ans
merge_field = {'bookmark': 'title', 'highlight': 'uuid'}.get(annot_type)
if merge_field is None:
return a + b
changed, c = merge_annots_with_identical_field(a, b, merge_field)
if changed:
sort_annot_list_by_position_in_book(c, annot_type)
return c
def merge_annotations(annots, annots_map):
amap = {}
for annot in annots:
annot = parse_annotation(annot)
annots_map[annot.pop('type')].append(annot)
atype = annot.pop('type')
amap.setdefault(atype, []).append(annot)
lr = annots_map['last-read']
if lr:
lr.sort(key=itemgetter('timestamp'), reverse=True)
@ -143,3 +191,31 @@ class AnnotationsSaveWorker(Thread):
'pathtoebook': current_book_data['pathtoebook'],
'in_book_file': in_book_file and can_save_in_book_file
})
def find_tests():
import unittest
def bm(title, bmid, year=20, first_cfi_number=1):
return {
'title': title, 'id': bmid, 'timestamp': '20{}-06-29T03:21:48.895323+00:00'.format(year),
'pos_type': 'epubcfi', 'pos': 'epubcfi(/{}/4/8)'.format(first_cfi_number)
}
def hl(uuid, hlid, year=20, first_cfi_number=1):
return {
'uuid': uuid, 'id': hlid, 'timestamp': '20{}-06-29T03:21:48.895323+00:00'.format(year),
'start_cfi': 'epubcfi(/{}/4/8)'.format(first_cfi_number)
}
class AnnotationsTest(unittest.TestCase):
def test_merge_annotations(self):
for atype in 'bookmark highlight'.split():
f = bm if atype == 'bookmark' else hl
a = [f('one', 1, 20, 2), f('two', 2, 20, 4), f('a', 3, 20, 16),]
b = [f('one', 10, 30, 2), f('two', 20, 10, 4), f('b', 30, 20, 8),]
c = merge_annot_lists(a, b, atype)
self.assertEqual(tuple(map(itemgetter('id'), c)), (10, 2, 30, 3))
return unittest.TestLoader().loadTestsFromTestCase(AnnotationsTest)