NODEDC_TASKMANAGER/plane-src/apps/api/plane/authentication/views/app/oidc.py

265 lines
8.9 KiB
Python

import base64
import hashlib
import os
import secrets
from urllib.parse import urlencode
import jwt
import requests
from django.http import HttpResponseRedirect
from django.utils import timezone
from django.views import View
from plane.authentication.utils.host import base_host
from plane.authentication.utils.login import user_login
from plane.authentication.utils.redirection_path import get_redirection_path
from plane.db.models import ExternalIdentityLink, User
from plane.utils.path_validator import get_safe_redirect_url, validate_next_path
OIDC_SESSION_KEY = "nodedc_oidc"
OIDC_PROVIDER = "authentik"
DEFAULT_REQUIRED_GROUPS = "nodedc:superadmin,nodedc:taskmanager:admin,nodedc:taskmanager:user"
class NodeDCOIDCInitiateEndpoint(View):
def get(self, request):
config = get_oidc_config()
next_path = validate_next_path(request.GET.get("next_path", ""))
discovery = load_discovery(config["issuer"])
state = secrets.token_urlsafe(32)
nonce = secrets.token_urlsafe(32)
code_verifier = secrets.token_urlsafe(64)
code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest()).decode().rstrip("=")
request.session[OIDC_SESSION_KEY] = {
"state": state,
"nonce": nonce,
"code_verifier": code_verifier,
"next_path": next_path,
}
request.session.save()
params = {
"response_type": "code",
"client_id": config["client_id"],
"redirect_uri": config["redirect_uri"],
"scope": config["scope"],
"state": state,
"nonce": nonce,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
if request.GET.get("prompt") == "login":
params["prompt"] = "login"
return HttpResponseRedirect(f"{discovery['authorization_endpoint']}?{urlencode(params)}")
class NodeDCOIDCCallbackEndpoint(View):
def get(self, request):
config = get_oidc_config()
oidc_session = request.session.get(OIDC_SESSION_KEY) or {}
next_path = oidc_session.get("next_path", "")
base_url = base_host(request=request, is_app=True)
if request.GET.get("error"):
return oidc_error_redirect(base_url, next_path, "oidc_provider_error")
state = request.GET.get("state")
code = request.GET.get("code")
if not state or state != oidc_session.get("state") or not code:
return oidc_error_redirect(base_url, next_path, "oidc_state_failed")
discovery = load_discovery(config["issuer"])
token_set = exchange_code(discovery, config, code, oidc_session.get("code_verifier"))
claims = verify_id_token(discovery, config, token_set["id_token"], oidc_session.get("nonce"))
groups = normalize_groups(claims.get("groups"))
if not has_required_group(groups):
return oidc_error_redirect(base_url, next_path, "oidc_access_denied")
user = resolve_linked_user(
claims=claims,
groups=groups,
auto_link=config["auto_link_email"],
sync_profile=config["sync_profile"],
)
if user is None or not user.is_active:
return oidc_error_redirect(base_url, next_path, "oidc_user_not_linked")
request.session.pop(OIDC_SESSION_KEY, None)
user_login(request=request, user=user, is_app=True)
path = next_path or get_redirection_path(user=user)
return HttpResponseRedirect(get_safe_redirect_url(base_url=base_url, next_path=path, params={}))
def get_oidc_config():
issuer = os.environ.get("PLANE_OIDC_ISSUER", "").strip()
client_id = os.environ.get("PLANE_OIDC_CLIENT_ID", "").strip()
client_secret = os.environ.get("PLANE_OIDC_CLIENT_SECRET", "").strip()
redirect_uri = os.environ.get("PLANE_OIDC_REDIRECT_URI", "").strip()
if not issuer or not client_id or not client_secret or not redirect_uri:
raise RuntimeError("Plane OIDC is not configured")
return {
"issuer": issuer.rstrip("/") + "/",
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
"scope": os.environ.get("PLANE_OIDC_SCOPE", "openid email profile groups"),
"auto_link_email": os.environ.get("PLANE_OIDC_AUTO_LINK_EMAIL", "0") == "1",
"sync_profile": os.environ.get("PLANE_OIDC_SYNC_PROFILE", "1") == "1",
}
def load_discovery(issuer):
response = requests.get(f"{issuer}.well-known/openid-configuration", timeout=10)
response.raise_for_status()
return response.json()
def exchange_code(discovery, config, code, code_verifier):
response = requests.post(
discovery["token_endpoint"],
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": config["redirect_uri"],
"code_verifier": code_verifier,
},
auth=(config["client_id"], config["client_secret"]),
timeout=10,
)
response.raise_for_status()
token_set = response.json()
if not token_set.get("id_token"):
raise RuntimeError("OIDC token response does not contain id_token")
return token_set
def verify_id_token(discovery, config, id_token, nonce):
jwks_client = jwt.PyJWKClient(discovery["jwks_uri"])
signing_key = jwks_client.get_signing_key_from_jwt(id_token)
claims = jwt.decode(
id_token,
signing_key.key,
algorithms=["RS256"],
audience=config["client_id"],
issuer=discovery.get("issuer", config["issuer"]),
)
if claims.get("nonce") != nonce:
raise RuntimeError("OIDC nonce validation failed")
return claims
def normalize_groups(groups):
if isinstance(groups, list):
return list(dict.fromkeys(group for group in groups if isinstance(group, str)))
if isinstance(groups, str) and groups:
return [groups]
return []
def has_required_group(groups):
required_groups = {
group.strip()
for group in os.environ.get("PLANE_OIDC_REQUIRED_GROUPS", DEFAULT_REQUIRED_GROUPS).split(",")
if group.strip()
}
return bool(required_groups.intersection(set(groups)))
def resolve_linked_user(claims, groups, auto_link, sync_profile):
subject = str(claims.get("sub") or "")
email = str(claims.get("email") or "").strip().lower()
if not subject:
return None
link = ExternalIdentityLink.objects.select_related("user").filter(
provider=OIDC_PROVIDER,
subject=subject,
status=ExternalIdentityLink.Status.ACTIVE,
).first()
if link is None and auto_link and email:
user = User.objects.filter(email__iexact=email, is_active=True).first()
if user:
link, _ = ExternalIdentityLink.objects.get_or_create(
provider=OIDC_PROVIDER,
subject=subject,
defaults={"user": user, "email": email, "groups": groups},
)
if link is None:
return None
link.email = email or link.email
link.groups = groups
link.last_login_at = timezone.now()
link.save(update_fields=["email", "groups", "last_login_at", "updated_at"])
user = link.user
user.last_login_medium = OIDC_PROVIDER
user.last_login_time = timezone.now()
update_fields = ["last_login_medium", "last_login_time", "updated_at"]
if sync_profile:
update_fields.extend(sync_user_profile_from_claims(user, claims))
user.save(update_fields=list(dict.fromkeys(update_fields)))
return user
def sync_user_profile_from_claims(user, claims):
updated_fields = []
display_name = first_string_claim(claims, "name", "preferred_username")
given_name = first_string_claim(claims, "given_name")
family_name = first_string_claim(claims, "family_name")
avatar_url = first_string_claim(claims, "picture", "avatar_url", "avatar")
if display_name and user.display_name != display_name:
user.display_name = display_name
updated_fields.append("display_name")
if not given_name and display_name:
name_parts = display_name.split(" ", 1)
given_name = name_parts[0]
family_name = family_name or (name_parts[1] if len(name_parts) > 1 else "")
if given_name and user.first_name != given_name:
user.first_name = given_name
updated_fields.append("first_name")
if family_name is not None and user.last_name != family_name:
user.last_name = family_name
updated_fields.append("last_name")
if avatar_url and user.avatar != avatar_url:
user.avatar = avatar_url
updated_fields.append("avatar")
return updated_fields
def first_string_claim(claims, *keys):
for key in keys:
value = claims.get(key)
if isinstance(value, str) and value:
return value
return None
def oidc_error_redirect(base_url, next_path, error_code):
return HttpResponseRedirect(get_safe_redirect_url(base_url=base_url, next_path=next_path, params={"error": error_code}))