import base64 import hashlib import os import secrets from urllib.parse import urlencode import jwt import requests from django.http import HttpResponseRedirect from django.utils import timezone from django.views import View from plane.authentication.utils.host import base_host from plane.authentication.utils.login import user_login from plane.authentication.utils.redirection_path import get_redirection_path from plane.db.models import ExternalIdentityLink, User from plane.utils.path_validator import get_safe_redirect_url, validate_next_path OIDC_SESSION_KEY = "nodedc_oidc" OIDC_PROVIDER = "authentik" DEFAULT_REQUIRED_GROUPS = "nodedc:superadmin,nodedc:taskmanager:admin,nodedc:taskmanager:user" class NodeDCOIDCInitiateEndpoint(View): def get(self, request): config = get_oidc_config() next_path = validate_next_path(request.GET.get("next_path", "")) discovery = load_discovery(config["issuer"]) state = secrets.token_urlsafe(32) nonce = secrets.token_urlsafe(32) code_verifier = secrets.token_urlsafe(64) code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest()).decode().rstrip("=") request.session[OIDC_SESSION_KEY] = { "state": state, "nonce": nonce, "code_verifier": code_verifier, "next_path": next_path, } request.session.save() params = { "response_type": "code", "client_id": config["client_id"], "redirect_uri": config["redirect_uri"], "scope": config["scope"], "state": state, "nonce": nonce, "code_challenge": code_challenge, "code_challenge_method": "S256", } if request.GET.get("prompt") == "login": params["prompt"] = "login" return HttpResponseRedirect(f"{discovery['authorization_endpoint']}?{urlencode(params)}") class NodeDCOIDCCallbackEndpoint(View): def get(self, request): config = get_oidc_config() oidc_session = request.session.get(OIDC_SESSION_KEY) or {} next_path = oidc_session.get("next_path", "") base_url = base_host(request=request, is_app=True) if request.GET.get("error"): return oidc_error_redirect(base_url, next_path, "oidc_provider_error") state = request.GET.get("state") code = request.GET.get("code") if not state or state != oidc_session.get("state") or not code: return oidc_error_redirect(base_url, next_path, "oidc_state_failed") discovery = load_discovery(config["issuer"]) token_set = exchange_code(discovery, config, code, oidc_session.get("code_verifier")) claims = verify_id_token(discovery, config, token_set["id_token"], oidc_session.get("nonce")) groups = normalize_groups(claims.get("groups")) if not has_required_group(groups): return oidc_error_redirect(base_url, next_path, "oidc_access_denied") user = resolve_linked_user( claims=claims, groups=groups, auto_link=config["auto_link_email"], sync_profile=config["sync_profile"], ) if user is None or not user.is_active: return oidc_error_redirect(base_url, next_path, "oidc_user_not_linked") request.session.pop(OIDC_SESSION_KEY, None) user_login(request=request, user=user, is_app=True) path = next_path or get_redirection_path(user=user) return HttpResponseRedirect(get_safe_redirect_url(base_url=base_url, next_path=path, params={})) def get_oidc_config(): issuer = os.environ.get("PLANE_OIDC_ISSUER", "").strip() client_id = os.environ.get("PLANE_OIDC_CLIENT_ID", "").strip() client_secret = os.environ.get("PLANE_OIDC_CLIENT_SECRET", "").strip() redirect_uri = os.environ.get("PLANE_OIDC_REDIRECT_URI", "").strip() if not issuer or not client_id or not client_secret or not redirect_uri: raise RuntimeError("Plane OIDC is not configured") return { "issuer": issuer.rstrip("/") + "/", "client_id": client_id, "client_secret": client_secret, "redirect_uri": redirect_uri, "scope": os.environ.get("PLANE_OIDC_SCOPE", "openid email profile groups"), "auto_link_email": os.environ.get("PLANE_OIDC_AUTO_LINK_EMAIL", "0") == "1", "sync_profile": os.environ.get("PLANE_OIDC_SYNC_PROFILE", "1") == "1", } def load_discovery(issuer): response = requests.get(f"{issuer}.well-known/openid-configuration", timeout=10) response.raise_for_status() return response.json() def exchange_code(discovery, config, code, code_verifier): response = requests.post( discovery["token_endpoint"], data={ "grant_type": "authorization_code", "code": code, "redirect_uri": config["redirect_uri"], "code_verifier": code_verifier, }, auth=(config["client_id"], config["client_secret"]), timeout=10, ) response.raise_for_status() token_set = response.json() if not token_set.get("id_token"): raise RuntimeError("OIDC token response does not contain id_token") return token_set def verify_id_token(discovery, config, id_token, nonce): jwks_client = jwt.PyJWKClient(discovery["jwks_uri"]) signing_key = jwks_client.get_signing_key_from_jwt(id_token) claims = jwt.decode( id_token, signing_key.key, algorithms=["RS256"], audience=config["client_id"], issuer=discovery.get("issuer", config["issuer"]), ) if claims.get("nonce") != nonce: raise RuntimeError("OIDC nonce validation failed") return claims def normalize_groups(groups): if isinstance(groups, list): return list(dict.fromkeys(group for group in groups if isinstance(group, str))) if isinstance(groups, str) and groups: return [groups] return [] def has_required_group(groups): required_groups = { group.strip() for group in os.environ.get("PLANE_OIDC_REQUIRED_GROUPS", DEFAULT_REQUIRED_GROUPS).split(",") if group.strip() } return bool(required_groups.intersection(set(groups))) def resolve_linked_user(claims, groups, auto_link, sync_profile): subject = str(claims.get("sub") or "") email = str(claims.get("email") or "").strip().lower() if not subject: return None link = ExternalIdentityLink.objects.select_related("user").filter( provider=OIDC_PROVIDER, subject=subject, status=ExternalIdentityLink.Status.ACTIVE, ).first() if link is None and auto_link and email: user = User.objects.filter(email__iexact=email, is_active=True).first() if user: link, _ = ExternalIdentityLink.objects.get_or_create( provider=OIDC_PROVIDER, subject=subject, defaults={"user": user, "email": email, "groups": groups}, ) if link is None: return None link.email = email or link.email link.groups = groups link.last_login_at = timezone.now() link.save(update_fields=["email", "groups", "last_login_at", "updated_at"]) user = link.user user.last_login_medium = OIDC_PROVIDER user.last_login_time = timezone.now() update_fields = ["last_login_medium", "last_login_time", "updated_at"] if sync_profile: update_fields.extend(sync_user_profile_from_claims(user, claims)) user.save(update_fields=list(dict.fromkeys(update_fields))) return user def sync_user_profile_from_claims(user, claims): updated_fields = [] display_name = first_string_claim(claims, "name", "preferred_username") given_name = first_string_claim(claims, "given_name") family_name = first_string_claim(claims, "family_name") avatar_url = first_string_claim(claims, "picture", "avatar_url", "avatar") if display_name and user.display_name != display_name: user.display_name = display_name updated_fields.append("display_name") if not given_name and display_name: name_parts = display_name.split(" ", 1) given_name = name_parts[0] family_name = family_name or (name_parts[1] if len(name_parts) > 1 else "") if given_name and user.first_name != given_name: user.first_name = given_name updated_fields.append("first_name") if family_name is not None and user.last_name != family_name: user.last_name = family_name updated_fields.append("last_name") if avatar_url and user.avatar != avatar_url: user.avatar = avatar_url updated_fields.append("avatar") return updated_fields def first_string_claim(claims, *keys): for key in keys: value = claims.get(key) if isinstance(value, str) and value: return value return None def oidc_error_redirect(base_url, next_path, error_code): return HttpResponseRedirect(get_safe_redirect_url(base_url=base_url, next_path=next_path, params={"error": error_code}))