NODEDC_PLATFORM/infra/authentik/bootstrap-dev.py

339 lines
12 KiB
Python

from os import environ
from pathlib import Path
from django.db import transaction
from authentik.brands.models import Brand
from authentik.common.oauth.constants import SubModes
from authentik.core.models import Application, Group, User
from authentik.crypto.models import CertificateKeyPair
from authentik.flows.models import Flow, FlowStageBinding
from authentik.policies.models import PolicyBinding
from authentik.providers.oauth2.models import (
ClientTypes,
IssuerMode,
OAuth2LogoutMethod,
OAuth2Provider,
RedirectURI,
RedirectURIMatchingMode,
ScopeMapping,
)
from authentik.stages.identification.models import IdentificationStage
from authentik.stages.password.models import PasswordStage
BRANDING_CSS_PATH = Path("/templates/branding/nodedc-login.css")
GROUP_SPECS = [
("nodedc:superadmin", False),
("nodedc:launcher:admin", False),
("nodedc:launcher:user", False),
("nodedc:taskmanager:admin", False),
("nodedc:taskmanager:user", False),
]
APP_SPECS = [
{
"slug": "launcher",
"name": "NODE.DC Launcher",
"provider_name": "NODE.DC Launcher OIDC",
"client_id_env": "LAUNCHER_OIDC_CLIENT_ID",
"client_secret_env": "LAUNCHER_OIDC_CLIENT_SECRET",
"redirect_uri_env": "LAUNCHER_OIDC_REDIRECT_URI",
"logged_out_redirect_uri_env": "LAUNCHER_OIDC_LOGGED_OUT_REDIRECT_URI",
"default_logged_out_redirect_uri": "http://launcher.local.nodedc/auth/logged-out",
"launch_url_env": "LAUNCHER_BASE_URL",
"launch_url": "http://launcher.local.nodedc",
"logout_uri_env": "LAUNCHER_LOGOUT_URI",
"logout_uri": "http://launcher.local.nodedc/logout",
"groups": ["nodedc:superadmin", "nodedc:launcher:admin", "nodedc:launcher:user"],
"description": "NODE.DC control plane launcher.",
},
{
"slug": "task-manager",
"name": "NODE.DC Task Manager",
"provider_name": "NODE.DC Task Manager OIDC",
"client_id_env": "PLANE_OIDC_CLIENT_ID",
"client_secret_env": "PLANE_OIDC_CLIENT_SECRET",
"redirect_uri_env": "PLANE_OIDC_REDIRECT_URI",
"launch_url_env": "TASK_BASE_URL",
"launch_url": "http://task.local.nodedc",
"logout_uri_env": "TASK_LOGOUT_URI",
"logout_uri": "http://task.local.nodedc/logout",
"groups": ["nodedc:superadmin", "nodedc:taskmanager:admin", "nodedc:taskmanager:user"],
"description": "NODE.DC Plane-based task manager.",
},
]
def required_env(name):
value = environ.get(name, "").strip()
if not value:
raise RuntimeError(f"{name} is required")
return value
def optional_env(name, default=""):
return environ.get(name, default).strip()
def ensure_group(name, is_superuser=False):
group, _ = Group.objects.get_or_create(name=name)
group.is_superuser = is_superuser
group.save()
return group
def ensure_groups():
groups = {}
for name, is_superuser in GROUP_SPECS:
groups[name] = ensure_group(name, is_superuser)
return groups
def ensure_user_groups(groups):
admin_email = environ.get("NODEDC_BOOTSTRAP_ADMIN_EMAIL", "").strip()
if not admin_email:
return None
user = User.objects.filter(email__iexact=admin_email).first() or User.objects.filter(
username=admin_email
).first()
if user is None:
user = User(username=admin_email, email=admin_email, name=admin_email, type="internal")
user.username = admin_email
user.email = admin_email
user.is_active = True
user.type = "internal"
if environ.get("NODEDC_BOOTSTRAP_ADMIN_PASSWORD"):
user.set_password(environ["NODEDC_BOOTSTRAP_ADMIN_PASSWORD"])
user.save()
authentik_admins = Group.objects.filter(name="authentik Admins").first()
if authentik_admins:
user.groups.add(authentik_admins)
for name in groups:
user.groups.add(groups[name])
return user
def ensure_groups_scope_mapping():
mapping, _ = ScopeMapping.objects.get_or_create(
name="NODE.DC OAuth Mapping: groups",
defaults={
"scope_name": "groups",
"description": "Adds Authentik group names to NODE.DC OIDC tokens.",
"expression": 'return {"groups": [group.name for group in request.user.groups.all()]}',
},
)
mapping.scope_name = "groups"
mapping.description = "Adds Authentik group names to NODE.DC OIDC tokens."
mapping.expression = 'return {"groups": [group.name for group in request.user.groups.all()]}'
mapping.save()
return mapping
def ensure_profile_scope_mapping():
expression = """
attributes = request.user.attributes or {}
display_name = request.user.name or request.user.username
name_parts = display_name.split(" ", 1)
avatar_url = attributes.get("picture") or attributes.get("avatar_url") or attributes.get("avatar")
return {
"name": display_name,
"given_name": name_parts[0] if name_parts else display_name,
"family_name": name_parts[1] if len(name_parts) > 1 else "",
"preferred_username": request.user.username,
"nickname": request.user.username,
"picture": avatar_url,
"avatar_url": avatar_url,
}
""".strip()
mapping, _ = ScopeMapping.objects.get_or_create(
name="NODE.DC OAuth Mapping: profile context",
defaults={
"scope_name": "profile",
"description": "Adds normalized NODE.DC profile claims to OIDC tokens.",
"expression": expression,
},
)
mapping.scope_name = "profile"
mapping.description = "Adds normalized NODE.DC profile claims to OIDC tokens."
mapping.expression = expression
mapping.save()
return mapping
def default_scope_mappings():
scope_names = ["openid", "email", "offline_access"]
mappings = list(ScopeMapping.objects.filter(scope_name__in=scope_names))
mappings.append(ensure_profile_scope_mapping())
mappings.append(ensure_groups_scope_mapping())
return mappings
def read_branding_css():
if BRANDING_CSS_PATH.exists():
return BRANDING_CSS_PATH.read_text(encoding="utf-8")
return ""
def ensure_nodedc_brand():
auth_domain = environ.get("AUTH_DOMAIN", "auth.local.nodedc").strip() or "auth.local.nodedc"
authentication_flow = Flow.objects.get(slug="default-authentication-flow")
invalidation_flow = Flow.objects.get(slug="default-invalidation-flow")
authentication_flow.name = "NODE.DC authentication"
authentication_flow.title = "Работайте во всех измерениях."
authentication_flow.layout = "stacked"
authentication_flow.background = ""
authentication_flow.save()
identification_stage = IdentificationStage.objects.get(
name="default-authentication-identification"
)
password_stage = PasswordStage.objects.get(name="default-authentication-password")
password_stage.allow_show_password = True
password_stage.save()
identification_stage.user_fields = ["email"]
identification_stage.password_stage = password_stage
identification_stage.show_matched_user = False
identification_stage.enable_remember_me = False
identification_stage.save()
FlowStageBinding.objects.filter(target=authentication_flow, stage=password_stage).delete()
brand = Brand.objects.filter(domain=auth_domain).first()
if brand is None:
brand = Brand(domain=auth_domain)
Brand.objects.exclude(brand_uuid=brand.brand_uuid).update(default=False)
brand.default = True
brand.domain = auth_domain
brand.branding_title = "NODE.DC"
brand.branding_logo = ""
brand.branding_favicon = ""
brand.branding_custom_css = read_branding_css()
brand.flow_authentication = authentication_flow
brand.flow_invalidation = invalidation_flow
brand.attributes = {
**(brand.attributes or {}),
"settings": {
**((brand.attributes or {}).get("settings") or {}),
"locale": "ru",
"theme": {
**(((brand.attributes or {}).get("settings") or {}).get("theme") or {}),
"base": "dark",
},
},
}
brand.save()
return brand
def ensure_provider(spec, mappings):
authorization_flow = Flow.objects.get(slug="default-provider-authorization-implicit-consent")
invalidation_flow = Flow.objects.get(slug="default-invalidation-flow")
signing_key = (
CertificateKeyPair.objects.filter(name="authentik Self-signed Certificate").first()
or CertificateKeyPair.objects.first()
)
if signing_key is None:
raise RuntimeError("No Authentik CertificateKeyPair exists for OIDC signing")
provider = OAuth2Provider.objects.filter(name=spec["provider_name"]).first()
if provider is None:
provider = OAuth2Provider(name=spec["provider_name"])
provider.name = spec["provider_name"]
provider.client_type = ClientTypes.CONFIDENTIAL
provider.client_id = required_env(spec["client_id_env"])
provider.client_secret = required_env(spec["client_secret_env"])
redirect_uri_values = [required_env(spec["redirect_uri_env"])]
logged_out_redirect_uri = optional_env(
spec.get("logged_out_redirect_uri_env", ""),
spec.get("default_logged_out_redirect_uri", ""),
)
if logged_out_redirect_uri:
redirect_uri_values.append(logged_out_redirect_uri)
provider.redirect_uris = [
RedirectURI(RedirectURIMatchingMode.STRICT, redirect_uri)
for redirect_uri in dict.fromkeys(redirect_uri_values)
]
provider.logout_uri = optional_env(spec.get("logout_uri_env", ""), spec["logout_uri"])
provider.logout_method = OAuth2LogoutMethod.FRONTCHANNEL
provider.include_claims_in_id_token = True
provider.sub_mode = SubModes.USER_UUID
provider.issuer_mode = IssuerMode.PER_PROVIDER
provider.authorization_flow = authorization_flow
provider.invalidation_flow = invalidation_flow
provider.signing_key = signing_key
provider.save()
provider.property_mappings.set(mappings)
return provider
def ensure_application(spec, provider, groups):
application = Application.objects.filter(slug=spec["slug"]).first()
if application is None:
application = Application(slug=spec["slug"], name=spec["name"])
application.name = spec["name"]
application.slug = spec["slug"]
application.group = "NODE.DC"
application.provider = provider
application.meta_launch_url = optional_env(spec.get("launch_url_env", ""), spec["launch_url"])
application.meta_description = spec["description"]
application.meta_publisher = "NODE.DC"
application.open_in_new_tab = False
application.policy_engine_mode = "any"
application.save()
PolicyBinding.objects.filter(target=application).exclude(
group__name__in=spec["groups"]
).delete()
for order, group_name in enumerate(spec["groups"]):
binding = PolicyBinding.objects.filter(target=application, group=groups[group_name]).first()
if binding is None:
binding = PolicyBinding(target=application, group=groups[group_name])
binding.enabled = True
binding.negate = False
binding.timeout = 30
binding.failure_result = False
binding.order = order
binding.save()
return application
@transaction.atomic
def main():
brand = ensure_nodedc_brand()
groups = ensure_groups()
user = ensure_user_groups(groups)
mappings = default_scope_mappings()
applications = []
providers = []
for spec in APP_SPECS:
provider = ensure_provider(spec, mappings)
application = ensure_application(spec, provider, groups)
providers.append(provider)
applications.append(application)
summary = {
"groups": list(groups),
"admin_user": user.email if user else None,
"brand": brand.domain,
"applications": [application.slug for application in applications],
"providers": [provider.name for provider in providers],
}
print(summary)
main()