import json import os import time from secrets import compare_digest from django.contrib.auth import logout from django.conf import settings from django.core.cache import cache from django.db import transaction from django.http import HttpResponse, HttpResponseRedirect, JsonResponse from django.utils import timezone from django.utils.decorators import method_decorator from django.views import View from django.views.decorators.csrf import csrf_exempt from plane.authentication.utils.host import user_ip from plane.db.models import ExternalIdentityLink, IssueAssignee, ProjectMember, Session, User, WorkspaceMember OIDC_PROVIDER = "authentik" DEFAULT_LOGOUT_GUARD_SECONDS = 30 def get_nodedc_global_logout_url(): value = os.environ.get("PLANE_NODEDC_GLOBAL_LOGOUT_URL", "").strip() return value or None def get_logout_redirect_url(default_url): return get_nodedc_global_logout_url() or default_url def get_nodedc_internal_token(): return ( os.environ.get("PLANE_NODEDC_ACCESS_TOKEN", "").strip() or os.environ.get("NODEDC_INTERNAL_ACCESS_TOKEN", "").strip() ) def is_internal_logout_request_authorized(request): expected_token = get_nodedc_internal_token() authorization = request.headers.get("Authorization", "") bearer_token = ( authorization.split(" ", 1)[1].strip() if authorization.lower().startswith("bearer ") else "" ) header_token = request.headers.get("X-NODEDC-Internal-Token", "").strip() request_token = bearer_token or header_token return bool(expected_token and request_token and compare_digest(request_token, expected_token)) def get_logout_guard_seconds(): value = os.environ.get("PLANE_NODEDC_LOGOUT_GUARD_SECONDS", "").strip() try: return max(1, int(value)) if value else DEFAULT_LOGOUT_GUARD_SECONDS except ValueError: return DEFAULT_LOGOUT_GUARD_SECONDS def normalize_guard_value(value): return value.strip().lower() if isinstance(value, str) else "" def get_logout_guard_cache_key(kind, value): normalized_value = normalize_guard_value(value) return f"nodedc:logout-guard:{kind}:{normalized_value}" if normalized_value else "" def collect_logout_guard_keys(user=None, payload=None): payload = payload or {} keys = set() for kind in ("subject", "email"): cache_key = get_logout_guard_cache_key(kind, payload.get(kind)) if cache_key: keys.add(cache_key) if user is None: return keys email_key = get_logout_guard_cache_key("email", user.email) if email_key: keys.add(email_key) links = ExternalIdentityLink.objects.filter( provider=OIDC_PROVIDER, user=user, status=ExternalIdentityLink.Status.ACTIVE, ).values_list("subject", "email") for subject, email in links: subject_key = get_logout_guard_cache_key("subject", subject) email_key = get_logout_guard_cache_key("email", email) if subject_key: keys.add(subject_key) if email_key: keys.add(email_key) return keys def mark_logout_guard(user=None, payload=None): guard_keys = collect_logout_guard_keys(user=user, payload=payload) if not guard_keys: return [] logout_time = int(time.time()) ttl = get_logout_guard_seconds() for guard_key in guard_keys: cache.set(guard_key, logout_time, timeout=ttl) return sorted(guard_keys) def logout_current_user(request): if request.user and request.user.is_authenticated: try: user = User.objects.get(pk=request.user.id) mark_logout_guard(user=user) user.last_logout_ip = user_ip(request=request) user.last_logout_time = timezone.now() user.save() except Exception: pass logout(request) def clear_nodedc_auth_cookies(response, request=None): cookie_names = ( getattr(settings, "SESSION_COOKIE_NAME", "session-id"), getattr(settings, "CSRF_COOKIE_NAME", "csrftoken"), getattr(settings, "ADMIN_SESSION_COOKIE_NAME", "admin-session-id"), "sessionid", "session-id", "csrftoken", ) domain = getattr(settings, "SESSION_COOKIE_DOMAIN", None) or getattr(settings, "CSRF_COOKIE_DOMAIN", None) if request is not None: host = request.get_host().split(":", 1)[0].lower() for suffix in (".local.nodedc", ".local.notdc", ".notdc.ru", ".nodedc.ru"): if host.endswith(suffix): domain = domain or suffix break for cookie_name in filter(None, cookie_names): response.delete_cookie(cookie_name, path="/") if domain: session_cookie_name = getattr(settings, "SESSION_COOKIE_NAME", "session-id") response["Set-Cookie"] = ( f'{session_cookie_name}=""; Domain={domain}; ' "expires=Thu, 01 Jan 1970 00:00:00 GMT; Max-Age=0; Path=/" ) response["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" response["Clear-Site-Data"] = '"cookies", "storage"' response["Pragma"] = "no-cache" return response def parse_json_body(request): if not request.body: return {} return json.loads(request.body.decode("utf-8")) def find_logout_user(payload): user_id = payload.get("planeUserId") subject = payload.get("subject") email = payload.get("email") user_id = user_id.strip() if isinstance(user_id, str) else "" subject = subject.strip() if isinstance(subject, str) else "" email = email.strip().lower() if isinstance(email, str) else "" if user_id: user = User.objects.filter(id=user_id).first() if user is not None: return user if subject: link = ExternalIdentityLink.objects.filter( provider=OIDC_PROVIDER, subject=subject, status=ExternalIdentityLink.Status.ACTIVE, ).select_related("user").first() if link is not None: return link.user if email: user = User.objects.filter(email__iexact=email).first() if user is not None: return user link = ExternalIdentityLink.objects.filter( provider=OIDC_PROVIDER, email__iexact=email, status=ExternalIdentityLink.Status.ACTIVE, ).select_related("user").first() if link is not None: return link.user return None def invalidate_user_sessions(user, request): deleted_count, _ = Session.objects.filter(user_id=str(user.id)).delete() try: user.last_logout_ip = user_ip(request=request) user.last_logout_time = timezone.now() user.save(update_fields=["last_logout_ip", "last_logout_time", "updated_at"]) except Exception: pass return deleted_count def revoke_user_external_identity_links(user): deleted_count, _ = ExternalIdentityLink.objects.filter( provider=OIDC_PROVIDER, user=user, status=ExternalIdentityLink.Status.ACTIVE, ).delete() return deleted_count def delete_queryset(queryset): result = queryset.delete() return result[0] if isinstance(result, tuple) else result def revoke_user_tasker_access(user): deleted_issue_assignees = delete_queryset(IssueAssignee.objects.filter(assignee=user)) deleted_project_memberships = delete_queryset(ProjectMember.objects.filter(member=user)) deleted_workspace_memberships = delete_queryset(WorkspaceMember.objects.filter(member=user)) return { "workspaceMemberships": deleted_workspace_memberships, "projectMemberships": deleted_project_memberships, "issueAssignees": deleted_issue_assignees, } class NodeDCFrontChannelLogoutEndpoint(View): def get(self, request): logout_current_user(request) response = HttpResponse( "" "NODE.DC Task session closed.", content_type="text/html", ) return clear_nodedc_auth_cookies(response, request) def post(self, request): logout_current_user(request) response = HttpResponseRedirect(get_logout_redirect_url("/")) return clear_nodedc_auth_cookies(response, request) @method_decorator(csrf_exempt, name="dispatch") class NodeDCInternalSessionLogoutEndpoint(View): http_method_names = ["post"] def post(self, request): if not is_internal_logout_request_authorized(request): return JsonResponse( { "ok": False, "error": "internal_logout_unauthorized" if get_nodedc_internal_token() else "internal_logout_not_configured", }, status=401 if get_nodedc_internal_token() else 503, ) try: payload = parse_json_body(request) except (TypeError, ValueError, json.JSONDecodeError): return JsonResponse({"ok": False, "error": "invalid_json"}, status=400) user = find_logout_user(payload) if user is None: mark_logout_guard(payload=payload) return JsonResponse({"ok": True, "deletedSessions": 0, "user": None}) guard_keys = mark_logout_guard(user=user, payload=payload) with transaction.atomic(): deleted_sessions = invalidate_user_sessions(user, request) deleted_identity_links = 0 deleted_tasker_access = { "workspaceMemberships": 0, "projectMemberships": 0, "issueAssignees": 0, } if payload.get("revokeIdentityLinks") is True or payload.get("revokeIdentityLink") is True: deleted_identity_links = revoke_user_external_identity_links(user) if payload.get("revokeTaskerAccess") is True or payload.get("revokeMemberships") is True: deleted_tasker_access = revoke_user_tasker_access(user) return JsonResponse( { "ok": True, "deletedSessions": deleted_sessions, "deletedIdentityLinks": deleted_identity_links, "deletedTaskerAccess": deleted_tasker_access, "guardKeys": len(guard_keys), "user": { "id": str(user.id), "email": user.email, }, } )