import json import os import time from secrets import compare_digest from django.contrib.auth import logout from django.conf import settings from django.core.cache import cache from django.http import HttpResponse, HttpResponseRedirect, JsonResponse from django.utils import timezone from django.utils.decorators import method_decorator from django.views import View from django.views.decorators.csrf import csrf_exempt from plane.authentication.utils.host import user_ip from plane.db.models import ExternalIdentityLink, Session, User OIDC_PROVIDER = "authentik" DEFAULT_LOGOUT_GUARD_SECONDS = 30 def get_nodedc_global_logout_url(): value = os.environ.get("PLANE_NODEDC_GLOBAL_LOGOUT_URL", "").strip() return value or None def get_logout_redirect_url(default_url): return get_nodedc_global_logout_url() or default_url def get_nodedc_internal_token(): return ( os.environ.get("PLANE_NODEDC_ACCESS_TOKEN", "").strip() or os.environ.get("NODEDC_INTERNAL_ACCESS_TOKEN", "").strip() or os.environ.get("PLANE_OIDC_CLIENT_SECRET", "").strip() ) def is_internal_logout_request_authorized(request): expected_token = get_nodedc_internal_token() authorization = request.headers.get("Authorization", "") bearer_token = ( authorization.split(" ", 1)[1].strip() if authorization.lower().startswith("bearer ") else "" ) header_token = request.headers.get("X-NODEDC-Internal-Token", "").strip() request_token = bearer_token or header_token return bool(expected_token and request_token and compare_digest(request_token, expected_token)) def get_logout_guard_seconds(): value = os.environ.get("PLANE_NODEDC_LOGOUT_GUARD_SECONDS", "").strip() try: return max(1, int(value)) if value else DEFAULT_LOGOUT_GUARD_SECONDS except ValueError: return DEFAULT_LOGOUT_GUARD_SECONDS def normalize_guard_value(value): return value.strip().lower() if isinstance(value, str) else "" def get_logout_guard_cache_key(kind, value): normalized_value = normalize_guard_value(value) return f"nodedc:logout-guard:{kind}:{normalized_value}" if normalized_value else "" def collect_logout_guard_keys(user=None, payload=None): payload = payload or {} keys = set() for kind in ("subject", "email"): cache_key = get_logout_guard_cache_key(kind, payload.get(kind)) if cache_key: keys.add(cache_key) if user is None: return keys email_key = get_logout_guard_cache_key("email", user.email) if email_key: keys.add(email_key) links = ExternalIdentityLink.objects.filter( provider=OIDC_PROVIDER, user=user, status=ExternalIdentityLink.Status.ACTIVE, ).values_list("subject", "email") for subject, email in links: subject_key = get_logout_guard_cache_key("subject", subject) email_key = get_logout_guard_cache_key("email", email) if subject_key: keys.add(subject_key) if email_key: keys.add(email_key) return keys def mark_logout_guard(user=None, payload=None): guard_keys = collect_logout_guard_keys(user=user, payload=payload) if not guard_keys: return [] logout_time = int(time.time()) ttl = get_logout_guard_seconds() for guard_key in guard_keys: cache.set(guard_key, logout_time, timeout=ttl) return sorted(guard_keys) def logout_current_user(request): if request.user and request.user.is_authenticated: try: user = User.objects.get(pk=request.user.id) mark_logout_guard(user=user) user.last_logout_ip = user_ip(request=request) user.last_logout_time = timezone.now() user.save() except Exception: pass logout(request) def clear_nodedc_auth_cookies(response, request=None): cookie_names = ( getattr(settings, "SESSION_COOKIE_NAME", "session-id"), getattr(settings, "CSRF_COOKIE_NAME", "csrftoken"), getattr(settings, "ADMIN_SESSION_COOKIE_NAME", "admin-session-id"), "sessionid", "session-id", "csrftoken", ) domain = getattr(settings, "SESSION_COOKIE_DOMAIN", None) or getattr(settings, "CSRF_COOKIE_DOMAIN", None) if request is not None: host = request.get_host().split(":", 1)[0].lower() for suffix in (".local.nodedc", ".local.notdc", ".notdc.ru", ".nodedc.ru"): if host.endswith(suffix): domain = domain or suffix break for cookie_name in filter(None, cookie_names): response.delete_cookie(cookie_name, path="/") if domain: session_cookie_name = getattr(settings, "SESSION_COOKIE_NAME", "session-id") response["Set-Cookie"] = ( f'{session_cookie_name}=""; Domain={domain}; ' "expires=Thu, 01 Jan 1970 00:00:00 GMT; Max-Age=0; Path=/" ) response["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" response["Clear-Site-Data"] = '"cookies", "storage"' response["Pragma"] = "no-cache" return response def parse_json_body(request): if not request.body: return {} return json.loads(request.body.decode("utf-8")) def find_logout_user(payload): user_id = payload.get("planeUserId") subject = payload.get("subject") email = payload.get("email") user_id = user_id.strip() if isinstance(user_id, str) else "" subject = subject.strip() if isinstance(subject, str) else "" email = email.strip().lower() if isinstance(email, str) else "" if user_id: user = User.objects.filter(id=user_id).first() if user is not None: return user if subject: link = ExternalIdentityLink.objects.filter( provider=OIDC_PROVIDER, subject=subject, status=ExternalIdentityLink.Status.ACTIVE, ).select_related("user").first() if link is not None: return link.user if email: user = User.objects.filter(email__iexact=email).first() if user is not None: return user link = ExternalIdentityLink.objects.filter( provider=OIDC_PROVIDER, email__iexact=email, status=ExternalIdentityLink.Status.ACTIVE, ).select_related("user").first() if link is not None: return link.user return None def invalidate_user_sessions(user, request): deleted_count, _ = Session.objects.filter(user_id=str(user.id)).delete() try: user.last_logout_ip = user_ip(request=request) user.last_logout_time = timezone.now() user.save(update_fields=["last_logout_ip", "last_logout_time", "updated_at"]) except Exception: pass return deleted_count class NodeDCFrontChannelLogoutEndpoint(View): def get(self, request): logout_current_user(request) response = HttpResponse( "" "NODE.DC Task session closed.", content_type="text/html", ) return clear_nodedc_auth_cookies(response, request) def post(self, request): logout_current_user(request) response = HttpResponseRedirect(get_logout_redirect_url("/")) return clear_nodedc_auth_cookies(response, request) @method_decorator(csrf_exempt, name="dispatch") class NodeDCInternalSessionLogoutEndpoint(View): http_method_names = ["post"] def post(self, request): if not is_internal_logout_request_authorized(request): return JsonResponse( { "ok": False, "error": "internal_logout_unauthorized" if get_nodedc_internal_token() else "internal_logout_not_configured", }, status=401 if get_nodedc_internal_token() else 503, ) try: payload = parse_json_body(request) except (TypeError, ValueError, json.JSONDecodeError): return JsonResponse({"ok": False, "error": "invalid_json"}, status=400) user = find_logout_user(payload) if user is None: mark_logout_guard(payload=payload) return JsonResponse({"ok": True, "deletedSessions": 0, "user": None}) guard_keys = mark_logout_guard(user=user, payload=payload) deleted_sessions = invalidate_user_sessions(user, request) return JsonResponse( { "ok": True, "deletedSessions": deleted_sessions, "guardKeys": len(guard_keys), "user": { "id": str(user.id), "email": user.email, }, } )