NODEDC_TASKMANAGER/plane-src/apps/api/plane/app/views/codex_agents.py

293 lines
11 KiB
Python

# Python imports
import os
from urllib.parse import quote
# Third party imports
import requests
from django.core.exceptions import ValidationError
from rest_framework import status
from rest_framework.response import Response
# Module imports
from plane.app.permissions import ROLE, allow_permission
from plane.app.views.base import BaseAPIView
from plane.authentication.nodedc_workspace_policy import get_nodedc_workspace_creation_policy
from plane.db.models import Project, Workspace
def get_gateway_config():
base_url = (
os.environ.get("PLANE_NODEDC_AGENT_GATEWAY_URL", "").strip()
or os.environ.get("NODEDC_AGENT_GATEWAY_INTERNAL_URL", "").strip()
or os.environ.get("NODEDC_AGENT_GATEWAY_URL", "").strip()
).rstrip("/")
token = (
os.environ.get("PLANE_NODEDC_AGENT_GATEWAY_TOKEN", "").strip()
or os.environ.get("NODEDC_AGENT_GATEWAY_INTERNAL_TOKEN", "").strip()
)
timeout = float(os.environ.get("PLANE_NODEDC_AGENT_GATEWAY_TIMEOUT_SECONDS", "5") or "5")
return base_url, token, timeout
def owner_path(user):
return quote(str(user.id), safe="")
def agent_path(agent_id):
return quote(str(agent_id), safe="")
def token_path(token_id):
return quote(str(token_id), safe="")
def require_gateway_config():
base_url, token, timeout = get_gateway_config()
if not base_url or not token:
return None, Response(
{
"ok": False,
"error": "codex_agent_gateway_not_configured",
"message": "NODE.DC Codex Agent Gateway URL/token is not configured.",
},
status=status.HTTP_503_SERVICE_UNAVAILABLE,
)
return (base_url, token, timeout), None
def require_codex_agent_entitlement(user, slug):
workspace_policy = get_nodedc_workspace_creation_policy(user, workspace_slug=slug)
service_modules = workspace_policy.get("service_modules") or {}
if not service_modules.get("codex_agents"):
return None, Response(
{
"ok": False,
"error": "codex_agents_not_entitled",
"message": "Codex Agent API is not enabled for this NODE.DC user/workspace.",
},
status=status.HTTP_403_FORBIDDEN,
)
return workspace_policy, None
def require_workspace(slug):
try:
return Workspace.objects.get(slug=slug), None
except Workspace.DoesNotExist:
return None, Response(
{"ok": False, "error": "workspace_not_found"},
status=status.HTTP_404_NOT_FOUND,
)
def validate_project_in_workspace(workspace, project_id):
if not project_id:
return None
try:
exists = Project.objects.filter(id=project_id, workspace=workspace, archived_at__isnull=True).exists()
except ValidationError:
exists = False
if not exists:
return Response(
{
"ok": False,
"error": "project_not_found",
"message": "Project is not available in this workspace.",
},
status=status.HTTP_404_NOT_FOUND,
)
return None
def gateway_request(method, path, payload=None):
config, error_response = require_gateway_config()
if error_response is not None:
return error_response
base_url, token, timeout = config
try:
response = requests.request(
method,
f"{base_url}{path}",
headers={
"Authorization": f"Bearer {token}",
"Accept": "application/json",
},
json=payload,
timeout=timeout,
)
except requests.RequestException:
return Response(
{
"ok": False,
"error": "codex_agent_gateway_unavailable",
"message": "NODE.DC Codex Agent Gateway is unavailable.",
},
status=status.HTTP_502_BAD_GATEWAY,
)
try:
data = response.json()
except ValueError:
data = {
"ok": False,
"error": "codex_agent_gateway_invalid_response",
"message": "NODE.DC Codex Agent Gateway returned a non-JSON response.",
}
return Response(data, status=response.status_code)
class CodexAgentEntitledEndpoint(BaseAPIView):
def require_entitlement(self, request, slug):
_, entitlement_error = require_codex_agent_entitlement(request.user, slug)
return entitlement_error
class CodexAgentListEndpoint(CodexAgentEntitledEndpoint):
@allow_permission(allowed_roles=[ROLE.ADMIN], level="WORKSPACE")
def get(self, request, slug):
entitlement_error = self.require_entitlement(request, slug)
if entitlement_error is not None:
return entitlement_error
return gateway_request("GET", f"/api/internal/v1/owners/{owner_path(request.user)}/agents")
@allow_permission(allowed_roles=[ROLE.ADMIN], level="WORKSPACE")
def post(self, request, slug):
entitlement_error = self.require_entitlement(request, slug)
if entitlement_error is not None:
return entitlement_error
display_name = str(request.data.get("display_name") or "").strip()
if not display_name:
return Response(
{"ok": False, "error": "display_name_required"},
status=status.HTTP_400_BAD_REQUEST,
)
payload = {
"display_name": display_name,
"owner_email": request.user.email or None,
"avatar_url": request.data.get("avatar_url") or None,
}
return gateway_request("POST", f"/api/internal/v1/owners/{owner_path(request.user)}/agents", payload)
class CodexAgentDetailEndpoint(CodexAgentEntitledEndpoint):
@allow_permission(allowed_roles=[ROLE.ADMIN], level="WORKSPACE")
def get(self, request, slug, agent_id):
entitlement_error = self.require_entitlement(request, slug)
if entitlement_error is not None:
return entitlement_error
return gateway_request("GET", f"/api/internal/v1/owners/{owner_path(request.user)}/agents/{agent_path(agent_id)}")
@allow_permission(allowed_roles=[ROLE.ADMIN], level="WORKSPACE")
def patch(self, request, slug, agent_id):
entitlement_error = self.require_entitlement(request, slug)
if entitlement_error is not None:
return entitlement_error
payload = {"actor_user_id": str(request.user.id)}
if "display_name" in request.data:
payload["display_name"] = request.data.get("display_name")
if "avatar_url" in request.data:
payload["avatar_url"] = request.data.get("avatar_url") or None
return gateway_request("PATCH", f"/api/internal/v1/owners/{owner_path(request.user)}/agents/{agent_path(agent_id)}", payload)
class CodexAgentRevokeEndpoint(CodexAgentEntitledEndpoint):
@allow_permission(allowed_roles=[ROLE.ADMIN], level="WORKSPACE")
def post(self, request, slug, agent_id):
entitlement_error = self.require_entitlement(request, slug)
if entitlement_error is not None:
return entitlement_error
return gateway_request(
"POST",
f"/api/internal/v1/owners/{owner_path(request.user)}/agents/{agent_path(agent_id)}/revoke",
{"actor_user_id": str(request.user.id)},
)
class CodexAgentGrantListEndpoint(CodexAgentEntitledEndpoint):
@allow_permission(allowed_roles=[ROLE.ADMIN], level="WORKSPACE")
def get(self, request, slug, agent_id):
entitlement_error = self.require_entitlement(request, slug)
if entitlement_error is not None:
return entitlement_error
return gateway_request("GET", f"/api/internal/v1/owners/{owner_path(request.user)}/agents/{agent_path(agent_id)}/grants")
@allow_permission(allowed_roles=[ROLE.ADMIN], level="WORKSPACE")
def post(self, request, slug, agent_id):
entitlement_error = self.require_entitlement(request, slug)
if entitlement_error is not None:
return entitlement_error
workspace, workspace_error = require_workspace(slug)
if workspace_error is not None:
return workspace_error
project_id = request.data.get("project_id")
project_error = validate_project_in_workspace(workspace, project_id)
if project_error is not None:
return project_error
payload = {
"workspace_slug": slug,
"project_id": str(project_id) if project_id else None,
"scopes": request.data.get("scopes") or [],
"mode": request.data.get("mode") or "voluntary",
}
return gateway_request("POST", f"/api/internal/v1/owners/{owner_path(request.user)}/agents/{agent_path(agent_id)}/grants", payload)
class CodexAgentTokenListEndpoint(CodexAgentEntitledEndpoint):
@allow_permission(allowed_roles=[ROLE.ADMIN], level="WORKSPACE")
def get(self, request, slug, agent_id):
entitlement_error = self.require_entitlement(request, slug)
if entitlement_error is not None:
return entitlement_error
return gateway_request("GET", f"/api/internal/v1/owners/{owner_path(request.user)}/agents/{agent_path(agent_id)}/tokens")
@allow_permission(allowed_roles=[ROLE.ADMIN], level="WORKSPACE")
def post(self, request, slug, agent_id):
entitlement_error = self.require_entitlement(request, slug)
if entitlement_error is not None:
return entitlement_error
payload = {
"name": request.data.get("name") or "Local Codex token",
"expires_at": request.data.get("expires_at") or None,
}
return gateway_request("POST", f"/api/internal/v1/owners/{owner_path(request.user)}/agents/{agent_path(agent_id)}/tokens", payload)
class CodexAgentTokenRevokeEndpoint(CodexAgentEntitledEndpoint):
@allow_permission(allowed_roles=[ROLE.ADMIN], level="WORKSPACE")
def post(self, request, slug, agent_id, token_id):
entitlement_error = self.require_entitlement(request, slug)
if entitlement_error is not None:
return entitlement_error
return gateway_request(
"POST",
f"/api/internal/v1/owners/{owner_path(request.user)}/agents/{agent_path(agent_id)}/tokens/{token_path(token_id)}/revoke",
{"actor_user_id": str(request.user.id)},
)
class CodexAgentSetupEndpoint(CodexAgentEntitledEndpoint):
@allow_permission(allowed_roles=[ROLE.ADMIN], level="WORKSPACE")
def get(self, request, slug, agent_id):
entitlement_error = self.require_entitlement(request, slug)
if entitlement_error is not None:
return entitlement_error
return gateway_request("GET", f"/api/internal/v1/owners/{owner_path(request.user)}/agents/{agent_path(agent_id)}/setup")