Terminal UI for managing library restrictions

This commit is contained in:
Kovid Goyal 2017-04-18 14:21:55 +05:30
parent fa0fa3cba9
commit c6cd432d92
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 143 additions and 21 deletions

View File

@ -7,6 +7,7 @@ __license__ = 'GPL v3'
__copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
import sys, os, signal
from functools import partial
from calibre import as_unicode, prints
from calibre.constants import plugins, iswindows, preferred_encoding
@ -87,7 +88,11 @@ def manage_users(path=None):
m = UserManager(path)
enc = getattr(sys.stdin, 'encoding', preferred_encoding) or preferred_encoding
def choice(question, choices, default=None, banner=''):
def get_input(prompt):
prints(prompt, end=' ')
return raw_input().decode(enc)
def choice(question=_('What do you want to do?'), choices=(), default=None, banner=''):
prints(banner)
for i, choice in enumerate(choices):
prints('%d)' % (i+1), choice)
@ -96,10 +101,11 @@ def manage_users(path=None):
prompt = question + ' [1-%d]:' % len(choices)
if default is not None:
prompt = question + ' [1-%d %s: %d]' % (len(choices), _('default'), default+1)
reply = raw_input(prompt)
reply = get_input(prompt)
if not reply and default is not None:
reply = str(default + 1)
if not reply:
prints(_('No choice selected, exiting...'))
raise SystemExit(0)
reply = reply.strip()
try:
@ -112,7 +118,7 @@ def manage_users(path=None):
def get_valid(prompt, invalidq=lambda x: None):
while True:
ans = raw_input(prompt + ': ').strip().decode(enc)
ans = get_input(prompt + ':').strip()
fail_message = invalidq(ans)
if fail_message is None:
return ans
@ -154,24 +160,80 @@ def manage_users(path=None):
def remove_user():
un = get_valid_user()
if raw_input((_('Are you sure you want to remove the user %s?') % un) + ' [y/n]: ').decode(enc) != 'y':
if get_input((_('Are you sure you want to remove the user %s?') % un) + ' [y/n]:') != 'y':
raise SystemExit(0)
m.remove_user(un)
prints(_('User %s successfully removed!') % un)
def edit_user():
username = get_valid_user()
def change_password(username):
pw = get_pass(username)
m.change_password(username, pw)
prints(_('Password for %s successfully changed!') % username)
def show_password():
username = get_valid_user()
def show_password(username):
pw = m.get(username)
prints(_('Password for {0} is: {1}').format(username, pw))
prints(_('Current password for {0} is: {1}').format(username, pw))
{0:add_user, 1:edit_user, 2:remove_user, 3:show_password}[choice(_('What do you want to do?'), [
_('Add a new user'), _('Edit an existing user'), _('Remove a user'), _('Show the password for a user')])]()
def change_readonly(username):
readonly = m.is_readonly(username)
if readonly:
q = _('Allow {} to make changes (i.e. grant write access)?')
else:
q = _('Prevent {} from making changes (i.e. remove write access)?')
if get_input(q.format(username) + ' [y/n]:').lower() == 'y':
m.set_readonly(username, not readonly)
def change_restriction(username):
r = m.restrictions(username) or {}
if r['allowed_library_names']:
prints(_('{} is currently only allowed to access the libraries named: {}').format(
username, ', '.join(r['allowed_library_names'])))
if r['blocked_library_names']:
prints(_('{} is currently not allowed to access the libraries named: {}').format(
username, ', '.join(r['blocked_library_names'])))
c = choice(choices=[
_('Allow access to all libraries'), _('Allow access to only specified libraries'),
_('Allow access to all, except specified libraries'), _('Cancel')])
if c == 0:
m.update_user_restrictions(username, {})
elif c == 3:
pass
else:
names = get_input(_('Enter a comma separated list of library names:'))
names = filter(None, [x.strip() for x in names.split(',')])
w = 'allowed_library_names' if c == 1 else 'blocked_library_names'
t = _('Allowing access only to libraries: {}') if c == 1 else _(
'Allowing access to all libraries, except: {}')
prints(t.format(', '.join(names)))
m.update_user_restrictions(username, {w:names})
def edit_user(username=None):
username = username or get_valid_user()
c = choice(choices=[
_('Show password for {}').format(username),
_('Change password for {}').format(username),
_('Change read/write permission for {}').format(username),
_('Change the libraries {} is allowed to access').format(username),
_('Cancel'),
], banner='\n' + _('{} has {} access').format(
username, _('readonly') if m.is_readonly(username) else _('read-write'))
)
print()
if c > 3:
actions.append(toplevel)
return
{0: show_password, 1: change_password, 2: change_readonly, 3: change_restriction}[c](username)
actions.append(partial(edit_user, username=username))
def toplevel():
{0:add_user, 1:edit_user, 2:remove_user, 3:lambda: None}[choice(choices=[
_('Add a new user'), _('Edit an existing user'), _('Remove a user'),
_('Cancel')])]()
actions = [toplevel]
while actions:
actions[0]()
del actions[0]
# }}}

View File

@ -24,6 +24,24 @@ def load_json(raw):
return {}
def parse_restriction(raw):
r = load_json(raw)
if not isinstance(r, dict):
r = {}
r['allowed_library_names'] = frozenset(r.get('allowed_library_names', ()))
r['blocked_library_names'] = frozenset(r.get('blocked_library_names', ()))
return r
def serialize_restriction(r):
ans = {}
for x in 'allowed_library_names blocked_library_names'.split():
v = r.get(x)
if v:
ans[x] = list(v)
return json.dumps(ans)
class UserManager(object):
lock = RLock()
@ -62,6 +80,7 @@ class UserManager(object):
self.path = os.path.join(config_dir, 'server-users.sqlite') if path is None else path
self._conn = None
self._restrictions = {}
self._readonly = {}
def get_session_data(self, username):
with self.lock:
@ -92,8 +111,9 @@ class UserManager(object):
def validate_username(self, username):
if self.has_user(username):
return _('The username %s already exists') % username
if re.sub(r'[a-zA-Z0-9 ]', '', username):
return _('For maximum compatibility you should use only the letters A-Z, the numbers 0-9 and spaces in the username')
if re.sub(r'[a-zA-Z_0-9 ]', '', username):
return _('For maximum compatibility you should use only the letters A-Z,'
' the numbers 0-9 and spaces or underscores in the username')
def validate_password(self, pw):
try:
@ -107,11 +127,9 @@ class UserManager(object):
if msg is not None:
raise ValueError(msg)
restriction = restriction or {}
if not isinstance(restriction, dict):
raise TypeError('restriction must be a dict')
self.conn.cursor().execute(
'INSERT INTO users (name, pw, restriction, readonly) VALUES (?, ?, ?, ?)',
(username, pw, json.dumps(restriction), ('y' if readonly else 'n')))
(username, pw, serialize_restriction(restriction), ('y' if readonly else 'n')))
def remove_user(self, username):
with self.lock:
@ -124,6 +142,49 @@ class UserManager(object):
return {x for x, in self.conn.cursor().execute(
'SELECT name FROM users')}
@property
def user_data(self):
with self.lock:
ans = {}
for name, pw, restriction, readonly in self.conn.cursor().execute('SELECT name,pw,restriction,readonly FROM users'):
ans[name] = {
'pw':pw, 'restriction':parse_restriction(restriction), 'readonly': readonly.lower() == 'y'
}
return ans
@user_data.setter
def user_data(self, users):
with self.lock, self.conn:
c = self.conn.cursor()
for name, data in users.iteritems():
res = serialize_restriction(data['restriction'])
r = 'y' if data['readonly'] else 'n'
c.execute('UPDATE users SET (pw, restriction, readonly) VALUES (?,?,?) WHERE name=?',
data['pw'], res, r, name)
if self.conn.changes() > 0:
continue
c.execute('INSERT INTO USERS (name, pw, restriction, readonly)', name, data['pw'], res, r)
self._restrictions.clear()
self._readonly.clear()
def is_readonly(self, username):
with self.lock:
try:
return self._readonly[username]
except KeyError:
self._readonly[username] = False
for readonly, in self.conn.cursor().execute(
'SELECT readonly FROM users WHERE name=?', (username,)):
self._readonly[username] = readonly == 'y'
return self._readonly[username]
return False
def set_readonly(self, username, value):
with self.lock:
self.conn.cursor().execute(
'UPDATE users SET readonly=? WHERE name=?', ('y' if value else 'n', username))
self._readonly.pop(username, None)
def change_password(self, username, pw):
with self.lock:
msg = self.validate_password(pw)
@ -136,11 +197,10 @@ class UserManager(object):
with self.lock:
r = self._restrictions.get(username)
if r is None:
r = self._restrictions[username] = parse_restriction('{}')
for restriction, in self.conn.cursor().execute(
'SELECT restriction FROM users WHERE name=?', (username,)):
self._restrictions[username] = r = json.loads(restriction)
r['allowed_library_names'] = frozenset(r.get('allowed_library_names', ()))
r['blocked_library_names'] = frozenset(r.get('blocked_library_names', ()))
self._restrictions[username] = r = parse_restriction(restriction)
break
return r
@ -163,4 +223,4 @@ class UserManager(object):
with self.lock:
self._restrictions.pop(username, None)
self.conn.cursor().execute(
'UPDATE users SET restriction=? WHERE name=?', (json.dumps(restrictions), username))
'UPDATE users SET restriction=? WHERE name=?', (serialize_restriction(restrictions), username))