559 lines
25 KiB
Python
559 lines
25 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
ORCHESTRATION_DIR = REPO_ROOT / "docs" / "orchestration"
|
|
SCHEMA_DIR = REPO_ROOT / "docs" / "orchestration" / "schemas"
|
|
ISSUE_CATALOG_PATH = REPO_ROOT / "docs" / "orchestration" / "issue_catalog.json"
|
|
DETECTOR_REGISTRY_PATH = REPO_ROOT / "docs" / "orchestration" / "detector_registry.json"
|
|
CONTRACTS_DIR = REPO_ROOT / "docs" / "orchestration" / "contracts"
|
|
BUSINESS_ANSWER_CONTRACT_SCHEMA_VERSION = "business_answer_contract_v1"
|
|
DOMAIN_SCENARIO_PACK_SCHEMA_VERSION = "domain_scenario_pack_v1"
|
|
EXPECTED_SCHEMA_FILES = {
|
|
"agent_issue_catalog.schema.json": "Agent Issue Catalog",
|
|
"agent_detector_registry.schema.json": "Agent Detector Registry",
|
|
"agent_detector_results.schema.json": "Agent Detector Results",
|
|
"auto_coder_gate.schema.json": "Auto-Coder Gate",
|
|
"business_answer_contract.schema.json": "Business Answer Contract",
|
|
"business_audit_contract.schema.json": "Business Audit Contract",
|
|
"domain_scenario_pack.schema.json": "Domain Scenario Pack",
|
|
"domain_loop_lead_coder_handoff.schema.json": "Domain Loop Lead Coder Handoff",
|
|
}
|
|
AUTO_CODER_ALLOWED_ISSUE_CODES = {
|
|
"business_direct_answer_missing",
|
|
"business_next_step_missing",
|
|
"technical_garbage_in_answer",
|
|
}
|
|
REQUIRED_ISSUE_FIELDS = {
|
|
"severity",
|
|
"business_meaning",
|
|
"root_layers",
|
|
"detectors",
|
|
"allowed_patch_targets",
|
|
"forbidden_patch_targets",
|
|
"rerun_matrix",
|
|
}
|
|
VALID_SEVERITIES = {"P0", "P1", "P2", "P3", "WARNING"}
|
|
|
|
|
|
def read_json(path: Path) -> Any:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def display_path(path: Path) -> str:
|
|
try:
|
|
return str(path.resolve().relative_to(REPO_ROOT))
|
|
except ValueError:
|
|
return str(path)
|
|
|
|
|
|
def normalize_string_list(value: Any) -> list[str]:
|
|
if isinstance(value, str):
|
|
stripped = value.strip()
|
|
return [stripped] if stripped else []
|
|
if not isinstance(value, list):
|
|
return []
|
|
result: list[str] = []
|
|
for item in value:
|
|
stripped = str(item or "").strip()
|
|
if stripped:
|
|
result.append(stripped)
|
|
return result
|
|
|
|
|
|
def has_answer_contract(issue: dict[str, Any]) -> bool:
|
|
if str(issue.get("expected_answer_contract") or "").strip():
|
|
return True
|
|
acceptance = issue.get("acceptance")
|
|
if not isinstance(acceptance, dict):
|
|
return False
|
|
return bool(normalize_string_list(acceptance.get("must_have")) or normalize_string_list(acceptance.get("must_not_have")))
|
|
|
|
|
|
def read_json_object_or_empty(path: Path) -> dict[str, Any]:
|
|
try:
|
|
payload = read_json(path)
|
|
except (json.JSONDecodeError, OSError):
|
|
return {}
|
|
return payload if isinstance(payload, dict) else {}
|
|
|
|
|
|
def collect_issue_detector_links(issue_catalog: dict[str, Any]) -> dict[str, set[str]]:
|
|
issues = issue_catalog.get("issues") if isinstance(issue_catalog.get("issues"), dict) else {}
|
|
links: dict[str, set[str]] = {}
|
|
for issue_code, issue in issues.items():
|
|
if not isinstance(issue, dict):
|
|
continue
|
|
for detector in normalize_string_list(issue.get("detectors")):
|
|
links.setdefault(detector, set()).add(str(issue_code))
|
|
return links
|
|
|
|
|
|
def collect_contract_detector_refs(contracts_dir: Path) -> tuple[dict[str, list[str]], list[str]]:
|
|
refs: dict[str, list[str]] = {}
|
|
warnings: list[str] = []
|
|
if not contracts_dir.exists():
|
|
return refs, warnings
|
|
for path in sorted(contracts_dir.glob("*.json")):
|
|
try:
|
|
payload = read_json(path)
|
|
except json.JSONDecodeError as error:
|
|
warnings.append(f"contract_detector_scan_invalid_json:{display_path(path)}:{error.msg}")
|
|
continue
|
|
if not isinstance(payload, dict):
|
|
continue
|
|
for detector in normalize_string_list(payload.get("detectors")):
|
|
refs.setdefault(detector, []).append(display_path(path))
|
|
return refs, warnings
|
|
|
|
|
|
def check_answer_contracts(contracts_dir: Path) -> tuple[dict[str, Any], list[str], list[str], set[str]]:
|
|
failures: list[str] = []
|
|
warnings: list[str] = []
|
|
contract_ids: set[str] = set()
|
|
contract_paths: list[str] = []
|
|
if not contracts_dir.exists():
|
|
return {"path": display_path(contracts_dir), "exists": False}, ["missing_answer_contracts_dir"], warnings, contract_ids
|
|
|
|
for path in sorted(contracts_dir.glob("*.json")):
|
|
display = display_path(path)
|
|
contract_paths.append(display)
|
|
try:
|
|
payload = read_json(path)
|
|
except json.JSONDecodeError as error:
|
|
failures.append(f"invalid_answer_contract_json:{display}:{error.msg}")
|
|
continue
|
|
if not isinstance(payload, dict):
|
|
failures.append(f"answer_contract_not_object:{display}")
|
|
continue
|
|
|
|
schema_version = str(payload.get("schema_version") or "").strip()
|
|
contract_id = str(payload.get("contract_id") or "").strip()
|
|
if schema_version != BUSINESS_ANSWER_CONTRACT_SCHEMA_VERSION:
|
|
failures.append(f"answer_contract_schema_version_mismatch:{display}:{schema_version or 'empty'}")
|
|
if not contract_id:
|
|
failures.append(f"answer_contract_missing_contract_id:{display}")
|
|
else:
|
|
if contract_id in contract_ids:
|
|
failures.append(f"answer_contract_duplicate_contract_id:{contract_id}")
|
|
contract_ids.add(contract_id)
|
|
if path.stem != contract_id:
|
|
warnings.append(f"answer_contract_filename_mismatch:{display}:{contract_id}")
|
|
if not str(payload.get("domain") or "").strip():
|
|
failures.append(f"answer_contract_missing_domain:{display}")
|
|
|
|
answer_surface = payload.get("answer_surface") if isinstance(payload.get("answer_surface"), dict) else {}
|
|
if not answer_surface:
|
|
failures.append(f"answer_contract_missing_answer_surface:{display}")
|
|
continue
|
|
required_fields = answer_surface.get("required_fields")
|
|
if not isinstance(required_fields, list) or not required_fields:
|
|
failures.append(f"answer_contract_missing_required_fields:{display}")
|
|
else:
|
|
for index, field in enumerate(required_fields):
|
|
if not isinstance(field, dict):
|
|
failures.append(f"answer_contract_required_field_not_object:{display}:{index}")
|
|
continue
|
|
if not str(field.get("field") or "").strip():
|
|
failures.append(f"answer_contract_required_field_missing_name:{display}:{index}")
|
|
if not str(field.get("meaning") or "").strip():
|
|
failures.append(f"answer_contract_required_field_missing_meaning:{display}:{index}")
|
|
if not normalize_string_list(payload.get("detectors")):
|
|
failures.append(f"answer_contract_missing_detectors:{display}")
|
|
|
|
summary = {
|
|
"path": display_path(contracts_dir),
|
|
"exists": True,
|
|
"contract_count": len(contract_ids),
|
|
"contract_ids": sorted(contract_ids),
|
|
"contract_paths": contract_paths,
|
|
}
|
|
if not contract_ids:
|
|
failures.append("answer_contracts_empty")
|
|
return summary, failures, warnings, contract_ids
|
|
|
|
|
|
def is_broad_patch_target(value: str) -> bool:
|
|
normalized = value.strip().replace("\\", "/").lower()
|
|
broad_targets = {
|
|
".",
|
|
"./",
|
|
"*",
|
|
"**",
|
|
"llm_normalizer",
|
|
"llm_normalizer/",
|
|
"llm_normalizer/backend",
|
|
"llm_normalizer/backend/",
|
|
"llm_normalizer/backend/src",
|
|
"llm_normalizer/backend/src/",
|
|
"llm_normalizer/backend/src/services",
|
|
"llm_normalizer/backend/src/services/",
|
|
"scripts",
|
|
"scripts/",
|
|
"docs",
|
|
"docs/",
|
|
"docs/orchestration",
|
|
"docs/orchestration/",
|
|
}
|
|
forbidden_markers = (
|
|
"active_domain_contract",
|
|
"shared_llm_connection",
|
|
"promptbuilder",
|
|
"prompt_registry",
|
|
"mcp protocol",
|
|
"mcp runtime",
|
|
"fake data",
|
|
"fake fixtures",
|
|
"heuristic masking",
|
|
"global orchestration",
|
|
)
|
|
if not normalized or normalized in broad_targets:
|
|
return True
|
|
if normalized.endswith("/**"):
|
|
return True
|
|
if normalized.count("/") < 2 and ("*" in normalized or normalized.endswith("/")):
|
|
return True
|
|
return any(marker in normalized for marker in forbidden_markers)
|
|
|
|
|
|
def check_schema_files(schema_dir: Path) -> tuple[list[dict[str, Any]], list[str]]:
|
|
checked: list[dict[str, Any]] = []
|
|
failures: list[str] = []
|
|
for filename, expected_title in EXPECTED_SCHEMA_FILES.items():
|
|
path = schema_dir / filename
|
|
item = {"path": display_path(path), "exists": path.exists()}
|
|
if not path.exists():
|
|
failures.append(f"missing_schema_file:{filename}")
|
|
checked.append(item)
|
|
continue
|
|
try:
|
|
payload = read_json(path)
|
|
except json.JSONDecodeError as error:
|
|
failures.append(f"invalid_schema_json:{filename}:{error.msg}")
|
|
checked.append(item)
|
|
continue
|
|
item["title"] = payload.get("title")
|
|
item["schema"] = payload.get("$schema")
|
|
item["type"] = payload.get("type")
|
|
checked.append(item)
|
|
if payload.get("title") != expected_title:
|
|
failures.append(f"schema_title_mismatch:{filename}")
|
|
if payload.get("type") != "object":
|
|
failures.append(f"schema_not_object:{filename}")
|
|
if not str(payload.get("$schema") or "").strip():
|
|
failures.append(f"schema_missing_draft_ref:{filename}")
|
|
return checked, failures
|
|
|
|
|
|
def check_issue_catalog(path: Path) -> tuple[dict[str, Any], list[str], list[str]]:
|
|
failures: list[str] = []
|
|
warnings: list[str] = []
|
|
if not path.exists():
|
|
return {"path": display_path(path), "exists": False}, ["missing_issue_catalog"], warnings
|
|
try:
|
|
payload = read_json(path)
|
|
except json.JSONDecodeError as error:
|
|
return {"path": display_path(path), "exists": True}, [f"invalid_issue_catalog_json:{error.msg}"], warnings
|
|
|
|
issues = payload.get("issues") if isinstance(payload.get("issues"), dict) else {}
|
|
summary = {
|
|
"path": display_path(path),
|
|
"exists": True,
|
|
"schema_version": payload.get("schema_version"),
|
|
"issue_count": len(issues),
|
|
"auto_coder_allowed_issue_codes": sorted(AUTO_CODER_ALLOWED_ISSUE_CODES),
|
|
}
|
|
if payload.get("schema_version") != "agent_issue_catalog_v1":
|
|
failures.append("issue_catalog_schema_version_mismatch")
|
|
if not issues:
|
|
failures.append("issue_catalog_empty")
|
|
|
|
for issue_code, issue in sorted(issues.items()):
|
|
if not isinstance(issue, dict):
|
|
failures.append(f"issue_not_object:{issue_code}")
|
|
continue
|
|
missing_fields = sorted(field for field in REQUIRED_ISSUE_FIELDS if field not in issue)
|
|
for field in missing_fields:
|
|
failures.append(f"issue_missing_field:{issue_code}:{field}")
|
|
severity = str(issue.get("severity") or "").strip().upper()
|
|
if severity not in VALID_SEVERITIES:
|
|
failures.append(f"issue_invalid_severity:{issue_code}:{severity or 'empty'}")
|
|
for field in ("root_layers", "detectors", "allowed_patch_targets", "forbidden_patch_targets", "rerun_matrix"):
|
|
if not normalize_string_list(issue.get(field)):
|
|
failures.append(f"issue_empty_list:{issue_code}:{field}")
|
|
|
|
if issue_code in AUTO_CODER_ALLOWED_ISSUE_CODES:
|
|
if not has_answer_contract(issue):
|
|
failures.append(f"auto_coder_issue_missing_answer_contract:{issue_code}")
|
|
if "accepted_smoke_pack" not in normalize_string_list(issue.get("rerun_matrix")):
|
|
failures.append(f"auto_coder_issue_missing_accepted_smoke_pack:{issue_code}")
|
|
for target in normalize_string_list(issue.get("allowed_patch_targets")):
|
|
if is_broad_patch_target(target):
|
|
failures.append(f"auto_coder_issue_broad_allowed_patch_target:{issue_code}:{target}")
|
|
|
|
missing_allowed = sorted(AUTO_CODER_ALLOWED_ISSUE_CODES.difference(issues.keys()))
|
|
for issue_code in missing_allowed:
|
|
failures.append(f"auto_coder_allowlisted_issue_missing_from_catalog:{issue_code}")
|
|
return summary, failures, warnings
|
|
|
|
|
|
def check_detector_registry(
|
|
path: Path,
|
|
issue_catalog: dict[str, Any] | None = None,
|
|
*,
|
|
include_contracts: bool = True,
|
|
) -> tuple[dict[str, Any], list[str], list[str]]:
|
|
failures: list[str] = []
|
|
warnings: list[str] = []
|
|
if not path.exists():
|
|
return {"path": display_path(path), "exists": False}, ["missing_detector_registry"], warnings
|
|
try:
|
|
payload = read_json(path)
|
|
except json.JSONDecodeError as error:
|
|
return {"path": display_path(path), "exists": True}, [f"invalid_detector_registry_json:{error.msg}"], warnings
|
|
|
|
detectors = payload.get("detectors") if isinstance(payload.get("detectors"), dict) else {}
|
|
catalog = issue_catalog if isinstance(issue_catalog, dict) else {}
|
|
issues = catalog.get("issues") if isinstance(catalog.get("issues"), dict) else {}
|
|
known_issue_codes = set(str(issue_code) for issue_code in issues)
|
|
detector_links = collect_issue_detector_links(catalog)
|
|
contract_refs, contract_warnings = collect_contract_detector_refs(CONTRACTS_DIR) if include_contracts else ({}, [])
|
|
warnings.extend(contract_warnings)
|
|
summary = {
|
|
"path": display_path(path),
|
|
"exists": True,
|
|
"schema_version": payload.get("schema_version"),
|
|
"detector_count": len(detectors),
|
|
"catalog_referenced_detector_count": len(detector_links),
|
|
"contract_referenced_detector_count": len(contract_refs),
|
|
}
|
|
if payload.get("schema_version") != "agent_detector_registry_v1":
|
|
failures.append("detector_registry_schema_version_mismatch")
|
|
if not detectors:
|
|
failures.append("detector_registry_empty")
|
|
|
|
for detector_name, issue_codes in sorted(detector_links.items()):
|
|
if detector_name not in detectors:
|
|
for issue_code in sorted(issue_codes):
|
|
failures.append(f"detector_registry_missing_catalog_detector:{issue_code}:{detector_name}")
|
|
|
|
for detector_name, paths in sorted(contract_refs.items()):
|
|
if detector_name not in detectors:
|
|
for contract_path in paths:
|
|
failures.append(f"detector_registry_missing_contract_detector:{contract_path}:{detector_name}")
|
|
|
|
for detector_name, detector in sorted(detectors.items()):
|
|
if not isinstance(detector, dict):
|
|
failures.append(f"detector_registry_detector_not_object:{detector_name}")
|
|
continue
|
|
for field_name in ("kind", "automation_level", "description"):
|
|
if not str(detector.get(field_name) or "").strip():
|
|
failures.append(f"detector_registry_missing_field:{detector_name}:{field_name}")
|
|
issue_codes = normalize_string_list(detector.get("issue_codes"))
|
|
inputs = normalize_string_list(detector.get("inputs"))
|
|
check = detector.get("check")
|
|
if not issue_codes:
|
|
failures.append(f"detector_registry_empty_issue_codes:{detector_name}")
|
|
if not inputs:
|
|
failures.append(f"detector_registry_empty_inputs:{detector_name}")
|
|
if not isinstance(check, dict) or not check:
|
|
failures.append(f"detector_registry_empty_check:{detector_name}")
|
|
if known_issue_codes:
|
|
for issue_code in issue_codes:
|
|
if issue_code not in known_issue_codes:
|
|
failures.append(f"detector_registry_unknown_issue_code:{detector_name}:{issue_code}")
|
|
for issue_code in sorted(detector_links.get(detector_name, set())):
|
|
if issue_code not in issue_codes:
|
|
failures.append(f"detector_registry_missing_issue_link:{detector_name}:{issue_code}")
|
|
|
|
if not isinstance(catalog, dict) or not issues:
|
|
warnings.append("detector_registry_issue_catalog_unavailable")
|
|
return summary, failures, warnings
|
|
|
|
|
|
def scenario_pack_paths(orchestration_dir: Path) -> tuple[list[tuple[Path, dict[str, Any]]], list[str]]:
|
|
warnings: list[str] = []
|
|
packs: list[tuple[Path, dict[str, Any]]] = []
|
|
if not orchestration_dir.exists():
|
|
return packs, ["domain_scenario_pack_dir_missing"]
|
|
for path in sorted(orchestration_dir.glob("*.json")):
|
|
try:
|
|
payload = read_json(path)
|
|
except json.JSONDecodeError as error:
|
|
warnings.append(f"domain_scenario_pack_scan_invalid_json:{display_path(path)}:{error.msg}")
|
|
continue
|
|
if isinstance(payload, dict) and payload.get("schema_version") == DOMAIN_SCENARIO_PACK_SCHEMA_VERSION:
|
|
packs.append((path, payload))
|
|
return packs, warnings
|
|
|
|
|
|
def check_domain_scenario_packs(
|
|
orchestration_dir: Path,
|
|
known_contract_ids: set[str],
|
|
detector_registry: dict[str, Any] | None = None,
|
|
) -> tuple[dict[str, Any], list[str], list[str]]:
|
|
failures: list[str] = []
|
|
warnings: list[str] = []
|
|
packs, scan_warnings = scenario_pack_paths(orchestration_dir)
|
|
warnings.extend(scan_warnings)
|
|
known_detectors = set()
|
|
if isinstance(detector_registry, dict):
|
|
detectors = detector_registry.get("detectors") if isinstance(detector_registry.get("detectors"), dict) else {}
|
|
known_detectors = set(str(name) for name in detectors)
|
|
|
|
pack_ids: list[str] = []
|
|
contract_bound_pack_count = 0
|
|
step_count = 0
|
|
contract_bound_step_count = 0
|
|
wrong_domain_trap_step_count = 0
|
|
for path, pack in packs:
|
|
display = display_path(path)
|
|
pack_id = str(pack.get("pack_id") or "").strip()
|
|
if not pack_id:
|
|
failures.append(f"domain_scenario_pack_missing_pack_id:{display}")
|
|
else:
|
|
pack_ids.append(pack_id)
|
|
if not str(pack.get("domain") or "").strip():
|
|
failures.append(f"domain_scenario_pack_missing_domain:{display}")
|
|
scenarios = pack.get("scenarios")
|
|
if not isinstance(scenarios, list) or not scenarios:
|
|
failures.append(f"domain_scenario_pack_missing_scenarios:{display}")
|
|
continue
|
|
|
|
source_contract_id = str(pack.get("source_contract_id") or "").strip()
|
|
if source_contract_id:
|
|
contract_bound_pack_count += 1
|
|
if source_contract_id not in known_contract_ids:
|
|
failures.append(f"domain_scenario_pack_unknown_source_contract:{display}:{source_contract_id}")
|
|
if not isinstance(pack.get("acceptance"), dict) or not pack.get("acceptance"):
|
|
failures.append(f"domain_scenario_pack_missing_acceptance:{display}:{source_contract_id}")
|
|
if not normalize_string_list(pack.get("detectors_under_test")):
|
|
failures.append(f"domain_scenario_pack_missing_detectors_under_test:{display}:{source_contract_id}")
|
|
|
|
analysis_context = pack.get("analysis_context") if isinstance(pack.get("analysis_context"), dict) else {}
|
|
expected_contract = str(analysis_context.get("expected_business_answer_contract") or "").strip()
|
|
if expected_contract and expected_contract not in known_contract_ids:
|
|
failures.append(f"domain_scenario_pack_unknown_analysis_contract:{display}:{expected_contract}")
|
|
|
|
for detector_name in normalize_string_list(pack.get("detectors_under_test")):
|
|
if known_detectors and detector_name not in known_detectors:
|
|
failures.append(f"domain_scenario_pack_unknown_detector:{display}:{detector_name}")
|
|
|
|
for scenario in scenarios:
|
|
if not isinstance(scenario, dict):
|
|
failures.append(f"domain_scenario_pack_scenario_not_object:{display}")
|
|
continue
|
|
scenario_id = str(scenario.get("scenario_id") or "").strip()
|
|
if not scenario_id:
|
|
failures.append(f"domain_scenario_pack_scenario_missing_id:{display}")
|
|
steps = scenario.get("steps")
|
|
if not isinstance(steps, list) or not steps:
|
|
failures.append(f"domain_scenario_pack_scenario_missing_steps:{display}:{scenario_id or 'unknown'}")
|
|
continue
|
|
for step in steps:
|
|
step_count += 1
|
|
if not isinstance(step, dict):
|
|
failures.append(f"domain_scenario_pack_step_not_object:{display}:{scenario_id or 'unknown'}")
|
|
continue
|
|
step_id = str(step.get("step_id") or "").strip()
|
|
if not step_id:
|
|
failures.append(f"domain_scenario_pack_step_missing_id:{display}:{scenario_id or 'unknown'}")
|
|
if not str(step.get("question") or "").strip():
|
|
failures.append(f"domain_scenario_pack_step_missing_question:{display}:{scenario_id or 'unknown'}:{step_id or 'unknown'}")
|
|
|
|
step_contract = str(
|
|
step.get("expected_business_answer_contract") or step.get("required_answer_contract") or ""
|
|
).strip()
|
|
if source_contract_id:
|
|
if not step_contract:
|
|
failures.append(
|
|
f"domain_scenario_pack_step_missing_expected_contract:{display}:{scenario_id or 'unknown'}:{step_id or 'unknown'}"
|
|
)
|
|
else:
|
|
contract_bound_step_count += 1
|
|
if step_contract and step_contract not in known_contract_ids:
|
|
failures.append(
|
|
f"domain_scenario_pack_step_unknown_contract:{display}:{scenario_id or 'unknown'}:{step_id or 'unknown'}:{step_contract}"
|
|
)
|
|
|
|
tags = normalize_string_list(step.get("semantic_tags"))
|
|
if "wrong_domain_trap" in tags:
|
|
wrong_domain_trap_step_count += 1
|
|
if not normalize_string_list(step.get("forbidden_answer_patterns")):
|
|
failures.append(
|
|
f"domain_scenario_pack_wrong_domain_trap_missing_forbidden_patterns:{display}:{scenario_id or 'unknown'}:{step_id or 'unknown'}"
|
|
)
|
|
|
|
return (
|
|
{
|
|
"path": display_path(orchestration_dir),
|
|
"exists": orchestration_dir.exists(),
|
|
"pack_count": len(packs),
|
|
"pack_ids": sorted(pack_ids),
|
|
"contract_bound_pack_count": contract_bound_pack_count,
|
|
"step_count": step_count,
|
|
"contract_bound_step_count": contract_bound_step_count,
|
|
"wrong_domain_trap_step_count": wrong_domain_trap_step_count,
|
|
},
|
|
failures,
|
|
warnings,
|
|
)
|
|
|
|
|
|
def build_healthcheck() -> dict[str, Any]:
|
|
schema_files, schema_failures = check_schema_files(SCHEMA_DIR)
|
|
answer_contracts, contract_failures, contract_warnings, contract_ids = check_answer_contracts(CONTRACTS_DIR)
|
|
issue_catalog, catalog_failures, catalog_warnings = check_issue_catalog(ISSUE_CATALOG_PATH)
|
|
issue_catalog_payload = read_json_object_or_empty(ISSUE_CATALOG_PATH)
|
|
detector_registry_payload = read_json_object_or_empty(DETECTOR_REGISTRY_PATH)
|
|
detector_registry, detector_failures, detector_warnings = check_detector_registry(
|
|
DETECTOR_REGISTRY_PATH,
|
|
issue_catalog_payload,
|
|
)
|
|
domain_packs, domain_pack_failures, domain_pack_warnings = check_domain_scenario_packs(
|
|
ORCHESTRATION_DIR,
|
|
contract_ids,
|
|
detector_registry_payload,
|
|
)
|
|
failures = schema_failures + contract_failures + catalog_failures + detector_failures + domain_pack_failures
|
|
warnings = contract_warnings + catalog_warnings + detector_warnings + domain_pack_warnings
|
|
return {
|
|
"schema_version": "agent_reliability_contract_health_v1",
|
|
"status": "pass" if not failures else "fail",
|
|
"checked_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
|
|
"schema_files": schema_files,
|
|
"answer_contracts": answer_contracts,
|
|
"issue_catalog": issue_catalog,
|
|
"detector_registry": detector_registry,
|
|
"domain_scenario_packs": domain_packs,
|
|
"failures": failures,
|
|
"warnings": warnings,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Validate Agent Reliability Uplift machine-readable contracts.")
|
|
parser.add_argument("--json", action="store_true", help="Print machine-readable healthcheck JSON.")
|
|
args = parser.parse_args()
|
|
result = build_healthcheck()
|
|
if args.json:
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
else:
|
|
print(f"status: {result['status']}")
|
|
for failure in result["failures"]:
|
|
print(f"FAIL {failure}")
|
|
for warning in result["warnings"]:
|
|
print(f"WARN {warning}")
|
|
return 0 if result["status"] == "pass" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|