from os import environ from django.db import transaction from authentik.common.oauth.constants import SubModes from authentik.core.models import Application, Group, User from authentik.crypto.models import CertificateKeyPair from authentik.flows.models import Flow from authentik.policies.models import PolicyBinding from authentik.providers.oauth2.models import ( ClientTypes, IssuerMode, OAuth2LogoutMethod, OAuth2Provider, RedirectURI, RedirectURIMatchingMode, ScopeMapping, ) GROUP_SPECS = [ ("nodedc:superadmin", False), ("nodedc:launcher:admin", False), ("nodedc:launcher:user", False), ("nodedc:taskmanager:admin", False), ("nodedc:taskmanager:user", False), ] APP_SPECS = [ { "slug": "launcher", "name": "NODE.DC Launcher", "provider_name": "NODE.DC Launcher OIDC", "client_id_env": "LAUNCHER_OIDC_CLIENT_ID", "client_secret_env": "LAUNCHER_OIDC_CLIENT_SECRET", "redirect_uri_env": "LAUNCHER_OIDC_REDIRECT_URI", "launch_url": "http://launcher.local.nodedc", "logout_uri": "http://launcher.local.nodedc/logout", "groups": ["nodedc:superadmin", "nodedc:launcher:admin", "nodedc:launcher:user"], "description": "NODE.DC control plane launcher.", }, { "slug": "task-manager", "name": "NODE.DC Task Manager", "provider_name": "NODE.DC Task Manager OIDC", "client_id_env": "PLANE_OIDC_CLIENT_ID", "client_secret_env": "PLANE_OIDC_CLIENT_SECRET", "redirect_uri_env": "PLANE_OIDC_REDIRECT_URI", "launch_url": "http://task.local.nodedc", "logout_uri": "http://task.local.nodedc/logout", "groups": ["nodedc:superadmin", "nodedc:taskmanager:admin", "nodedc:taskmanager:user"], "description": "NODE.DC Plane-based task manager.", }, ] def required_env(name): value = environ.get(name, "").strip() if not value: raise RuntimeError(f"{name} is required") return value def ensure_group(name, is_superuser=False): group, _ = Group.objects.get_or_create(name=name) group.is_superuser = is_superuser group.save() return group def ensure_groups(): groups = {} for name, is_superuser in GROUP_SPECS: groups[name] = ensure_group(name, is_superuser) return groups def ensure_user_groups(groups): admin_email = environ.get("NODEDC_BOOTSTRAP_ADMIN_EMAIL", "").strip() if not admin_email: return None user = User.objects.filter(email__iexact=admin_email).first() or User.objects.filter( username=admin_email ).first() if user is None: user = User(username=admin_email, email=admin_email, name=admin_email, type="internal") user.username = admin_email user.email = admin_email user.is_active = True user.type = "internal" if environ.get("NODEDC_BOOTSTRAP_ADMIN_PASSWORD"): user.set_password(environ["NODEDC_BOOTSTRAP_ADMIN_PASSWORD"]) user.save() authentik_admins = Group.objects.filter(name="authentik Admins").first() if authentik_admins: user.groups.add(authentik_admins) for name in groups: user.groups.add(groups[name]) return user def ensure_groups_scope_mapping(): mapping, _ = ScopeMapping.objects.get_or_create( name="NODE.DC OAuth Mapping: groups", defaults={ "scope_name": "groups", "description": "Adds Authentik group names to NODE.DC OIDC tokens.", "expression": 'return {"groups": [group.name for group in request.user.groups.all()]}', }, ) mapping.scope_name = "groups" mapping.description = "Adds Authentik group names to NODE.DC OIDC tokens." mapping.expression = 'return {"groups": [group.name for group in request.user.groups.all()]}' mapping.save() return mapping def default_scope_mappings(): scope_names = ["openid", "email", "profile", "offline_access"] mappings = list(ScopeMapping.objects.filter(scope_name__in=scope_names)) mappings.append(ensure_groups_scope_mapping()) return mappings def ensure_provider(spec, mappings): authorization_flow = Flow.objects.get(slug="default-provider-authorization-implicit-consent") invalidation_flow = Flow.objects.get(slug="default-provider-invalidation-flow") signing_key = ( CertificateKeyPair.objects.filter(name="authentik Self-signed Certificate").first() or CertificateKeyPair.objects.first() ) if signing_key is None: raise RuntimeError("No Authentik CertificateKeyPair exists for OIDC signing") provider = OAuth2Provider.objects.filter(name=spec["provider_name"]).first() if provider is None: provider = OAuth2Provider(name=spec["provider_name"]) provider.name = spec["provider_name"] provider.client_type = ClientTypes.CONFIDENTIAL provider.client_id = required_env(spec["client_id_env"]) provider.client_secret = required_env(spec["client_secret_env"]) provider.redirect_uris = [ RedirectURI(RedirectURIMatchingMode.STRICT, required_env(spec["redirect_uri_env"])) ] provider.logout_uri = spec["logout_uri"] provider.logout_method = OAuth2LogoutMethod.FRONTCHANNEL provider.include_claims_in_id_token = True provider.sub_mode = SubModes.USER_UUID provider.issuer_mode = IssuerMode.PER_PROVIDER provider.authorization_flow = authorization_flow provider.invalidation_flow = invalidation_flow provider.signing_key = signing_key provider.save() provider.property_mappings.set(mappings) return provider def ensure_application(spec, provider, groups): application = Application.objects.filter(slug=spec["slug"]).first() if application is None: application = Application(slug=spec["slug"], name=spec["name"]) application.name = spec["name"] application.slug = spec["slug"] application.group = "NODE.DC" application.provider = provider application.meta_launch_url = spec["launch_url"] application.meta_description = spec["description"] application.meta_publisher = "NODE.DC" application.open_in_new_tab = False application.policy_engine_mode = "any" application.save() PolicyBinding.objects.filter(target=application).exclude( group__name__in=spec["groups"] ).delete() for order, group_name in enumerate(spec["groups"]): binding = PolicyBinding.objects.filter(target=application, group=groups[group_name]).first() if binding is None: binding = PolicyBinding(target=application, group=groups[group_name]) binding.enabled = True binding.negate = False binding.timeout = 30 binding.failure_result = False binding.order = order binding.save() return application @transaction.atomic def main(): groups = ensure_groups() user = ensure_user_groups(groups) mappings = default_scope_mappings() applications = [] providers = [] for spec in APP_SPECS: provider = ensure_provider(spec, mappings) application = ensure_application(spec, provider, groups) providers.append(provider) applications.append(application) summary = { "groups": list(groups), "admin_user": user.email if user else None, "applications": [application.slug for application in applications], "providers": [provider.name for provider in providers], } print(summary) main()