diff --git a/opentakserver/app.py b/opentakserver/app.py index 1fb34690..ba542468 100644 --- a/opentakserver/app.py +++ b/opentakserver/app.py @@ -39,7 +39,16 @@ from opentakserver.controllers.meshtastic_controller import MeshtasticController from opentakserver.defaultconfig import DefaultConfig from opentakserver.EmailValidator import EmailValidator -from opentakserver.extensions import apscheduler, babel, db, ldap_manager, logger, mail, socketio +from opentakserver.extensions import ( + apscheduler, + babel, + db, + ldap_manager, + logger, + mail, + oidc, + socketio, +) from opentakserver.models.Group import Group, GroupTypeEnum from opentakserver.models.Icon import Icon from opentakserver.models.role import Role @@ -67,6 +76,52 @@ def get_timezone(): return pytz.timezone("UTC") +def _normalize_samesite(value): + if value is None: + return None + + value = str(value).strip() + return value.lower() or None + + +def _init_oidc(app): + if not app.config.get("OTS_ENABLE_OIDC"): + return + + logger.info("Enabling OIDC via flask-oidc") + if oidc is None: + raise RuntimeError( + "OTS_ENABLE_OIDC is enabled but flask-oidc is not installed. Install flask-oidc to use OIDC." + ) + + if _normalize_samesite(app.config.get("SESSION_COOKIE_SAMESITE")) == "strict": + logger.warning( + "SESSION_COOKIE_SAMESITE=strict breaks browser OIDC callbacks; overriding it to Lax" + ) + app.config["SESSION_COOKIE_SAMESITE"] = "Lax" + + oidc.init_app(app) + + +def _build_security_identity_attributes(app): + identity_attributes = [ + {"username": {"mapper": uia_username_mapper, "case_insensitive": True}} + ] + + if app.config.get("OTS_ENABLE_EMAIL"): + identity_attributes.append( + {"email": {"mapper": uia_email_mapper, "case_insensitive": True}} + ) + + if app.config.get("OTS_ENABLE_LDAP"): + identity_attributes.append({"ldap": {}}) + + if app.config.get("OTS_ENABLE_OIDC"): + identity_attributes.append({"oidc": {}}) + + return identity_attributes + + def init_extensions(app): db.init_app(app) Migrate(app, db) @@ -95,13 +150,10 @@ def init_extensions(app): } } ) - identity_attributes = [{"username": {"mapper": uia_username_mapper, "case_insensitive": True}}] + identity_attributes = _build_security_identity_attributes(app) # Don't allow registration unless email is enabled if app.config.get("OTS_ENABLE_EMAIL"): - identity_attributes.append( - {"email": {"mapper": uia_email_mapper, "case_insensitive": True}} - ) app.config.update( { "SECURITY_REGISTERABLE": True, @@ -123,7 +175,8 @@ def init_extensions(app): if app.config.get("OTS_ENABLE_LDAP"): logger.info("Enabling LDAP") ldap_manager.init_app(app) - identity_attributes.append({"ldap": {}}) + + _init_oidc(app) app.config.update({"SECURITY_USER_IDENTITY_ATTRIBUTES": identity_attributes}) @@ -304,8 +357,6 @@ def create_app(cli=True): app.register_blueprint(scheduler_blueprint) - app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_host=1) - else: from opentakserver.blueprints.cli import ots, translate @@ -355,6 +406,7 @@ def create_app(cli=True): app.register_blueprint(scheduler_blueprint) + app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_host=1, x_proto=1, x_port=1) return app diff --git a/opentakserver/blueprints/ots_api/__init__.py b/opentakserver/blueprints/ots_api/__init__.py index e56a0be0..f97f9858 100644 --- a/opentakserver/blueprints/ots_api/__init__.py +++ b/opentakserver/blueprints/ots_api/__init__.py @@ -22,6 +22,7 @@ from .language_api import language_api from .ldap_api import ldap_blueprint from .tak_gov_link_api import tak_gov_link_blueprint +from .oidc_api import oidc_blueprint ots_api = Blueprint("ots_api", __name__) ots_api.register_blueprint(api_blueprint) @@ -41,5 +42,6 @@ ots_api.register_blueprint(plugin_blueprint) ots_api.register_blueprint(token_api_blueprint) ots_api.register_blueprint(ldap_blueprint) +ots_api.register_blueprint(oidc_blueprint) ots_api.register_blueprint(tak_gov_link_blueprint) ots_api.register_blueprint(language_api) diff --git a/opentakserver/blueprints/ots_api/oidc_api.py b/opentakserver/blueprints/ots_api/oidc_api.py new file mode 100644 index 00000000..0bac02f7 --- /dev/null +++ b/opentakserver/blueprints/ots_api/oidc_api.py @@ -0,0 +1,537 @@ +import unicodedata +from urllib.parse import urlparse + +from flask import Blueprint +from flask import current_app as app +from flask import jsonify, redirect, request, session, url_for +from flask_security import login_user + +from opentakserver.UsernameValidator import UsernameValidator +from opentakserver.extensions import logger, oidc +from opentakserver.oidc import _resolve_configured_issuer + + +oidc_blueprint = Blueprint("oidc_blueprint", __name__) + + +def _to_bool(value): + return str(value).lower() in ["true", "1", "yes", "on"] + + +def _as_list(value): + if not value: + return [] + + if isinstance(value, (list, tuple, set)): + return [str(item).strip() for item in value if str(item).strip()] + + return [item.strip() for item in str(value).split(",") if item.strip()] + + +def _sanitize_next_url(value, default="/"): + if not value: + return default + + parsed = urlparse(value) + if parsed.scheme or parsed.netloc: + return default + + if not value.startswith("/"): + return default + + if value.startswith("//"): + return default + + return value + + +def _read_claim(data, claim): + if not claim or not isinstance(data, dict): + return None + + value = data + for key in claim.split("."): + if not isinstance(value, dict): + return None + value = value.get(key) + if value is None: + return None + + return value + + +def _read_string_claim(data, claim): + value = _read_claim(data, claim) + if value is None: + return None + + value = str(value).strip() + return value or None + + +def _read_configured_string_claim(claims, config_key, default_claim): + return _read_string_claim(claims, app.config.get(config_key, default_claim)) + + +def _iter_configured_string_claims(claims, config_key, default_claims): + for claim in _as_list(app.config.get(config_key, default_claims)): + value = _read_string_claim(claims, claim) + if value: + yield claim, value + + +def _get_oidc_client(): + if not app.config.get("OTS_ENABLE_OIDC") or oidc is None: + return None + + oauth = getattr(oidc, "oauth", None) + if oauth is None: + return None + + return getattr(oauth, "oidc", None) + + +def _get_callback_url(): + callback = app.config.get("OTS_OIDC_REDIRECT_URI", "/api/oidc/callback") + if callback.startswith("http://") or callback.startswith("https://"): + return callback + + if callback.startswith("/"): + return f"{request.host_url.rstrip('/')}{callback}" + + return url_for(".oidc_callback", _external=True) + + +def _extract_issuer(claims): + return _read_configured_string_claim(claims, "OTS_OIDC_ISSUER_CLAIM", "iss") or _resolve_configured_issuer(app) + + +def _extract_subject(claims): + return _read_configured_string_claim(claims, "OTS_OIDC_SUBJECT_CLAIM", "sub") + + +def _extract_oidc_identity(claims): + return _extract_issuer(claims), _extract_subject(claims) + + +def _iter_username_claim_values(claims): + yield from _iter_configured_string_claims( + claims, + "OTS_OIDC_USERNAME_CLAIMS", + "preferred_username, sub", + ) + + +def _normalize_username_candidate(value): + if value is None: + return None + + raw_username = str(value).strip() + if not raw_username: + return None + + username_validator = UsernameValidator(app) + error, normalized_username = username_validator.validate(raw_username) + if not error and normalized_username: + return normalized_username + + safe_username = [] + last_was_separator = False + for character in raw_username: + if unicodedata.category(character)[0] in ["L", "N"] or character in ["_", "."]: + safe_character = character + else: + safe_character = "." + + if safe_character == ".": + if last_was_separator: + continue + last_was_separator = True + else: + last_was_separator = False + + safe_username.append(safe_character) + + fallback_username = "".join(safe_username).strip("._") + if not fallback_username: + return None + + fallback_error, fallback_normalized_username = username_validator.validate(fallback_username) + if fallback_error or not fallback_normalized_username: + return None + + logger.warning( + "Normalized OIDC username %s to local username %s", + raw_username, + fallback_normalized_username, + ) + return fallback_normalized_username + + +def _extract_username(claims): + for claim, raw_username in _iter_username_claim_values(claims): + username = _normalize_username_candidate(raw_username) + if username: + return username + + logger.warning( + "OIDC username claim %s could not be normalized into a valid local username", + claim, + ) + + return None + + +def _extract_email(claims): + return _read_configured_string_claim(claims, "OTS_OIDC_EMAIL_CLAIM", "email") + + +def _extract_roles(claims): + role_claim = _read_claim(claims, app.config.get("OTS_OIDC_ROLE_CLAIM", "groups")) + if role_claim is None: + return [] + + if isinstance(role_claim, (list, tuple, set)): + return [str(role).strip() for role in role_claim if str(role).strip()] + + if isinstance(role_claim, dict): + return [str(key).strip() for key in role_claim.keys() if str(key).strip()] + + return [item.strip() for item in str(role_claim).split(",") if item.strip()] + + +def _normalize_roles(roles): + normalized = [] + seen = set() + for role in roles: + normalized_role = str(role).strip() + normalized_role_key = normalized_role.lower() + + if not normalized_role or normalized_role_key in seen: + continue + + normalized.append(normalized_role) + seen.add(normalized_role_key) + + return normalized + + +def _resolve_oidc_roles(claims): + roles = _extract_roles(claims) + if not roles: + roles = _as_list(app.config.get("OTS_OIDC_DEFAULT_ROLES", "user")) + + admin_roles = {role.lower() for role in _as_list(app.config.get("OTS_OIDC_ADMIN_ROLES"))} + if admin_roles.intersection({role.lower() for role in roles}): + roles.append("administrator") + + roles = _normalize_roles(roles) + if roles: + return roles + + return _normalize_roles(_as_list(app.config.get("OTS_OIDC_DEFAULT_ROLES", "user"))) + + +def _apply_roles_to_user(user, roles): + for role in list(user.roles): + app.security.datastore.remove_role_from_user(user, role) + + for role_name in roles: + role = app.security.datastore.find_or_create_role(role_name) + app.security.datastore.add_role_to_user(user, role) + + +def _apply_no_store_headers(response): + response.headers["Cache-Control"] = "no-store, no-cache, max-age=0, must-revalidate, private" + response.headers["Pragma"] = "no-cache" + response.headers["Expires"] = "0" + response.headers["Referrer-Policy"] = "no-referrer" + response.headers["X-Content-Type-Options"] = "nosniff" + return response + + +def _oidc_error_response(message, status_code, log_message=None, level="warning"): + if log_message: + getattr(logger, level)(log_message) + + return _apply_no_store_headers(jsonify({"success": False, "error": message})), status_code + + +def _build_oidc_callback_payload(user): + payload = user.serialize() + payload["success"] = True + payload["identity_attributes"] = { + "oidc": { + "provider": app.config.get("OTS_OIDC_NAME", "oidc"), + } + } + + if _to_bool(app.config.get("OTS_OIDC_INCLUDE_AUTH_TOKEN_IN_CALLBACK_JSON", False)): + payload["token"] = user.get_auth_token() + + return payload + + +def _find_linked_oidc_user(issuer, subject): + return app.security.datastore.find_user(oidc_issuer=issuer, oidc_subject=subject) + + +def _create_oidc_user(username, email, issuer, subject): + create_kwargs = { + "username": username, + "password": None, + "oidc_issuer": issuer, + "oidc_subject": subject, + } + if email: + create_kwargs["email"] = email + + return app.security.datastore.create_user(**create_kwargs) + + +def _update_oidc_user_profile(user, username, email, subject): + if user.username != username: + logger.warning( + "OIDC username claim changed for linked subject %s; keeping local username %s", + subject, + user.username, + ) + + if email and user.email != email: + user.email = email + + +def _resolve_or_create_oidc_user(issuer, subject, username, email): + user = _find_linked_oidc_user(issuer, subject) + if not user: + existing_user = app.security.datastore.find_user(username=username) + if existing_user: + logger.error("OIDC username collision for %s", username) + return None + + return _create_oidc_user(username, email, issuer, subject) + + _update_oidc_user_profile(user, username, email, subject) + return user + + +def _sync_oidc_user(claims): + issuer, subject = _extract_oidc_identity(claims) + if not issuer or not subject: + logger.error("OIDC callback did not include a stable issuer/subject identity") + return None + + username = _extract_username(claims) + if not username: + logger.error("OIDC callback did not include a usable username claim") + return None + + email = _extract_email(claims) + + user = _resolve_or_create_oidc_user(issuer, subject, username, email) + if not user: + return None + + _apply_roles_to_user(user, _resolve_oidc_roles(claims)) + app.security.datastore.commit() + return user + + +def _clear_oidc_flow_state(): + session.pop("ots_oidc_next", None) + session.pop("ots_oidc_return_json", None) + # OTS uses the Flask-Security session for local login state and should not + # persist provider-issued OIDC tokens in the browser session cookie. + session.pop("oidc_auth_token", None) + session.pop("oidc_auth_profile", None) + + +def _pop_oidc_flow_state(): + session.pop("oidc_auth_token", None) + session.pop("oidc_auth_profile", None) + return ( + _to_bool(session.pop("ots_oidc_return_json", False)), + _sanitize_next_url(session.pop("ots_oidc_next", "/")), + ) + + +def _fetch_oidc_userinfo(client, token): + try: + return client.userinfo(token=token) + except TypeError: + return client.userinfo() + + +def _merge_claim_sets(primary_claims, secondary_claims): + merged_claims = dict(primary_claims or {}) + if isinstance(secondary_claims, dict): + for key, value in secondary_claims.items(): + if value is not None: + merged_claims[key] = value + + return merged_claims + + +def _fetch_oidc_claims(client): + try: + token = client.authorize_access_token() + except Exception as e: + return None, _oidc_error_response( + "Invalid OIDC callback", + 400, + log_message=f"OIDC token exchange failed: {e}", + level="warning", + ) + + embedded_claims = token.get("userinfo") if isinstance(token, dict) else None + if embedded_claims is not None and not isinstance(embedded_claims, dict): + return None, _oidc_error_response( + "Invalid OIDC user info", + 400, + log_message="OIDC user info payload was not a JSON object", + level="warning", + ) + + claims = dict(embedded_claims or {}) + + if isinstance(token, dict) and token.get("access_token"): + try: + userinfo_claims = _fetch_oidc_userinfo(client, token) + except Exception as e: + if claims: + logger.debug( + "Falling back to embedded OIDC claims after userinfo fetch failed: %s", + e, + ) + return claims, None + + return None, _oidc_error_response( + "Failed to fetch OIDC user info", + 400, + log_message=f"Failed to read OIDC user info: {e}", + level="warning", + ) + + if not isinstance(userinfo_claims, dict): + if claims: + logger.debug( + "Ignoring non-object OIDC userinfo payload and using embedded claims instead" + ) + return claims, None + + return None, _oidc_error_response( + "Invalid OIDC user info", + 400, + log_message="OIDC user info payload was not a JSON object", + level="warning", + ) + + claims = _merge_claim_sets(claims, userinfo_claims) + + if claims: + return claims, None + + return None, _oidc_error_response( + "Invalid OIDC user info", + 400, + log_message="OIDC callback did not provide user claims", + level="warning", + ) + + +def _complete_oidc_login(user): + remember = bool(app.config.get("SECURITY_DEFAULT_REMEMBER_ME", False)) + if login_user(user, remember=remember, authn_via=["oidc"]): + return None + + return _oidc_error_response( + "Failed to establish OIDC session", + 403, + log_message=f"OIDC login_user failed for {user.username}", + level="error", + ) + + +def _build_oidc_success_response(user, return_json, next_url): + payload = _build_oidc_callback_payload(user) + if return_json: + return _apply_no_store_headers(jsonify(payload)) + + return _apply_no_store_headers(redirect(next_url)) + + +@oidc_blueprint.route("/api/oidc/login", methods=["GET"]) +def oidc_login(): + if not app.config.get("OTS_ENABLE_OIDC"): + return jsonify({"success": False, "error": "OIDC is not enabled"}), 503 + + client = _get_oidc_client() + if not client: + return jsonify({"success": False, "error": "OIDC client is not configured"}), 503 + + _clear_oidc_flow_state() + + next_url = request.args.get("next") + if next_url: + session["ots_oidc_next"] = _sanitize_next_url(next_url) + + session["ots_oidc_return_json"] = _to_bool(request.args.get("return_json", "False")) + + try: + return client.authorize_redirect(_get_callback_url()) + except Exception as e: + logger.error(f"OIDC authorize redirect failed: {e}") + _clear_oidc_flow_state() + return jsonify({"success": False, "error": "OIDC login failed"}), 500 + + +@oidc_blueprint.route("/api/oidc/callback", methods=["GET"]) +def oidc_callback(): + return_json, next_url = _pop_oidc_flow_state() + + if not app.config.get("OTS_ENABLE_OIDC"): + return _oidc_error_response("OIDC is not enabled", 503) + + client = _get_oidc_client() + if not client: + return _oidc_error_response("OIDC client is not configured", 503) + + provider_error = request.args.get("error") + if provider_error: + provider_error_description = request.args.get("error_description", "") + log_message = f"OIDC provider returned error={provider_error!r}" + if provider_error_description: + log_message += f", description={provider_error_description!r}" + return _oidc_error_response( + "OIDC provider rejected authentication", + 400, + log_message=log_message, + level="warning", + ) + + if not request.args.get("code"): + return _oidc_error_response( + "Invalid OIDC callback", + 400, + log_message="OIDC callback missing authorization code", + level="warning", + ) + + claims, error_response = _fetch_oidc_claims(client) + if error_response: + return error_response + + user = _sync_oidc_user(claims) + if not user: + return _oidc_error_response( + "Failed to sync OIDC user", + 400, + log_message="OIDC user synchronization failed", + level="warning", + ) + + error_response = _complete_oidc_login(user) + if error_response: + return error_response + + return _build_oidc_success_response(user, return_json, next_url) diff --git a/opentakserver/defaultconfig.py b/opentakserver/defaultconfig.py index 32ce462a..7c0dc254 100644 --- a/opentakserver/defaultconfig.py +++ b/opentakserver/defaultconfig.py @@ -107,6 +107,43 @@ class DefaultConfig: LDAP_BIND_USER_DN = "cn=admin,ou=users=dc=example,dc=com" LDAP_BIND_USER_PASSWORD = "password" + # OIDC / SSO settings + OTS_ENABLE_OIDC = os.getenv("OTS_ENABLE_OIDC", "False").lower() in ["true", "1", "yes"] + OTS_OIDC_NAME = os.getenv("OTS_OIDC_NAME", "oidc") + OTS_OIDC_CLIENT_ID = os.getenv("OTS_OIDC_CLIENT_ID", "") + OTS_OIDC_CLIENT_SECRET = os.getenv("OTS_OIDC_CLIENT_SECRET", "") + OTS_OIDC_TOKEN_ENDPOINT_AUTH_METHOD = os.getenv("OTS_OIDC_TOKEN_ENDPOINT_AUTH_METHOD", "") + + # Authlib uses this for discovery and to discover OpenID endpoints + OTS_OIDC_METADATA_URL = os.getenv("OTS_OIDC_METADATA_URL", "") + + # Manual OpenID endpoints (used when metadata is not configured) + OTS_OIDC_AUTHORIZATION_ENDPOINT = os.getenv("OTS_OIDC_AUTHORIZATION_ENDPOINT", "") + OTS_OIDC_TOKEN_ENDPOINT = os.getenv("OTS_OIDC_TOKEN_ENDPOINT", "") + OTS_OIDC_USERINFO_ENDPOINT = os.getenv("OTS_OIDC_USERINFO_ENDPOINT", "") + + OTS_OIDC_SCOPE = os.getenv("OTS_OIDC_SCOPE", "openid profile email") + OTS_OIDC_REDIRECT_URI = os.getenv("OTS_OIDC_REDIRECT_URI", "/api/oidc/callback") + OTS_OIDC_ISSUER = os.getenv("OTS_OIDC_ISSUER", "") + OTS_OIDC_USE_PKCE = os.getenv("OTS_OIDC_USE_PKCE", "False").lower() in ["true", "1", "yes"] + OTS_OIDC_PKCE_METHOD = os.getenv("OTS_OIDC_PKCE_METHOD", "S256") + OTS_OIDC_INCLUDE_AUTH_TOKEN_IN_CALLBACK_JSON = os.getenv( + "OTS_OIDC_INCLUDE_AUTH_TOKEN_IN_CALLBACK_JSON", "False" + ).lower() in ["true", "1", "yes"] + + # Claims used to create and update OTS users + OTS_OIDC_USERNAME_CLAIMS = os.getenv( + "OTS_OIDC_USERNAME_CLAIMS", "preferred_username, upn, email, sub" + ) + OTS_OIDC_EMAIL_CLAIM = os.getenv("OTS_OIDC_EMAIL_CLAIM", "email") + OTS_OIDC_ROLE_CLAIM = os.getenv("OTS_OIDC_ROLE_CLAIM", "groups") + OTS_OIDC_SUBJECT_CLAIM = os.getenv("OTS_OIDC_SUBJECT_CLAIM", "sub") + OTS_OIDC_ISSUER_CLAIM = os.getenv("OTS_OIDC_ISSUER_CLAIM", "iss") + + # Role mapping behavior + OTS_OIDC_ADMIN_ROLES = os.getenv("OTS_OIDC_ADMIN_ROLES", "administrator") + OTS_OIDC_DEFAULT_ROLES = os.getenv("OTS_OIDC_DEFAULT_ROLES", "user") + # See https://docs.python.org/3/library/logging.handlers.html#logging.handlers.TimedRotatingFileHandler OTS_LOG_ROTATE_WHEN = os.getenv("OTS_LOG_ROTATE_WHEN", "midnight") OTS_LOG_ROTATE_INTERVAL = int(os.getenv("OTS_LOG_ROTATE_INTERVAL", 0)) diff --git a/opentakserver/extensions.py b/opentakserver/extensions.py index 60d0a211..1d4ab55a 100644 --- a/opentakserver/extensions.py +++ b/opentakserver/extensions.py @@ -23,4 +23,13 @@ ldap_manager = LDAP3LoginManager() +# OIDC helper used by the OIDC blueprint. LDAP remains supported as-is and this client +# is only initialized when OTS_ENABLE_OIDC is enabled. +try: + from opentakserver.oidc import OpenTAKOIDCExtension + + oidc = OpenTAKOIDCExtension() +except ModuleNotFoundError: + oidc = None + babel = Babel() diff --git a/opentakserver/migrations/versions/c2f8e2e0c1b1_add_oidc_identity_to_user.py b/opentakserver/migrations/versions/c2f8e2e0c1b1_add_oidc_identity_to_user.py new file mode 100644 index 00000000..dcde67e8 --- /dev/null +++ b/opentakserver/migrations/versions/c2f8e2e0c1b1_add_oidc_identity_to_user.py @@ -0,0 +1,30 @@ +"""add oidc identity to user + +Revision ID: c2f8e2e0c1b1 +Revises: 6a7929c07690 +Create Date: 2026-04-10 20:45:00.000000 + +""" + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "c2f8e2e0c1b1" +down_revision = "6a7929c07690" +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table("user", schema=None) as batch_op: + batch_op.add_column(sa.Column("oidc_issuer", sa.String(length=255), nullable=True)) + batch_op.add_column(sa.Column("oidc_subject", sa.String(length=255), nullable=True)) + batch_op.create_unique_constraint("uq_user_oidc_identity", ["oidc_issuer", "oidc_subject"]) + + +def downgrade(): + with op.batch_alter_table("user", schema=None) as batch_op: + batch_op.drop_constraint("uq_user_oidc_identity", type_="unique") + batch_op.drop_column("oidc_subject") + batch_op.drop_column("oidc_issuer") diff --git a/opentakserver/models/user.py b/opentakserver/models/user.py index e55f4fa8..44443975 100644 --- a/opentakserver/models/user.py +++ b/opentakserver/models/user.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from flask_security.models import fsqla_v3 as fsqla -from sqlalchemy import String +from sqlalchemy import String, UniqueConstraint from sqlalchemy.orm import relationship from opentakserver.extensions import db @@ -14,7 +14,11 @@ @dataclass class User(db.Model, fsqla.FsUserMixin): + __table_args__ = (UniqueConstraint("oidc_issuer", "oidc_subject", name="uq_user_oidc_identity"),) + email = db.Column(String(255), nullable=True) + oidc_issuer = db.Column(String(255), nullable=True) + oidc_subject = db.Column(String(255), nullable=True) video_streams = relationship("VideoStream", back_populates="user") euds = relationship("EUD", back_populates="user") data_packages = relationship("DataPackage", back_populates="user") diff --git a/opentakserver/oidc.py b/opentakserver/oidc.py new file mode 100644 index 00000000..cf0e5714 --- /dev/null +++ b/opentakserver/oidc.py @@ -0,0 +1,184 @@ +import logging + +from authlib.integrations.flask_client import OAuth +from flask_oidc import OpenIDConnect +from werkzeug.utils import import_string + + +logger = logging.getLogger("OpenTAKServer") + + +def _normalize_optional_string(value): + if value is None: + return None + + value = str(value).strip() + return value or None + + +def _derive_issuer_from_metadata_url(metadata_url): + metadata_url = _normalize_optional_string(metadata_url) + if not metadata_url: + return None + + suffix = "/.well-known/openid-configuration" + if metadata_url.endswith(suffix): + issuer = metadata_url[: -len(suffix)].rstrip("/") + return issuer or None + + return None + + +def _resolve_configured_issuer(app): + issuer = _normalize_optional_string(app.config.get("OTS_OIDC_ISSUER")) + if issuer: + return issuer + + issuer = _derive_issuer_from_metadata_url(app.config.get("OTS_OIDC_METADATA_URL")) + if issuer: + return issuer + + oidc_client_secrets = app.config.get("OIDC_CLIENT_SECRETS") or {} + if isinstance(oidc_client_secrets, dict): + web_secrets = oidc_client_secrets.get("web") or {} + if isinstance(web_secrets, dict): + issuer = _normalize_optional_string(web_secrets.get("issuer")) + if issuer: + return issuer + + return None + + +def _configured_token_endpoint_auth_method(app): + return _normalize_optional_string(app.config.get("OTS_OIDC_TOKEN_ENDPOINT_AUTH_METHOD")) + + +def _build_client_registration(app): + client_id = _normalize_optional_string(app.config.get("OTS_OIDC_CLIENT_ID")) + client_secret = _normalize_optional_string(app.config.get("OTS_OIDC_CLIENT_SECRET")) + metadata_url = _normalize_optional_string(app.config.get("OTS_OIDC_METADATA_URL")) + + client_kwargs = {"scope": app.config.get("OTS_OIDC_SCOPE")} + + token_endpoint_auth_method = _configured_token_endpoint_auth_method(app) + if token_endpoint_auth_method: + logger.info( + "Using configured OIDC token endpoint auth method: %s", + token_endpoint_auth_method, + ) + client_kwargs["token_endpoint_auth_method"] = token_endpoint_auth_method + elif not client_secret: + client_kwargs["token_endpoint_auth_method"] = "none" + + use_pkce = bool(app.config.get("OTS_OIDC_USE_PKCE")) or not client_secret + if use_pkce: + pkce_method = _normalize_optional_string(app.config.get("OTS_OIDC_PKCE_METHOD")) or "S256" + if pkce_method != "S256": + raise RuntimeError("OTS_OIDC_PKCE_METHOD must be S256") + + client_kwargs["code_challenge_method"] = pkce_method + if client_secret: + logger.info("Enabling OIDC PKCE with %s", pkce_method) + else: + logger.info("Configuring OIDC as a public client with PKCE %s", pkce_method) + + registration = { + "client_id": client_id, + "client_secret": client_secret, + "client_kwargs": client_kwargs, + } + + if metadata_url: + logger.info("Using OIDC metadata URL: %s", metadata_url) + registration["server_metadata_url"] = metadata_url + return registration + + authorization_endpoint = _normalize_optional_string(app.config.get("OTS_OIDC_AUTHORIZATION_ENDPOINT")) + token_endpoint = _normalize_optional_string(app.config.get("OTS_OIDC_TOKEN_ENDPOINT")) + userinfo_endpoint = _normalize_optional_string(app.config.get("OTS_OIDC_USERINFO_ENDPOINT")) + + if not all([authorization_endpoint, token_endpoint, userinfo_endpoint]): + raise RuntimeError( + "OTS_ENABLE_OIDC is enabled but metadata or endpoint settings are missing. " + "Configure OTS_OIDC_METADATA_URL or the per-endpoint URLs." + ) + + registration.update( + { + "authorize_url": authorization_endpoint, + "access_token_url": token_endpoint, + "userinfo_endpoint": userinfo_endpoint, + } + ) + return registration + + +def _build_client_secrets(app): + issuer = _resolve_configured_issuer(app) + if not issuer: + raise RuntimeError( + "OTS_ENABLE_OIDC is enabled but OTS_OIDC_ISSUER is not configured and could not be " + "derived from OTS_OIDC_METADATA_URL." + ) + + return { + "web": { + "client_id": _normalize_optional_string(app.config.get("OTS_OIDC_CLIENT_ID")) or "", + "client_secret": _normalize_optional_string(app.config.get("OTS_OIDC_CLIENT_SECRET")) + or "", + "issuer": issuer, + } + } + + +def _build_extension_config(app): + if not _normalize_optional_string(app.config.get("OTS_OIDC_NAME")): + raise RuntimeError("OTS_ENABLE_OIDC is enabled but OTS_OIDC_NAME is not configured") + + if not app.config.get("OTS_OIDC_CLIENT_ID") and not app.config.get("OTS_OIDC_METADATA_URL"): + logger.warning( + "OTS_OIDC_CLIENT_ID is empty. Public clients without client_id may fail for some providers." + ) + + registration = _build_client_registration(app) + secrets = _build_client_secrets(app) + + token_endpoint_auth_method = _configured_token_endpoint_auth_method(app) + if not token_endpoint_auth_method: + token_endpoint_auth_method = ( + "none" if not registration.get("client_secret") else "client_secret_basic" + ) + + return { + "OIDC_ENABLED": True, + "OIDC_CLIENT_SECRETS": secrets, + "OIDC_CLIENT_ID": secrets["web"]["client_id"], + "OIDC_CLIENT_SECRET": secrets["web"]["client_secret"], + "OIDC_SCOPES": app.config.get("OTS_OIDC_SCOPE"), + "OIDC_USER_INFO_ENABLED": True, + "OIDC_RESOURCE_SERVER_ONLY": True, + "OIDC_INTROSPECTION_AUTH_METHOD": token_endpoint_auth_method, + "OIDC_CLOCK_SKEW": 60, + "OTS_OIDC_CLIENT_REGISTRATION": registration, + } + + +class OpenTAKOIDCExtension(OpenIDConnect): + def init_app(self, app, prefix=None): + app.config.update(_build_extension_config(app)) + self.client_secrets = app.config["OIDC_CLIENT_SECRETS"]["web"] + + if "openid" not in app.config["OIDC_SCOPES"]: + raise ValueError('The value "openid" must be in the OIDC_SCOPES') + + registration = dict(app.config["OTS_OIDC_CLIENT_REGISTRATION"]) + registration["client_kwargs"] = dict(registration.get("client_kwargs") or {}) + + self.oauth = OAuth(app) + self.oauth.register(name="oidc", update_token=self._update_token, **registration) + + app.config.setdefault("OIDC_USER_CLASS", "flask_oidc.model.User") + if app.config["OIDC_USER_CLASS"]: + app.extensions["_oidc_user_class"] = import_string(app.config["OIDC_USER_CLASS"]) + + app.before_request(self._before_request) diff --git a/poetry.lock b/poetry.lock index d8a56e71..6ca3be0c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.3.2 and should not be changed by hand. [[package]] name = "adsbxcot" @@ -474,6 +474,21 @@ files = [ {file = "attrs-25.4.0.tar.gz", hash = "sha256:16d5969b87f0859ef33a48b35d55ac1be6e42ae49d5e853b597db70c35c57e11"}, ] +[[package]] +name = "authlib" +version = "1.6.9" +description = "The ultimate Python library in building OAuth and OpenID Connect servers and clients." +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "authlib-1.6.9-py2.py3-none-any.whl", hash = "sha256:f08b4c14e08f0861dc18a32357b33fbcfd2ea86cfe3fe149484b4d764c4a0ac3"}, + {file = "authlib-1.6.9.tar.gz", hash = "sha256:d8f2421e7e5980cc1ddb4e32d3f5fa659cfaf60d8eaf3281ebed192e4ab74f04"}, +] + +[package.dependencies] +cryptography = "*" + [[package]] name = "babel" version = "2.17.0" @@ -2055,6 +2070,24 @@ Flask-SQLAlchemy = ">=1.0" dev = ["flake8", "pytest", "tox"] docs = ["sphinx"] +[[package]] +name = "flask-oidc" +version = "2.4.0" +description = "OpenID Connect extension for Flask" +optional = false +python-versions = "<4.0,>=3.8" +groups = ["main"] +files = [ + {file = "flask_oidc-2.4.0-py3-none-any.whl", hash = "sha256:5aa398dd3e593c5ed9d94f23a948c635606d619b974b5df25de8aeb725b6ea8d"}, + {file = "flask_oidc-2.4.0.tar.gz", hash = "sha256:e72378fcea8f90ccab874836a61f1783092a900c965e52587c1307e38b2e2153"}, +] + +[package.dependencies] +authlib = ">=1.2.0,<2.0.0" +blinker = ">=1.4.0,<2.0.0" +flask = ">=0.12.2,<4.0.0" +requests = ">=2.20.0,<3.0.0" + [[package]] name = "flask-principal" version = "0.4.0" @@ -3101,7 +3134,7 @@ librabbitmq = ["librabbitmq (>=2.0.0) ; python_version < \"3.11\""] mongodb = ["pymongo (==4.15.3)"] msgpack = ["msgpack (==1.1.2)"] pyro = ["pyro4 (==4.82)"] -qpid = ["qpid-python (==1.36.0-1)", "qpid-tools (==1.36.0-1)"] +qpid = ["qpid-python (==1.36.0.post1)", "qpid-tools (==1.36.0.post1)"] redis = ["redis (>=4.5.2,!=4.5.5,!=5.0.2,<6.5)"] slmq = ["softlayer_messaging (>=1.0.3)"] sqlalchemy = ["sqlalchemy (>=1.4.48,<2.1)"] @@ -3190,7 +3223,7 @@ colorama = {version = ">=0.3.4", markers = "sys_platform == \"win32\""} win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""} [package.extras] -dev = ["Sphinx (==8.1.3) ; python_version >= \"3.11\"", "build (==1.2.2) ; python_version >= \"3.11\"", "colorama (==0.4.5) ; python_version < \"3.8\"", "colorama (==0.4.6) ; python_version >= \"3.8\"", "exceptiongroup (==1.1.3) ; python_version >= \"3.7\" and python_version < \"3.11\"", "freezegun (==1.1.0) ; python_version < \"3.8\"", "freezegun (==1.5.0) ; python_version >= \"3.8\"", "mypy (==v0.910) ; python_version < \"3.6\"", "mypy (==v0.971) ; python_version == \"3.6\"", "mypy (==v1.13.0) ; python_version >= \"3.8\"", "mypy (==v1.4.1) ; python_version == \"3.7\"", "myst-parser (==4.0.0) ; python_version >= \"3.11\"", "pre-commit (==4.0.1) ; python_version >= \"3.9\"", "pytest (==6.1.2) ; python_version < \"3.8\"", "pytest (==8.3.2) ; python_version >= \"3.8\"", "pytest-cov (==2.12.1) ; python_version < \"3.8\"", "pytest-cov (==5.0.0) ; python_version == \"3.8\"", "pytest-cov (==6.0.0) ; python_version >= \"3.9\"", "pytest-mypy-plugins (==1.9.3) ; python_version >= \"3.6\" and python_version < \"3.8\"", "pytest-mypy-plugins (==3.1.0) ; python_version >= \"3.8\"", "sphinx-rtd-theme (==3.0.2) ; python_version >= \"3.11\"", "tox (==3.27.1) ; python_version < \"3.8\"", "tox (==4.23.2) ; python_version >= \"3.8\"", "twine (==6.0.1) ; python_version >= \"3.11\""] +dev = ["Sphinx (==8.1.3) ; python_version >= \"3.11\"", "build (==1.2.2) ; python_version >= \"3.11\"", "colorama (==0.4.5) ; python_version < \"3.8\"", "colorama (==0.4.6) ; python_version >= \"3.8\"", "exceptiongroup (==1.1.3) ; python_version >= \"3.7\" and python_version < \"3.11\"", "freezegun (==1.1.0) ; python_version < \"3.8\"", "freezegun (==1.5.0) ; python_version >= \"3.8\"", "mypy (==0.910) ; python_version < \"3.6\"", "mypy (==0.971) ; python_version == \"3.6\"", "mypy (==1.13.0) ; python_version >= \"3.8\"", "mypy (==1.4.1) ; python_version == \"3.7\"", "myst-parser (==4.0.0) ; python_version >= \"3.11\"", "pre-commit (==4.0.1) ; python_version >= \"3.9\"", "pytest (==6.1.2) ; python_version < \"3.8\"", "pytest (==8.3.2) ; python_version >= \"3.8\"", "pytest-cov (==2.12.1) ; python_version < \"3.8\"", "pytest-cov (==5.0.0) ; python_version == \"3.8\"", "pytest-cov (==6.0.0) ; python_version >= \"3.9\"", "pytest-mypy-plugins (==1.9.3) ; python_version >= \"3.6\" and python_version < \"3.8\"", "pytest-mypy-plugins (==3.1.0) ; python_version >= \"3.8\"", "sphinx-rtd-theme (==3.0.2) ; python_version >= \"3.11\"", "tox (==3.27.1) ; python_version < \"3.8\"", "tox (==4.23.2) ; python_version >= \"3.8\"", "twine (==6.0.1) ; python_version >= \"3.11\""] [[package]] name = "lxml" @@ -7079,9 +7112,9 @@ files = [ ] [package.extras] -cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and python_version < \"3.14\"", "cffi (>=2.0.0b) ; platform_python_implementation != \"PyPy\" and python_version >= \"3.14\""] +cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and python_version < \"3.14\"", "cffi (>=2.0.0b0) ; platform_python_implementation != \"PyPy\" and python_version >= \"3.14\""] [metadata] lock-version = "2.1" python-versions = ">=3.10, <3.15" -content-hash = "2c46326fcf86504b2377219f4f6276349da3fd08253dad1dc68b708d7c7061b0" +content-hash = "f0058e6caed72b4034099b10c6788540fb085c685ae694ffa4c71269ea11ca28" diff --git a/pyproject.toml b/pyproject.toml index 0c6db25c..9afde979 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,7 @@ yt-dlp = "*" # Keep zope-event at 5.1.1, DO NOT UPGRADE zope-event = "5.1.1" zope-interface = "8.1.1" +flask-oidc = "2.4.0" [tool.poetry.group.dev.dependencies] isort = "^7.0.0" diff --git a/tests/test_oidc_api.py b/tests/test_oidc_api.py new file mode 100644 index 00000000..c09be23e --- /dev/null +++ b/tests/test_oidc_api.py @@ -0,0 +1,1070 @@ +from importlib import import_module +from importlib import util +from pathlib import Path + +import pytest + +from opentakserver.extensions import db + + +def _load_oidc_module(): + module_path = Path(__file__).resolve().parents[1] / "opentakserver/blueprints/ots_api/oidc_api.py" + spec = util.spec_from_file_location("test_oidc_blueprint", module_path) + assert spec is not None + module = util.module_from_spec(spec) + loader = spec.loader + assert loader is not None + loader.exec_module(module) + return module + + +oidc_api = _load_oidc_module() + +_extract_roles = oidc_api._extract_roles +_extract_username = oidc_api._extract_username +_normalize_roles = oidc_api._normalize_roles +_sync_oidc_user = oidc_api._sync_oidc_user +_sanitize_next_url = oidc_api._sanitize_next_url + + +@pytest.fixture +def real_oidc_api(app): + return import_module("opentakserver.blueprints.ots_api.oidc_api") + + +@pytest.fixture +def app_with_db(app): + app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite://" + db.init_app(app) + + with app.app_context(): + db.create_all() + yield app + db.session.remove() + db.drop_all() + + +class _MockOIDCClient: + def __init__(self, token=None, fallback_userinfo=None, redirect_to="/idp/authorize"): + self.token = token or {} + self.fallback_userinfo = fallback_userinfo or {} + self.redirect_to = redirect_to + self.redirect_uri = None + self.authorize_access_token_called = False + self.userinfo_called = False + + def authorize_redirect(self, redirect_uri): + self.redirect_uri = redirect_uri + return ("", 302, {"Location": self.redirect_to}) + + def authorize_access_token(self): + self.authorize_access_token_called = True + return self.token + + def userinfo(self, *args, **kwargs): + self.userinfo_called = True + return self.fallback_userinfo + + +@pytest.fixture +def client_with_db(app_with_db): + with app_with_db.test_client() as client: + yield client + + + +def test_oidc_login_is_disabled(client): + response = client.get("/api/oidc/login") + + assert response.status_code == 503 + assert response.json["success"] is False + + +def test_oidc_claim_helpers_extract_values(app): + with app.app_context(): + app.config["OTS_OIDC_USERNAME_CLAIMS"] = "nested.name, email" + app.config["OTS_OIDC_ROLE_CLAIM"] = "claims.roles" + + claims = { + "nested": {"name": "sso-user"}, + "email": "sso-user@example.com", + "claims": {"roles": "Admin,admin,operator"}, + } + + assert _extract_username(claims) == "sso.user" + assert _extract_roles(claims) == ["Admin", "admin", "operator"] + assert _normalize_roles([" Admin ", "admin", "", "Admin"]) == ["Admin"] + + +def test_oidc_extract_username_uses_normalized_email_claim(app): + with app.app_context(): + app.config["OTS_OIDC_USERNAME_CLAIMS"] = "email" + + claims = { + "email": "oidc.user+ops@example.com", + } + + assert _extract_username(claims) == "oidc.user.ops.example.com" + + +def test_oidc_extract_username_rejects_unusable_claims(app): + with app.app_context(): + app.config["OTS_OIDC_USERNAME_CLAIMS"] = "preferred_username" + + claims = { + "preferred_username": "!!!", + } + + assert _extract_username(claims) is None + + +def test_oidc_sync_user_rejects_missing_username_claim(app_with_db): + app = app_with_db + + with app.app_context(): + app.config["OTS_OIDC_USERNAME_CLAIMS"] = "preferred_username2, upn2" + synced = _sync_oidc_user({"iss": "https://issuer.example", "sub": "user-123"}) + + assert synced is None + + +def test_oidc_sync_user_rejects_missing_issuer_identity(app_with_db): + app = app_with_db + + with app.app_context(): + synced = _sync_oidc_user({"sub": "user-123", "preferred_username": "issuerless_user"}) + + assert synced is None + + +def test_oidc_sync_user_binds_user_to_stable_oidc_identity(app_with_db): + app = app_with_db + + with app.app_context(): + synced = _sync_oidc_user( + { + "iss": "https://issuer.example", + "sub": "user-123", + "preferred_username": "stable_user", + "email": "stable-user@example.com", + } + ) + + assert synced is not None + assert synced.username == "stable_user" + assert synced.email == "stable-user@example.com" + assert synced.oidc_issuer == "https://issuer.example" + assert synced.oidc_subject == "user-123" + + db.session.delete(synced) + db.session.commit() + + +def test_oidc_sync_user_normalizes_username_from_email_claim(app_with_db): + app = app_with_db + + with app.app_context(): + app.config["OTS_OIDC_USERNAME_CLAIMS"] = "email" + + synced = _sync_oidc_user( + { + "iss": "https://issuer.example", + "sub": "user-456", + "email": "oidc.user+ops@example.com", + } + ) + + assert synced is not None + assert synced.username == "oidc.user.ops.example.com" + assert synced.email == "oidc.user+ops@example.com" + + db.session.delete(synced) + db.session.commit() + + +def test_oidc_sync_user_rejects_username_collision_for_unlinked_local_user(app_with_db): + app = app_with_db + + with app.app_context(): + existing_user = app.security.datastore.create_user( + username="existing_user", + email="original@example.com", + password=None, + ) + db.session.commit() + + synced = _sync_oidc_user( + { + "iss": "https://issuer.example", + "sub": "user-123", + "preferred_username": "existing_user", + } + ) + + assert synced is None + + db.session.delete(existing_user) + db.session.commit() + + +def test_oidc_sync_user_preserves_linked_username_and_updates_roles(app_with_db): + app = app_with_db + + app.config["OTS_OIDC_DEFAULT_ROLES"] = "user" + app.config["OTS_OIDC_ADMIN_ROLES"] = "administrator,global-admin" + + with app.app_context(): + user = app.security.datastore.create_user( + username="existing_user", + email="original@example.com", + password=None, + oidc_issuer="https://issuer.example", + oidc_subject="user-123", + ) + app.security.datastore.add_role_to_user( + user, + app.security.datastore.find_or_create_role("administrator"), + ) + db.session.commit() + + synced = _sync_oidc_user( + { + "iss": "https://issuer.example", + "sub": "user-123", + "preferred_username": "renamed_user", + "groups": ["admin", "user"], + } + ) + + assert synced is not None + role_names = {role.name for role in synced.roles} + assert synced.username == "existing_user" + assert synced.email == "original@example.com" + assert role_names == {"admin", "user"} + + db.session.delete(synced) + db.session.commit() + + +def test_oidc_sync_user_adds_administrator_role_for_mapped_groups(app_with_db): + app = app_with_db + + with app.app_context(): + app.config["OTS_OIDC_DEFAULT_ROLES"] = "user" + app.config["OTS_OIDC_ADMIN_ROLES"] = "global-admin" + + synced = _sync_oidc_user( + { + "iss": "https://issuer.example", + "sub": "admin-user-123", + "preferred_username": "admin_map_user", + "email": "admin@example.com", + "groups": ["global-admin", "operator"], + } + ) + + assert synced is not None + role_names = {role.name for role in synced.roles} + assert "administrator" in role_names + assert "global-admin" in role_names + assert "operator" in role_names + + db.session.delete(synced) + db.session.commit() + + +def test_oidc_login_redirects_to_provider_when_enabled( + real_oidc_api, app_with_db, client_with_db, monkeypatch +): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + + mock_client = _MockOIDCClient(redirect_to="/idp/authorize") + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: mock_client) + + response = client_with_db.get("/api/oidc/login?return_json=True&next=/dashboard") + + assert response.status_code == 302 + assert response.headers["Location"] == "/idp/authorize" + assert mock_client.redirect_uri == "http://localhost/api/oidc/callback" + + +def test_oidc_login_uses_forwarded_proto_for_callback_url( + real_oidc_api, app_with_db, client_with_db, monkeypatch +): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + + mock_client = _MockOIDCClient(redirect_to="/idp/authorize") + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: mock_client) + + response = client_with_db.get( + "/api/oidc/login", + headers={ + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "ots.example.com", + }, + ) + + assert response.status_code == 302 + assert mock_client.redirect_uri == "https://ots.example.com/api/oidc/callback" + + +def test_oidc_login_clears_existing_flow_state_when_args_are_missing( + real_oidc_api, app_with_db, client_with_db, monkeypatch +): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + + mock_client = _MockOIDCClient(redirect_to="/idp/authorize") + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: mock_client) + + with client_with_db.session_transaction() as session: + session["ots_oidc_next"] = "/previous" + session["ots_oidc_return_json"] = True + + response = client_with_db.get("/api/oidc/login") + + assert response.status_code == 302 + with client_with_db.session_transaction() as session: + assert "ots_oidc_next" not in session + assert session["ots_oidc_return_json"] is False + + +def test_oidc_next_url_is_sanitized(): + assert _sanitize_next_url("/dashboard") == "/dashboard" + assert _sanitize_next_url("dashboard") == "/" + assert _sanitize_next_url("https://malicious.example/callback") == "/" + assert _sanitize_next_url("//malicious.example/callback") == "/" + assert _sanitize_next_url("") == "/" + assert _sanitize_next_url(None) == "/" + + +def test_oidc_login_stores_sanitized_next(real_oidc_api, app_with_db, client_with_db, monkeypatch): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + + mock_client = _MockOIDCClient(redirect_to="/idp/authorize") + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: mock_client) + + response = client_with_db.get("/api/oidc/login?next=https://malicious.example/callback") + + assert response.status_code == 302 + with client_with_db.session_transaction() as session: + assert session["ots_oidc_next"] == "/" + + +def test_oidc_login_authorize_redirect_failure_returns_error( + real_oidc_api, app_with_db, client_with_db, monkeypatch +): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + + class BrokenOIDCClient(_MockOIDCClient): + def authorize_redirect(self, redirect_uri): # type: ignore[override] + raise RuntimeError("provider unavailable") + + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: BrokenOIDCClient()) + + with client_with_db.session_transaction() as session: + session["ots_oidc_next"] = "/previous" + session["ots_oidc_return_json"] = True + + response = client_with_db.get("/api/oidc/login") + + assert response.status_code == 500 + assert response.json["success"] is False + assert response.json["error"] == "OIDC login failed" + with client_with_db.session_transaction() as session: + assert "ots_oidc_next" not in session + assert "ots_oidc_return_json" not in session + + +def test_oidc_callback_token_exchange_failure_clears_flow_state( + real_oidc_api, client_with_db, app_with_db, monkeypatch +): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + + class BrokenOIDCClient(_MockOIDCClient): + def authorize_access_token(self): # type: ignore[override] + raise RuntimeError("bad callback") + + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: BrokenOIDCClient()) + + with client_with_db.session_transaction() as session: + session["ots_oidc_next"] = "/previous" + session["ots_oidc_return_json"] = True + + response = client_with_db.get("/api/oidc/callback?code=test-code") + + assert response.status_code == 400 + assert response.json["success"] is False + assert response.json["error"] == "Invalid OIDC callback" + with client_with_db.session_transaction() as session: + assert "ots_oidc_next" not in session + assert "ots_oidc_return_json" not in session + + +def test_oidc_callback_rejects_provider_error_response(real_oidc_api, client_with_db, app_with_db, monkeypatch): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + + mock_client = _MockOIDCClient() + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: mock_client) + + with client_with_db.session_transaction() as session: + session["ots_oidc_return_json"] = True + + response = client_with_db.get( + "/api/oidc/callback?error=access_denied&error_description=user%20cancelled" + ) + + assert response.status_code == 400 + assert response.headers["Cache-Control"] == "no-store, no-cache, max-age=0, must-revalidate, private" + assert response.headers["X-Content-Type-Options"] == "nosniff" + assert response.json["success"] is False + assert response.json["error"] == "OIDC provider rejected authentication" + + +def test_oidc_callback_rejects_missing_authorization_code( + real_oidc_api, client_with_db, app_with_db, monkeypatch +): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + + mock_client = _MockOIDCClient() + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: mock_client) + + with client_with_db.session_transaction() as session: + session["ots_oidc_return_json"] = True + + response = client_with_db.get("/api/oidc/callback") + + assert response.status_code == 400 + assert response.headers["Cache-Control"] == "no-store, no-cache, max-age=0, must-revalidate, private" + assert response.headers["X-Content-Type-Options"] == "nosniff" + assert response.json["success"] is False + assert response.json["error"] == "Invalid OIDC callback" + + +def test_oidc_callback_syncs_user_and_returns_json( + real_oidc_api, client_with_db, app_with_db, monkeypatch +): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + app.config["OTS_OIDC_DEFAULT_ROLES"] = "user" + + claims = { + "iss": "https://issuer.example", + "sub": "callback-user-123", + "preferred_username": "callback_user", + "email": "callback-user@example.com", + "groups": ["operator", "admin"], + } + + mock_client = _MockOIDCClient(token={"userinfo": claims}) + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: mock_client) + + with client_with_db.session_transaction() as session: + session["ots_oidc_return_json"] = True + + response = client_with_db.get("/api/oidc/callback?code=test-code") + + assert response.status_code == 200 + assert response.headers["Cache-Control"] == "no-store, no-cache, max-age=0, must-revalidate, private" + assert response.headers["Pragma"] == "no-cache" + assert response.headers["Referrer-Policy"] == "no-referrer" + assert response.headers["X-Content-Type-Options"] == "nosniff" + payload = response.json + assert payload["success"] is True + assert payload["identity_attributes"]["oidc"]["provider"] == "oidc" + assert payload["username"] == "callback_user" + assert payload["email"] == "callback-user@example.com" + assert "token" not in payload + + role_names = {role["name"] for role in payload["roles"]} + assert role_names == {"operator", "admin"} + + with client_with_db.session_transaction() as session: + assert "oidc_auth_token" not in session + assert "oidc_auth_profile" not in session + + with app.app_context(): + user = app.security.datastore.find_user(username="callback_user") + assert user is not None + assert user.oidc_issuer == "https://issuer.example" + assert user.oidc_subject == "callback-user-123" + + +def test_oidc_callback_clears_oidc_session_artifacts( + real_oidc_api, client_with_db, app_with_db, monkeypatch +): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + + claims = { + "iss": "https://issuer.example", + "sub": "session-artifact-user-123", + "preferred_username": "session_artifact_user", + "email": "session-artifact-user@example.com", + "groups": ["user"], + } + + mock_client = _MockOIDCClient(token={"userinfo": claims}) + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: mock_client) + + with client_with_db.session_transaction() as session: + session["ots_oidc_return_json"] = True + session["oidc_auth_token"] = {"access_token": "placeholder-token"} + session["oidc_auth_profile"] = {"sub": "placeholder-subject"} + + response = client_with_db.get("/api/oidc/callback?code=test-code") + + assert response.status_code == 200 + with client_with_db.session_transaction() as session: + assert "oidc_auth_token" not in session + assert "oidc_auth_profile" not in session + + +def test_oidc_callback_redirects_to_return_url(real_oidc_api, app_with_db, client_with_db, monkeypatch): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + + claims = { + "iss": "https://issuer.example", + "sub": "redirect-user-123", + "preferred_username": "redirect_user", + "email": "redirect-user@example.com", + "groups": ["user"], + } + + mock_client = _MockOIDCClient(token={"userinfo": claims}) + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: mock_client) + + with client_with_db.session_transaction() as session: + session["ots_oidc_next"] = "/dashboard" + + response = client_with_db.get("/api/oidc/callback?code=test-code", follow_redirects=False) + + assert response.status_code == 302 + assert response.headers["Location"] == "/dashboard" + assert response.headers["Cache-Control"] == "no-store, no-cache, max-age=0, must-revalidate, private" + assert response.headers["Pragma"] == "no-cache" + assert response.headers["Referrer-Policy"] == "no-referrer" + assert response.headers["X-Content-Type-Options"] == "nosniff" + + +def test_oidc_callback_can_include_auth_token_when_explicitly_enabled( + real_oidc_api, client_with_db, app_with_db, monkeypatch +): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + app.config["OTS_OIDC_INCLUDE_AUTH_TOKEN_IN_CALLBACK_JSON"] = True + + claims = { + "iss": "https://issuer.example", + "sub": "token-user-123", + "preferred_username": "token_user", + "email": "token-user@example.com", + "groups": ["user"], + } + + mock_client = _MockOIDCClient(token={"userinfo": claims}) + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: mock_client) + + with client_with_db.session_transaction() as session: + session["ots_oidc_return_json"] = True + + response = client_with_db.get("/api/oidc/callback?code=test-code") + + assert response.status_code == 200 + payload = response.json + assert payload["success"] is True + assert payload["username"] == "token_user" + assert "token" in payload + assert payload["token"] + + +def test_oidc_callback_returns_error_when_login_user_fails( + real_oidc_api, client_with_db, app_with_db, monkeypatch +): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + + claims = { + "iss": "https://issuer.example", + "sub": "login-fail-user-123", + "preferred_username": "login_fail_user", + "email": "login-fail-user@example.com", + "groups": ["user"], + } + + mock_client = _MockOIDCClient(token={"userinfo": claims}) + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: mock_client) + monkeypatch.setattr(real_oidc_api, "login_user", lambda *args, **kwargs: False) + + with client_with_db.session_transaction() as session: + session["ots_oidc_return_json"] = True + + response = client_with_db.get("/api/oidc/callback?code=test-code") + + assert response.status_code == 403 + assert response.headers["Cache-Control"] == "no-store, no-cache, max-age=0, must-revalidate, private" + assert response.headers["X-Content-Type-Options"] == "nosniff" + assert response.json["success"] is False + assert response.json["error"] == "Failed to establish OIDC session" + + +def test_oidc_callback_fetches_userinfo_when_token_missing( + real_oidc_api, client_with_db, app_with_db, monkeypatch +): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + app.config["OTS_OIDC_DEFAULT_ROLES"] = "user" + + claims = { + "iss": "https://issuer.example", + "sub": "userinfo-user-123", + "preferred_username": "userinfo_user", + "email": "userinfo-user@example.com", + } + + mock_client = _MockOIDCClient(token={"access_token": "tok"}, fallback_userinfo=claims) + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: mock_client) + + with client_with_db.session_transaction() as session: + session["ots_oidc_return_json"] = True + + response = client_with_db.get("/api/oidc/callback?code=test-code") + + assert response.status_code == 200 + assert response.headers["Cache-Control"] == "no-store, no-cache, max-age=0, must-revalidate, private" + assert response.headers["X-Content-Type-Options"] == "nosniff" + payload = response.json + assert payload["success"] is True + assert payload["username"] == "userinfo_user" + assert payload["email"] == "userinfo-user@example.com" + assert "token" not in payload + + role_names = {role["name"] for role in payload["roles"]} + assert role_names == {"user"} + + assert mock_client.authorize_access_token_called is True + assert mock_client.userinfo_called is True + + + +def test_oidc_callback_merges_userinfo_endpoint_claims_with_embedded_claims( + real_oidc_api, client_with_db, app_with_db, monkeypatch +): + app = app_with_db + app.config["OTS_ENABLE_OIDC"] = True + app.config["OTS_OIDC_DEFAULT_ROLES"] = "user" + app.config["OTS_OIDC_ADMIN_ROLES"] = "ots-admin" + + embedded_claims = { + "iss": "https://issuer.example", + "sub": "merged-user-123", + } + userinfo_claims = { + "iss": "https://issuer.example", + "sub": "merged-user-123", + "preferred_username": "merged_user", + "email": "merged-user@example.com", + "groups": ["ots-admin"], + } + + mock_client = _MockOIDCClient( + token={"access_token": "tok", "userinfo": embedded_claims}, + fallback_userinfo=userinfo_claims, + ) + monkeypatch.setattr(real_oidc_api, "_get_oidc_client", lambda: mock_client) + + with client_with_db.session_transaction() as session: + session["ots_oidc_return_json"] = True + + response = client_with_db.get("/api/oidc/callback?code=test-code") + + assert response.status_code == 200 + payload = response.json + assert payload["success"] is True + assert payload["username"] == "merged_user" + assert payload["email"] == "merged-user@example.com" + role_names = {role["name"] for role in payload["roles"]} + assert role_names == {"ots-admin", "administrator"} + assert mock_client.userinfo_called is True + + +@pytest.fixture +def app_module(): + return import_module("opentakserver.app") + + +@pytest.fixture +def oidc_module(): + return import_module("opentakserver.oidc") + + +def test_app_can_start_without_oidc_configuration(app_module, monkeypatch, tmp_path): + monkeypatch.setattr(app_module.DefaultConfig, "OTS_DATA_FOLDER", str(tmp_path)) + (tmp_path / "config.yml").write_text("OTS_ENABLE_OIDC: false\nSQLALCHEMY_DATABASE_URI: sqlite://\n") + + app = app_module.create_app() + app.config["TESTING"] = True + + with app.test_client() as client: + response = client.get("/api/oidc/login") + + assert response.status_code == 503 + assert response.json["success"] is False + assert response.json["error"] == "OIDC is not enabled" + + +def test_build_oidc_client_registration_enables_pkce_for_public_client(app, oidc_module): + app.config.update( + { + "OTS_OIDC_CLIENT_ID": "public-client", + "OTS_OIDC_CLIENT_SECRET": "", + "OTS_OIDC_METADATA_URL": "https://issuer.example/.well-known/openid-configuration", + "OTS_OIDC_SCOPE": "openid profile email", + "OTS_OIDC_TOKEN_ENDPOINT_AUTH_METHOD": "", + "OTS_OIDC_USE_PKCE": False, + } + ) + + registration = oidc_module._build_client_registration(app) + + assert registration["client_id"] == "public-client" + assert registration["client_secret"] is None + assert ( + registration["server_metadata_url"] + == "https://issuer.example/.well-known/openid-configuration" + ) + assert registration["client_kwargs"]["scope"] == "openid profile email" + assert registration["client_kwargs"]["token_endpoint_auth_method"] == "none" + assert registration["client_kwargs"]["code_challenge_method"] == "S256" + + +def test_build_oidc_client_registration_can_enable_pkce_for_confidential_client(app, oidc_module): + app.config.update( + { + "OTS_OIDC_CLIENT_ID": "confidential-client", + "OTS_OIDC_CLIENT_SECRET": "super-secret", + "OTS_OIDC_METADATA_URL": "https://issuer.example/.well-known/openid-configuration", + "OTS_OIDC_SCOPE": "openid profile email", + "OTS_OIDC_TOKEN_ENDPOINT_AUTH_METHOD": "", + "OTS_OIDC_USE_PKCE": True, + "OTS_OIDC_PKCE_METHOD": "S256", + } + ) + + registration = oidc_module._build_client_registration(app) + + assert registration["client_secret"] == "super-secret" + assert registration["client_kwargs"]["code_challenge_method"] == "S256" + assert "token_endpoint_auth_method" not in registration["client_kwargs"] + + +def test_build_oidc_client_registration_can_override_token_endpoint_auth_method(app, oidc_module): + app.config.update( + { + "OTS_OIDC_CLIENT_ID": "confidential-client", + "OTS_OIDC_CLIENT_SECRET": "super-secret", + "OTS_OIDC_METADATA_URL": "https://issuer.example/.well-known/openid-configuration", + "OTS_OIDC_SCOPE": "openid profile email", + "OTS_OIDC_TOKEN_ENDPOINT_AUTH_METHOD": "client_secret_post", + } + ) + + registration = oidc_module._build_client_registration(app) + + assert registration["client_kwargs"]["token_endpoint_auth_method"] == "client_secret_post" + + +@pytest.mark.parametrize("pkce_method", ["plain", "s256", "S512"]) +def test_build_oidc_client_registration_rejects_invalid_pkce_method(app, oidc_module, pkce_method): + app.config.update( + { + "OTS_OIDC_CLIENT_ID": "public-client", + "OTS_OIDC_CLIENT_SECRET": "", + "OTS_OIDC_METADATA_URL": "https://issuer.example/.well-known/openid-configuration", + "OTS_OIDC_SCOPE": "openid profile email", + "OTS_OIDC_USE_PKCE": True, + "OTS_OIDC_PKCE_METHOD": pkce_method, + } + ) + + with pytest.raises(RuntimeError, match="OTS_OIDC_PKCE_METHOD must be S256"): + oidc_module._build_client_registration(app) + + +def test_build_oidc_client_registration_supports_manual_endpoints(app, oidc_module): + app.config.update( + { + "OTS_OIDC_CLIENT_ID": "public-client", + "OTS_OIDC_CLIENT_SECRET": "", + "OTS_OIDC_METADATA_URL": "", + "OTS_OIDC_SCOPE": "openid profile email", + "OTS_OIDC_AUTHORIZATION_ENDPOINT": "https://issuer.example/authorize", + "OTS_OIDC_TOKEN_ENDPOINT": "https://issuer.example/token", + "OTS_OIDC_USERINFO_ENDPOINT": "https://issuer.example/userinfo", + } + ) + + registration = oidc_module._build_client_registration(app) + + assert registration["authorize_url"] == "https://issuer.example/authorize" + assert registration["access_token_url"] == "https://issuer.example/token" + assert registration["userinfo_endpoint"] == "https://issuer.example/userinfo" + assert registration["client_kwargs"]["code_challenge_method"] == "S256" + + +def test_build_oidc_client_secrets_uses_configured_issuer(app, oidc_module): + app.config.update( + { + "OTS_OIDC_NAME": "main-oidc", + "OTS_OIDC_CLIENT_ID": "public-client", + "OTS_OIDC_CLIENT_SECRET": "", + "OTS_OIDC_ISSUER": "https://issuer.example", + } + ) + + secrets = oidc_module._build_client_secrets(app) + + assert secrets == { + "web": { + "client_id": "public-client", + "client_secret": "", + "issuer": "https://issuer.example", + } + } + + +def test_build_oidc_client_secrets_can_derive_issuer_from_metadata_url(app, oidc_module): + app.config.update( + { + "OTS_OIDC_NAME": "main-oidc", + "OTS_OIDC_CLIENT_ID": "public-client", + "OTS_OIDC_CLIENT_SECRET": "", + "OTS_OIDC_ISSUER": "", + "OTS_OIDC_METADATA_URL": "https://issuer.example/.well-known/openid-configuration", + } + ) + + secrets = oidc_module._build_client_secrets(app) + + assert secrets["web"]["issuer"] == "https://issuer.example" + + +def test_resolve_configured_issuer_can_use_internal_oidc_client_secrets(app, oidc_module): + app.config.update( + { + "OTS_OIDC_ISSUER": "", + "OTS_OIDC_METADATA_URL": "", + "OIDC_CLIENT_SECRETS": {"web": {"issuer": "https://issuer.example"}}, + } + ) + + assert oidc_module._resolve_configured_issuer(app) == "https://issuer.example" + + +def test_build_oidc_client_secrets_requires_real_issuer(app, oidc_module): + app.config.update( + { + "OTS_OIDC_NAME": "main-oidc", + "OTS_OIDC_CLIENT_ID": "public-client", + "OTS_OIDC_CLIENT_SECRET": "", + "OTS_OIDC_ISSUER": "", + "OTS_OIDC_METADATA_URL": "", + } + ) + + with pytest.raises( + RuntimeError, + match=( + "OTS_ENABLE_OIDC is enabled but OTS_OIDC_ISSUER is not configured and could not be " + "derived from OTS_OIDC_METADATA_URL." + ), + ): + oidc_module._build_client_secrets(app) + + +def test_init_oidc_calls_extension_init_app(app, app_module, monkeypatch): + class FakeOIDC: + def __init__(self): + self.init_app_called = False + + def init_app(self, flask_app): + self.init_app_called = True + + fake_oidc = FakeOIDC() + monkeypatch.setattr(app_module, "oidc", fake_oidc) + + app.config.update( + { + "OTS_ENABLE_OIDC": True, + "OTS_OIDC_NAME": "main-oidc", + "OTS_OIDC_CLIENT_ID": "public-client", + "OTS_OIDC_CLIENT_SECRET": "", + "OTS_OIDC_METADATA_URL": "https://issuer.example/.well-known/openid-configuration", + "OTS_OIDC_SCOPE": "openid profile email", + "OTS_OIDC_TOKEN_ENDPOINT_AUTH_METHOD": "", + } + ) + + app_module._init_oidc(app) + + assert fake_oidc.init_app_called is True + + +def test_init_oidc_relaxes_strict_session_cookie_same_site_to_lax( + app, app_module, monkeypatch +): + class FakeOIDC: + def init_app(self, flask_app): + return None + + monkeypatch.setattr(app_module, "oidc", FakeOIDC()) + + app.config.update( + { + "OTS_ENABLE_OIDC": True, + "OTS_OIDC_NAME": "main-oidc", + "OTS_OIDC_CLIENT_ID": "public-client", + "OTS_OIDC_CLIENT_SECRET": "", + "OTS_OIDC_METADATA_URL": "https://issuer.example/.well-known/openid-configuration", + "OTS_OIDC_SCOPE": "openid profile email", + "OTS_OIDC_TOKEN_ENDPOINT_AUTH_METHOD": "", + "SESSION_COOKIE_SAMESITE": "strict", + } + ) + + app_module._init_oidc(app) + + assert app.config["SESSION_COOKIE_SAMESITE"] == "Lax" + + + +def test_build_security_identity_attributes_adds_oidc_when_enabled(app, app_module): + app.config.update( + { + "OTS_ENABLE_EMAIL": True, + "OTS_ENABLE_LDAP": True, + "OTS_ENABLE_OIDC": True, + } + ) + + identity_attributes = app_module._build_security_identity_attributes(app) + + assert [next(iter(attribute)) for attribute in identity_attributes] == [ + "username", + "email", + "ldap", + "oidc", + ] + + + +def test_login_form_identity_attributes_expose_oidc_when_enabled(app, app_module): + app.config["OTS_ENABLE_OIDC"] = True + app.config["SECURITY_USER_IDENTITY_ATTRIBUTES"] = app_module._build_security_identity_attributes(app) + + with app.test_client() as client: + response = client.get( + "/api/login?include_auth_token", + headers={"Accept": "application/json", "Content-Type": "application/json"}, + ) + + assert response.status_code == 200 + assert "oidc" in response.json["response"]["identity_attributes"] + + + +def test_oidc_extension_init_app_populates_internal_oidc_settings(app, oidc_module): + extension = oidc_module.OpenTAKOIDCExtension() + app.config.update( + { + "OTS_OIDC_NAME": "main-oidc", + "OTS_OIDC_CLIENT_ID": "public-client", + "OTS_OIDC_CLIENT_SECRET": "", + "OTS_OIDC_METADATA_URL": "https://issuer.example/.well-known/openid-configuration", + "OTS_OIDC_SCOPE": "openid profile email", + "OTS_OIDC_TOKEN_ENDPOINT_AUTH_METHOD": "", + } + ) + + extension.init_app(app) + + assert app.config["OIDC_ENABLED"] is True + assert app.config["OIDC_RESOURCE_SERVER_ONLY"] is True + assert app.config["OIDC_SCOPES"] == "openid profile email" + assert app.config["OIDC_INTROSPECTION_AUTH_METHOD"] == "none" + assert app.config["OIDC_CLIENT_SECRETS"]["web"]["issuer"] == "https://issuer.example" + assert app.config["OTS_OIDC_CLIENT_REGISTRATION"]["client_kwargs"]["code_challenge_method"] == "S256" + + + +def _load_defaultconfig_module(): + module_path = Path(__file__).resolve().parents[1] / "opentakserver/defaultconfig.py" + spec = util.spec_from_file_location("test_defaultconfig_module", module_path) + assert spec is not None + module = util.module_from_spec(spec) + loader = spec.loader + assert loader is not None + loader.exec_module(module) + return module + + +def test_defaultconfig_disables_oidc_by_default(monkeypatch): + monkeypatch.delenv("OTS_ENABLE_OIDC", raising=False) + + module = _load_defaultconfig_module() + cfg = module.DefaultConfig + + assert cfg.OTS_ENABLE_OIDC is False + + +def test_create_app_defaults_oidc_to_disabled(app): + assert app.config["OTS_ENABLE_OIDC"] is False + + +def test_defaultconfig_reads_oidc_claim_and_role_mapping_from_env(monkeypatch): + monkeypatch.setenv("OTS_OIDC_USERNAME_CLAIMS", "preferred_username,email") + monkeypatch.setenv("OTS_OIDC_EMAIL_CLAIM", "mail") + monkeypatch.setenv("OTS_OIDC_ROLE_CLAIM", "realm_access.roles") + monkeypatch.setenv("OTS_OIDC_ADMIN_ROLES", "global-admin,ots-admin") + monkeypatch.setenv("OTS_OIDC_DEFAULT_ROLES", "viewer") + monkeypatch.setenv("OTS_OIDC_TOKEN_ENDPOINT_AUTH_METHOD", "client_secret_post") + + module = _load_defaultconfig_module() + cfg = module.DefaultConfig + + assert cfg.OTS_OIDC_USERNAME_CLAIMS == "preferred_username,email" + assert cfg.OTS_OIDC_EMAIL_CLAIM == "mail" + assert cfg.OTS_OIDC_ROLE_CLAIM == "realm_access.roles" + assert cfg.OTS_OIDC_ADMIN_ROLES == "global-admin,ots-admin" + assert cfg.OTS_OIDC_DEFAULT_ROLES == "viewer" + assert cfg.OTS_OIDC_TOKEN_ENDPOINT_AUTH_METHOD == "client_secret_post" + + +def test_defaultconfig_oidc_claim_and_role_mapping_defaults_when_env_absent(monkeypatch): + for key in [ + "OTS_OIDC_USERNAME_CLAIMS", + "OTS_OIDC_EMAIL_CLAIM", + "OTS_OIDC_ROLE_CLAIM", + "OTS_OIDC_ADMIN_ROLES", + "OTS_OIDC_DEFAULT_ROLES", + "OTS_OIDC_TOKEN_ENDPOINT_AUTH_METHOD", + ]: + monkeypatch.delenv(key, raising=False) + + module = _load_defaultconfig_module() + cfg = module.DefaultConfig + + assert cfg.OTS_OIDC_USERNAME_CLAIMS == "preferred_username, upn, email, sub" + assert cfg.OTS_OIDC_EMAIL_CLAIM == "email" + assert cfg.OTS_OIDC_ROLE_CLAIM == "groups" + assert cfg.OTS_OIDC_ADMIN_ROLES == "administrator" + assert cfg.OTS_OIDC_DEFAULT_ROLES == "user" + assert cfg.OTS_OIDC_TOKEN_ENDPOINT_AUTH_METHOD == ""