Content server: Add a checkbox in content server user preferences to prevent a user account from changing its own password via the web interface

Useful if the user account is shared by multiple people and the server
admin doesnt want any of them to be able to change the password without
informing the others.
This commit is contained in:
Kovid Goyal 2025-07-07 12:04:01 +05:30
parent 6855d2e177
commit bb64863046
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
4 changed files with 92 additions and 17 deletions

View File

@ -743,6 +743,12 @@ class User(QWidget):
) )
rw.stateChanged.connect(self.readonly_changed) rw.stateChanged.connect(self.readonly_changed)
l.addWidget(rw) l.addWidget(rw)
self.cpw_text = _('Allow {} to change their password via the web')
self.cpw = cpw = QCheckBox(self)
cpw.setToolTip(_(
'If enabled, allows the user to change their own password via the web interface'))
cpw.stateChanged.connect(self.cpw_changed)
l.addWidget(cpw)
self.access_label = la = QLabel(self) self.access_label = la = QLabel(self)
l.addWidget(la), la.setWordWrap(True) l.addWidget(la), la.setWordWrap(True)
self.cpb = b = QPushButton(_('Change &password')) self.cpb = b = QPushButton(_('Change &password'))
@ -764,6 +770,10 @@ class User(QWidget):
self.user_data[self.username]['readonly'] = not self.rw.isChecked() self.user_data[self.username]['readonly'] = not self.rw.isChecked()
self.changed_signal.emit() self.changed_signal.emit()
def cpw_changed(self):
self.user_data[self.username]['allow_change_password_via_http'] = bool(self.cpw.isChecked())
self.changed_signal.emit()
def update_restriction(self): def update_restriction(self):
username, user_data = self.username, self.user_data username, user_data = self.username, self.user_data
r = user_data[username]['restriction'] r = user_data[username]['restriction']
@ -799,11 +809,17 @@ class User(QWidget):
self.rw.blockSignals(True), self.rw.setChecked( self.rw.blockSignals(True), self.rw.setChecked(
not user_data[username]['readonly'] not user_data[username]['readonly']
), self.rw.blockSignals(False) ), self.rw.blockSignals(False)
self.cpw.setText(self.cpw_text.format(username))
self.cpw.setVisible(True)
self.cpw.blockSignals(True), self.cpw.setChecked(
bool(user_data[username].get('allow_change_password_via_http')))
self.cpw.blockSignals(False)
self.access_label.setVisible(True) self.access_label.setVisible(True)
self.restrict_button.setVisible(True) self.restrict_button.setVisible(True)
self.update_restriction() self.update_restriction()
else: else:
self.rw.setVisible(False) self.rw.setVisible(False)
self.cpw.setVisible(False)
self.access_label.setVisible(False) self.access_label.setVisible(False)
self.restrict_button.setVisible(False) self.restrict_button.setVisible(False)

View File

@ -56,6 +56,31 @@ List all usernames.
print(name) print(name)
def change_set_password(user_manager, args):
p = create_subcommand_parser('change_set_password', _('username set|reset|toggle|show') + '\n\n' + '''\
Restrict the specified user account to prevent it from changing its own password via the web interface. \
The value of set allows the account to change its own password, reset prevents it from changing its \
own password, toggle flips the value and show prints out the current value. \
''')
opts, args = p.parse_args(['calibre-server'] + list(args))
if len(args) < 3:
p.print_help()
raise SystemExit(_('username and operation are required'))
username, op = args[1], args[2]
if op == 'toggle':
val = not user_manager.is_allowed_to_change_password_via_http(username)
elif op == 'set':
val = True
elif op == 'reset':
val = False
elif op == 'show':
print('set' if user_manager.is_allowed_to_change_password_via_http(username) else 'reset', end='')
return
else:
raise SystemExit(f'{op} is an unknown operation')
user_manager.set_allowed_to_change_password_via_http(username, val)
def change_readonly(user_manager, args): def change_readonly(user_manager, args):
p = create_subcommand_parser('readonly', _('username set|reset|toggle|show') + '\n\n' + '''\ p = create_subcommand_parser('readonly', _('username set|reset|toggle|show') + '\n\n' + '''\
Restrict the specified user account to prevent it from making changes. \ Restrict the specified user account to prevent it from making changes. \
@ -180,13 +205,15 @@ def main(user_manager, args):
return list_users(user_manager, rest) return list_users(user_manager, rest)
if q == 'readonly': if q == 'readonly':
return change_readonly(user_manager, rest) return change_readonly(user_manager, rest)
if q == 'change_set_password':
return change_set_password(user_manager, rest)
if q == 'libraries': if q == 'libraries':
return change_libraries(user_manager, rest) return change_libraries(user_manager, rest)
if q != 'help': if q != 'help':
print(_('Unknown command: {}').format(q), file=sys.stderr) print(_('Unknown command: {}').format(q), file=sys.stderr)
print() print()
print(_('Manage the user accounts for calibre-server. Available commands are:')) print(_('Manage the user accounts for calibre-server. Available commands are:'))
print('add, remove, chpass, list') print('add, remove, chpass, list, readonly, change_set_password')
print(_('Use {} for help on individual commands').format('calibre-server --manage-users -- command -h')) print(_('Use {} for help on individual commands').format('calibre-server --manage-users -- command -h'))
raise SystemExit(1) raise SystemExit(1)
@ -307,6 +334,15 @@ def manage_users_cli(path=None, args=()):
if get_input(q.format(username) + '? [y/n]:').lower() == 'y': if get_input(q.format(username) + '? [y/n]:').lower() == 'y':
m.set_readonly(username, not readonly) m.set_readonly(username, not readonly)
def change_set_password(username):
allowed = m.is_allowed_to_change_password_via_http(username)
if allowed:
q = _('Prevent {} from changing their own password via the web')
else:
q = _('Allow {} to change their own password via the web')
if get_input(q.format(username) + '? [y/n]:').lower() == 'y':
m.set_allowed_to_change_password_via_http(username, not allowed)
def change_restriction(username): def change_restriction(username):
r = m.restrictions(username) r = m.restrictions(username)
if r is None: if r is None:
@ -378,19 +414,22 @@ def manage_users_cli(path=None, args=()):
_('Change password for {}').format(username), _('Change password for {}').format(username),
_('Change read/write permission for {}').format(username), _('Change read/write permission for {}').format(username),
_('Change the libraries {} is allowed to access').format(username), _('Change the libraries {} is allowed to access').format(username),
_('Change if {} is allowed to set their own password').format(username),
_('Cancel'), ], _('Cancel'), ],
banner='\n' + _('{0} has {1} access').format( banner='\n' + _('{0} has {1} access').format(
username, username,
_('readonly') if m.is_readonly(username) else _('read-write'))) _('readonly') if m.is_readonly(username) else _('read-write')))
print() print()
if c > 3: if c > 4:
actions.append(toplevel) actions.append(toplevel)
return return
{ {
0: show_password, 0: show_password,
1: change_password, 1: change_password,
2: change_readonly, 2: change_readonly,
3: change_restriction}[c](username) 3: change_restriction,
4: change_set_password,
}[c](username)
actions.append(partial(edit_user, username=username)) actions.append(partial(edit_user, username=username))
def toplevel(): def toplevel():

View File

@ -67,9 +67,11 @@ def validate_password(pw):
return _('The password must contain only ASCII (English) characters and symbols') return _('The password must contain only ASCII (English) characters and symbols')
def create_user_data(pw, readonly=False, restriction=None): def create_user_data(pw, readonly=False, restriction=None, misc_data='{}'):
misc_data = json.loads(misc_data)
return { return {
'pw':pw, 'restriction':parse_restriction(restriction or '{}').copy(), 'readonly': readonly 'pw':pw, 'restriction':parse_restriction(restriction or '{}').copy(), 'readonly': readonly,
'allow_change_password_via_http': bool(misc_data.get('allow_change_password_via_http', True)),
} }
@ -164,15 +166,16 @@ class UserManager:
def validate_password(self, pw): def validate_password(self, pw):
return validate_password(pw) return validate_password(pw)
def add_user(self, username, pw, restriction=None, readonly=False): def add_user(self, username, pw, restriction=None, readonly=False, allow_change_password_via_http=True):
with self.lock: with self.lock:
msg = self.validate_username(username) or self.validate_password(pw) msg = self.validate_username(username) or self.validate_password(pw)
if msg is not None: if msg is not None:
raise ValueError(msg) raise ValueError(msg)
restriction = restriction or {} restriction = restriction or {}
misc_data = {'allow_change_password_via_http': allow_change_password_via_http}
self.conn.cursor().execute( self.conn.cursor().execute(
'INSERT INTO users (name, pw, restriction, readonly) VALUES (?, ?, ?, ?)', 'INSERT INTO users (name, pw, restriction, readonly, misc_data) VALUES (?, ?, ?, ?, ?)',
(username, pw, serialize_restriction(restriction), ('y' if readonly else 'n'))) (username, pw, serialize_restriction(restriction), ('y' if readonly else 'n'), json.dumps(misc_data)))
def remove_user(self, username): def remove_user(self, username):
with self.lock: with self.lock:
@ -189,8 +192,8 @@ class UserManager:
def user_data(self): def user_data(self):
with self.lock: with self.lock:
ans = {} ans = {}
for name, pw, restriction, readonly in self.conn.cursor().execute('SELECT name,pw,restriction,readonly FROM users'): for name, pw, restriction, readonly, misc_data in self.conn.cursor().execute('SELECT name,pw,restriction,readonly,misc_data FROM users'):
ans[name] = create_user_data(pw, readonly.lower() == 'y', restriction) ans[name] = create_user_data(pw, readonly.lower() == 'y', restriction, misc_data)
return ans return ans
@user_data.setter @user_data.setter
@ -200,15 +203,13 @@ class UserManager:
remove = self.all_user_names - set(users) remove = self.all_user_names - set(users)
if remove: if remove:
c.executemany('DELETE FROM users WHERE name=?', [(n,) for n in remove]) c.executemany('DELETE FROM users WHERE name=?', [(n,) for n in remove])
for name, data in iteritems(users): for name, data in users.items():
res = serialize_restriction(data['restriction']) res = serialize_restriction(data['restriction'])
misc_data = json.dumps({'allow_change_password_via_http': bool(data.get('allow_change_password_via_http', True))})
r = 'y' if data['readonly'] else 'n' r = 'y' if data['readonly'] else 'n'
c.execute('UPDATE users SET pw=?, restriction=?, readonly=? WHERE name=?', c.execute('INSERT INTO USERS (name, pw, restriction, readonly, misc_data) VALUES (?, ?, ?, ?, ?) '
(data['pw'], res, r, name)) 'ON CONFLICT DO UPDATE SET pw=?, restriction=?, readonly=?, misc_data=?',
if self.conn.changes() > 0: (name, data['pw'], res, r, misc_data, data['pw'], res, r, misc_data))
continue
c.execute('INSERT INTO USERS (name, pw, restriction, readonly) VALUES (?, ?, ?, ?)',
(name, data['pw'], res, r))
self.refresh() self.refresh()
def refresh(self): def refresh(self):
@ -266,3 +267,20 @@ class UserManager:
return '' return ''
library_name = os.path.basename(library_path).lower() library_name = os.path.basename(library_path).lower()
return r['library_restrictions'].get(library_name) or '' return r['library_restrictions'].get(library_name) or ''
def misc_data(self, username):
with self.lock:
for misc_data, in self.conn.cursor().execute(
'SELECT misc_data FROM users WHERE name=?', (username,)):
return json.loads(misc_data)
return {}
def is_allowed_to_change_password_via_http(self, username: str) -> bool:
return bool(self.misc_data(username).get('allow_change_password_via_http', True))
def set_allowed_to_change_password_via_http(self, username: str, allow: bool = True) -> None:
with self.lock:
md = self.misc_data(username)
md['allow_change_password_via_http'] = allow
self.conn.cursor().execute(
'UPDATE users SET misc_data=? WHERE name=?', (json.dumps(md), username))

View File

@ -16,6 +16,8 @@ def change_pw(ctx, rd):
user = rd.username or None user = rd.username or None
if user is None: if user is None:
raise HTTPForbidden('Anonymous users are not allowed to change passwords') raise HTTPForbidden('Anonymous users are not allowed to change passwords')
if not ctx.user_manager.is_allowed_to_change_password_via_http(user):
raise HTTPForbidden(f'The user {user} is not allowed to change passwords')
try: try:
pw = json.loads(rd.request_body_file.read()) pw = json.loads(rd.request_body_file.read())
oldpw, newpw = pw['oldpw'], pw['newpw'] oldpw, newpw = pw['oldpw'], pw['newpw']