#!/usr/bin/env python3 from __future__ import annotations import argparse import hashlib import json import os import re import subprocess from datetime import datetime, timezone from pathlib import Path from typing import Any REPO_ROOT = Path(__file__).resolve().parents[1] EFFECTIVE_RUNTIME_SCHEMA_VERSION = "agent_effective_runtime_v1" PROMPT_REGISTRY_HEALTH_SCHEMA_VERSION = "prompt_registry_health_v1" EFFECTIVE_RUNTIME_FILE_NAME = "effective_runtime.json" CONFIG_TS = REPO_ROOT / "llm_normalizer" / "backend" / "src" / "config.ts" PROMPT_BUILDER_TS = REPO_ROOT / "llm_normalizer" / "backend" / "src" / "services" / "promptBuilder.ts" PROMPTS_DIR = REPO_ROOT / "llm_normalizer" / "prompts" PRESETS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "presets" SHARED_LLM_CONNECTION_CONFIG = REPO_ROOT / "llm_normalizer" / "data" / "shared_llm_connection.json" DEFAULT_MCP_PROXY_URL = "http://127.0.0.1:6003" ASSISTANT_RUNTIME_PROMPT_VERSIONS = {"address_query_runtime_v1"} BUILTIN_PROMPT_FILES: dict[str, dict[str, str]] = { "normalizer_v1": { "system": "system/default.txt", "developer": "developer/default.txt", "domain": "domain/default.txt", "fewshot": "fewshot/default.txt", }, "normalizer_v1_1": { "system": "system/default.txt", "developer": "developer/normalizer_v1_1.txt", "domain": "domain/normalizer_domain_v1_1.txt", "fewshot": "fewshot/normalizer_fewshot_v1_1.txt", }, "normalizer_v1_1_1": { "system": "system/default.txt", "developer": "developer/normalizer_v1_1_1.txt", "domain": "domain/normalizer_domain_v1_1.txt", "fewshot": "fewshot/normalizer_fewshot_v1_1_1.txt", }, "normalizer_v1_1_2": { "system": "system/default.txt", "developer": "developer/normalizer_v1_1_2.txt", "domain": "domain/normalizer_domain_v1_1.txt", "fewshot": "fewshot/normalizer_fewshot_v1_1_2.txt", }, "normalizer_v1_1_2_1": { "system": "system/default.txt", "developer": "developer/normalizer_v1_1_2_1.txt", "domain": "domain/normalizer_domain_v1_1.txt", "fewshot": "fewshot/normalizer_fewshot_v1_1_2_1.txt", }, "normalizer_v2": { "system": "system/default.txt", "developer": "developer/normalizer_v2.txt", "domain": "domain/normalizer_domain_v1_1.txt", "fewshot": "fewshot/normalizer_v2.txt", }, "normalizer_v2_0_1": { "system": "system/default.txt", "developer": "developer/normalizer_v2_0_1.txt", "domain": "domain/normalizer_domain_v1_1.txt", "fewshot": "fewshot/normalizer_v2_0_1.txt", }, "normalizer_v2_0_2": { "system": "system/default.txt", "developer": "developer/normalizer_v2_0_2.txt", "domain": "domain/normalizer_domain_v1_1.txt", "fewshot": "fewshot/normalizer_v2_0_2.txt", }, } def now_utc_iso() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat() def repo_relative(path: Path, repo_root: Path = REPO_ROOT) -> str: try: return str(path.resolve().relative_to(repo_root.resolve())).replace("\\", "/") except ValueError: return str(path.resolve()) def read_json_object(path: Path) -> dict[str, Any]: parsed = json.loads(path.read_text(encoding="utf-8")) return parsed if isinstance(parsed, dict) else {} def write_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", newline="\n") def git_sha(repo_root: Path = REPO_ROOT) -> str: try: result = subprocess.run( ["git", "rev-parse", "HEAD"], cwd=str(repo_root), text=True, encoding="utf-8", errors="replace", capture_output=True, check=False, timeout=10, ) except (OSError, subprocess.SubprocessError): return "unknown" if result.returncode != 0: return "unknown" return result.stdout.strip() or "unknown" def read_default_prompt_version(repo_root: Path = REPO_ROOT) -> str | None: config_path = repo_root / "llm_normalizer" / "backend" / "src" / "config.ts" if not config_path.exists(): return None text = config_path.read_text(encoding="utf-8", errors="replace") match = re.search(r"DEFAULT_PROMPT_VERSION\s*=\s*process\.env\.DEFAULT_PROMPT_VERSION\s*\?\?\s*\"([^\"]+)\"", text) return match.group(1) if match else None def load_shared_llm_connection(repo_root: Path = REPO_ROOT) -> dict[str, Any]: config_path = repo_root / "llm_normalizer" / "data" / "shared_llm_connection.json" if not config_path.exists(): return {} try: raw = read_json_object(config_path) except (OSError, json.JSONDecodeError): return {} connection = raw.get("connection") return dict(connection) if isinstance(connection, dict) else {} def _env_bool(value: str | None, default_value: bool) -> bool: if value is None or value.strip() == "": return default_value lowered = value.strip().lower() return lowered not in {"0", "false", "off", "no"} def collect_feature_flags(repo_root: Path = REPO_ROOT) -> dict[str, Any]: config_path = repo_root / "llm_normalizer" / "backend" / "src" / "config.ts" if not config_path.exists(): return {} text = config_path.read_text(encoding="utf-8", errors="replace") pattern = re.compile( r"export\s+const\s+(FEATURE_[A-Z0-9_]+)\s*=\s*toBooleanFlag\(\s*" r"process\.env\.\1\s*,\s*(true|false)\s*\)", re.DOTALL, ) flags: dict[str, Any] = {} for name, default_raw in pattern.findall(text): default_value = default_raw == "true" flags[name] = { "value": _env_bool(os.environ.get(name), default_value), "source": "env" if name in os.environ else "default", "default": default_value, } return flags def _hash_prompt_files(files: list[dict[str, Any]], repo_root: Path) -> str | None: present_files = [item for item in files if item.get("exists") is True] if not present_files: return None digest = hashlib.sha256() for item in sorted(present_files, key=lambda entry: str(entry.get("relative_path") or "")): path = repo_root / str(item["relative_path"]) digest.update(str(item["relative_path"]).replace("\\", "/").encode("utf-8")) digest.update(b"\0") digest.update(path.read_bytes()) digest.update(b"\0") return digest.hexdigest() def _prompt_files_for_version(repo_root: Path, prompt_version: str) -> list[dict[str, Any]]: definitions = BUILTIN_PROMPT_FILES.get(prompt_version) if not definitions: return [] files: list[dict[str, Any]] = [] for slot, relative_prompt_path in definitions.items(): relative_path = Path("llm_normalizer") / "prompts" / Path(relative_prompt_path) file_path = repo_root / relative_path files.append( { "slot": slot, "prompt_path": relative_prompt_path.replace("\\", "/"), "relative_path": relative_path.as_posix(), "exists": file_path.exists(), "size_bytes": file_path.stat().st_size if file_path.exists() else None, } ) return files def _preset_prompt_versions(repo_root: Path) -> list[dict[str, Any]]: presets_dir = repo_root / "llm_normalizer" / "data" / "presets" if not presets_dir.exists(): return [] presets: list[dict[str, Any]] = [] for path in sorted(presets_dir.glob("*.json")): try: payload = read_json_object(path) except (OSError, json.JSONDecodeError): presets.append( { "path": repo_relative(path, repo_root), "prompt_version": None, "status": "invalid_json", } ) continue presets.append( { "path": repo_relative(path, repo_root), "prompt_version": str(payload.get("prompt_version") or "").strip() or None, "status": "ok", } ) return presets def build_prompt_registry_health( repo_root: Path = REPO_ROOT, *, prompt_version: str | None = None, strict_preset_match: bool = True, ) -> dict[str, Any]: active_prompt_version = prompt_version or read_default_prompt_version(repo_root) or "unknown" default_prompt_version = read_default_prompt_version(repo_root) files = _prompt_files_for_version(repo_root, active_prompt_version) failures: list[str] = [] warnings: list[str] = [] if active_prompt_version not in BUILTIN_PROMPT_FILES: failures.append(f"unknown_prompt_version:{active_prompt_version}") missing_files = [ str(item.get("relative_path")) for item in files if item.get("exists") is not True ] if missing_files: failures.append("prompt_files_missing:" + ",".join(missing_files)) prompt_hash = _hash_prompt_files(files, repo_root) if not prompt_hash: failures.append("prompt_hash_unavailable") preset_versions = _preset_prompt_versions(repo_root) mismatched_presets = [ item for item in preset_versions if item.get("status") == "ok" and item.get("prompt_version") and default_prompt_version and item.get("prompt_version") != default_prompt_version ] if mismatched_presets: message = "preset_version_mismatch:" + ",".join( f"{item['path']}={item['prompt_version']}" for item in mismatched_presets ) if strict_preset_match: failures.append(message) else: warnings.append(message) invalid_presets = [item for item in preset_versions if item.get("status") != "ok"] if invalid_presets: failures.append("preset_json_invalid:" + ",".join(str(item.get("path")) for item in invalid_presets)) source = "file" if files and not missing_files else ("unknown" if not files else "partial_file") status = "pass" if not failures else "fail" return { "schema_version": PROMPT_REGISTRY_HEALTH_SCHEMA_VERSION, "status": status, "default_prompt_version": default_prompt_version, "active_prompt_version": active_prompt_version, "prompt_source": source, "prompt_hash": prompt_hash, "prompt_files": files, "prompt_builder": repo_relative(PROMPT_BUILDER_TS, repo_root), "config": repo_relative(CONFIG_TS, repo_root), "preset_versions": preset_versions, "failures": failures, "warnings": warnings, "checked_at": now_utc_iso(), } def resolve_effective_prompt_version(repo_root: Path, requested_prompt_version: str | None) -> tuple[str, dict[str, Any]]: requested = str(requested_prompt_version or "").strip() default_prompt_version = read_default_prompt_version(repo_root) if not requested: resolved = default_prompt_version or "unknown" return resolved, { "mode": "default_prompt_version", "requested_prompt_version": None, "resolved_prompt_version": resolved, } if requested in BUILTIN_PROMPT_FILES: return requested, { "mode": "requested_prompt_version", "requested_prompt_version": requested, "resolved_prompt_version": requested, } if requested in ASSISTANT_RUNTIME_PROMPT_VERSIONS: resolved = default_prompt_version or "unknown" return resolved, { "mode": "assistant_runtime_schema_uses_default_normalizer_prompt", "requested_prompt_version": requested, "resolved_prompt_version": resolved, "assistant_runtime_prompt_version": requested, } return requested, { "mode": "unknown_prompt_version", "requested_prompt_version": requested, "resolved_prompt_version": requested, } def _get_arg(args: argparse.Namespace | None, name: str, default: Any = None) -> Any: if args is None: return default return getattr(args, name, default) def build_effective_runtime_manifest( *, runner: str, args: argparse.Namespace | None = None, repo_root: Path = REPO_ROOT, spec_path: Path | None = None, output_dir: Path | None = None, run_id: str | None = None, extra: dict[str, Any] | None = None, ) -> dict[str, Any]: requested_prompt_version = str(_get_arg(args, "prompt_version", "") or "").strip() or None prompt_version, prompt_resolution = resolve_effective_prompt_version(repo_root, requested_prompt_version) prompt_health = build_prompt_registry_health(repo_root, prompt_version=prompt_version, strict_preset_match=False) shared_llm = load_shared_llm_connection(repo_root) llm_provider = str(_get_arg(args, "llm_provider", "") or shared_llm.get("llmProvider") or "unknown") llm_model = str(_get_arg(args, "llm_model", "") or shared_llm.get("model") or "unknown") llm_base_url = str(_get_arg(args, "llm_base_url", "") or shared_llm.get("baseUrl") or "") temperature = _get_arg(args, "temperature", shared_llm.get("temperature")) max_output_tokens = _get_arg(args, "max_output_tokens", shared_llm.get("maxOutputTokens")) manifest: dict[str, Any] = { "schema_version": EFFECTIVE_RUNTIME_SCHEMA_VERSION, "git_sha": git_sha(repo_root), "runner": runner, "run_id": run_id, "spec_path": repo_relative(spec_path, repo_root) if spec_path else None, "output_dir": repo_relative(output_dir, repo_root) if output_dir else None, "backend_url": _get_arg(args, "backend_url"), "mcp_proxy_url": _get_arg(args, "mcp_proxy_url", os.environ.get("MCP_PROXY_URL") or DEFAULT_MCP_PROXY_URL), "mcp_channel": _get_arg(args, "mcp_channel", os.environ.get("MCP_CHANNEL")), "llm_provider": llm_provider, "llm_model": llm_model, "llm_base_url": llm_base_url or None, "temperature": temperature, "max_output_tokens": max_output_tokens, "requested_prompt_version": prompt_resolution.get("requested_prompt_version"), "prompt_version": prompt_version, "prompt_resolution": prompt_resolution, "assistant_runtime_prompt_version": prompt_resolution.get("assistant_runtime_prompt_version"), "prompt_source": prompt_health.get("prompt_source"), "prompt_hash": prompt_health.get("prompt_hash"), "prompt_registry_status": prompt_health.get("status"), "prompt_registry_failures": prompt_health.get("failures") or [], "prompt_registry_warnings": prompt_health.get("warnings") or [], "prompt_files": prompt_health.get("prompt_files") or [], "feature_flags": collect_feature_flags(repo_root), "shared_llm_connection": { "path": repo_relative(repo_root / "llm_normalizer" / "data" / "shared_llm_connection.json", repo_root), "exists": (repo_root / "llm_normalizer" / "data" / "shared_llm_connection.json").exists(), "connection": shared_llm, }, "use_mock": bool(_get_arg(args, "use_mock", False)), "created_at": now_utc_iso(), } if extra: manifest["extra"] = extra return manifest def write_effective_runtime_manifest(output_dir: Path, manifest: dict[str, Any]) -> Path: manifest_path = output_dir / EFFECTIVE_RUNTIME_FILE_NAME write_json(manifest_path, manifest) return manifest_path def write_effective_runtime( output_dir: Path, *, runner: str, args: argparse.Namespace | None = None, repo_root: Path = REPO_ROOT, spec_path: Path | None = None, run_id: str | None = None, extra: dict[str, Any] | None = None, ) -> dict[str, Any]: manifest = build_effective_runtime_manifest( runner=runner, args=args, repo_root=repo_root, spec_path=spec_path, output_dir=output_dir, run_id=run_id, extra=extra, ) write_effective_runtime_manifest(output_dir, manifest) return manifest def validate_effective_runtime_manifest(manifest: dict[str, Any], *, manifest_path: Path | None = None) -> None: location = f": {manifest_path}" if manifest_path else "" required_fields = ( "git_sha", "runner", "llm_model", "temperature", "max_output_tokens", "prompt_version", "prompt_source", "prompt_hash", ) missing_fields = [ field_name for field_name in required_fields if manifest.get(field_name) is None or str(manifest.get(field_name)).strip() == "" ] if missing_fields: raise RuntimeError( f"{EFFECTIVE_RUNTIME_FILE_NAME} is incomplete{location}: missing " + ", ".join(missing_fields) ) if manifest.get("prompt_registry_status") != "pass": failures = manifest.get("prompt_registry_failures") failure_text = ",".join(str(item) for item in failures) if isinstance(failures, list) else str(failures or "") raise RuntimeError( f"{EFFECTIVE_RUNTIME_FILE_NAME} has failing prompt registry status{location}: " f"{manifest.get('prompt_registry_status')}; {failure_text}" ) def load_effective_runtime_manifest(run_dir: Path) -> dict[str, Any]: manifest_path = run_dir / EFFECTIVE_RUNTIME_FILE_NAME if not manifest_path.exists(): raise RuntimeError(f"{EFFECTIVE_RUNTIME_FILE_NAME} not found: {manifest_path}") try: manifest = read_json_object(manifest_path) except json.JSONDecodeError as exc: raise RuntimeError(f"{EFFECTIVE_RUNTIME_FILE_NAME} is invalid JSON: {manifest_path}") from exc if manifest.get("schema_version") != EFFECTIVE_RUNTIME_SCHEMA_VERSION: raise RuntimeError( f"{EFFECTIVE_RUNTIME_FILE_NAME} has unsupported schema_version={manifest.get('schema_version')!r}" ) validate_effective_runtime_manifest(manifest, manifest_path=manifest_path) return manifest