from os import environ from pathlib import Path from django.db import transaction from authentik.brands.models import Brand from authentik.common.oauth.constants import SubModes from authentik.core.models import Application, Group, User from authentik.crypto.models import CertificateKeyPair from authentik.flows.models import Flow, FlowStageBinding from authentik.policies.models import PolicyBinding from authentik.providers.oauth2.models import ( ClientTypes, IssuerMode, OAuth2LogoutMethod, OAuth2Provider, RedirectURI, RedirectURIMatchingMode, ScopeMapping, ) from authentik.stages.identification.models import IdentificationStage from authentik.stages.password.models import PasswordStage BRANDING_CSS_PATH = Path("/templates/branding/nodedc-login.css") GROUP_SPECS = [ ("nodedc:superadmin", False), ("nodedc:launcher:admin", False), ("nodedc:launcher:user", False), ("nodedc:taskmanager:admin", False), ("nodedc:taskmanager:user", False), ] APP_SPECS = [ { "slug": "launcher", "name": "NODE.DC Launcher", "provider_name": "NODE.DC Launcher OIDC", "client_id_env": "LAUNCHER_OIDC_CLIENT_ID", "client_secret_env": "LAUNCHER_OIDC_CLIENT_SECRET", "redirect_uri_env": "LAUNCHER_OIDC_REDIRECT_URI", "logged_out_redirect_uri_env": "LAUNCHER_OIDC_LOGGED_OUT_REDIRECT_URI", "default_logged_out_redirect_uri": "http://launcher.local.nodedc/auth/logged-out", "launch_url_env": "LAUNCHER_BASE_URL", "launch_url": "http://launcher.local.nodedc", "logout_uri_env": "LAUNCHER_LOGOUT_URI", "logout_uri": "http://launcher.local.nodedc/logout", "groups": ["nodedc:superadmin", "nodedc:launcher:admin", "nodedc:launcher:user"], "description": "NODE.DC control plane launcher.", }, { "slug": "task-manager", "name": "NODE.DC Task Manager", "provider_name": "NODE.DC Task Manager OIDC", "client_id_env": "PLANE_OIDC_CLIENT_ID", "client_secret_env": "PLANE_OIDC_CLIENT_SECRET", "redirect_uri_env": "PLANE_OIDC_REDIRECT_URI", "launch_url_env": "TASK_BASE_URL", "launch_url": "http://task.local.nodedc", "logout_uri_env": "TASK_LOGOUT_URI", "logout_uri": "http://task.local.nodedc/logout", "groups": ["nodedc:superadmin", "nodedc:taskmanager:admin", "nodedc:taskmanager:user"], "description": "NODE.DC Plane-based task manager.", }, ] def required_env(name): value = environ.get(name, "").strip() if not value: raise RuntimeError(f"{name} is required") return value def optional_env(name, default=""): return environ.get(name, default).strip() def ensure_group(name, is_superuser=False): group, _ = Group.objects.get_or_create(name=name) group.is_superuser = is_superuser group.save() return group def ensure_groups(): groups = {} for name, is_superuser in GROUP_SPECS: groups[name] = ensure_group(name, is_superuser) return groups def ensure_user_groups(groups): admin_email = environ.get("NODEDC_BOOTSTRAP_ADMIN_EMAIL", "").strip() if not admin_email: return None user = User.objects.filter(email__iexact=admin_email).first() or User.objects.filter( username=admin_email ).first() if user is None: user = User(username=admin_email, email=admin_email, name=admin_email, type="internal") user.username = admin_email user.email = admin_email user.is_active = True user.type = "internal" if environ.get("NODEDC_BOOTSTRAP_ADMIN_PASSWORD"): user.set_password(environ["NODEDC_BOOTSTRAP_ADMIN_PASSWORD"]) user.save() authentik_admins = Group.objects.filter(name="authentik Admins").first() if authentik_admins: user.groups.add(authentik_admins) for name in groups: user.groups.add(groups[name]) return user def ensure_groups_scope_mapping(): mapping, _ = ScopeMapping.objects.get_or_create( name="NODE.DC OAuth Mapping: groups", defaults={ "scope_name": "groups", "description": "Adds Authentik group names to NODE.DC OIDC tokens.", "expression": 'return {"groups": [group.name for group in request.user.groups.all()]}', }, ) mapping.scope_name = "groups" mapping.description = "Adds Authentik group names to NODE.DC OIDC tokens." mapping.expression = 'return {"groups": [group.name for group in request.user.groups.all()]}' mapping.save() return mapping def ensure_profile_scope_mapping(): expression = """ attributes = request.user.attributes or {} display_name = request.user.name or request.user.username name_parts = display_name.split(" ", 1) avatar_url = attributes.get("picture") or attributes.get("avatar_url") or attributes.get("avatar") return { "name": display_name, "given_name": name_parts[0] if name_parts else display_name, "family_name": name_parts[1] if len(name_parts) > 1 else "", "preferred_username": request.user.username, "nickname": request.user.username, "picture": avatar_url, "avatar_url": avatar_url, } """.strip() mapping, _ = ScopeMapping.objects.get_or_create( name="NODE.DC OAuth Mapping: profile context", defaults={ "scope_name": "profile", "description": "Adds normalized NODE.DC profile claims to OIDC tokens.", "expression": expression, }, ) mapping.scope_name = "profile" mapping.description = "Adds normalized NODE.DC profile claims to OIDC tokens." mapping.expression = expression mapping.save() return mapping def default_scope_mappings(): scope_names = ["openid", "email", "offline_access"] mappings = list(ScopeMapping.objects.filter(scope_name__in=scope_names)) mappings.append(ensure_profile_scope_mapping()) mappings.append(ensure_groups_scope_mapping()) return mappings def read_branding_css(): if BRANDING_CSS_PATH.exists(): return BRANDING_CSS_PATH.read_text(encoding="utf-8") return "" def ensure_nodedc_brand(): auth_domain = environ.get("AUTH_DOMAIN", "auth.local.nodedc").strip() or "auth.local.nodedc" authentication_flow = Flow.objects.get(slug="default-authentication-flow") invalidation_flow = Flow.objects.get(slug="default-invalidation-flow") authentication_flow.name = "NODE.DC authentication" authentication_flow.title = "Работайте во всех измерениях." authentication_flow.layout = "stacked" authentication_flow.background = "" authentication_flow.save() identification_stage = IdentificationStage.objects.get( name="default-authentication-identification" ) password_stage = PasswordStage.objects.get(name="default-authentication-password") password_stage.allow_show_password = True password_stage.save() identification_stage.user_fields = ["email"] identification_stage.password_stage = password_stage identification_stage.show_matched_user = False identification_stage.enable_remember_me = False identification_stage.save() FlowStageBinding.objects.filter(target=authentication_flow, stage=password_stage).delete() brand = Brand.objects.filter(domain=auth_domain).first() if brand is None: brand = Brand(domain=auth_domain) Brand.objects.exclude(brand_uuid=brand.brand_uuid).update(default=False) brand.default = True brand.domain = auth_domain brand.branding_title = "NODE.DC" brand.branding_logo = "" brand.branding_favicon = "" brand.branding_custom_css = read_branding_css() brand.flow_authentication = authentication_flow brand.flow_invalidation = invalidation_flow brand.attributes = { **(brand.attributes or {}), "settings": { **((brand.attributes or {}).get("settings") or {}), "locale": "ru", "theme": { **(((brand.attributes or {}).get("settings") or {}).get("theme") or {}), "base": "dark", }, }, } brand.save() return brand def ensure_provider(spec, mappings): authorization_flow = Flow.objects.get(slug="default-provider-authorization-implicit-consent") invalidation_flow = Flow.objects.get(slug="default-invalidation-flow") signing_key = ( CertificateKeyPair.objects.filter(name="authentik Self-signed Certificate").first() or CertificateKeyPair.objects.first() ) if signing_key is None: raise RuntimeError("No Authentik CertificateKeyPair exists for OIDC signing") provider = OAuth2Provider.objects.filter(name=spec["provider_name"]).first() if provider is None: provider = OAuth2Provider(name=spec["provider_name"]) provider.name = spec["provider_name"] provider.client_type = ClientTypes.CONFIDENTIAL provider.client_id = required_env(spec["client_id_env"]) provider.client_secret = required_env(spec["client_secret_env"]) redirect_uri_values = [required_env(spec["redirect_uri_env"])] logged_out_redirect_uri = optional_env( spec.get("logged_out_redirect_uri_env", ""), spec.get("default_logged_out_redirect_uri", ""), ) if logged_out_redirect_uri: redirect_uri_values.append(logged_out_redirect_uri) provider.redirect_uris = [ RedirectURI(RedirectURIMatchingMode.STRICT, redirect_uri) for redirect_uri in dict.fromkeys(redirect_uri_values) ] provider.logout_uri = optional_env(spec.get("logout_uri_env", ""), spec["logout_uri"]) provider.logout_method = OAuth2LogoutMethod.FRONTCHANNEL provider.include_claims_in_id_token = True provider.sub_mode = SubModes.USER_UUID provider.issuer_mode = IssuerMode.PER_PROVIDER provider.authorization_flow = authorization_flow provider.invalidation_flow = invalidation_flow provider.signing_key = signing_key provider.save() provider.property_mappings.set(mappings) return provider def ensure_application(spec, provider, groups): application = Application.objects.filter(slug=spec["slug"]).first() if application is None: application = Application(slug=spec["slug"], name=spec["name"]) application.name = spec["name"] application.slug = spec["slug"] application.group = "NODE.DC" application.provider = provider application.meta_launch_url = optional_env(spec.get("launch_url_env", ""), spec["launch_url"]) application.meta_description = spec["description"] application.meta_publisher = "NODE.DC" application.open_in_new_tab = False application.policy_engine_mode = "any" application.save() PolicyBinding.objects.filter(target=application).exclude( group__name__in=spec["groups"] ).delete() for order, group_name in enumerate(spec["groups"]): binding = PolicyBinding.objects.filter(target=application, group=groups[group_name]).first() if binding is None: binding = PolicyBinding(target=application, group=groups[group_name]) binding.enabled = True binding.negate = False binding.timeout = 30 binding.failure_result = False binding.order = order binding.save() return application @transaction.atomic def main(): brand = ensure_nodedc_brand() groups = ensure_groups() user = ensure_user_groups(groups) mappings = default_scope_mappings() applications = [] providers = [] for spec in APP_SPECS: provider = ensure_provider(spec, mappings) application = ensure_application(spec, provider, groups) providers.append(provider) applications.append(application) summary = { "groups": list(groups), "admin_user": user.email if user else None, "brand": brand.domain, "applications": [application.slug for application in applications], "providers": [provider.name for provider in providers], } print(summary) main()