Test replacement search query parser

This commit is contained in:
Charles Haley 2013-04-16 13:33:08 +02:00
parent 3631011901
commit a5b66f0b1c
2 changed files with 3756 additions and 3683 deletions

File diff suppressed because it is too large Load Diff

View File

@ -16,11 +16,9 @@ methods :method:`SearchQueryParser.universal_set` and
If this module is run, it will perform a series of unit tests. If this module is run, it will perform a series of unit tests.
''' '''
import sys, operator, weakref import sys, operator, weakref, re
from calibre.utils.pyparsing import (CaselessKeyword, Group, Forward, from calibre.utils.pyparsing import ParseException
CharsNotIn, Suppress, OneOrMore, MatchFirst, CaselessLiteral,
Optional, NoMatch, ParseException, QuotedString)
from calibre.constants import preferred_encoding from calibre.constants import preferred_encoding
from calibre.utils.icu import sort_key from calibre.utils.icu import sort_key
from calibre import prints from calibre import prints
@ -96,6 +94,138 @@ def saved_searches():
global ss global ss
return ss return ss
'''
Parse a search expression into a series of potentially recursive operations.
The syntax is a bit twisted.
prog ::= or_expression
or_expression ::= and_expression [ 'or' or_expression ]
and_expression ::= not_expression [ ( [ 'and' ] and_expression ) | ( '(' or_expression ')' ) ]
not_expression ::= [ 'not' ] base_token
base_token ::= location_expression | ( '(' or_expression ')' )
location_expression ::= [ word [ ':' word ]*
'''
class Parser(object):
def __init__(self):
self.current_token = 0
self.tokens = None
OPCODE = 1
WORD = 2
EOF = 3
# Had to translate named constants to numeric values
lex_scanner = re.Scanner([
(r'[():]', lambda x,t: (1, t)),
(r'[^ "():]+', lambda x,t: (2, unicode(t))),
(r'".*?((?<!\\)")', lambda x,t: (2, t[1:-1])),
(r'\s', None)
], flags=re.DOTALL)
def token(self):
if self.is_eof():
return None
return self.tokens[self.current_token][1]
def token_type(self):
if self.is_eof():
return self.EOF
return self.tokens[self.current_token][0]
def is_eof(self):
return self.current_token >= len(self.tokens)
def advance(self):
self.current_token += 1
def parse(self, expr, locations):
self.locations = locations
self.tokens = self.lex_scanner.scan(icu_lower(expr))[0]
self.current_token = 0
prog = self.or_expression()
if not self.is_eof():
raise ParseException(_('Extra characters at end of search'))
# prints(self.tokens, '\n', prog)
return prog
def or_expression(self):
lhs = self.and_expression()
if self.is_eof():
return lhs
if self.token() == 'or':
self.advance()
return ['or', lhs, self.or_expression()]
return lhs
def and_expression(self):
lhs = self.not_expression()
if self.is_eof():
return lhs
if self.token() == 'and':
self.advance()
return ['and', lhs, self.and_expression()]
# Account for the optional 'and'
if self.token_type() == self.WORD and self.token() != 'or':
return ['and', lhs, self.and_expression()]
elif self.token() == '(':
self.advance()
rhs = self.or_expression()
if self.token() != ')':
raise ParseException('missing )')
else:
self.advance();
return ['and', lhs, rhs]
return lhs
def not_expression(self):
if self.token() == 'not':
self.advance()
return ['not', self.not_expression()]
return self.base_token()
def base_token(self):
if self.token() == '(':
self.advance()
res = self.or_expression()
if self.token() != ')':
raise ParseException('missing )')
self.advance()
return res
if self.token_type() != self.WORD:
raise ParseException('Invalid syntax. Expected a lookup name or a word')
return self.location_expression()
def location_expression(self):
loc = self.token()
self.advance()
if self.token() == ':':
if loc in self.locations:
val = ''
else:
val = loc + ':'
loc = 'all'
self.advance()
while True:
val += self.token()
self.advance()
if self.token() == ':':
val += ':'
self.advance()
else:
break
return ['token', loc, val]
return ['token', 'all', loc]
class SearchQueryParser(object): class SearchQueryParser(object):
''' '''
Parses a search query. Parses a search query.
@ -139,65 +269,9 @@ class SearchQueryParser(object):
self.sqp_initialize(locations, optimize=self.optimize) self.sqp_initialize(locations, optimize=self.optimize)
def sqp_initialize(self, locations, test=False, optimize=False): def sqp_initialize(self, locations, test=False, optimize=False):
self.locations = locations
self._tests_failed = False self._tests_failed = False
self.optimize = optimize self.optimize = optimize
# Define a token
standard_locations = map(lambda x : CaselessLiteral(x)+Suppress(':'),
locations)
location = NoMatch()
for l in standard_locations:
location |= l
location = Optional(location, default='all')
word_query = CharsNotIn(u'\t\r\n\u00a0 ' + u'()')
#quoted_query = Suppress('"')+CharsNotIn('"')+Suppress('"')
quoted_query = QuotedString('"', escChar='\\')
query = quoted_query | word_query
Token = Group(location + query).setResultsName('token')
if test:
print 'Testing Token parser:'
Token.validate()
failed = SearchQueryParser.run_tests(Token, 'token',
(
('tag:asd', ['tag', 'asd']),
(u'ddsä', ['all', u'ddsä']),
('"one \\"two"', ['all', 'one "two']),
('title:"one \\"1.5\\" two"', ['title', 'one "1.5" two']),
('title:abc"def', ['title', 'abc"def']),
)
)
Or = Forward()
Parenthesis = Group(
Suppress('(') + Or + Suppress(')')
).setResultsName('parenthesis') | Token
Not = Forward()
Not << (Group(
Suppress(CaselessKeyword("not")) + Not
).setResultsName("not") | Parenthesis)
And = Forward()
And << (Group(
Not + Suppress(CaselessKeyword("and")) + And
).setResultsName("and") | Group(
Not + OneOrMore(~MatchFirst(list(map(CaselessKeyword,
('and', 'or')))) + And)
).setResultsName("and") | Not)
Or << (Group(
And + Suppress(CaselessKeyword("or")) + Or
).setResultsName("or") | And)
if test:
#Or.validate()
self._tests_failed = bool(failed)
self._parser = Or
self._parser.setDebug(False)
def parse(self, query): def parse(self, query):
# empty the list of searches used for recursion testing # empty the list of searches used for recursion testing
@ -213,9 +287,8 @@ class SearchQueryParser(object):
def _parse(self, query, candidates=None): def _parse(self, query, candidates=None):
self.recurse_level += 1 self.recurse_level += 1
try: try:
res = self._parser.parseString(query)[0] res = Parser().parse(query, self.locations)
except RuntimeError: except RuntimeError:
import repr
raise ParseException('Failed to parse query, recursion limit reached: %s'%repr(query)) raise ParseException('Failed to parse query, recursion limit reached: %s'%repr(query))
if candidates is None: if candidates is None:
candidates = self.universal_set() candidates = self.universal_set()
@ -227,7 +300,7 @@ class SearchQueryParser(object):
return getattr(self, 'evaluate_'+group_name) return getattr(self, 'evaluate_'+group_name)
def evaluate(self, parse_result, candidates): def evaluate(self, parse_result, candidates):
return self.method(parse_result.getName())(parse_result, candidates) return self.method(parse_result[0])(parse_result[1:], candidates)
def evaluate_and(self, argument, candidates): def evaluate_and(self, argument, candidates):
# RHS checks only those items matched by LHS # RHS checks only those items matched by LHS
@ -249,8 +322,8 @@ class SearchQueryParser(object):
# return self.universal_set().difference(self.evaluate(argument[0])) # return self.universal_set().difference(self.evaluate(argument[0]))
return candidates.difference(self.evaluate(argument[0], candidates)) return candidates.difference(self.evaluate(argument[0], candidates))
def evaluate_parenthesis(self, argument, candidates): # def evaluate_parenthesis(self, argument, candidates):
return self.evaluate(argument[0], candidates) # return self.evaluate(argument[0], candidates)
def evaluate_token(self, argument, candidates): def evaluate_token(self, argument, candidates):
location = argument[0] location = argument[0]