NODEDC_1C/scripts/test_agent_detector_runner.py

227 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import sys
import tempfile
import unittest
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
import agent_detector_runner as runner
def write_json(path: Path, payload: object) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def write_text(path: Path, text: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text, encoding="utf-8")
class AgentDetectorRunnerTests(unittest.TestCase):
def test_default_runner_fails_missing_effective_runtime(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
artifact_dir = root / "run"
artifact_dir.mkdir()
registry_path = root / "detector_registry.json"
issue_catalog_path = root / "issue_catalog.json"
write_json(
registry_path,
{
"schema_version": "agent_detector_registry_v1",
"detectors": {
"missing_effective_runtime_json": {
"kind": "artifact_presence",
"automation_level": "automatic",
"description": "Manifest is required.",
"issue_codes": ["runtime_manifest_missing"],
"inputs": ["effective_runtime.json"],
"check": {"required_files": ["effective_runtime.json"]},
}
},
},
)
write_json(
issue_catalog_path,
{
"schema_version": "agent_issue_catalog_v1",
"issues": {
"runtime_manifest_missing": {
"detectors": ["missing_effective_runtime_json"],
}
},
},
)
results = runner.build_detector_results(
artifact_dir,
registry_path=registry_path,
issue_catalog_path=issue_catalog_path,
)
self.assertEqual(results["summary"]["status"], "fail")
self.assertEqual(results["results"][0]["detector"], "missing_effective_runtime_json")
self.assertEqual(results["results"][0]["status"], "fail")
def test_default_runner_passes_when_effective_runtime_exists(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
artifact_dir = root / "run"
write_json(artifact_dir / "effective_runtime.json", {"runner": "test"})
registry_path = root / "detector_registry.json"
issue_catalog_path = root / "issue_catalog.json"
write_json(
registry_path,
{
"schema_version": "agent_detector_registry_v1",
"detectors": {
"missing_effective_runtime_json": {
"kind": "artifact_presence",
"automation_level": "automatic",
"description": "Manifest is required.",
"issue_codes": ["runtime_manifest_missing"],
"inputs": ["effective_runtime.json"],
"check": {"required_files": ["effective_runtime.json"]},
}
},
},
)
write_json(issue_catalog_path, {"schema_version": "agent_issue_catalog_v1", "issues": {}})
results = runner.build_detector_results(
artifact_dir,
registry_path=registry_path,
issue_catalog_path=issue_catalog_path,
)
self.assertEqual(results["summary"]["status"], "pass")
self.assertEqual(results["results"][0]["status"], "pass")
def test_candidate_forbidden_regex_is_limited_to_evidence_path(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
artifact_dir = root / "run"
write_text(artifact_dir / "scenarios" / "ok" / "steps" / "s01" / "output.md", "Маржа не подтверждена.")
write_text(
artifact_dir / "scenarios" / "bad" / "steps" / "s02" / "output.md",
"Это амортизация объекта ОС, но этот шаг не в evidence scope.",
)
registry_path = root / "detector_registry.json"
issue_catalog_path = root / "issue_catalog.json"
candidates_path = root / "detector_candidates.json"
write_json(
registry_path,
{
"schema_version": "agent_detector_registry_v1",
"detectors": {
"forbidden_margin_terms": {
"kind": "answer_text_regex_forbidden",
"automation_level": "automatic",
"description": "No wrong-domain words.",
"issue_codes": ["margin_domain_leak_accounting_route"],
"inputs": ["output.md"],
"check": {"forbidden_patterns": ["(?i)(амортизац|объект\\s+ОС)"]},
}
},
},
)
write_json(
issue_catalog_path,
{
"schema_version": "agent_issue_catalog_v1",
"issues": {
"margin_domain_leak_accounting_route": {
"detectors": ["forbidden_margin_terms"],
}
},
},
)
write_json(
candidates_path,
{
"schema_version": "detector_candidates_v1",
"candidates": [
{
"issue_code": "margin_domain_leak_accounting_route",
"detector": "forbidden_margin_terms",
"evidence_paths": ["scenarios/ok/steps/s01/output.md"],
}
],
},
)
results = runner.build_detector_results(
artifact_dir,
detector_candidates_path=candidates_path,
registry_path=registry_path,
issue_catalog_path=issue_catalog_path,
include_default_global=False,
)
self.assertEqual(results["summary"]["status"], "pass")
self.assertEqual(results["results"][0]["status"], "pass")
def test_composite_detector_fails_after_child_detector_fails(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
artifact_dir = root / "run"
write_text(artifact_dir / "steps" / "s01" / "output.md", "Маржа почему-то посчитана через амортизацию.")
registry_path = root / "detector_registry.json"
issue_catalog_path = root / "issue_catalog.json"
write_json(
registry_path,
{
"schema_version": "agent_detector_registry_v1",
"detectors": {
"forbidden_margin_terms": {
"kind": "answer_text_regex_forbidden",
"automation_level": "automatic",
"description": "No wrong-domain words.",
"issue_codes": ["margin_domain_leak_accounting_route"],
"inputs": ["output.md"],
"check": {"forbidden_patterns": ["(?i)амортизац"]},
},
"margin_domain_leak_accounting_route": {
"kind": "composite_detector",
"automation_level": "semi_automatic",
"description": "Composite leak detector.",
"issue_codes": ["margin_domain_leak_accounting_route"],
"inputs": ["output.md"],
"check": {"uses_detectors": ["forbidden_margin_terms"]},
},
},
},
)
write_json(
issue_catalog_path,
{
"schema_version": "agent_issue_catalog_v1",
"issues": {
"margin_domain_leak_accounting_route": {
"detectors": ["margin_domain_leak_accounting_route"],
}
},
},
)
results = runner.build_detector_results(
artifact_dir,
detector_names=["margin_domain_leak_accounting_route"],
registry_path=registry_path,
issue_catalog_path=issue_catalog_path,
include_default_global=False,
)
statuses = {item["detector"]: item["status"] for item in results["results"]}
self.assertEqual(statuses["forbidden_margin_terms"], "fail")
self.assertEqual(statuses["margin_domain_leak_accounting_route"], "fail")
if __name__ == "__main__":
unittest.main()