227 lines
9.4 KiB
Python
227 lines
9.4 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import sys
|
||
import tempfile
|
||
import unittest
|
||
from pathlib import Path
|
||
|
||
|
||
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
||
|
||
import agent_detector_runner as runner
|
||
|
||
|
||
def write_json(path: Path, payload: object) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
|
||
def write_text(path: Path, text: str) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text(text, encoding="utf-8")
|
||
|
||
|
||
class AgentDetectorRunnerTests(unittest.TestCase):
|
||
def test_default_runner_fails_missing_effective_runtime(self) -> None:
|
||
with tempfile.TemporaryDirectory() as tmp:
|
||
root = Path(tmp)
|
||
artifact_dir = root / "run"
|
||
artifact_dir.mkdir()
|
||
registry_path = root / "detector_registry.json"
|
||
issue_catalog_path = root / "issue_catalog.json"
|
||
write_json(
|
||
registry_path,
|
||
{
|
||
"schema_version": "agent_detector_registry_v1",
|
||
"detectors": {
|
||
"missing_effective_runtime_json": {
|
||
"kind": "artifact_presence",
|
||
"automation_level": "automatic",
|
||
"description": "Manifest is required.",
|
||
"issue_codes": ["runtime_manifest_missing"],
|
||
"inputs": ["effective_runtime.json"],
|
||
"check": {"required_files": ["effective_runtime.json"]},
|
||
}
|
||
},
|
||
},
|
||
)
|
||
write_json(
|
||
issue_catalog_path,
|
||
{
|
||
"schema_version": "agent_issue_catalog_v1",
|
||
"issues": {
|
||
"runtime_manifest_missing": {
|
||
"detectors": ["missing_effective_runtime_json"],
|
||
}
|
||
},
|
||
},
|
||
)
|
||
|
||
results = runner.build_detector_results(
|
||
artifact_dir,
|
||
registry_path=registry_path,
|
||
issue_catalog_path=issue_catalog_path,
|
||
)
|
||
|
||
self.assertEqual(results["summary"]["status"], "fail")
|
||
self.assertEqual(results["results"][0]["detector"], "missing_effective_runtime_json")
|
||
self.assertEqual(results["results"][0]["status"], "fail")
|
||
|
||
def test_default_runner_passes_when_effective_runtime_exists(self) -> None:
|
||
with tempfile.TemporaryDirectory() as tmp:
|
||
root = Path(tmp)
|
||
artifact_dir = root / "run"
|
||
write_json(artifact_dir / "effective_runtime.json", {"runner": "test"})
|
||
registry_path = root / "detector_registry.json"
|
||
issue_catalog_path = root / "issue_catalog.json"
|
||
write_json(
|
||
registry_path,
|
||
{
|
||
"schema_version": "agent_detector_registry_v1",
|
||
"detectors": {
|
||
"missing_effective_runtime_json": {
|
||
"kind": "artifact_presence",
|
||
"automation_level": "automatic",
|
||
"description": "Manifest is required.",
|
||
"issue_codes": ["runtime_manifest_missing"],
|
||
"inputs": ["effective_runtime.json"],
|
||
"check": {"required_files": ["effective_runtime.json"]},
|
||
}
|
||
},
|
||
},
|
||
)
|
||
write_json(issue_catalog_path, {"schema_version": "agent_issue_catalog_v1", "issues": {}})
|
||
|
||
results = runner.build_detector_results(
|
||
artifact_dir,
|
||
registry_path=registry_path,
|
||
issue_catalog_path=issue_catalog_path,
|
||
)
|
||
|
||
self.assertEqual(results["summary"]["status"], "pass")
|
||
self.assertEqual(results["results"][0]["status"], "pass")
|
||
|
||
def test_candidate_forbidden_regex_is_limited_to_evidence_path(self) -> None:
|
||
with tempfile.TemporaryDirectory() as tmp:
|
||
root = Path(tmp)
|
||
artifact_dir = root / "run"
|
||
write_text(artifact_dir / "scenarios" / "ok" / "steps" / "s01" / "output.md", "Маржа не подтверждена.")
|
||
write_text(
|
||
artifact_dir / "scenarios" / "bad" / "steps" / "s02" / "output.md",
|
||
"Это амортизация объекта ОС, но этот шаг не в evidence scope.",
|
||
)
|
||
registry_path = root / "detector_registry.json"
|
||
issue_catalog_path = root / "issue_catalog.json"
|
||
candidates_path = root / "detector_candidates.json"
|
||
write_json(
|
||
registry_path,
|
||
{
|
||
"schema_version": "agent_detector_registry_v1",
|
||
"detectors": {
|
||
"forbidden_margin_terms": {
|
||
"kind": "answer_text_regex_forbidden",
|
||
"automation_level": "automatic",
|
||
"description": "No wrong-domain words.",
|
||
"issue_codes": ["margin_domain_leak_accounting_route"],
|
||
"inputs": ["output.md"],
|
||
"check": {"forbidden_patterns": ["(?i)(амортизац|объект\\s+ОС)"]},
|
||
}
|
||
},
|
||
},
|
||
)
|
||
write_json(
|
||
issue_catalog_path,
|
||
{
|
||
"schema_version": "agent_issue_catalog_v1",
|
||
"issues": {
|
||
"margin_domain_leak_accounting_route": {
|
||
"detectors": ["forbidden_margin_terms"],
|
||
}
|
||
},
|
||
},
|
||
)
|
||
write_json(
|
||
candidates_path,
|
||
{
|
||
"schema_version": "detector_candidates_v1",
|
||
"candidates": [
|
||
{
|
||
"issue_code": "margin_domain_leak_accounting_route",
|
||
"detector": "forbidden_margin_terms",
|
||
"evidence_paths": ["scenarios/ok/steps/s01/output.md"],
|
||
}
|
||
],
|
||
},
|
||
)
|
||
|
||
results = runner.build_detector_results(
|
||
artifact_dir,
|
||
detector_candidates_path=candidates_path,
|
||
registry_path=registry_path,
|
||
issue_catalog_path=issue_catalog_path,
|
||
include_default_global=False,
|
||
)
|
||
|
||
self.assertEqual(results["summary"]["status"], "pass")
|
||
self.assertEqual(results["results"][0]["status"], "pass")
|
||
|
||
def test_composite_detector_fails_after_child_detector_fails(self) -> None:
|
||
with tempfile.TemporaryDirectory() as tmp:
|
||
root = Path(tmp)
|
||
artifact_dir = root / "run"
|
||
write_text(artifact_dir / "steps" / "s01" / "output.md", "Маржа почему-то посчитана через амортизацию.")
|
||
registry_path = root / "detector_registry.json"
|
||
issue_catalog_path = root / "issue_catalog.json"
|
||
write_json(
|
||
registry_path,
|
||
{
|
||
"schema_version": "agent_detector_registry_v1",
|
||
"detectors": {
|
||
"forbidden_margin_terms": {
|
||
"kind": "answer_text_regex_forbidden",
|
||
"automation_level": "automatic",
|
||
"description": "No wrong-domain words.",
|
||
"issue_codes": ["margin_domain_leak_accounting_route"],
|
||
"inputs": ["output.md"],
|
||
"check": {"forbidden_patterns": ["(?i)амортизац"]},
|
||
},
|
||
"margin_domain_leak_accounting_route": {
|
||
"kind": "composite_detector",
|
||
"automation_level": "semi_automatic",
|
||
"description": "Composite leak detector.",
|
||
"issue_codes": ["margin_domain_leak_accounting_route"],
|
||
"inputs": ["output.md"],
|
||
"check": {"uses_detectors": ["forbidden_margin_terms"]},
|
||
},
|
||
},
|
||
},
|
||
)
|
||
write_json(
|
||
issue_catalog_path,
|
||
{
|
||
"schema_version": "agent_issue_catalog_v1",
|
||
"issues": {
|
||
"margin_domain_leak_accounting_route": {
|
||
"detectors": ["margin_domain_leak_accounting_route"],
|
||
}
|
||
},
|
||
},
|
||
)
|
||
|
||
results = runner.build_detector_results(
|
||
artifact_dir,
|
||
detector_names=["margin_domain_leak_accounting_route"],
|
||
registry_path=registry_path,
|
||
issue_catalog_path=issue_catalog_path,
|
||
include_default_global=False,
|
||
)
|
||
|
||
statuses = {item["detector"]: item["status"] for item in results["results"]}
|
||
self.assertEqual(statuses["forbidden_margin_terms"], "fail")
|
||
self.assertEqual(statuses["margin_domain_leak_accounting_route"], "fail")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
unittest.main()
|