from __future__ import annotations import argparse import json import sys from datetime import datetime, timezone from pathlib import Path from typing import Any REPO_ROOT = Path(__file__).resolve().parent.parent SCHEMA_DIR = REPO_ROOT / "docs" / "orchestration" / "schemas" ISSUE_CATALOG_PATH = REPO_ROOT / "docs" / "orchestration" / "issue_catalog.json" DETECTOR_REGISTRY_PATH = REPO_ROOT / "docs" / "orchestration" / "detector_registry.json" CONTRACTS_DIR = REPO_ROOT / "docs" / "orchestration" / "contracts" EXPECTED_SCHEMA_FILES = { "agent_issue_catalog.schema.json": "Agent Issue Catalog", "agent_detector_registry.schema.json": "Agent Detector Registry", "auto_coder_gate.schema.json": "Auto-Coder Gate", "business_audit_contract.schema.json": "Business Audit Contract", "domain_loop_lead_coder_handoff.schema.json": "Domain Loop Lead Coder Handoff", } AUTO_CODER_ALLOWED_ISSUE_CODES = { "business_direct_answer_missing", "business_next_step_missing", "technical_garbage_in_answer", } REQUIRED_ISSUE_FIELDS = { "severity", "business_meaning", "root_layers", "detectors", "allowed_patch_targets", "forbidden_patch_targets", "rerun_matrix", } VALID_SEVERITIES = {"P0", "P1", "P2", "P3", "WARNING"} def read_json(path: Path) -> Any: return json.loads(path.read_text(encoding="utf-8")) def display_path(path: Path) -> str: try: return str(path.resolve().relative_to(REPO_ROOT)) except ValueError: return str(path) def normalize_string_list(value: Any) -> list[str]: if isinstance(value, str): stripped = value.strip() return [stripped] if stripped else [] if not isinstance(value, list): return [] result: list[str] = [] for item in value: stripped = str(item or "").strip() if stripped: result.append(stripped) return result def has_answer_contract(issue: dict[str, Any]) -> bool: if str(issue.get("expected_answer_contract") or "").strip(): return True acceptance = issue.get("acceptance") if not isinstance(acceptance, dict): return False return bool(normalize_string_list(acceptance.get("must_have")) or normalize_string_list(acceptance.get("must_not_have"))) def read_json_object_or_empty(path: Path) -> dict[str, Any]: try: payload = read_json(path) except (json.JSONDecodeError, OSError): return {} return payload if isinstance(payload, dict) else {} def collect_issue_detector_links(issue_catalog: dict[str, Any]) -> dict[str, set[str]]: issues = issue_catalog.get("issues") if isinstance(issue_catalog.get("issues"), dict) else {} links: dict[str, set[str]] = {} for issue_code, issue in issues.items(): if not isinstance(issue, dict): continue for detector in normalize_string_list(issue.get("detectors")): links.setdefault(detector, set()).add(str(issue_code)) return links def collect_contract_detector_refs(contracts_dir: Path) -> tuple[dict[str, list[str]], list[str]]: refs: dict[str, list[str]] = {} warnings: list[str] = [] if not contracts_dir.exists(): return refs, warnings for path in sorted(contracts_dir.glob("*.json")): try: payload = read_json(path) except json.JSONDecodeError as error: warnings.append(f"contract_detector_scan_invalid_json:{display_path(path)}:{error.msg}") continue if not isinstance(payload, dict): continue for detector in normalize_string_list(payload.get("detectors")): refs.setdefault(detector, []).append(display_path(path)) return refs, warnings def is_broad_patch_target(value: str) -> bool: normalized = value.strip().replace("\\", "/").lower() broad_targets = { ".", "./", "*", "**", "llm_normalizer", "llm_normalizer/", "llm_normalizer/backend", "llm_normalizer/backend/", "llm_normalizer/backend/src", "llm_normalizer/backend/src/", "llm_normalizer/backend/src/services", "llm_normalizer/backend/src/services/", "scripts", "scripts/", "docs", "docs/", "docs/orchestration", "docs/orchestration/", } forbidden_markers = ( "active_domain_contract", "shared_llm_connection", "promptbuilder", "prompt_registry", "mcp protocol", "mcp runtime", "fake data", "fake fixtures", "heuristic masking", "global orchestration", ) if not normalized or normalized in broad_targets: return True if normalized.endswith("/**"): return True if normalized.count("/") < 2 and ("*" in normalized or normalized.endswith("/")): return True return any(marker in normalized for marker in forbidden_markers) def check_schema_files(schema_dir: Path) -> tuple[list[dict[str, Any]], list[str]]: checked: list[dict[str, Any]] = [] failures: list[str] = [] for filename, expected_title in EXPECTED_SCHEMA_FILES.items(): path = schema_dir / filename item = {"path": display_path(path), "exists": path.exists()} if not path.exists(): failures.append(f"missing_schema_file:{filename}") checked.append(item) continue try: payload = read_json(path) except json.JSONDecodeError as error: failures.append(f"invalid_schema_json:{filename}:{error.msg}") checked.append(item) continue item["title"] = payload.get("title") item["schema"] = payload.get("$schema") item["type"] = payload.get("type") checked.append(item) if payload.get("title") != expected_title: failures.append(f"schema_title_mismatch:{filename}") if payload.get("type") != "object": failures.append(f"schema_not_object:{filename}") if not str(payload.get("$schema") or "").strip(): failures.append(f"schema_missing_draft_ref:{filename}") return checked, failures def check_issue_catalog(path: Path) -> tuple[dict[str, Any], list[str], list[str]]: failures: list[str] = [] warnings: list[str] = [] if not path.exists(): return {"path": display_path(path), "exists": False}, ["missing_issue_catalog"], warnings try: payload = read_json(path) except json.JSONDecodeError as error: return {"path": display_path(path), "exists": True}, [f"invalid_issue_catalog_json:{error.msg}"], warnings issues = payload.get("issues") if isinstance(payload.get("issues"), dict) else {} summary = { "path": display_path(path), "exists": True, "schema_version": payload.get("schema_version"), "issue_count": len(issues), "auto_coder_allowed_issue_codes": sorted(AUTO_CODER_ALLOWED_ISSUE_CODES), } if payload.get("schema_version") != "agent_issue_catalog_v1": failures.append("issue_catalog_schema_version_mismatch") if not issues: failures.append("issue_catalog_empty") for issue_code, issue in sorted(issues.items()): if not isinstance(issue, dict): failures.append(f"issue_not_object:{issue_code}") continue missing_fields = sorted(field for field in REQUIRED_ISSUE_FIELDS if field not in issue) for field in missing_fields: failures.append(f"issue_missing_field:{issue_code}:{field}") severity = str(issue.get("severity") or "").strip().upper() if severity not in VALID_SEVERITIES: failures.append(f"issue_invalid_severity:{issue_code}:{severity or 'empty'}") for field in ("root_layers", "detectors", "allowed_patch_targets", "forbidden_patch_targets", "rerun_matrix"): if not normalize_string_list(issue.get(field)): failures.append(f"issue_empty_list:{issue_code}:{field}") if issue_code in AUTO_CODER_ALLOWED_ISSUE_CODES: if not has_answer_contract(issue): failures.append(f"auto_coder_issue_missing_answer_contract:{issue_code}") if "accepted_smoke_pack" not in normalize_string_list(issue.get("rerun_matrix")): failures.append(f"auto_coder_issue_missing_accepted_smoke_pack:{issue_code}") for target in normalize_string_list(issue.get("allowed_patch_targets")): if is_broad_patch_target(target): failures.append(f"auto_coder_issue_broad_allowed_patch_target:{issue_code}:{target}") missing_allowed = sorted(AUTO_CODER_ALLOWED_ISSUE_CODES.difference(issues.keys())) for issue_code in missing_allowed: failures.append(f"auto_coder_allowlisted_issue_missing_from_catalog:{issue_code}") return summary, failures, warnings def check_detector_registry( path: Path, issue_catalog: dict[str, Any] | None = None, *, include_contracts: bool = True, ) -> tuple[dict[str, Any], list[str], list[str]]: failures: list[str] = [] warnings: list[str] = [] if not path.exists(): return {"path": display_path(path), "exists": False}, ["missing_detector_registry"], warnings try: payload = read_json(path) except json.JSONDecodeError as error: return {"path": display_path(path), "exists": True}, [f"invalid_detector_registry_json:{error.msg}"], warnings detectors = payload.get("detectors") if isinstance(payload.get("detectors"), dict) else {} catalog = issue_catalog if isinstance(issue_catalog, dict) else {} issues = catalog.get("issues") if isinstance(catalog.get("issues"), dict) else {} known_issue_codes = set(str(issue_code) for issue_code in issues) detector_links = collect_issue_detector_links(catalog) contract_refs, contract_warnings = collect_contract_detector_refs(CONTRACTS_DIR) if include_contracts else ({}, []) warnings.extend(contract_warnings) summary = { "path": display_path(path), "exists": True, "schema_version": payload.get("schema_version"), "detector_count": len(detectors), "catalog_referenced_detector_count": len(detector_links), "contract_referenced_detector_count": len(contract_refs), } if payload.get("schema_version") != "agent_detector_registry_v1": failures.append("detector_registry_schema_version_mismatch") if not detectors: failures.append("detector_registry_empty") for detector_name, issue_codes in sorted(detector_links.items()): if detector_name not in detectors: for issue_code in sorted(issue_codes): failures.append(f"detector_registry_missing_catalog_detector:{issue_code}:{detector_name}") for detector_name, paths in sorted(contract_refs.items()): if detector_name not in detectors: for contract_path in paths: failures.append(f"detector_registry_missing_contract_detector:{contract_path}:{detector_name}") for detector_name, detector in sorted(detectors.items()): if not isinstance(detector, dict): failures.append(f"detector_registry_detector_not_object:{detector_name}") continue for field_name in ("kind", "automation_level", "description"): if not str(detector.get(field_name) or "").strip(): failures.append(f"detector_registry_missing_field:{detector_name}:{field_name}") issue_codes = normalize_string_list(detector.get("issue_codes")) inputs = normalize_string_list(detector.get("inputs")) check = detector.get("check") if not issue_codes: failures.append(f"detector_registry_empty_issue_codes:{detector_name}") if not inputs: failures.append(f"detector_registry_empty_inputs:{detector_name}") if not isinstance(check, dict) or not check: failures.append(f"detector_registry_empty_check:{detector_name}") if known_issue_codes: for issue_code in issue_codes: if issue_code not in known_issue_codes: failures.append(f"detector_registry_unknown_issue_code:{detector_name}:{issue_code}") for issue_code in sorted(detector_links.get(detector_name, set())): if issue_code not in issue_codes: failures.append(f"detector_registry_missing_issue_link:{detector_name}:{issue_code}") if not isinstance(catalog, dict) or not issues: warnings.append("detector_registry_issue_catalog_unavailable") return summary, failures, warnings def build_healthcheck() -> dict[str, Any]: schema_files, schema_failures = check_schema_files(SCHEMA_DIR) issue_catalog, catalog_failures, catalog_warnings = check_issue_catalog(ISSUE_CATALOG_PATH) issue_catalog_payload = read_json_object_or_empty(ISSUE_CATALOG_PATH) detector_registry, detector_failures, detector_warnings = check_detector_registry( DETECTOR_REGISTRY_PATH, issue_catalog_payload, ) failures = schema_failures + catalog_failures + detector_failures warnings = catalog_warnings + detector_warnings return { "schema_version": "agent_reliability_contract_health_v1", "status": "pass" if not failures else "fail", "checked_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), "schema_files": schema_files, "issue_catalog": issue_catalog, "detector_registry": detector_registry, "failures": failures, "warnings": warnings, } def main() -> int: parser = argparse.ArgumentParser(description="Validate Agent Reliability Uplift machine-readable contracts.") parser.add_argument("--json", action="store_true", help="Print machine-readable healthcheck JSON.") args = parser.parse_args() result = build_healthcheck() if args.json: print(json.dumps(result, ensure_ascii=False, indent=2)) else: print(f"status: {result['status']}") for failure in result["failures"]: print(f"FAIL {failure}") for warning in result["warnings"]: print(f"WARN {warning}") return 0 if result["status"] == "pass" else 1 if __name__ == "__main__": raise SystemExit(main())