calibre/src/pyj/read_book/search.pyj

442 lines
16 KiB
Plaintext

# vim:fileencoding=utf-8
# License: GPL v3 Copyright: 2017, Kovid Goyal <kovid at kovidgoyal.net>
from __python__ import bound_methods, hash_literals
from elementmaker import E
from book_list.globals import get_session_data
from book_list.theme import get_color
from book_list.top_bar import create_top_bar
from complete import create_search_bar
from dom import clear, add_extra_css, build_rule
from gettext import gettext as _, ngettext
from modals import error_dialog
from read_book.globals import current_book, ui_operations
from read_book.search_worker import (
CONNECT_FAILED, DB_ERROR, GET_SPINE_FAILED, UNHANDLED_ERROR, worker_main
)
from read_book.shortcuts import shortcut_for_key_event
from widgets import create_button, create_spinner
from worker import start_worker
def parse_error_msg(msg):
details = msg.msg
emsg = _('Unknown error')
if msg.code is GET_SPINE_FAILED:
emsg = _('Loading text from the book failed.')
elif msg.code is CONNECT_FAILED:
emsg = _('Connecting to database storing the local copy of the book failed in the worker thread.')
elif msg.code is UNHANDLED_ERROR:
emsg = _('There was an unhandled error while searching.')
elif msg.code is DB_ERROR:
emsg = msg.error.msg
details = msg.error.details
return emsg, details
def get_toc_data(book):
spine = book.manifest.spine
spine_toc_map = {name: v'[]' for name in spine}
parent_map = {}
toc_id_map = {}
def process_node(node):
toc_id_map[node.id] = node
items = spine_toc_map[node.dest]
if items:
items.push(node)
children = node.children
if children:
for child in children:
parent_map[child.id] = node
process_node(child)
toc = book.manifest.toc
if toc:
process_node(toc)
return {
'spine': spine, 'spine_toc_map': spine_toc_map,
'spine_idx_map': {name: idx for idx, name in enumerate(spine)},
'parent_map': parent_map, 'toc_id_map': toc_id_map
}
class SearchOverlay:
display_type = 'flex'
CONTAINER_ID = 'book-search-overlay'
def __init__(self, view):
self.view = view
self.search_in_flight = {'id': None, 'mode': 'contains', 'case_sensitive': False}
self._worker = None
self.request_counter = 0
self.result_map = {}
c = self.container
c.style.backgroundColor = get_color('window-background')
c.style.maxHeight = '100vh'
c.style.minHeight = '100vh'
c.style.flexDirection = 'column'
c.style.alignItems = 'stretch'
c.style.overflow = 'hidden'
c.style.userSelect = 'none'
c.addEventListener('keydown', self.onkeydown)
c.addEventListener('keyup', self.onkeyup)
self.result_handler = None
create_top_bar(c, title=_('Search in book'), action=self.hide, icon='close')
search_button = create_button(_('Search'), 'search')
c.appendChild(E.div(
style='display: flex; padding: 1rem; padding-bottom: 0.5rem; overflow: hidden',
create_search_bar(self.run_search, 'search-in-book', button=search_button),
E.div('\xa0\xa0'), search_button,
))
c.lastChild.firstChild.style.flexGrow = '100'
sd = get_session_data()
mode = sd.get('book_search_mode')
c.appendChild(E.div(
style='display: flex; padding: 1rem; padding-top: 0.5rem; padding-bottom: 0; align-items: center; overflow: hidden',
E.label(
_('Search type:') + '\xa0',
E.select(
name='mode',
title=_('''Type of search:
Contains: Search for the entered text anywhere
Whole words: Search for whole words that equal the entered text
Regex: Interpret the entered text as a regular expression
'''),
E.option(_('Contains'), value='contains', selected=mode=='contains'),
E.option(_('Whole words'), value='word', selected=mode=='word'),
E.option(_('Regex'), value='regex', selected=mode=='regex'),
onchange=def(event):
get_session_data().set('book_search_mode', event.target.value)
),
),
E.div('\xa0\xa0'),
E.label(E.input(
type='checkbox', name='case_sensitive', checked=bool(sd.get('book_search_case_sensitive'))),
onchange=def(event):
get_session_data().set('book_search_case_sensitive', event.target.checked)
, _('Case sensitive'),
),
E.div('\xa0\xa0'),
create_button(_('Return'), 'chevron-left', action=self.return_to_original_position, tooltip=_('Go back to where you were before searching'))
))
c.appendChild(E.hr())
c.appendChild(E.div(
style='display: none; overflow: auto', tabindex='0',
E.div(
style='text-align: center',
E.div(create_spinner('4em', '4em')),
E.div(_('Searching, please wait…'), style='margin-top: 1ex'),
),
E.div(),
))
for child in c.childNodes:
if child is not c.lastChild:
child.style.flexShrink = '0'
@property
def current_query(self):
c = self.container
return {
'mode': c.querySelector('select[name=mode]').value,
'case_sensitive': c.querySelector('input[name=case_sensitive]').checked,
'text': c.querySelector('input[type=search]').value
}
@current_query.setter
def current_query(self, q):
c = self.container
c.querySelector('select[name=mode]').value = q.mode or 'contains'
c.querySelector('input[name=case_sensitive]').checked = bool(q.case_sensitive)
c.querySelector('input[type=search]').value = q.text or ''
@property
def bottom_container(self):
return self.container.lastChild
@property
def results_container(self):
return self.bottom_container.lastChild
def show_wait(self):
c = self.bottom_container
c.style.display = 'block'
c.firstChild.style.display = 'block'
c.lastChild.style.display = 'none'
def show_results(self):
c = self.bottom_container
c.style.display = 'block'
c.firstChild.style.display = 'none'
c.lastChild.style.display = 'block'
c.focus()
def clear_results(self):
clear(self.results_container)
self.result_map = {}
@property
def worker(self):
if not self._worker:
self._worker = start_worker('read_book.search')
self._worker.onmessage = self.on_worker_message
self.clear_caches()
return self._worker
def do_initial_search(self, text, query):
q = {'mode': 'contains', 'case_sensitive': True, 'text': text, 'only_first_match': True}
self.request_counter += 1
self.initial_search_result_counter = 0
self.initial_search_human_readable_query = query
self.search_in_flight.id = self.request_counter
self.result_handler = self.handle_initial_search_result
self.worker.postMessage({
'type': 'search', 'current_name': '', 'id': self.request_counter, 'query': q
})
def handle_initial_search_result(self, msg):
if msg.type is 'error':
emsg, details = parse_error_msg(msg)
error_dialog(_('Could not search'), emsg, details)
self.result_handler = None
self.initial_search_result_counter = 0
elif msg.id is self.search_in_flight.id:
if msg.type is 'search_complete':
self.search_in_flight.id = None
self.result_handler = None
if self.initial_search_result_counter is 0:
self.view.hide_loading()
window.alert(_('The full text search query {} was not found in the book, try a manual search.').format(self.initial_search_human_readable_query))
self.initial_search_result_counter = 0
elif msg.type is 'search_result':
self.initial_search_result_counter += 1
if self.initial_search_result_counter is 1:
self.view.discover_search_result(msg.result)
self.view.hide_loading()
def queue_search(self, query, book, current_name):
self.request_counter += 1
self.original_position = self.view.currently_showing.bookpos
self.view.get_current_cfi('search-original-pos', self.set_original_pos)
self.search_in_flight.id = self.request_counter
self.worker.postMessage({
'type': 'search', 'current_name': current_name, 'id': self.request_counter, 'query': query
})
self.clear_results()
self.show_wait()
def set_original_pos(self, request_id, data):
self.original_position = data.cfi
def return_to_original_position(self):
if self.original_position:
self.view.goto_cfi(self.original_position)
self.hide()
def on_worker_message(self, evt):
msg = evt.data
if self.result_handler:
return self.result_handler(msg)
if msg.type is 'error':
emsg, details = parse_error_msg(msg)
error_dialog(_('Could not search'), emsg, details)
elif msg.id is self.search_in_flight.id:
if msg.type is 'search_complete':
self.search_in_flight.id = None
if Object.keys(self.result_map).length is 0:
self.no_result_received()
elif msg.type is 'search_result':
self.result_received(msg.result)
def no_result_received(self):
self.show_results()
self.results_container.appendChild(E.div(
style='margin: 1rem',
_('No matching results found')))
def result_received(self, result):
self.show_results()
self.result_map[result.result_num] = result
sr = Object.assign({}, result)
self.view.discover_search_result(sr)
toc_node_id = result.toc_nodes[0] if result.toc_nodes.length else -1
toc_node = self.toc_data.toc_id_map[toc_node_id]
c = self.results_container
group = c.querySelector(f'[data-toc-node-id="{toc_node_id}"]')
if not group:
group = E.div(
data_toc_node_id=toc_node_id + '',
data_spine_index=result.spine_idx + '',
E.div(
E.span('+\xa0', style='display: none'),
E.span(toc_node?.title or _('Unknown')),
title=_('Click to show/hide the results in this chapter'),
onclick=def(ev):
ev.target.closest('[data-toc-node-id]').classList.toggle('collapsed')
),
E.ul()
)
appended = False
for child in c.querySelectorAll('[data-spine-index]'):
csi = parseInt(child.dataset.spineIndex)
if csi > result.spine_idx:
appended = True
c.insertBefore(group, child)
break
if not appended:
c.appendChild(group)
ul = group.getElementsByTagName('ul')[0]
tt = ''
if result.toc_nodes.length:
lines = v'[]'
for i, node_id in enumerate(result.toc_nodes):
lines.push('\xa0\xa0' * i + '➤ ' + (self.toc_data.toc_id_map[node_id]?.title or _('Unknown')))
tt = ngettext('Table of Contents section:', 'Table of Contents sections:', lines.length)
tt += '\n' + lines.join('\n')
rnum = result.result_num
entry = E.li(title=tt, data_result_num=rnum + '', onclick=self.result_clicked.bind(None, rnum))
if result.before:
entry.appendChild(E.span('…' + result.before))
entry.appendChild(E.strong(result.text))
if result.after:
entry.appendChild(E.span(result.after + '…'))
ul.appendChild(entry)
@property
def current_result_container(self):
return self.container.querySelector('.current')
def make_result_current(self, result_num):
q = result_num + ''
for li in self.container.querySelectorAll('[data-result-num]'):
if li.dataset.resultNum is q:
li.classList.add('current')
li.scrollIntoView()
else:
li.classList.remove('current')
def search_result_discovered(self, sr):
self.make_result_current(sr.result_num)
def search_result_not_found(self, sr):
if sr.on_discovery:
return
error_dialog(
_('Search result not found'), _(
'This search result matches text that is hidden in the book and cannot be displayed'))
self.show()
def select_search_result_in_book(self, result_num):
sr = Object.assign({}, self.result_map[result_num])
sr.on_discovery = 0
self.view.show_search_result(sr)
def result_clicked(self, rnum):
self.make_result_current(rnum)
self.select_search_result_in_book(rnum)
self.hide()
def clear_caches(self, book):
self.clear_results()
self.bottom_container.style.display = 'none'
if self._worker:
book = book or current_book()
self.toc_data = get_toc_data(book)
data = {
'book_hash': book.book_hash, 'stored_files': book.stored_files, 'spine': book.manifest.spine,
'toc_data': self.toc_data
}
self.worker.postMessage({'type': 'clear_caches', 'book': data})
def next_match(self, delta):
delta = delta or 1
num_of_results = Object.keys(self.result_map).length
c = self.current_result_container
if c:
rnum = parseInt(c.dataset.resultNum) - 1
rnum = (rnum + delta + num_of_results) % num_of_results
rnum += 1
else:
rnum = 1
self.make_result_current(rnum)
self.results_container.focus()
cr = self.current_result_container
if cr:
self.select_search_result_in_book(cr.dataset.resultNum)
def onkeyup(self, event):
if event.key is 'Escape' or event.key is 'Esc':
self.hide()
event.stopPropagation(), event.preventDefault()
def onkeydown(self, event):
sc_name = shortcut_for_key_event(event, self.view.keyboard_shortcut_map)
if sc_name is 'next_match':
self.next_match(1)
event.stopPropagation(), event.preventDefault()
return
if sc_name is 'previous_match':
self.next_match(-1)
event.stopPropagation(), event.preventDefault()
return
def find_next(self, backwards):
self.next_match(-1 if backwards else 1)
@property
def container(self):
return document.getElementById(self.CONTAINER_ID)
@property
def is_visible(self):
return self.container.style.display is not 'none'
def set_text(self, text):
self.container.querySelector('input[type=search]').value = text or ''
def hide(self):
self.container.style.display = 'none'
ui_operations.focus_iframe()
def show(self):
c = self.container
c.style.display = self.display_type
inp = c.querySelector('input')
inp.focus(), inp.select()
def run_search(self):
q = self.current_query
if not q.text:
self.clear_results()
self.show_results()
else:
self.queue_search(q, current_book(), self.view.currently_showing.name)
add_extra_css(def():
css = ''
sel = f'#{SearchOverlay.CONTAINER_ID} '
sel += ' div[data-toc-node-id]'
css += build_rule(sel, margin='1rem')
css += sel + '.collapsed > div > span { display: inline !important; }'
css += build_rule(sel + '.collapsed > ul', display='none')
css += build_rule(sel + ' > div', font_style='italic', font_weight='bold', cursor='pointer')
css += build_rule(sel + ' li', list_style_type='none', margin='1rem', margin_right='0', cursor='pointer')
css += build_rule(sel + ' li.current', border_left='solid 2px ' + get_color('link-foreground'), padding_left='2px')
css += build_rule(sel + ' li strong', color=get_color('link-foreground'), font_style='italic')
return css
)
main = worker_main