NODEDC_1C/scripts/agent_runtime_manifest.py

468 lines
18 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
REPO_ROOT = Path(__file__).resolve().parents[1]
EFFECTIVE_RUNTIME_SCHEMA_VERSION = "agent_effective_runtime_v1"
PROMPT_REGISTRY_HEALTH_SCHEMA_VERSION = "prompt_registry_health_v1"
EFFECTIVE_RUNTIME_FILE_NAME = "effective_runtime.json"
CONFIG_TS = REPO_ROOT / "llm_normalizer" / "backend" / "src" / "config.ts"
PROMPT_BUILDER_TS = REPO_ROOT / "llm_normalizer" / "backend" / "src" / "services" / "promptBuilder.ts"
PROMPTS_DIR = REPO_ROOT / "llm_normalizer" / "prompts"
PRESETS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "presets"
SHARED_LLM_CONNECTION_CONFIG = REPO_ROOT / "llm_normalizer" / "data" / "shared_llm_connection.json"
DEFAULT_MCP_PROXY_URL = "http://127.0.0.1:6003"
ASSISTANT_RUNTIME_PROMPT_VERSIONS = {"address_query_runtime_v1"}
BUILTIN_PROMPT_FILES: dict[str, dict[str, str]] = {
"normalizer_v1": {
"system": "system/default.txt",
"developer": "developer/default.txt",
"domain": "domain/default.txt",
"fewshot": "fewshot/default.txt",
},
"normalizer_v1_1": {
"system": "system/default.txt",
"developer": "developer/normalizer_v1_1.txt",
"domain": "domain/normalizer_domain_v1_1.txt",
"fewshot": "fewshot/normalizer_fewshot_v1_1.txt",
},
"normalizer_v1_1_1": {
"system": "system/default.txt",
"developer": "developer/normalizer_v1_1_1.txt",
"domain": "domain/normalizer_domain_v1_1.txt",
"fewshot": "fewshot/normalizer_fewshot_v1_1_1.txt",
},
"normalizer_v1_1_2": {
"system": "system/default.txt",
"developer": "developer/normalizer_v1_1_2.txt",
"domain": "domain/normalizer_domain_v1_1.txt",
"fewshot": "fewshot/normalizer_fewshot_v1_1_2.txt",
},
"normalizer_v1_1_2_1": {
"system": "system/default.txt",
"developer": "developer/normalizer_v1_1_2_1.txt",
"domain": "domain/normalizer_domain_v1_1.txt",
"fewshot": "fewshot/normalizer_fewshot_v1_1_2_1.txt",
},
"normalizer_v2": {
"system": "system/default.txt",
"developer": "developer/normalizer_v2.txt",
"domain": "domain/normalizer_domain_v1_1.txt",
"fewshot": "fewshot/normalizer_v2.txt",
},
"normalizer_v2_0_1": {
"system": "system/default.txt",
"developer": "developer/normalizer_v2_0_1.txt",
"domain": "domain/normalizer_domain_v1_1.txt",
"fewshot": "fewshot/normalizer_v2_0_1.txt",
},
"normalizer_v2_0_2": {
"system": "system/default.txt",
"developer": "developer/normalizer_v2_0_2.txt",
"domain": "domain/normalizer_domain_v1_1.txt",
"fewshot": "fewshot/normalizer_v2_0_2.txt",
},
}
def now_utc_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def repo_relative(path: Path, repo_root: Path = REPO_ROOT) -> str:
try:
return str(path.resolve().relative_to(repo_root.resolve())).replace("\\", "/")
except ValueError:
return str(path.resolve())
def read_json_object(path: Path) -> dict[str, Any]:
parsed = json.loads(path.read_text(encoding="utf-8"))
return parsed if isinstance(parsed, dict) else {}
def write_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", newline="\n")
def git_sha(repo_root: Path = REPO_ROOT) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=str(repo_root),
text=True,
encoding="utf-8",
errors="replace",
capture_output=True,
check=False,
timeout=10,
)
except (OSError, subprocess.SubprocessError):
return "unknown"
if result.returncode != 0:
return "unknown"
return result.stdout.strip() or "unknown"
def read_default_prompt_version(repo_root: Path = REPO_ROOT) -> str | None:
config_path = repo_root / "llm_normalizer" / "backend" / "src" / "config.ts"
if not config_path.exists():
return None
text = config_path.read_text(encoding="utf-8", errors="replace")
match = re.search(r"DEFAULT_PROMPT_VERSION\s*=\s*process\.env\.DEFAULT_PROMPT_VERSION\s*\?\?\s*\"([^\"]+)\"", text)
return match.group(1) if match else None
def load_shared_llm_connection(repo_root: Path = REPO_ROOT) -> dict[str, Any]:
config_path = repo_root / "llm_normalizer" / "data" / "shared_llm_connection.json"
if not config_path.exists():
return {}
try:
raw = read_json_object(config_path)
except (OSError, json.JSONDecodeError):
return {}
connection = raw.get("connection")
return dict(connection) if isinstance(connection, dict) else {}
def _env_bool(value: str | None, default_value: bool) -> bool:
if value is None or value.strip() == "":
return default_value
lowered = value.strip().lower()
return lowered not in {"0", "false", "off", "no"}
def collect_feature_flags(repo_root: Path = REPO_ROOT) -> dict[str, Any]:
config_path = repo_root / "llm_normalizer" / "backend" / "src" / "config.ts"
if not config_path.exists():
return {}
text = config_path.read_text(encoding="utf-8", errors="replace")
pattern = re.compile(
r"export\s+const\s+(FEATURE_[A-Z0-9_]+)\s*=\s*toBooleanFlag\(\s*"
r"process\.env\.\1\s*,\s*(true|false)\s*\)",
re.DOTALL,
)
flags: dict[str, Any] = {}
for name, default_raw in pattern.findall(text):
default_value = default_raw == "true"
flags[name] = {
"value": _env_bool(os.environ.get(name), default_value),
"source": "env" if name in os.environ else "default",
"default": default_value,
}
return flags
def _hash_prompt_files(files: list[dict[str, Any]], repo_root: Path) -> str | None:
present_files = [item for item in files if item.get("exists") is True]
if not present_files:
return None
digest = hashlib.sha256()
for item in sorted(present_files, key=lambda entry: str(entry.get("relative_path") or "")):
path = repo_root / str(item["relative_path"])
digest.update(str(item["relative_path"]).replace("\\", "/").encode("utf-8"))
digest.update(b"\0")
digest.update(path.read_bytes())
digest.update(b"\0")
return digest.hexdigest()
def _prompt_files_for_version(repo_root: Path, prompt_version: str) -> list[dict[str, Any]]:
definitions = BUILTIN_PROMPT_FILES.get(prompt_version)
if not definitions:
return []
files: list[dict[str, Any]] = []
for slot, relative_prompt_path in definitions.items():
relative_path = Path("llm_normalizer") / "prompts" / Path(relative_prompt_path)
file_path = repo_root / relative_path
files.append(
{
"slot": slot,
"prompt_path": relative_prompt_path.replace("\\", "/"),
"relative_path": relative_path.as_posix(),
"exists": file_path.exists(),
"size_bytes": file_path.stat().st_size if file_path.exists() else None,
}
)
return files
def _preset_prompt_versions(repo_root: Path) -> list[dict[str, Any]]:
presets_dir = repo_root / "llm_normalizer" / "data" / "presets"
if not presets_dir.exists():
return []
presets: list[dict[str, Any]] = []
for path in sorted(presets_dir.glob("*.json")):
try:
payload = read_json_object(path)
except (OSError, json.JSONDecodeError):
presets.append(
{
"path": repo_relative(path, repo_root),
"prompt_version": None,
"status": "invalid_json",
}
)
continue
presets.append(
{
"path": repo_relative(path, repo_root),
"prompt_version": str(payload.get("prompt_version") or "").strip() or None,
"status": "ok",
}
)
return presets
def build_prompt_registry_health(
repo_root: Path = REPO_ROOT,
*,
prompt_version: str | None = None,
strict_preset_match: bool = True,
) -> dict[str, Any]:
active_prompt_version = prompt_version or read_default_prompt_version(repo_root) or "unknown"
default_prompt_version = read_default_prompt_version(repo_root)
files = _prompt_files_for_version(repo_root, active_prompt_version)
failures: list[str] = []
warnings: list[str] = []
if active_prompt_version not in BUILTIN_PROMPT_FILES:
failures.append(f"unknown_prompt_version:{active_prompt_version}")
missing_files = [
str(item.get("relative_path"))
for item in files
if item.get("exists") is not True
]
if missing_files:
failures.append("prompt_files_missing:" + ",".join(missing_files))
prompt_hash = _hash_prompt_files(files, repo_root)
if not prompt_hash:
failures.append("prompt_hash_unavailable")
preset_versions = _preset_prompt_versions(repo_root)
mismatched_presets = [
item
for item in preset_versions
if item.get("status") == "ok"
and item.get("prompt_version")
and default_prompt_version
and item.get("prompt_version") != default_prompt_version
]
if mismatched_presets:
message = "preset_version_mismatch:" + ",".join(
f"{item['path']}={item['prompt_version']}" for item in mismatched_presets
)
if strict_preset_match:
failures.append(message)
else:
warnings.append(message)
invalid_presets = [item for item in preset_versions if item.get("status") != "ok"]
if invalid_presets:
failures.append("preset_json_invalid:" + ",".join(str(item.get("path")) for item in invalid_presets))
source = "file" if files and not missing_files else ("unknown" if not files else "partial_file")
status = "pass" if not failures else "fail"
return {
"schema_version": PROMPT_REGISTRY_HEALTH_SCHEMA_VERSION,
"status": status,
"default_prompt_version": default_prompt_version,
"active_prompt_version": active_prompt_version,
"prompt_source": source,
"prompt_hash": prompt_hash,
"prompt_files": files,
"prompt_builder": repo_relative(PROMPT_BUILDER_TS, repo_root),
"config": repo_relative(CONFIG_TS, repo_root),
"preset_versions": preset_versions,
"failures": failures,
"warnings": warnings,
"checked_at": now_utc_iso(),
}
def resolve_effective_prompt_version(repo_root: Path, requested_prompt_version: str | None) -> tuple[str, dict[str, Any]]:
requested = str(requested_prompt_version or "").strip()
default_prompt_version = read_default_prompt_version(repo_root)
if not requested:
resolved = default_prompt_version or "unknown"
return resolved, {
"mode": "default_prompt_version",
"requested_prompt_version": None,
"resolved_prompt_version": resolved,
}
if requested in BUILTIN_PROMPT_FILES:
return requested, {
"mode": "requested_prompt_version",
"requested_prompt_version": requested,
"resolved_prompt_version": requested,
}
if requested in ASSISTANT_RUNTIME_PROMPT_VERSIONS:
resolved = default_prompt_version or "unknown"
return resolved, {
"mode": "assistant_runtime_schema_uses_default_normalizer_prompt",
"requested_prompt_version": requested,
"resolved_prompt_version": resolved,
"assistant_runtime_prompt_version": requested,
}
return requested, {
"mode": "unknown_prompt_version",
"requested_prompt_version": requested,
"resolved_prompt_version": requested,
}
def _get_arg(args: argparse.Namespace | None, name: str, default: Any = None) -> Any:
if args is None:
return default
return getattr(args, name, default)
def build_effective_runtime_manifest(
*,
runner: str,
args: argparse.Namespace | None = None,
repo_root: Path = REPO_ROOT,
spec_path: Path | None = None,
output_dir: Path | None = None,
run_id: str | None = None,
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
requested_prompt_version = str(_get_arg(args, "prompt_version", "") or "").strip() or None
prompt_version, prompt_resolution = resolve_effective_prompt_version(repo_root, requested_prompt_version)
prompt_health = build_prompt_registry_health(repo_root, prompt_version=prompt_version, strict_preset_match=False)
shared_llm = load_shared_llm_connection(repo_root)
llm_provider = str(_get_arg(args, "llm_provider", "") or shared_llm.get("llmProvider") or "unknown")
llm_model = str(_get_arg(args, "llm_model", "") or shared_llm.get("model") or "unknown")
llm_base_url = str(_get_arg(args, "llm_base_url", "") or shared_llm.get("baseUrl") or "")
temperature = _get_arg(args, "temperature", shared_llm.get("temperature"))
max_output_tokens = _get_arg(args, "max_output_tokens", shared_llm.get("maxOutputTokens"))
manifest: dict[str, Any] = {
"schema_version": EFFECTIVE_RUNTIME_SCHEMA_VERSION,
"git_sha": git_sha(repo_root),
"runner": runner,
"run_id": run_id,
"spec_path": repo_relative(spec_path, repo_root) if spec_path else None,
"output_dir": repo_relative(output_dir, repo_root) if output_dir else None,
"backend_url": _get_arg(args, "backend_url"),
"mcp_proxy_url": _get_arg(args, "mcp_proxy_url", os.environ.get("MCP_PROXY_URL") or DEFAULT_MCP_PROXY_URL),
"mcp_channel": _get_arg(args, "mcp_channel", os.environ.get("MCP_CHANNEL")),
"llm_provider": llm_provider,
"llm_model": llm_model,
"llm_base_url": llm_base_url or None,
"temperature": temperature,
"max_output_tokens": max_output_tokens,
"requested_prompt_version": prompt_resolution.get("requested_prompt_version"),
"prompt_version": prompt_version,
"prompt_resolution": prompt_resolution,
"assistant_runtime_prompt_version": prompt_resolution.get("assistant_runtime_prompt_version"),
"prompt_source": prompt_health.get("prompt_source"),
"prompt_hash": prompt_health.get("prompt_hash"),
"prompt_registry_status": prompt_health.get("status"),
"prompt_registry_failures": prompt_health.get("failures") or [],
"prompt_registry_warnings": prompt_health.get("warnings") or [],
"prompt_files": prompt_health.get("prompt_files") or [],
"feature_flags": collect_feature_flags(repo_root),
"shared_llm_connection": {
"path": repo_relative(repo_root / "llm_normalizer" / "data" / "shared_llm_connection.json", repo_root),
"exists": (repo_root / "llm_normalizer" / "data" / "shared_llm_connection.json").exists(),
"connection": shared_llm,
},
"use_mock": bool(_get_arg(args, "use_mock", False)),
"created_at": now_utc_iso(),
}
if extra:
manifest["extra"] = extra
return manifest
def write_effective_runtime_manifest(output_dir: Path, manifest: dict[str, Any]) -> Path:
manifest_path = output_dir / EFFECTIVE_RUNTIME_FILE_NAME
write_json(manifest_path, manifest)
return manifest_path
def write_effective_runtime(
output_dir: Path,
*,
runner: str,
args: argparse.Namespace | None = None,
repo_root: Path = REPO_ROOT,
spec_path: Path | None = None,
run_id: str | None = None,
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
manifest = build_effective_runtime_manifest(
runner=runner,
args=args,
repo_root=repo_root,
spec_path=spec_path,
output_dir=output_dir,
run_id=run_id,
extra=extra,
)
write_effective_runtime_manifest(output_dir, manifest)
return manifest
def validate_effective_runtime_manifest(manifest: dict[str, Any], *, manifest_path: Path | None = None) -> None:
location = f": {manifest_path}" if manifest_path else ""
required_fields = (
"git_sha",
"runner",
"llm_model",
"temperature",
"max_output_tokens",
"prompt_version",
"prompt_source",
"prompt_hash",
)
missing_fields = [
field_name
for field_name in required_fields
if manifest.get(field_name) is None or str(manifest.get(field_name)).strip() == ""
]
if missing_fields:
raise RuntimeError(
f"{EFFECTIVE_RUNTIME_FILE_NAME} is incomplete{location}: missing "
+ ", ".join(missing_fields)
)
if manifest.get("prompt_registry_status") != "pass":
failures = manifest.get("prompt_registry_failures")
failure_text = ",".join(str(item) for item in failures) if isinstance(failures, list) else str(failures or "")
raise RuntimeError(
f"{EFFECTIVE_RUNTIME_FILE_NAME} has failing prompt registry status{location}: "
f"{manifest.get('prompt_registry_status')}; {failure_text}"
)
def load_effective_runtime_manifest(run_dir: Path) -> dict[str, Any]:
manifest_path = run_dir / EFFECTIVE_RUNTIME_FILE_NAME
if not manifest_path.exists():
raise RuntimeError(f"{EFFECTIVE_RUNTIME_FILE_NAME} not found: {manifest_path}")
try:
manifest = read_json_object(manifest_path)
except json.JSONDecodeError as exc:
raise RuntimeError(f"{EFFECTIVE_RUNTIME_FILE_NAME} is invalid JSON: {manifest_path}") from exc
if manifest.get("schema_version") != EFFECTIVE_RUNTIME_SCHEMA_VERSION:
raise RuntimeError(
f"{EFFECTIVE_RUNTIME_FILE_NAME} has unsupported schema_version={manifest.get('schema_version')!r}"
)
validate_effective_runtime_manifest(manifest, manifest_path=manifest_path)
return manifest