253 lines
8.5 KiB
Python
253 lines
8.5 KiB
Python
from os import environ
|
|
|
|
from django.db import transaction
|
|
|
|
from authentik.common.oauth.constants import SubModes
|
|
from authentik.core.models import Application, Group, User
|
|
from authentik.crypto.models import CertificateKeyPair
|
|
from authentik.flows.models import Flow
|
|
from authentik.policies.models import PolicyBinding
|
|
from authentik.providers.oauth2.models import (
|
|
ClientTypes,
|
|
IssuerMode,
|
|
OAuth2LogoutMethod,
|
|
OAuth2Provider,
|
|
RedirectURI,
|
|
RedirectURIMatchingMode,
|
|
ScopeMapping,
|
|
)
|
|
|
|
GROUP_SPECS = [
|
|
("nodedc:superadmin", False),
|
|
("nodedc:launcher:admin", False),
|
|
("nodedc:launcher:user", False),
|
|
("nodedc:taskmanager:admin", False),
|
|
("nodedc:taskmanager:user", False),
|
|
]
|
|
|
|
APP_SPECS = [
|
|
{
|
|
"slug": "launcher",
|
|
"name": "NODE.DC Launcher",
|
|
"provider_name": "NODE.DC Launcher OIDC",
|
|
"client_id_env": "LAUNCHER_OIDC_CLIENT_ID",
|
|
"client_secret_env": "LAUNCHER_OIDC_CLIENT_SECRET",
|
|
"redirect_uri_env": "LAUNCHER_OIDC_REDIRECT_URI",
|
|
"launch_url": "http://launcher.local.nodedc",
|
|
"logout_uri": "http://launcher.local.nodedc/logout",
|
|
"groups": ["nodedc:superadmin", "nodedc:launcher:admin", "nodedc:launcher:user"],
|
|
"description": "NODE.DC control plane launcher.",
|
|
},
|
|
{
|
|
"slug": "task-manager",
|
|
"name": "NODE.DC Task Manager",
|
|
"provider_name": "NODE.DC Task Manager OIDC",
|
|
"client_id_env": "PLANE_OIDC_CLIENT_ID",
|
|
"client_secret_env": "PLANE_OIDC_CLIENT_SECRET",
|
|
"redirect_uri_env": "PLANE_OIDC_REDIRECT_URI",
|
|
"launch_url": "http://task.local.nodedc",
|
|
"logout_uri": "http://task.local.nodedc/logout",
|
|
"groups": ["nodedc:superadmin", "nodedc:taskmanager:admin", "nodedc:taskmanager:user"],
|
|
"description": "NODE.DC Plane-based task manager.",
|
|
},
|
|
]
|
|
|
|
|
|
def required_env(name):
|
|
value = environ.get(name, "").strip()
|
|
if not value:
|
|
raise RuntimeError(f"{name} is required")
|
|
return value
|
|
|
|
|
|
def ensure_group(name, is_superuser=False):
|
|
group, _ = Group.objects.get_or_create(name=name)
|
|
group.is_superuser = is_superuser
|
|
group.save()
|
|
return group
|
|
|
|
|
|
def ensure_groups():
|
|
groups = {}
|
|
for name, is_superuser in GROUP_SPECS:
|
|
groups[name] = ensure_group(name, is_superuser)
|
|
return groups
|
|
|
|
|
|
def ensure_user_groups(groups):
|
|
admin_email = environ.get("NODEDC_BOOTSTRAP_ADMIN_EMAIL", "").strip()
|
|
if not admin_email:
|
|
return None
|
|
|
|
user = User.objects.filter(email__iexact=admin_email).first() or User.objects.filter(
|
|
username=admin_email
|
|
).first()
|
|
if user is None:
|
|
user = User(username=admin_email, email=admin_email, name=admin_email, type="internal")
|
|
|
|
user.username = admin_email
|
|
user.email = admin_email
|
|
user.is_active = True
|
|
user.type = "internal"
|
|
if environ.get("NODEDC_BOOTSTRAP_ADMIN_PASSWORD"):
|
|
user.set_password(environ["NODEDC_BOOTSTRAP_ADMIN_PASSWORD"])
|
|
user.save()
|
|
|
|
authentik_admins = Group.objects.filter(name="authentik Admins").first()
|
|
if authentik_admins:
|
|
user.groups.add(authentik_admins)
|
|
|
|
for name in groups:
|
|
user.groups.add(groups[name])
|
|
return user
|
|
|
|
|
|
def ensure_groups_scope_mapping():
|
|
mapping, _ = ScopeMapping.objects.get_or_create(
|
|
name="NODE.DC OAuth Mapping: groups",
|
|
defaults={
|
|
"scope_name": "groups",
|
|
"description": "Adds Authentik group names to NODE.DC OIDC tokens.",
|
|
"expression": 'return {"groups": [group.name for group in request.user.groups.all()]}',
|
|
},
|
|
)
|
|
mapping.scope_name = "groups"
|
|
mapping.description = "Adds Authentik group names to NODE.DC OIDC tokens."
|
|
mapping.expression = 'return {"groups": [group.name for group in request.user.groups.all()]}'
|
|
mapping.save()
|
|
return mapping
|
|
|
|
|
|
def ensure_profile_scope_mapping():
|
|
expression = """
|
|
attributes = request.user.attributes or {}
|
|
display_name = request.user.name or request.user.username
|
|
name_parts = display_name.split(" ", 1)
|
|
avatar_url = attributes.get("picture") or attributes.get("avatar_url") or attributes.get("avatar")
|
|
|
|
return {
|
|
"name": display_name,
|
|
"given_name": name_parts[0] if name_parts else display_name,
|
|
"family_name": name_parts[1] if len(name_parts) > 1 else "",
|
|
"preferred_username": request.user.username,
|
|
"nickname": request.user.username,
|
|
"picture": avatar_url,
|
|
"avatar_url": avatar_url,
|
|
}
|
|
""".strip()
|
|
mapping, _ = ScopeMapping.objects.get_or_create(
|
|
name="NODE.DC OAuth Mapping: profile context",
|
|
defaults={
|
|
"scope_name": "profile",
|
|
"description": "Adds normalized NODE.DC profile claims to OIDC tokens.",
|
|
"expression": expression,
|
|
},
|
|
)
|
|
mapping.scope_name = "profile"
|
|
mapping.description = "Adds normalized NODE.DC profile claims to OIDC tokens."
|
|
mapping.expression = expression
|
|
mapping.save()
|
|
return mapping
|
|
|
|
|
|
def default_scope_mappings():
|
|
scope_names = ["openid", "email", "offline_access"]
|
|
mappings = list(ScopeMapping.objects.filter(scope_name__in=scope_names))
|
|
mappings.append(ensure_profile_scope_mapping())
|
|
mappings.append(ensure_groups_scope_mapping())
|
|
return mappings
|
|
|
|
|
|
def ensure_provider(spec, mappings):
|
|
authorization_flow = Flow.objects.get(slug="default-provider-authorization-implicit-consent")
|
|
invalidation_flow = Flow.objects.get(slug="default-provider-invalidation-flow")
|
|
signing_key = (
|
|
CertificateKeyPair.objects.filter(name="authentik Self-signed Certificate").first()
|
|
or CertificateKeyPair.objects.first()
|
|
)
|
|
|
|
if signing_key is None:
|
|
raise RuntimeError("No Authentik CertificateKeyPair exists for OIDC signing")
|
|
|
|
provider = OAuth2Provider.objects.filter(name=spec["provider_name"]).first()
|
|
if provider is None:
|
|
provider = OAuth2Provider(name=spec["provider_name"])
|
|
|
|
provider.name = spec["provider_name"]
|
|
provider.client_type = ClientTypes.CONFIDENTIAL
|
|
provider.client_id = required_env(spec["client_id_env"])
|
|
provider.client_secret = required_env(spec["client_secret_env"])
|
|
provider.redirect_uris = [
|
|
RedirectURI(RedirectURIMatchingMode.STRICT, required_env(spec["redirect_uri_env"]))
|
|
]
|
|
provider.logout_uri = spec["logout_uri"]
|
|
provider.logout_method = OAuth2LogoutMethod.FRONTCHANNEL
|
|
provider.include_claims_in_id_token = True
|
|
provider.sub_mode = SubModes.USER_UUID
|
|
provider.issuer_mode = IssuerMode.PER_PROVIDER
|
|
provider.authorization_flow = authorization_flow
|
|
provider.invalidation_flow = invalidation_flow
|
|
provider.signing_key = signing_key
|
|
provider.save()
|
|
provider.property_mappings.set(mappings)
|
|
return provider
|
|
|
|
|
|
def ensure_application(spec, provider, groups):
|
|
application = Application.objects.filter(slug=spec["slug"]).first()
|
|
if application is None:
|
|
application = Application(slug=spec["slug"], name=spec["name"])
|
|
|
|
application.name = spec["name"]
|
|
application.slug = spec["slug"]
|
|
application.group = "NODE.DC"
|
|
application.provider = provider
|
|
application.meta_launch_url = spec["launch_url"]
|
|
application.meta_description = spec["description"]
|
|
application.meta_publisher = "NODE.DC"
|
|
application.open_in_new_tab = False
|
|
application.policy_engine_mode = "any"
|
|
application.save()
|
|
|
|
PolicyBinding.objects.filter(target=application).exclude(
|
|
group__name__in=spec["groups"]
|
|
).delete()
|
|
for order, group_name in enumerate(spec["groups"]):
|
|
binding = PolicyBinding.objects.filter(target=application, group=groups[group_name]).first()
|
|
if binding is None:
|
|
binding = PolicyBinding(target=application, group=groups[group_name])
|
|
binding.enabled = True
|
|
binding.negate = False
|
|
binding.timeout = 30
|
|
binding.failure_result = False
|
|
binding.order = order
|
|
binding.save()
|
|
|
|
return application
|
|
|
|
|
|
@transaction.atomic
|
|
def main():
|
|
groups = ensure_groups()
|
|
user = ensure_user_groups(groups)
|
|
mappings = default_scope_mappings()
|
|
applications = []
|
|
providers = []
|
|
|
|
for spec in APP_SPECS:
|
|
provider = ensure_provider(spec, mappings)
|
|
application = ensure_application(spec, provider, groups)
|
|
providers.append(provider)
|
|
applications.append(application)
|
|
|
|
summary = {
|
|
"groups": list(groups),
|
|
"admin_user": user.email if user else None,
|
|
"applications": [application.slug for application in applications],
|
|
"providers": [provider.name for provider in providers],
|
|
}
|
|
print(summary)
|
|
|
|
|
|
main()
|