mirror of
https://github.com/mealie-recipes/mealie.git
synced 2025-06-04 22:25:34 -04:00
fix: user login and creation with LDAP (#2107)
* Corrected if statement to check if a results was returned by the LDAP search. And decoded the user_attributes from binary data to string * removed trailing spaces * Revert asserts in LDAP unit test back Since an empty tuple is still a result, an user is created and the result should not be false. * Simplified code * Extended the LDAP implementation * fix ldap authentication and user creation * modified docs to include new LDAP environment variables * update tests and linting * add libldap-2.4-2 as runtime dependency for the api --------- Co-authored-by: Erik Landkroon <eriklandkroon@gmail.com>
This commit is contained in:
parent
2a929865e2
commit
da60e56982
@ -72,6 +72,7 @@ RUN apt-get update \
|
|||||||
&& apt-get install --no-install-recommends -y \
|
&& apt-get install --no-install-recommends -y \
|
||||||
gosu \
|
gosu \
|
||||||
tesseract-ocr-all \
|
tesseract-ocr-all \
|
||||||
|
libldap-2.4-2 \
|
||||||
&& apt-get autoremove \
|
&& apt-get autoremove \
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
@ -97,6 +97,7 @@ RUN apt-get update \
|
|||||||
tesseract-ocr-all \
|
tesseract-ocr-all \
|
||||||
curl \
|
curl \
|
||||||
gnupg \
|
gnupg \
|
||||||
|
libldap-2.4-2 \
|
||||||
&& apt-get autoremove \
|
&& apt-get autoremove \
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
@ -68,6 +68,11 @@ Changing the webworker settings may cause unforeseen memory leak issues with Mea
|
|||||||
| LDAP_SERVER_URL | None | LDAP server URL (e.g. ldap://ldap.example.com) |
|
| LDAP_SERVER_URL | None | LDAP server URL (e.g. ldap://ldap.example.com) |
|
||||||
| LDAP_TLS_INSECURE | False | Do not verify server certificate when using secure LDAP |
|
| LDAP_TLS_INSECURE | False | Do not verify server certificate when using secure LDAP |
|
||||||
| LDAP_TLS_CACERTFILE | None | File path to Certificate Authority used to verify server certificate (e.g. `/path/to/ca.crt`) |
|
| LDAP_TLS_CACERTFILE | None | File path to Certificate Authority used to verify server certificate (e.g. `/path/to/ca.crt`) |
|
||||||
| LDAP_BIND_TEMPLATE | None | Templated DN for users, `{}` will be replaced with the username (e.g. `cn={},dc=example,dc=com`, `{}@example.com`) |
|
|
||||||
| LDAP_BASE_DN | None | Starting point when searching for users authentication (e.g. `CN=Users,DC=xx,DC=yy,DC=de`) |
|
| LDAP_BASE_DN | None | Starting point when searching for users authentication (e.g. `CN=Users,DC=xx,DC=yy,DC=de`) |
|
||||||
|
| LDAP_QUERY_BIND | None | A bind user for LDAP search queries (e.g. `cn=admin,cn=users,dc=example,dc=com`) |
|
||||||
|
| LDAP_QUERY_PASSWORD | None | The password for the bind user used in LDAP_QUERY_BIND |
|
||||||
|
| LDAP_USER_FILTER | None | The LDAP search filter to find users (e.g. `(&( | ({id_attribute}={input})({mail_attribute}={input}))(objectClass=person))`).<br/> **Note** `id_attribute` and `mail_attribute` will be replaced with `LDAP_ID_ATTRIBUTE` and `LDAP_MAIL_ATTRIBUTE`, respectively. `input` will be replaced with either the username or email the user logs in with. |
|
||||||
| LDAP_ADMIN_FILTER | None | Optional LDAP filter, which tells Mealie the LDAP user is an admin (e.g. `(memberOf=cn=admins,dc=example,dc=com)`) |
|
| LDAP_ADMIN_FILTER | None | Optional LDAP filter, which tells Mealie the LDAP user is an admin (e.g. `(memberOf=cn=admins,dc=example,dc=com)`) |
|
||||||
|
| LDAP_ID_ATTRIBUTE | uid | The LDAP attribute that maps to the user's id |
|
||||||
|
| LDAP_NAME_ATTRIBUTE | name | The LDAP attribute that maps to the user's name |
|
||||||
|
| LDAP_MAIL_ATTRIBUTE | mail | The LDAP attribute that maps to the user's email |
|
||||||
|
File diff suppressed because one or more lines are too long
@ -54,6 +54,7 @@ def user_from_ldap(db: AllRepositories, username: str, password: str) -> Private
|
|||||||
|
|
||||||
if settings.LDAP_TLS_INSECURE:
|
if settings.LDAP_TLS_INSECURE:
|
||||||
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
|
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
|
||||||
|
|
||||||
ldap.set_option(ldap.OPT_REFERRALS, 0)
|
ldap.set_option(ldap.OPT_REFERRALS, 0)
|
||||||
ldap.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
ldap.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
||||||
conn = ldap.initialize(settings.LDAP_SERVER_URL)
|
conn = ldap.initialize(settings.LDAP_SERVER_URL)
|
||||||
@ -61,19 +62,11 @@ def user_from_ldap(db: AllRepositories, username: str, password: str) -> Private
|
|||||||
if settings.LDAP_TLS_CACERTFILE:
|
if settings.LDAP_TLS_CACERTFILE:
|
||||||
conn.set_option(ldap.OPT_X_TLS_CACERTFILE, settings.LDAP_TLS_CACERTFILE)
|
conn.set_option(ldap.OPT_X_TLS_CACERTFILE, settings.LDAP_TLS_CACERTFILE)
|
||||||
conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
|
conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
|
||||||
user = db.users.get_one(username, "email", any_case=True)
|
|
||||||
|
|
||||||
if not settings.LDAP_BIND_TEMPLATE:
|
|
||||||
return False
|
|
||||||
|
|
||||||
if not user:
|
|
||||||
user_bind = settings.LDAP_BIND_TEMPLATE.format(username)
|
|
||||||
user = db.users.get_one(username, "username", any_case=True)
|
|
||||||
else:
|
|
||||||
user_bind = settings.LDAP_BIND_TEMPLATE.format(user.username)
|
|
||||||
|
|
||||||
|
# Use query user for the search instead of the logged in user
|
||||||
|
# This prevents the need for every user to have query permissions in LDAP
|
||||||
try:
|
try:
|
||||||
conn.simple_bind_s(user_bind, password)
|
conn.simple_bind_s(settings.LDAP_QUERY_BIND, settings.LDAP_QUERY_PASSWORD)
|
||||||
except (ldap.INVALID_CREDENTIALS, ldap.NO_SUCH_OBJECT):
|
except (ldap.INVALID_CREDENTIALS, ldap.NO_SUCH_OBJECT):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@ -83,21 +76,44 @@ def user_from_ldap(db: AllRepositories, username: str, password: str) -> Private
|
|||||||
user_entry = conn.search_s(
|
user_entry = conn.search_s(
|
||||||
settings.LDAP_BASE_DN,
|
settings.LDAP_BASE_DN,
|
||||||
ldap.SCOPE_SUBTREE,
|
ldap.SCOPE_SUBTREE,
|
||||||
f"(&(objectClass=user)(|(cn={username})(sAMAccountName={username})(mail={username})))",
|
settings.LDAP_USER_FILTER.format(
|
||||||
["name", "mail"],
|
id_attribute=settings.LDAP_ID_ATTRIBUTE, mail_attribute=settings.LDAP_MAIL_ATTRIBUTE, input=username
|
||||||
|
),
|
||||||
|
[settings.LDAP_ID_ATTRIBUTE, settings.LDAP_NAME_ATTRIBUTE, settings.LDAP_MAIL_ATTRIBUTE],
|
||||||
)
|
)
|
||||||
if user_entry is not None and len(user_entry[0]) != 0 and user_entry[0][0] is not None:
|
|
||||||
user_dn, user_attr = user_entry[0]
|
if not user_entry:
|
||||||
else:
|
conn.unbind_s()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
user_dn, user_attr = user_entry[0]
|
||||||
|
|
||||||
|
# Check the credentials of the user
|
||||||
|
try:
|
||||||
|
conn.simple_bind_s(user_dn, password)
|
||||||
|
except (ldap.INVALID_CREDENTIALS, ldap.NO_SUCH_OBJECT):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check for existing user
|
||||||
|
user = db.users.get_one(username, "email", any_case=True)
|
||||||
|
if not user:
|
||||||
|
user = db.users.get_one(username, "username", any_case=True)
|
||||||
|
|
||||||
if user is None:
|
if user is None:
|
||||||
|
try:
|
||||||
|
user_id = user_attr[settings.LDAP_ID_ATTRIBUTE][0].decode("utf-8")
|
||||||
|
full_name = user_attr[settings.LDAP_NAME_ATTRIBUTE][0].decode("utf-8")
|
||||||
|
email = user_attr[settings.LDAP_MAIL_ATTRIBUTE][0].decode("utf-8")
|
||||||
|
except KeyError:
|
||||||
|
conn.unbind_s()
|
||||||
|
return False
|
||||||
|
|
||||||
user = db.users.create(
|
user = db.users.create(
|
||||||
{
|
{
|
||||||
"username": username,
|
"username": user_id,
|
||||||
"password": "LDAP",
|
"password": "LDAP",
|
||||||
"full_name": user_attr["name"][0],
|
"full_name": full_name,
|
||||||
"email": user_attr["mail"][0],
|
"email": email,
|
||||||
"admin": False,
|
"admin": False,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@ -105,6 +121,8 @@ def user_from_ldap(db: AllRepositories, username: str, password: str) -> Private
|
|||||||
if settings.LDAP_ADMIN_FILTER:
|
if settings.LDAP_ADMIN_FILTER:
|
||||||
user.admin = len(conn.search_s(user_dn, ldap.SCOPE_BASE, settings.LDAP_ADMIN_FILTER, [])) > 0
|
user.admin = len(conn.search_s(user_dn, ldap.SCOPE_BASE, settings.LDAP_ADMIN_FILTER, [])) > 0
|
||||||
db.users.update(user.id, user)
|
db.users.update(user.id, user)
|
||||||
|
|
||||||
|
conn.unbind_s()
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
@ -117,18 +117,27 @@ class AppSettings(BaseSettings):
|
|||||||
LDAP_SERVER_URL: NoneStr = None
|
LDAP_SERVER_URL: NoneStr = None
|
||||||
LDAP_TLS_INSECURE: bool = False
|
LDAP_TLS_INSECURE: bool = False
|
||||||
LDAP_TLS_CACERTFILE: NoneStr = None
|
LDAP_TLS_CACERTFILE: NoneStr = None
|
||||||
LDAP_BIND_TEMPLATE: NoneStr = None
|
|
||||||
LDAP_BASE_DN: NoneStr = None
|
LDAP_BASE_DN: NoneStr = None
|
||||||
|
LDAP_QUERY_BIND: NoneStr = None
|
||||||
|
LDAP_QUERY_PASSWORD: NoneStr = None
|
||||||
|
LDAP_USER_FILTER: NoneStr = None
|
||||||
LDAP_ADMIN_FILTER: NoneStr = None
|
LDAP_ADMIN_FILTER: NoneStr = None
|
||||||
|
LDAP_ID_ATTRIBUTE: str = "uid"
|
||||||
|
LDAP_MAIL_ATTRIBUTE: str = "mail"
|
||||||
|
LDAP_NAME_ATTRIBUTE: str = "name"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def LDAP_ENABLED(self) -> bool:
|
def LDAP_ENABLED(self) -> bool:
|
||||||
"""Validates LDAP settings are all set"""
|
"""Validates LDAP settings are all set"""
|
||||||
required = {
|
required = {
|
||||||
self.LDAP_SERVER_URL,
|
self.LDAP_SERVER_URL,
|
||||||
self.LDAP_BIND_TEMPLATE,
|
|
||||||
self.LDAP_BASE_DN,
|
self.LDAP_BASE_DN,
|
||||||
self.LDAP_ADMIN_FILTER,
|
self.LDAP_USER_FILTER,
|
||||||
|
self.LDAP_QUERY_BIND,
|
||||||
|
self.LDAP_QUERY_PASSWORD,
|
||||||
|
self.LDAP_ID_ATTRIBUTE,
|
||||||
|
self.LDAP_MAIL_ATTRIBUTE,
|
||||||
|
self.LDAP_NAME_ATTRIBUTE,
|
||||||
}
|
}
|
||||||
not_none = None not in required
|
not_none = None not in required
|
||||||
return self.LDAP_AUTH_ENABLED and not_none
|
return self.LDAP_AUTH_ENABLED and not_none
|
||||||
|
17
template.env
17
template.env
@ -36,9 +36,14 @@ LANG=en-US
|
|||||||
|
|
||||||
# Configuration for authentication via an external LDAP server
|
# Configuration for authentication via an external LDAP server
|
||||||
LDAP_AUTH_ENABLED=False
|
LDAP_AUTH_ENABLED=False
|
||||||
LDAP_SERVER_URL=None
|
# LDAP_SERVER_URL=""
|
||||||
LDAP_TLS_INSECURE=False
|
# LDAP_TLS_INSECURE=False
|
||||||
LDAP_TLS_CACERTFILE=None
|
# LDAP_TLS_CACERTFILE=
|
||||||
LDAP_BIND_TEMPLATE=None
|
# LDAP_BASE_DN=""
|
||||||
LDAP_BASE_DN=None
|
# LDAP_QUERY_BIND=""
|
||||||
LDAP_ADMIN_FILTER=None
|
# LDAP_QUERY_PASSWORD=""
|
||||||
|
# LDAP_USER_FILTER=""
|
||||||
|
# LDAP_ADMIN_FILTER=""
|
||||||
|
# LDAP_ID_ATTRIBUTE=uid
|
||||||
|
# LDAP_NAME_ATTRIBUTE=name
|
||||||
|
# LDAP_MAIL_ATTRIBUTE=mail
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
import ldap
|
||||||
from pytest import MonkeyPatch
|
from pytest import MonkeyPatch
|
||||||
|
|
||||||
from mealie.core import security
|
from mealie.core import security
|
||||||
@ -9,6 +10,90 @@ from mealie.db.db_setup import session_context
|
|||||||
from tests.utils.factories import random_string
|
from tests.utils.factories import random_string
|
||||||
|
|
||||||
|
|
||||||
|
class LdapConnMock:
|
||||||
|
def __init__(self, user, password, admin, query_bind, query_password, mail, name) -> None:
|
||||||
|
self.app_settings = get_app_settings()
|
||||||
|
self.user = user
|
||||||
|
self.password = password
|
||||||
|
self.query_bind = query_bind
|
||||||
|
self.query_password = query_password
|
||||||
|
self.admin = admin
|
||||||
|
self.mail = mail
|
||||||
|
self.name = name
|
||||||
|
|
||||||
|
def simple_bind_s(self, dn, bind_pw):
|
||||||
|
if dn == "cn={}, {}".format(self.user, self.app_settings.LDAP_BASE_DN):
|
||||||
|
valid_password = self.password
|
||||||
|
elif "cn={}, {}".format(self.query_bind, self.app_settings.LDAP_BASE_DN):
|
||||||
|
valid_password = self.query_password
|
||||||
|
|
||||||
|
if bind_pw == valid_password:
|
||||||
|
return
|
||||||
|
|
||||||
|
raise ldap.INVALID_CREDENTIALS
|
||||||
|
|
||||||
|
# Default search mock implementation
|
||||||
|
def search_s(self, dn, scope, filter, attrlist):
|
||||||
|
if filter == self.app_settings.LDAP_ADMIN_FILTER:
|
||||||
|
assert attrlist == []
|
||||||
|
assert filter == self.app_settings.LDAP_ADMIN_FILTER
|
||||||
|
assert dn == "cn={}, {}".format(self.user, self.app_settings.LDAP_BASE_DN)
|
||||||
|
assert scope == ldap.SCOPE_BASE
|
||||||
|
|
||||||
|
if not self.admin:
|
||||||
|
return []
|
||||||
|
|
||||||
|
return [(dn, {})]
|
||||||
|
|
||||||
|
assert attrlist == [
|
||||||
|
self.app_settings.LDAP_ID_ATTRIBUTE,
|
||||||
|
self.app_settings.LDAP_NAME_ATTRIBUTE,
|
||||||
|
self.app_settings.LDAP_MAIL_ATTRIBUTE,
|
||||||
|
]
|
||||||
|
assert filter == self.app_settings.LDAP_USER_FILTER.format(
|
||||||
|
id_attribute=self.app_settings.LDAP_ID_ATTRIBUTE,
|
||||||
|
mail_attribute=self.app_settings.LDAP_MAIL_ATTRIBUTE,
|
||||||
|
input=self.user,
|
||||||
|
)
|
||||||
|
assert dn == self.app_settings.LDAP_BASE_DN
|
||||||
|
assert scope == ldap.SCOPE_SUBTREE
|
||||||
|
|
||||||
|
return [
|
||||||
|
(
|
||||||
|
"cn={}, {}".format(self.user, self.app_settings.LDAP_BASE_DN),
|
||||||
|
{
|
||||||
|
self.app_settings.LDAP_ID_ATTRIBUTE: [self.user.encode()],
|
||||||
|
self.app_settings.LDAP_NAME_ATTRIBUTE: [self.name.encode()],
|
||||||
|
self.app_settings.LDAP_MAIL_ATTRIBUTE: [self.mail.encode()],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
def set_option(self, option, invalue):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def unbind_s(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def setup_env(monkeypatch: MonkeyPatch):
|
||||||
|
user = random_string(10)
|
||||||
|
mail = random_string(10)
|
||||||
|
name = random_string(10)
|
||||||
|
password = random_string(10)
|
||||||
|
query_bind = random_string(10)
|
||||||
|
query_password = random_string(10)
|
||||||
|
base_dn = "(dc=example,dc=com)"
|
||||||
|
monkeypatch.setenv("LDAP_AUTH_ENABLED", "true")
|
||||||
|
monkeypatch.setenv("LDAP_SERVER_URL", "") # Not needed due to mocking
|
||||||
|
monkeypatch.setenv("LDAP_BASE_DN", base_dn)
|
||||||
|
monkeypatch.setenv("LDAP_QUERY_BIND", query_bind)
|
||||||
|
monkeypatch.setenv("LDAP_QUERY_PASSWORD", query_password)
|
||||||
|
monkeypatch.setenv("LDAP_USER_FILTER", "(&(objectClass=user)(|({id_attribute}={input})({mail_attribute}={input})))")
|
||||||
|
|
||||||
|
return user, mail, name, password, query_bind, query_password
|
||||||
|
|
||||||
|
|
||||||
def test_create_file_token():
|
def test_create_file_token():
|
||||||
file_path = Path(__file__).parent
|
file_path = Path(__file__).parent
|
||||||
file_token = security.create_file_token(file_path)
|
file_token = security.create_file_token(file_path)
|
||||||
@ -17,30 +102,104 @@ def test_create_file_token():
|
|||||||
|
|
||||||
|
|
||||||
def test_ldap_authentication_mocked(monkeypatch: MonkeyPatch):
|
def test_ldap_authentication_mocked(monkeypatch: MonkeyPatch):
|
||||||
import ldap
|
user, mail, name, password, query_bind, query_password = setup_env(monkeypatch)
|
||||||
|
|
||||||
|
def ldap_initialize_mock(url):
|
||||||
|
assert url == ""
|
||||||
|
return LdapConnMock(user, password, False, query_bind, query_password, mail, name)
|
||||||
|
|
||||||
|
monkeypatch.setattr(ldap, "initialize", ldap_initialize_mock)
|
||||||
|
|
||||||
|
get_app_settings.cache_clear()
|
||||||
|
|
||||||
|
with session_context() as session:
|
||||||
|
result = security.authenticate_user(session, user, password)
|
||||||
|
|
||||||
|
assert result
|
||||||
|
assert result.username == user
|
||||||
|
assert result.email == mail
|
||||||
|
assert result.full_name == name
|
||||||
|
assert result.admin is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_ldap_authentication_failed_mocked(monkeypatch: MonkeyPatch):
|
||||||
|
user, mail, name, password, query_bind, query_password = setup_env(monkeypatch)
|
||||||
|
|
||||||
|
def ldap_initialize_mock(url):
|
||||||
|
assert url == ""
|
||||||
|
return LdapConnMock(user, password, False, query_bind, query_password, mail, name)
|
||||||
|
|
||||||
|
monkeypatch.setattr(ldap, "initialize", ldap_initialize_mock)
|
||||||
|
|
||||||
|
get_app_settings.cache_clear()
|
||||||
|
|
||||||
|
with session_context() as session:
|
||||||
|
result = security.authenticate_user(session, user, password + "a")
|
||||||
|
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_ldap_authentication_non_admin_mocked(monkeypatch: MonkeyPatch):
|
||||||
|
user, mail, name, password, query_bind, query_password = setup_env(monkeypatch)
|
||||||
|
monkeypatch.setenv("LDAP_ADMIN_FILTER", "(memberOf=cn=admins,dc=example,dc=com)")
|
||||||
|
|
||||||
|
def ldap_initialize_mock(url):
|
||||||
|
assert url == ""
|
||||||
|
return LdapConnMock(user, password, False, query_bind, query_password, mail, name)
|
||||||
|
|
||||||
|
monkeypatch.setattr(ldap, "initialize", ldap_initialize_mock)
|
||||||
|
|
||||||
|
get_app_settings.cache_clear()
|
||||||
|
|
||||||
|
with session_context() as session:
|
||||||
|
result = security.authenticate_user(session, user, password)
|
||||||
|
|
||||||
|
assert result
|
||||||
|
assert result.username == user
|
||||||
|
assert result.email == mail
|
||||||
|
assert result.full_name == name
|
||||||
|
assert result.admin is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_ldap_authentication_admin_mocked(monkeypatch: MonkeyPatch):
|
||||||
|
user, mail, name, password, query_bind, query_password = setup_env(monkeypatch)
|
||||||
|
monkeypatch.setenv("LDAP_ADMIN_FILTER", "(memberOf=cn=admins,dc=example,dc=com)")
|
||||||
|
|
||||||
|
def ldap_initialize_mock(url):
|
||||||
|
assert url == ""
|
||||||
|
return LdapConnMock(user, password, True, query_bind, query_password, mail, name)
|
||||||
|
|
||||||
|
monkeypatch.setattr(ldap, "initialize", ldap_initialize_mock)
|
||||||
|
|
||||||
|
get_app_settings.cache_clear()
|
||||||
|
|
||||||
|
with session_context() as session:
|
||||||
|
result = security.authenticate_user(session, user, password)
|
||||||
|
|
||||||
|
assert result
|
||||||
|
assert result.username == user
|
||||||
|
assert result.email == mail
|
||||||
|
assert result.full_name == name
|
||||||
|
assert result.admin
|
||||||
|
|
||||||
|
|
||||||
|
def test_ldap_authentication_disabled_mocked(monkeypatch: MonkeyPatch):
|
||||||
|
monkeypatch.setenv("LDAP_AUTH_ENABLED", "False")
|
||||||
|
|
||||||
user = random_string(10)
|
user = random_string(10)
|
||||||
password = random_string(10)
|
password = random_string(10)
|
||||||
bind_template = "cn={},dc=example,dc=com"
|
|
||||||
base_dn = "(dc=example,dc=com)"
|
|
||||||
monkeypatch.setenv("LDAP_AUTH_ENABLED", "true")
|
|
||||||
monkeypatch.setenv("LDAP_SERVER_URL", "") # Not needed due to mocking
|
|
||||||
monkeypatch.setenv("LDAP_BIND_TEMPLATE", bind_template)
|
|
||||||
monkeypatch.setenv("LDAP_BASE_DN", base_dn)
|
|
||||||
|
|
||||||
class LdapConnMock:
|
class LdapConnMock:
|
||||||
def simple_bind_s(self, dn, bind_pw):
|
def simple_bind_s(self, dn, bind_pw):
|
||||||
assert dn == bind_template.format(user)
|
assert False # When LDAP is disabled, this method should not be called
|
||||||
return bind_pw == password
|
|
||||||
|
|
||||||
def search_s(self, dn, scope, filter, attrlist):
|
def search_s(self, dn, scope, filter, attrlist):
|
||||||
assert attrlist == ["name", "mail"]
|
pass
|
||||||
assert filter == f"(&(objectClass=user)(|(cn={user})(sAMAccountName={user})(mail={user})))"
|
|
||||||
assert dn == base_dn
|
|
||||||
assert scope == ldap.SCOPE_SUBTREE
|
|
||||||
return [()]
|
|
||||||
|
|
||||||
def set_option(*args, **kwargs):
|
def set_option(self, option, invalue):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def unbind_s(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def ldap_initialize_mock(url):
|
def ldap_initialize_mock(url):
|
||||||
@ -52,6 +211,4 @@ def test_ldap_authentication_mocked(monkeypatch: MonkeyPatch):
|
|||||||
get_app_settings.cache_clear()
|
get_app_settings.cache_clear()
|
||||||
|
|
||||||
with session_context() as session:
|
with session_context() as session:
|
||||||
result = security.authenticate_user(session, user, password)
|
security.authenticate_user(session, user, password)
|
||||||
|
|
||||||
assert result is False
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user