Finish up tests for per library restrictions

This commit is contained in:
Kovid Goyal 2017-05-18 17:59:19 +05:30
parent df9449207d
commit 07e368873a
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
7 changed files with 68 additions and 29 deletions

View File

@ -15,7 +15,7 @@ from calibre.library.field_metadata import category_icon_map
from calibre.db.view import sanitize_sort_field_name from calibre.db.view import sanitize_sort_field_name
from calibre.ebooks.metadata import title_sort from calibre.ebooks.metadata import title_sort
from calibre.ebooks.metadata.book.json_codec import JsonCodec from calibre.ebooks.metadata.book.json_codec import JsonCodec
from calibre.srv.errors import HTTPNotFound from calibre.srv.errors import HTTPNotFound, BookNotFound
from calibre.srv.routes import endpoint, json from calibre.srv.routes import endpoint, json
from calibre.srv.content import get as get_content, icon as get_icon from calibre.srv.content import get as get_content, icon as get_icon
from calibre.srv.utils import http_date, custom_fields_to_display, encode_name, decode_name, get_db from calibre.srv.utils import http_date, custom_fields_to_display, encode_name, decode_name, get_db
@ -169,7 +169,7 @@ def book(ctx, rd, book_id, library_id):
except Exception: except Exception:
book_id = None book_id = None
if book_id is None or not ctx.has_id(rd, db, book_id): if book_id is None or not ctx.has_id(rd, db, book_id):
raise HTTPNotFound('Book with id %r does not exist' % oid) raise BookNotFound(oid, db)
category_urls = rd.query.get('category_urls', 'true').lower() category_urls = rd.query.get('category_urls', 'true').lower()
device_compatible = rd.query.get('device_compatible', 'false').lower() device_compatible = rd.query.get('device_compatible', 'false').lower()
device_for_template = rd.query.get('device_for_template', None) device_for_template = rd.query.get('device_for_template', None)

View File

@ -16,7 +16,7 @@ from calibre.constants import cache_dir, iswindows
from calibre.customize.ui import plugin_for_input_format from calibre.customize.ui import plugin_for_input_format
from calibre.srv.metadata import book_as_json from calibre.srv.metadata import book_as_json
from calibre.srv.render_book import RENDER_VERSION from calibre.srv.render_book import RENDER_VERSION
from calibre.srv.errors import HTTPNotFound from calibre.srv.errors import HTTPNotFound, BookNotFound
from calibre.srv.routes import endpoint, json from calibre.srv.routes import endpoint, json
from calibre.srv.utils import get_library_data, get_db from calibre.srv.utils import get_library_data, get_db
@ -131,7 +131,7 @@ def book_manifest(ctx, rd, book_id, fmt):
if plugin_for_input_format(fmt) is None: if plugin_for_input_format(fmt) is None:
raise HTTPNotFound('The format %s cannot be viewed' % fmt.upper()) raise HTTPNotFound('The format %s cannot be viewed' % fmt.upper())
if not ctx.has_id(rd, db, book_id): if not ctx.has_id(rd, db, book_id):
raise HTTPNotFound('No book with id: %s in library: %s' % (book_id, library_id)) raise BookNotFound(book_id, db)
with db.safe_read_lock: with db.safe_read_lock:
fm = db.format_metadata(book_id, fmt) fm = db.format_metadata(book_id, fmt)
if not fm: if not fm:
@ -167,7 +167,7 @@ def book_manifest(ctx, rd, book_id, fmt):
def book_file(ctx, rd, book_id, fmt, size, mtime, name): def book_file(ctx, rd, book_id, fmt, size, mtime, name):
db, library_id = get_library_data(ctx, rd)[:2] db, library_id = get_library_data(ctx, rd)[:2]
if not ctx.has_id(rd, db, book_id): if not ctx.has_id(rd, db, book_id):
raise HTTPNotFound('No book with id: %s in library: %s' % (book_id, library_id)) raise BookNotFound(book_id, db)
bhash = book_hash(db.library_id, book_id, fmt, size, mtime) bhash = book_hash(db.library_id, book_id, fmt, size, mtime)
base = abspath(os.path.join(books_cache_dir(), 'f')) base = abspath(os.path.join(books_cache_dir(), 'f'))
mpath = abspath(os.path.join(base, bhash, name)) mpath = abspath(os.path.join(base, bhash, name))
@ -209,7 +209,7 @@ def set_last_read_position(ctx, rd, library_id, book_id, fmt):
db = get_db(ctx, rd, library_id) db = get_db(ctx, rd, library_id)
user = rd.username or None user = rd.username or None
if not ctx.has_id(rd, db, book_id): if not ctx.has_id(rd, db, book_id):
raise HTTPNotFound('No book with id {} found'.format(book_id)) raise BookNotFound(book_id, db)
try: try:
data = jsonlib.load(rd.request_body_file) data = jsonlib.load(rd.request_body_file)
device, cfi, pos_frac = data['device'], data['cfi'], data['pos_frac'] device, cfi, pos_frac = data['device'], data['cfi'], data['pos_frac']

View File

@ -28,6 +28,9 @@ def cdb_run(ctx, rd, which, version):
raise HTTPNotFound(('The module {} is not available in version: {}.' raise HTTPNotFound(('The module {} is not available in version: {}.'
'Make sure the version of calibre used for the' 'Make sure the version of calibre used for the'
' server and calibredb match').format(which, version)) ' server and calibredb match').format(which, version))
db = get_library_data(ctx, rd, strict_library_id=True)[0]
if ctx.restriction_for(rd, db):
raise HTTPForbidden('Cannot use the command-line db interface with a user who has per library restrictions')
raw = rd.read() raw = rd.read()
ct = rd.inheaders.get('Content-Type', all=True) ct = rd.inheaders.get('Content-Type', all=True)
try: try:
@ -37,9 +40,6 @@ def cdb_run(ctx, rd, which, version):
args = json_loads(raw) args = json_loads(raw)
except Exception: except Exception:
raise HTTPBadRequest('args are not valid encoded data') raise HTTPBadRequest('args are not valid encoded data')
db = get_library_data(ctx, rd, strict_library_id=True)[0]
if ctx.restriction_for(rd, db):
raise HTTPForbidden('Cannot use the command-line db interface with a user who has per library restrictions')
if getattr(m, 'needs_srv_ctx', False): if getattr(m, 'needs_srv_ctx', False):
args = [ctx] + list(args) args = [ctx] + list(args)
try: try:

View File

@ -10,7 +10,7 @@ from calibre import as_unicode
from calibre.customize.ui import available_input_formats from calibre.customize.ui import available_input_formats
from calibre.db.view import sanitize_sort_field_name from calibre.db.view import sanitize_sort_field_name
from calibre.srv.ajax import search_result from calibre.srv.ajax import search_result
from calibre.srv.errors import HTTPNotFound, HTTPBadRequest from calibre.srv.errors import HTTPNotFound, HTTPBadRequest, BookNotFound
from calibre.srv.metadata import book_as_json, categories_as_json, icon_map, categories_settings from calibre.srv.metadata import book_as_json, categories_as_json, icon_map, categories_settings
from calibre.srv.routes import endpoint, json from calibre.srv.routes import endpoint, json
from calibre.srv.utils import get_library_data, get_use_roman from calibre.srv.utils import get_library_data, get_use_roman
@ -308,17 +308,14 @@ def book_metadata(ctx, rd, book_id):
''' '''
library_id, db, sorts, orders, vl = get_basic_query_data(ctx, rd) library_id, db, sorts, orders, vl = get_basic_query_data(ctx, rd)
def notfound():
raise HTTPNotFound(_('No book with id: {} in library: {}').format(book_id, library_id))
if not book_id: if not book_id:
all_ids = ctx.allowed_book_ids(rd, db) all_ids = ctx.allowed_book_ids(rd, db)
book_id = random.choice(tuple(all_ids)) book_id = random.choice(tuple(all_ids))
elif not ctx.has_id(rd, db, book_id): elif not ctx.has_id(rd, db, book_id):
notfound() raise BookNotFound(book_id, db)
data = book_as_json(db, book_id) data = book_as_json(db, book_id)
if data is None: if data is None:
notfound() raise BookNotFound(book_id, db)
data['id'] = book_id # needed for random book view (when book_id=0) data['id'] = book_id # needed for random book view (when book_id=0)
return data return data

View File

@ -21,7 +21,7 @@ from calibre.ebooks.metadata import authors_to_string
from calibre.ebooks.metadata.meta import set_metadata from calibre.ebooks.metadata.meta import set_metadata
from calibre.ebooks.metadata.opf2 import metadata_to_opf from calibre.ebooks.metadata.opf2 import metadata_to_opf
from calibre.library.save_to_disk import find_plugboard from calibre.library.save_to_disk import find_plugboard
from calibre.srv.errors import HTTPNotFound from calibre.srv.errors import HTTPNotFound, BookNotFound
from calibre.srv.routes import endpoint, json from calibre.srv.routes import endpoint, json
from calibre.srv.utils import http_date, get_db, get_use_roman from calibre.srv.utils import http_date, get_db, get_use_roman
from calibre.utils.config_base import tweaks from calibre.utils.config_base import tweaks
@ -281,7 +281,7 @@ def get(ctx, rd, what, book_id, library_id):
raise HTTPNotFound('Library %r not found' % library_id) raise HTTPNotFound('Library %r not found' % library_id)
with db.safe_read_lock: with db.safe_read_lock:
if not ctx.has_id(rd, db, book_id): if not ctx.has_id(rd, db, book_id):
raise HTTPNotFound('Book with id %r does not exist' % book_id) raise BookNotFound(book_id, db)
library_id = db.server_library_id # in case library_id was None library_id = db.server_library_id # in case library_id was None
if what == 'thumb': if what == 'thumb':
sz = rd.query.get('sz') sz = rd.query.get('sz')

View File

@ -56,3 +56,9 @@ class HTTPForbidden(HTTPSimpleResponse):
def __init__(self, http_message='', close_connection=True): def __init__(self, http_message='', close_connection=True):
HTTPSimpleResponse.__init__(self, httplib.FORBIDDEN, http_message, close_connection) HTTPSimpleResponse.__init__(self, httplib.FORBIDDEN, http_message, close_connection)
class BookNotFound(HTTPNotFound):
def __init__(self, book_id, db):
HTTPNotFound.__init__(self, 'No book with id: {} in library: {}'.format(book_id, db.server_library_id))

View File

@ -9,15 +9,15 @@ __copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
import httplib, zlib, json, base64, os import httplib, zlib, json, base64, os
from functools import partial from functools import partial
from urllib import urlencode from urllib import urlencode
from httplib import OK, NOT_FOUND from httplib import OK, NOT_FOUND, FORBIDDEN
from calibre.srv.tests.base import LibraryBaseTest from calibre.srv.tests.base import LibraryBaseTest
def make_request(conn, url, headers={}, prefix='/ajax', username=None, password=None): def make_request(conn, url, headers={}, prefix='/ajax', username=None, password=None, method='GET'):
if username and password: if username and password:
headers[b'Authorization'] = b'Basic ' + base64.standard_b64encode((username + ':' + password).encode('utf-8')) headers[b'Authorization'] = b'Basic ' + base64.standard_b64encode((username + ':' + password).encode('utf-8'))
conn.request('GET', prefix + url, headers=headers) conn.request(method, prefix + url, headers=headers)
r = conn.getresponse() r = conn.getresponse()
data = r.read() data = r.read()
if r.status == httplib.OK and data and data[0] in b'{[': if r.status == httplib.OK and data and data[0] in b'{[':
@ -88,27 +88,63 @@ class ContentTest(LibraryBaseTest):
db = server.handler.router.ctx.library_broker.get(None) db = server.handler.router.ctx.library_broker.get(None)
db.set_pref('virtual_libraries', {'1':'id:1', '12':'id:1 or id:2'}) db.set_pref('virtual_libraries', {'1':'id:1', '12':'id:1 or id:2'})
db.set_field('tags', {1: ['present'], 3: ['missing']}) db.set_field('tags', {1: ['present'], 3: ['missing']})
self.assertTrue(db.has_id(3))
server.handler.ctx.user_manager.add_user('12', 'test', restriction={ server.handler.ctx.user_manager.add_user('12', 'test', restriction={
'library_restrictions':{os.path.basename(db.backend.library_path): 'id:1 or id:2'}}) 'library_restrictions':{os.path.basename(db.backend.library_path): 'id:1 or id:2'}})
conn = server.connect() conn = server.connect()
url_for = server.handler.router.url_for
def r(path, status=OK): def url_for(path, **kw):
r, data = make_request(conn, path, username='12', password='test', prefix='') p, q = path.partition('?')[::2]
self.assertEqual(status, r.status) ans = server.handler.router.url_for(p, **kw)
if q:
ans += '?' + q
return ans
ae = self.assertEqual
def r(path, status=OK, method='GET'):
r, data = make_request(conn, path, username='12', password='test', prefix='', method=method)
ae(status, r.status)
if status == NOT_FOUND:
p = data.partition(':')[0]
ae(p, 'No book with id')
return data return data
ok = r ok = r
nf = partial(r, status=NOT_FOUND) nf = partial(r, status=NOT_FOUND)
# ajax.py
ok(url_for('/ajax/book', book_id=1)) ok(url_for('/ajax/book', book_id=1))
nf(url_for('/ajax/book', book_id=3)) nf(url_for('/ajax/book', book_id=3))
data = ok(url_for('/ajax/books')) data = ok(url_for('/ajax/books'))
self.assertIsNone(data['3']) self.assertIsNone(data['3'])
for i in '12': for i in '12':
self.assertIsNotNone(data[i]) self.assertIsNotNone(data[i])
self.assertEqual(set(r('/ajax/search')['book_ids']), {1,2}) ae(set(r(url_for('/ajax/search'))['book_ids']), {1,2})
self.assertEqual(set(r('/ajax/search?query=id:2')['book_ids']), {2}) ae(set(r(url_for('/ajax/search?query=id:2'))['book_ids']), {2})
self.assertEqual(set(r('/ajax/search?vl=1')['book_ids']), {1}) ae(set(r(url_for('/ajax/search?vl=1'))['book_ids']), {1})
nf(url_for('/book-manifest', book_id=3, fmt='x')) # books.py
nf(url_for('/book-file', book_id=3, fmt='x', size=1, mtime=1, name='x')) nf(url_for('/book-manifest', book_id=3, fmt='TXT'))
nf(url_for('/book-file', book_id=3, fmt='TXT', size=1, mtime=1, name='x'))
data = ok(url_for('/book-get-last-read-position', library_id=db.server_library_id, which='1-TXT_3-TXT'))
ae(set(data), {'1:TXT'})
nf(url_for('/book-set-last-read-position', book_id=3, library_id=db.server_library_id, fmt='TXT'), method='POST')
# cdb.py
r(url_for('/cdb/cmd', which='list'), status=FORBIDDEN)
# code.py
def sr(path, **k):
return set(ok(url_for(path, **k))['search_result']['book_ids'])
for q in 'books-init init get-books'.split():
ae(sr('/interface-data/' + q), {1, 2})
ae(sr('/interface-data/get-books?vl=1'), {1})
ok(url_for('/interface-data/book-metadata', book_id=1))
nf(url_for('/interface-data/book-metadata', book_id=3))
# content.py
ok(url_for('/get', what='thumb', book_id=1))
nf(url_for('/get', what='thumb', book_id=3))
# Not going test legacy and opds as they are to painful