from os import environ from django.db import transaction from authentik.common.oauth.constants import SubModes from authentik.core.models import Application, Group, User from authentik.flows.models import Flow from authentik.policies.models import PolicyBinding from authentik.providers.oauth2.models import ( ClientTypes, IssuerMode, OAuth2LogoutMethod, OAuth2Provider, RedirectURI, RedirectURIMatchingMode, ScopeMapping, ) GROUP_SPECS = [ ("nodedc:superadmin", False), ("nodedc:launcher:admin", False), ("nodedc:launcher:user", False), ("nodedc:taskmanager:admin", False), ("nodedc:taskmanager:user", False), ] APP_SPECS = [ { "slug": "launcher", "name": "NODE.DC Launcher", "provider_name": "NODE.DC Launcher OIDC", "client_id_env": "LAUNCHER_OIDC_CLIENT_ID", "client_secret_env": "LAUNCHER_OIDC_CLIENT_SECRET", "redirect_uri_env": "LAUNCHER_OIDC_REDIRECT_URI", "launch_url": "http://launcher.local.nodedc", "logout_uri": "http://launcher.local.nodedc/logout", "groups": ["nodedc:superadmin", "nodedc:launcher:admin", "nodedc:launcher:user"], "description": "NODE.DC control plane launcher.", }, { "slug": "task-manager", "name": "NODE.DC Task Manager", "provider_name": "NODE.DC Task Manager OIDC", "client_id_env": "PLANE_OIDC_CLIENT_ID", "client_secret_env": "PLANE_OIDC_CLIENT_SECRET", "redirect_uri_env": "PLANE_OIDC_REDIRECT_URI", "launch_url": "http://task.local.nodedc", "logout_uri": "http://task.local.nodedc/logout", "groups": ["nodedc:superadmin", "nodedc:taskmanager:admin", "nodedc:taskmanager:user"], "description": "NODE.DC Plane-based task manager.", }, ] def required_env(name): value = environ.get(name, "").strip() if not value: raise RuntimeError(f"{name} is required") return value def ensure_group(name, is_superuser=False): group, _ = Group.objects.get_or_create(name=name) group.is_superuser = is_superuser group.save() return group def ensure_groups(): groups = {} for name, is_superuser in GROUP_SPECS: groups[name] = ensure_group(name, is_superuser) return groups def ensure_user_groups(groups): admin_email = environ.get("NODEDC_BOOTSTRAP_ADMIN_EMAIL", "").strip() if not admin_email: return None user = User.objects.filter(email__iexact=admin_email).first() or User.objects.filter( username=admin_email ).first() if user is None: user = User(username=admin_email, email=admin_email, name=admin_email, type="internal") user.username = admin_email user.email = admin_email user.is_active = True user.type = "internal" if environ.get("NODEDC_BOOTSTRAP_ADMIN_PASSWORD"): user.set_password(environ["NODEDC_BOOTSTRAP_ADMIN_PASSWORD"]) user.save() authentik_admins = Group.objects.filter(name="authentik Admins").first() if authentik_admins: user.groups.add(authentik_admins) for name in groups: user.groups.add(groups[name]) return user def ensure_groups_scope_mapping(): mapping, _ = ScopeMapping.objects.get_or_create( name="NODE.DC OAuth Mapping: groups", defaults={ "scope_name": "groups", "description": "Adds Authentik group names to NODE.DC OIDC tokens.", "expression": 'return {"groups": [group.name for group in request.user.groups.all()]}', }, ) mapping.scope_name = "groups" mapping.description = "Adds Authentik group names to NODE.DC OIDC tokens." mapping.expression = 'return {"groups": [group.name for group in request.user.groups.all()]}' mapping.save() return mapping def default_scope_mappings(): scope_names = ["openid", "email", "profile", "offline_access"] mappings = list(ScopeMapping.objects.filter(scope_name__in=scope_names)) mappings.append(ensure_groups_scope_mapping()) return mappings def ensure_provider(spec, mappings): authorization_flow = Flow.objects.get(slug="default-provider-authorization-implicit-consent") invalidation_flow = Flow.objects.get(slug="default-provider-invalidation-flow") provider = OAuth2Provider.objects.filter(name=spec["provider_name"]).first() if provider is None: provider = OAuth2Provider(name=spec["provider_name"]) provider.name = spec["provider_name"] provider.client_type = ClientTypes.CONFIDENTIAL provider.client_id = required_env(spec["client_id_env"]) provider.client_secret = required_env(spec["client_secret_env"]) provider.redirect_uris = [ RedirectURI(RedirectURIMatchingMode.STRICT, required_env(spec["redirect_uri_env"])) ] provider.logout_uri = spec["logout_uri"] provider.logout_method = OAuth2LogoutMethod.FRONTCHANNEL provider.include_claims_in_id_token = True provider.sub_mode = SubModes.USER_UUID provider.issuer_mode = IssuerMode.PER_PROVIDER provider.authorization_flow = authorization_flow provider.invalidation_flow = invalidation_flow provider.save() provider.property_mappings.set(mappings) return provider def ensure_application(spec, provider, groups): application = Application.objects.filter(slug=spec["slug"]).first() if application is None: application = Application(slug=spec["slug"], name=spec["name"]) application.name = spec["name"] application.slug = spec["slug"] application.group = "NODE.DC" application.provider = provider application.meta_launch_url = spec["launch_url"] application.meta_description = spec["description"] application.meta_publisher = "NODE.DC" application.open_in_new_tab = False application.policy_engine_mode = "any" application.save() PolicyBinding.objects.filter(target=application).exclude( group__name__in=spec["groups"] ).delete() for order, group_name in enumerate(spec["groups"]): binding = PolicyBinding.objects.filter(target=application, group=groups[group_name]).first() if binding is None: binding = PolicyBinding(target=application, group=groups[group_name]) binding.enabled = True binding.negate = False binding.timeout = 30 binding.failure_result = False binding.order = order binding.save() return application @transaction.atomic def main(): groups = ensure_groups() user = ensure_user_groups(groups) mappings = default_scope_mappings() applications = [] providers = [] for spec in APP_SPECS: provider = ensure_provider(spec, mappings) application = ensure_application(spec, provider, groups) providers.append(provider) applications.append(application) summary = { "groups": list(groups), "admin_user": user.email if user else None, "applications": [application.slug for application in applications], "providers": [provider.name for provider in providers], } print(summary) main()