Добавить healthcheck контрактов Agent Reliability

This commit is contained in:
dctouch 2026-05-24 13:00:14 +03:00
parent 06e035eadf
commit 0506def909
6 changed files with 831 additions and 0 deletions

View File

@ -0,0 +1,102 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Agent Issue Catalog",
"type": "object",
"additionalProperties": true,
"required": ["schema_version", "issues"],
"properties": {
"schema_version": {
"const": "agent_issue_catalog_v1"
},
"updated_at": {
"type": "string"
},
"principles": {
"type": "array",
"items": {
"type": "string"
}
},
"issues": {
"type": "object",
"additionalProperties": {
"type": "object",
"additionalProperties": true,
"required": [
"severity",
"business_meaning",
"root_layers",
"detectors",
"allowed_patch_targets",
"forbidden_patch_targets",
"rerun_matrix"
],
"properties": {
"severity": {
"type": "string",
"enum": ["P0", "P1", "P2", "P3", "WARNING"]
},
"business_meaning": {
"type": "string"
},
"root_layers": {
"type": "array",
"items": {
"type": "string"
},
"minItems": 1
},
"expected_answer_contract": {
"type": "string"
},
"detectors": {
"type": "array",
"items": {
"type": "string"
},
"minItems": 1
},
"allowed_patch_targets": {
"type": "array",
"items": {
"type": "string"
},
"minItems": 1
},
"forbidden_patch_targets": {
"type": "array",
"items": {
"type": "string"
},
"minItems": 1
},
"rerun_matrix": {
"type": "array",
"items": {
"type": "string"
},
"minItems": 1
},
"acceptance": {
"type": "object",
"additionalProperties": true,
"properties": {
"must_have": {
"type": "array",
"items": {
"type": "string"
}
},
"must_not_have": {
"type": "array",
"items": {
"type": "string"
}
}
}
}
}
}
}
}
}

View File

@ -0,0 +1,107 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Auto-Coder Gate",
"type": "object",
"additionalProperties": true,
"required": [
"schema_version",
"allowed",
"mode",
"issue_codes",
"root_layers",
"allowed_patch_targets",
"forbidden_patch_targets",
"rerun_matrix",
"allowlisted_issue_codes",
"issue_catalog_contracts",
"blocking_reasons",
"reason",
"policy"
],
"properties": {
"schema_version": {
"const": "auto_coder_gate_v1"
},
"allowed": {
"type": "boolean"
},
"mode": {
"const": "auto-coder"
},
"focus_id": {
"type": ["string", "null"]
},
"issue_codes": {
"type": "array",
"items": {
"type": "string"
}
},
"root_layers": {
"type": "array",
"items": {
"type": "string"
}
},
"allowed_patch_targets": {
"type": "array",
"items": {
"type": "string"
}
},
"forbidden_patch_targets": {
"type": "array",
"items": {
"type": "string"
}
},
"rerun_matrix": {
"type": "array",
"items": {
"type": "string"
}
},
"allowlisted_issue_codes": {
"type": "array",
"items": {
"type": "string"
}
},
"issue_catalog_contracts": {
"type": "object",
"additionalProperties": {
"type": "object",
"additionalProperties": true,
"required": [
"root_layers",
"expected_answer_contract",
"allowed_patch_targets",
"forbidden_patch_targets",
"rerun_matrix"
]
}
},
"blocking_reasons": {
"type": "array",
"items": {
"type": "string"
}
},
"reason": {
"type": "string"
},
"policy": {
"type": "object",
"additionalProperties": true,
"required": [
"auto_coder_default",
"requires_issue_catalog_contract",
"requires_expected_answer_contract",
"requires_target_evidence_paths",
"requires_accepted_smoke_pack",
"requires_catalog_limited_patch_scope",
"lead_owns_merge_and_acceptance"
]
}
}
}

View File

@ -0,0 +1,119 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Business Audit Contract",
"type": "object",
"additionalProperties": true,
"required": [
"schema_version",
"created_at",
"overall_status",
"quality_score",
"target_score",
"loop_decision",
"analyst_accepted_gate",
"accepted_gate",
"deterministic_gate_ok",
"deterministic_gate_reason",
"human_meaning",
"quality_flags",
"root_layers",
"violated_invariants",
"blocking_issues",
"repair_targets_summary",
"rerun_matrix",
"artifact_refs"
],
"properties": {
"schema_version": {
"const": "business_audit_contract_v1"
},
"created_at": {
"type": "string"
},
"overall_status": {
"type": "string",
"enum": ["accepted", "partial", "blocked", "needs_exact_capability"]
},
"quality_score": {
"type": "integer",
"minimum": 0,
"maximum": 100
},
"target_score": {
"type": "integer",
"minimum": 0,
"maximum": 100
},
"loop_decision": {
"type": "string"
},
"analyst_accepted_gate": {
"type": "boolean"
},
"accepted_gate": {
"type": "boolean"
},
"deterministic_gate_ok": {
"type": "boolean"
},
"deterministic_gate_reason": {
"type": "string"
},
"human_meaning": {
"type": "object",
"additionalProperties": true
},
"quality_flags": {
"type": "object",
"additionalProperties": {
"type": "boolean"
}
},
"root_layers": {
"type": "array",
"items": {
"type": "string"
}
},
"violated_invariants": {
"type": "array",
"items": {
"type": "string"
}
},
"blocking_issues": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": true,
"required": [
"issue_code",
"severity",
"expected_business_answer_contract",
"root_layers",
"evidence_paths",
"minimal_patch_direction",
"allowed_patch_targets",
"forbidden_patch_targets",
"rerun_matrix"
]
}
},
"repair_targets_summary": {
"type": "object",
"additionalProperties": true
},
"rerun_matrix": {
"type": "array",
"items": {
"type": "string"
}
},
"artifact_refs": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
}
}

View File

@ -0,0 +1,140 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Domain Loop Lead Coder Handoff",
"type": "object",
"additionalProperties": true,
"required": [
"schema_version",
"repair_mode",
"created_at",
"loop_id",
"iteration_id",
"status",
"reason",
"quality_score",
"target_score",
"loop_decision",
"analyst_accepted_gate",
"accepted_gate",
"deterministic_gate_ok",
"deterministic_gate_reason",
"artifact_refs",
"issue_codes",
"rerun_matrix",
"human_meaning",
"root_cause_layers",
"violated_invariants",
"assigned_primary_focus",
"top_repair_targets",
"candidate_files",
"lead_instructions"
],
"properties": {
"schema_version": {
"const": "domain_loop_lead_coder_handoff_v1"
},
"repair_mode": {
"const": "lead-handoff"
},
"created_at": {
"type": "string"
},
"loop_id": {
"type": ["string", "null"]
},
"iteration_id": {
"type": "string"
},
"status": {
"type": "string"
},
"reason": {
"type": "string"
},
"quality_score": {
"type": "integer",
"minimum": 0,
"maximum": 100
},
"target_score": {
"type": "integer",
"minimum": 0,
"maximum": 100
},
"loop_decision": {
"type": "string"
},
"analyst_accepted_gate": {
"type": "boolean"
},
"accepted_gate": {
"type": "boolean"
},
"deterministic_gate_ok": {
"type": "boolean"
},
"deterministic_gate_reason": {
"type": "string"
},
"artifact_refs": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"issue_codes": {
"type": "array",
"items": {
"type": "string"
}
},
"rerun_matrix": {
"type": "array",
"items": {
"type": "string"
}
},
"human_meaning": {
"type": "object",
"additionalProperties": true
},
"root_cause_layers": {
"type": "array",
"items": {
"type": "string"
}
},
"violated_invariants": {
"type": "array",
"items": {
"type": "string"
}
},
"assigned_primary_focus": {
"type": ["object", "null"]
},
"top_repair_targets": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": true
}
},
"candidate_files": {
"type": "array",
"items": {
"type": "string"
}
},
"lead_instructions": {
"type": "array",
"items": {
"type": "string"
}
},
"auto_coder_gate": {
"type": "object",
"additionalProperties": true
}
}
}

View File

@ -0,0 +1,228 @@
from __future__ import annotations
import argparse
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
REPO_ROOT = Path(__file__).resolve().parent.parent
SCHEMA_DIR = REPO_ROOT / "docs" / "orchestration" / "schemas"
ISSUE_CATALOG_PATH = REPO_ROOT / "docs" / "orchestration" / "issue_catalog.json"
EXPECTED_SCHEMA_FILES = {
"agent_issue_catalog.schema.json": "Agent Issue Catalog",
"auto_coder_gate.schema.json": "Auto-Coder Gate",
"business_audit_contract.schema.json": "Business Audit Contract",
"domain_loop_lead_coder_handoff.schema.json": "Domain Loop Lead Coder Handoff",
}
AUTO_CODER_ALLOWED_ISSUE_CODES = {
"business_direct_answer_missing",
"business_next_step_missing",
"technical_garbage_in_answer",
}
REQUIRED_ISSUE_FIELDS = {
"severity",
"business_meaning",
"root_layers",
"detectors",
"allowed_patch_targets",
"forbidden_patch_targets",
"rerun_matrix",
}
VALID_SEVERITIES = {"P0", "P1", "P2", "P3", "WARNING"}
def read_json(path: Path) -> Any:
return json.loads(path.read_text(encoding="utf-8"))
def display_path(path: Path) -> str:
try:
return str(path.resolve().relative_to(REPO_ROOT))
except ValueError:
return str(path)
def normalize_string_list(value: Any) -> list[str]:
if isinstance(value, str):
stripped = value.strip()
return [stripped] if stripped else []
if not isinstance(value, list):
return []
result: list[str] = []
for item in value:
stripped = str(item or "").strip()
if stripped:
result.append(stripped)
return result
def has_answer_contract(issue: dict[str, Any]) -> bool:
if str(issue.get("expected_answer_contract") or "").strip():
return True
acceptance = issue.get("acceptance")
if not isinstance(acceptance, dict):
return False
return bool(normalize_string_list(acceptance.get("must_have")) or normalize_string_list(acceptance.get("must_not_have")))
def is_broad_patch_target(value: str) -> bool:
normalized = value.strip().replace("\\", "/").lower()
broad_targets = {
".",
"./",
"*",
"**",
"llm_normalizer",
"llm_normalizer/",
"llm_normalizer/backend",
"llm_normalizer/backend/",
"llm_normalizer/backend/src",
"llm_normalizer/backend/src/",
"llm_normalizer/backend/src/services",
"llm_normalizer/backend/src/services/",
"scripts",
"scripts/",
"docs",
"docs/",
"docs/orchestration",
"docs/orchestration/",
}
forbidden_markers = (
"active_domain_contract",
"shared_llm_connection",
"promptbuilder",
"prompt_registry",
"mcp protocol",
"mcp runtime",
"fake data",
"fake fixtures",
"heuristic masking",
"global orchestration",
)
if not normalized or normalized in broad_targets:
return True
if normalized.endswith("/**"):
return True
if normalized.count("/") < 2 and ("*" in normalized or normalized.endswith("/")):
return True
return any(marker in normalized for marker in forbidden_markers)
def check_schema_files(schema_dir: Path) -> tuple[list[dict[str, Any]], list[str]]:
checked: list[dict[str, Any]] = []
failures: list[str] = []
for filename, expected_title in EXPECTED_SCHEMA_FILES.items():
path = schema_dir / filename
item = {"path": display_path(path), "exists": path.exists()}
if not path.exists():
failures.append(f"missing_schema_file:{filename}")
checked.append(item)
continue
try:
payload = read_json(path)
except json.JSONDecodeError as error:
failures.append(f"invalid_schema_json:{filename}:{error.msg}")
checked.append(item)
continue
item["title"] = payload.get("title")
item["schema"] = payload.get("$schema")
item["type"] = payload.get("type")
checked.append(item)
if payload.get("title") != expected_title:
failures.append(f"schema_title_mismatch:{filename}")
if payload.get("type") != "object":
failures.append(f"schema_not_object:{filename}")
if not str(payload.get("$schema") or "").strip():
failures.append(f"schema_missing_draft_ref:{filename}")
return checked, failures
def check_issue_catalog(path: Path) -> tuple[dict[str, Any], list[str], list[str]]:
failures: list[str] = []
warnings: list[str] = []
if not path.exists():
return {"path": display_path(path), "exists": False}, ["missing_issue_catalog"], warnings
try:
payload = read_json(path)
except json.JSONDecodeError as error:
return {"path": display_path(path), "exists": True}, [f"invalid_issue_catalog_json:{error.msg}"], warnings
issues = payload.get("issues") if isinstance(payload.get("issues"), dict) else {}
summary = {
"path": display_path(path),
"exists": True,
"schema_version": payload.get("schema_version"),
"issue_count": len(issues),
"auto_coder_allowed_issue_codes": sorted(AUTO_CODER_ALLOWED_ISSUE_CODES),
}
if payload.get("schema_version") != "agent_issue_catalog_v1":
failures.append("issue_catalog_schema_version_mismatch")
if not issues:
failures.append("issue_catalog_empty")
for issue_code, issue in sorted(issues.items()):
if not isinstance(issue, dict):
failures.append(f"issue_not_object:{issue_code}")
continue
missing_fields = sorted(field for field in REQUIRED_ISSUE_FIELDS if field not in issue)
for field in missing_fields:
failures.append(f"issue_missing_field:{issue_code}:{field}")
severity = str(issue.get("severity") or "").strip().upper()
if severity not in VALID_SEVERITIES:
failures.append(f"issue_invalid_severity:{issue_code}:{severity or 'empty'}")
for field in ("root_layers", "detectors", "allowed_patch_targets", "forbidden_patch_targets", "rerun_matrix"):
if not normalize_string_list(issue.get(field)):
failures.append(f"issue_empty_list:{issue_code}:{field}")
if issue_code in AUTO_CODER_ALLOWED_ISSUE_CODES:
if not has_answer_contract(issue):
failures.append(f"auto_coder_issue_missing_answer_contract:{issue_code}")
if "accepted_smoke_pack" not in normalize_string_list(issue.get("rerun_matrix")):
failures.append(f"auto_coder_issue_missing_accepted_smoke_pack:{issue_code}")
for target in normalize_string_list(issue.get("allowed_patch_targets")):
if is_broad_patch_target(target):
failures.append(f"auto_coder_issue_broad_allowed_patch_target:{issue_code}:{target}")
missing_allowed = sorted(AUTO_CODER_ALLOWED_ISSUE_CODES.difference(issues.keys()))
for issue_code in missing_allowed:
failures.append(f"auto_coder_allowlisted_issue_missing_from_catalog:{issue_code}")
return summary, failures, warnings
def build_healthcheck() -> dict[str, Any]:
schema_files, schema_failures = check_schema_files(SCHEMA_DIR)
issue_catalog, catalog_failures, catalog_warnings = check_issue_catalog(ISSUE_CATALOG_PATH)
failures = schema_failures + catalog_failures
warnings = catalog_warnings
return {
"schema_version": "agent_reliability_contract_health_v1",
"status": "pass" if not failures else "fail",
"checked_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
"schema_files": schema_files,
"issue_catalog": issue_catalog,
"failures": failures,
"warnings": warnings,
}
def main() -> int:
parser = argparse.ArgumentParser(description="Validate Agent Reliability Uplift machine-readable contracts.")
parser.add_argument("--json", action="store_true", help="Print machine-readable healthcheck JSON.")
args = parser.parse_args()
result = build_healthcheck()
if args.json:
print(json.dumps(result, ensure_ascii=False, indent=2))
else:
print(f"status: {result['status']}")
for failure in result["failures"]:
print(f"FAIL {failure}")
for warning in result["warnings"]:
print(f"WARN {warning}")
return 0 if result["status"] == "pass" else 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -0,0 +1,135 @@
from __future__ import annotations
import json
import sys
import tempfile
import unittest
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
import agent_reliability_contract_healthcheck as health
class AgentReliabilityContractHealthcheckTests(unittest.TestCase):
def test_repo_contract_healthcheck_passes(self) -> None:
result = health.build_healthcheck()
self.assertEqual(result["status"], "pass")
self.assertEqual(result["failures"], [])
def test_issue_catalog_healthcheck_blocks_auto_coder_issue_without_contract(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
catalog_path = Path(tmp) / "issue_catalog.json"
catalog_path.write_text(
json.dumps(
{
"schema_version": "agent_issue_catalog_v1",
"issues": {
"business_direct_answer_missing": {
"severity": "P0",
"business_meaning": "Direct answer is missing.",
"root_layers": ["answer_surface"],
"detectors": ["first_line_not_direct_answer"],
"allowed_patch_targets": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts"
],
"forbidden_patch_targets": ["routing rewrites"],
"rerun_matrix": ["failed_scenario", "accepted_smoke_pack"],
},
"business_next_step_missing": {
"severity": "P2",
"business_meaning": "Next step is missing.",
"root_layers": ["answer_surface"],
"expected_answer_contract": "limited_answer_next_action_v1",
"detectors": ["limited_answer_without_next_action"],
"allowed_patch_targets": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts"
],
"forbidden_patch_targets": ["route masking"],
"rerun_matrix": ["failed_scenario", "accepted_smoke_pack"],
},
"technical_garbage_in_answer": {
"severity": "P0",
"business_meaning": "Debug text leaked.",
"root_layers": ["answer_surface"],
"expected_answer_contract": "technical_garbage_free_answer_v1",
"detectors": ["runtime_tokens_in_user_answer"],
"allowed_patch_targets": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts"
],
"forbidden_patch_targets": ["route masking"],
"rerun_matrix": ["failed_scenario", "accepted_smoke_pack"],
},
},
}
),
encoding="utf-8",
)
_, failures, _ = health.check_issue_catalog(catalog_path)
self.assertIn(
"auto_coder_issue_missing_answer_contract:business_direct_answer_missing",
failures,
)
def test_issue_catalog_healthcheck_blocks_broad_auto_coder_patch_scope(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
catalog_path = Path(tmp) / "issue_catalog.json"
catalog_path.write_text(
json.dumps(
{
"schema_version": "agent_issue_catalog_v1",
"issues": {
"business_direct_answer_missing": {
"severity": "P0",
"business_meaning": "Direct answer is missing.",
"root_layers": ["answer_surface"],
"expected_answer_contract": "direct_answer_surface_v1",
"detectors": ["first_line_not_direct_answer"],
"allowed_patch_targets": ["llm_normalizer/backend/src/services/"],
"forbidden_patch_targets": ["routing rewrites"],
"rerun_matrix": ["failed_scenario", "accepted_smoke_pack"],
},
"business_next_step_missing": {
"severity": "P2",
"business_meaning": "Next step is missing.",
"root_layers": ["answer_surface"],
"expected_answer_contract": "limited_answer_next_action_v1",
"detectors": ["limited_answer_without_next_action"],
"allowed_patch_targets": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts"
],
"forbidden_patch_targets": ["route masking"],
"rerun_matrix": ["failed_scenario", "accepted_smoke_pack"],
},
"technical_garbage_in_answer": {
"severity": "P0",
"business_meaning": "Debug text leaked.",
"root_layers": ["answer_surface"],
"expected_answer_contract": "technical_garbage_free_answer_v1",
"detectors": ["runtime_tokens_in_user_answer"],
"allowed_patch_targets": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts"
],
"forbidden_patch_targets": ["route masking"],
"rerun_matrix": ["failed_scenario", "accepted_smoke_pack"],
},
},
}
),
encoding="utf-8",
)
_, failures, _ = health.check_issue_catalog(catalog_path)
self.assertIn(
"auto_coder_issue_broad_allowed_patch_target:business_direct_answer_missing:llm_normalizer/backend/src/services/",
failures,
)
if __name__ == "__main__":
unittest.main()