NODEDC_1C/scripts/agent_detector_runner.py

640 lines
26 KiB
Python

from __future__ import annotations
import argparse
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
REPO_ROOT = Path(__file__).resolve().parent.parent
DETECTOR_REGISTRY_PATH = REPO_ROOT / "docs" / "orchestration" / "detector_registry.json"
ISSUE_CATALOG_PATH = REPO_ROOT / "docs" / "orchestration" / "issue_catalog.json"
DEFAULT_GLOBAL_DETECTORS = ["missing_effective_runtime_json"]
DETECTOR_RESULTS_SCHEMA_VERSION = "agent_detector_results_v1"
def read_json(path: Path) -> Any:
return json.loads(path.read_text(encoding="utf-8"))
def read_json_object(path: Path) -> dict[str, Any]:
try:
payload = read_json(path)
except (OSError, json.JSONDecodeError):
return {}
return payload if isinstance(payload, dict) else {}
def write_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
def utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def repo_relative(path: Path) -> str:
try:
return str(path.resolve().relative_to(REPO_ROOT))
except ValueError:
return str(path)
def normalize_string_list(raw_value: Any) -> list[str]:
if isinstance(raw_value, str):
value = raw_value.strip()
return [value] if value else []
if not isinstance(raw_value, list):
return []
result: list[str] = []
for item in raw_value:
value = str(item or "").strip()
if value:
result.append(value)
return result
def normalize_path_key(value: str | Path) -> str:
return str(value).replace("\\", "/").strip().lower()
def load_registry(path: Path = DETECTOR_REGISTRY_PATH) -> dict[str, Any]:
payload = read_json_object(path)
detectors = payload.get("detectors") if isinstance(payload.get("detectors"), dict) else {}
return detectors
def load_issue_catalog(path: Path = ISSUE_CATALOG_PATH) -> dict[str, Any]:
payload = read_json_object(path)
issues = payload.get("issues") if isinstance(payload.get("issues"), dict) else {}
return issues
def detector_names_for_issue_codes(issue_codes: list[str], issue_catalog: dict[str, Any]) -> list[str]:
names: list[str] = []
for issue_code in issue_codes:
issue = issue_catalog.get(issue_code)
if not isinstance(issue, dict):
continue
for detector_name in normalize_string_list(issue.get("detectors")):
if detector_name not in names:
names.append(detector_name)
return names
def load_detector_candidates(path: Path | None) -> tuple[list[str], dict[str, list[str]], dict[str, list[str]]]:
if path is None or not path.exists():
return [], {}, {}
payload = read_json_object(path)
detectors: list[str] = []
evidence_by_detector: dict[str, list[str]] = {}
issue_codes_by_detector: dict[str, list[str]] = {}
candidates = payload.get("candidates") if isinstance(payload.get("candidates"), list) else []
for item in candidates:
if not isinstance(item, dict):
continue
detector_name = str(item.get("detector") or "").strip()
if not detector_name:
continue
if detector_name not in detectors:
detectors.append(detector_name)
evidence_by_detector.setdefault(detector_name, [])
for evidence_path in normalize_string_list(item.get("evidence_paths")):
if evidence_path not in evidence_by_detector[detector_name]:
evidence_by_detector[detector_name].append(evidence_path)
issue_code = str(item.get("issue_code") or "").strip()
if issue_code:
issue_codes_by_detector.setdefault(detector_name, [])
if issue_code not in issue_codes_by_detector[detector_name]:
issue_codes_by_detector[detector_name].append(issue_code)
return detectors, evidence_by_detector, issue_codes_by_detector
def expand_detector_dependencies(detector_names: list[str], registry: dict[str, Any]) -> list[str]:
expanded: list[str] = []
def visit(detector_name: str) -> None:
if detector_name in expanded:
return
detector = registry.get(detector_name)
if isinstance(detector, dict):
check = detector.get("check") if isinstance(detector.get("check"), dict) else {}
for child_name in normalize_string_list(check.get("uses_detectors")):
visit(child_name)
if detector_name in expanded:
return
expanded.append(detector_name)
for name in detector_names:
visit(name)
return expanded
def select_detectors(
*,
registry: dict[str, Any],
issue_catalog: dict[str, Any],
detector_names: list[str] | None = None,
issue_codes: list[str] | None = None,
detector_candidates_path: Path | None = None,
include_default_global: bool = True,
) -> tuple[list[str], dict[str, list[str]], dict[str, list[str]]]:
selected: list[str] = []
candidate_names, evidence_by_detector, issue_codes_by_detector = load_detector_candidates(detector_candidates_path)
for source_name in detector_names or []:
if source_name not in selected:
selected.append(source_name)
for source_name in detector_names_for_issue_codes(issue_codes or [], issue_catalog):
if source_name not in selected:
selected.append(source_name)
for source_name in candidate_names:
if source_name not in selected:
selected.append(source_name)
if not selected and include_default_global:
selected.extend(DEFAULT_GLOBAL_DETECTORS)
selected = [name for name in expand_detector_dependencies(selected, registry) if name in registry]
return selected, evidence_by_detector, issue_codes_by_detector
def read_text_or_empty(path: Path) -> str:
try:
return path.read_text(encoding="utf-8")
except OSError:
return ""
def output_turn_path(output_path: Path) -> Path | None:
name = output_path.name
if name == "output.md":
candidate = output_path.with_name("turn.json")
return candidate if candidate.exists() else None
if name.endswith("_output.md"):
prefix = name[: -len("_output.md")]
candidate = output_path.with_name(f"{prefix}_turn.json")
return candidate if candidate.exists() else None
return None
def collect_output_artifacts(artifact_dir: Path) -> list[dict[str, Any]]:
outputs: list[dict[str, Any]] = []
seen: set[Path] = set()
for path in sorted(artifact_dir.rglob("*.md")):
if path in seen:
continue
if path.name == "output.md" or path.name in {"scenario_output.md"} or path.name.endswith("_output.md"):
seen.add(path)
turn_path = output_turn_path(path)
outputs.append(
{
"path": path,
"repo_path": repo_relative(path),
"artifact_path": str(path.relative_to(artifact_dir)),
"text": read_text_or_empty(path),
"turn_path": turn_path,
}
)
return outputs
def collect_turn_artifacts(artifact_dir: Path) -> list[dict[str, Any]]:
turns: list[dict[str, Any]] = []
for path in sorted(artifact_dir.rglob("*.json")):
if path.name == "turn.json" or path.name.endswith("_turn.json"):
turns.append(
{
"path": path,
"repo_path": repo_relative(path),
"artifact_path": str(path.relative_to(artifact_dir)),
"text": read_text_or_empty(path),
}
)
return turns
def path_matches_evidence(path: Path, artifact_dir: Path, evidence_paths: list[str]) -> bool:
if not evidence_paths:
return True
repo_key = normalize_path_key(repo_relative(path))
artifact_key = normalize_path_key(path.relative_to(artifact_dir))
name_key = normalize_path_key(path.name)
for evidence_path in evidence_paths:
evidence_key = normalize_path_key(evidence_path)
if evidence_key in {repo_key, artifact_key, name_key}:
return True
if evidence_key.endswith(artifact_key) or repo_key.endswith(evidence_key):
return True
return False
def filter_outputs(outputs: list[dict[str, Any]], artifact_dir: Path, evidence_paths: list[str]) -> list[dict[str, Any]]:
return [item for item in outputs if path_matches_evidence(item["path"], artifact_dir, evidence_paths)]
def filter_turns(turns: list[dict[str, Any]], artifact_dir: Path, evidence_paths: list[str]) -> list[dict[str, Any]]:
return [item for item in turns if path_matches_evidence(item["path"], artifact_dir, evidence_paths)]
def compile_patterns(patterns: list[str]) -> list[re.Pattern[str]]:
compiled: list[re.Pattern[str]] = []
for pattern in patterns:
try:
compiled.append(re.compile(pattern))
except re.error:
continue
return compiled
def build_result(
detector_name: str,
detector: dict[str, Any],
status: str,
message: str,
*,
evidence: list[dict[str, Any]] | None = None,
issue_codes: list[str] | None = None,
) -> dict[str, Any]:
return {
"detector": detector_name,
"kind": detector.get("kind"),
"automation_level": detector.get("automation_level"),
"status": status,
"issue_codes": issue_codes or normalize_string_list(detector.get("issue_codes")),
"message": message,
"evidence": evidence or [],
}
def evaluate_artifact_presence(
detector_name: str,
detector: dict[str, Any],
artifact_dir: Path,
) -> dict[str, Any]:
check = detector.get("check") if isinstance(detector.get("check"), dict) else {}
missing: list[str] = []
present: list[str] = []
for file_name in normalize_string_list(check.get("required_files")):
candidate = artifact_dir / file_name
if candidate.exists():
present.append(file_name)
else:
missing.append(file_name)
status = "fail" if missing else "pass"
message = f"missing required files: {', '.join(missing)}" if missing else "required files are present"
return build_result(
detector_name,
detector,
status,
message,
evidence=[{"present": present, "missing": missing}],
)
def evaluate_forbidden_regex(
detector_name: str,
detector: dict[str, Any],
outputs: list[dict[str, Any]],
*,
prefix_line_count: int | None = None,
) -> dict[str, Any]:
check = detector.get("check") if isinstance(detector.get("check"), dict) else {}
patterns = compile_patterns(normalize_string_list(check.get("forbidden_patterns")))
if not outputs:
return build_result(detector_name, detector, "skipped", "no output.md-style artifacts matched detector scope")
evidence: list[dict[str, Any]] = []
for output in outputs:
text = str(output.get("text") or "")
if prefix_line_count is not None:
lines = [line for line in text.splitlines() if line.strip()]
text = "\n".join(lines[:prefix_line_count])
for pattern in patterns:
match = pattern.search(text)
if match:
evidence.append(
{
"path": output["repo_path"],
"pattern": pattern.pattern,
"match": match.group(0)[:240],
}
)
status = "fail" if evidence else "pass"
message = "forbidden patterns found" if evidence else "no forbidden patterns found"
return build_result(detector_name, detector, status, message, evidence=evidence)
def evaluate_required_any(
detector_name: str,
detector: dict[str, Any],
outputs: list[dict[str, Any]],
) -> dict[str, Any]:
check = detector.get("check") if isinstance(detector.get("check"), dict) else {}
patterns = compile_patterns(normalize_string_list(check.get("required_patterns_any")))
if not outputs:
return build_result(detector_name, detector, "skipped", "no output.md-style artifacts matched detector scope")
missing: list[dict[str, Any]] = []
matched: list[dict[str, Any]] = []
for output in outputs:
text = str(output.get("text") or "")
match = next((pattern.search(text) for pattern in patterns if pattern.search(text)), None)
if match:
matched.append({"path": output["repo_path"], "match": match.group(0)[:240]})
else:
missing.append({"path": output["repo_path"]})
status = "fail" if missing else "pass"
message = "required answer signals missing" if missing else "required answer signals found"
return build_result(detector_name, detector, status, message, evidence=[*matched, *missing])
def evaluate_limited_next_action(
detector_name: str,
detector: dict[str, Any],
outputs: list[dict[str, Any]],
) -> dict[str, Any]:
check = detector.get("check") if isinstance(detector.get("check"), dict) else {}
limited_patterns = compile_patterns(normalize_string_list(check.get("limited_patterns")))
next_action_patterns = compile_patterns(normalize_string_list(check.get("required_next_action_patterns_any")))
if not outputs:
return build_result(detector_name, detector, "skipped", "no output.md-style artifacts matched detector scope")
failures: list[dict[str, Any]] = []
limited_outputs = 0
for output in outputs:
text = str(output.get("text") or "")
limited_match = next((pattern.search(text) for pattern in limited_patterns if pattern.search(text)), None)
if not limited_match:
continue
limited_outputs += 1
if not any(pattern.search(text) for pattern in next_action_patterns):
failures.append({"path": output["repo_path"], "limited_match": limited_match.group(0)[:240]})
status = "fail" if failures else "pass"
if failures:
message = "limited answer has no next action"
elif limited_outputs:
message = "limited answer includes a next action"
else:
message = "answer is not limited; next action requirement not triggered"
return build_result(detector_name, detector, status, message, evidence=failures)
def evaluate_trace_guard(
detector_name: str,
detector: dict[str, Any],
turns: list[dict[str, Any]],
) -> dict[str, Any]:
check = detector.get("check") if isinstance(detector.get("check"), dict) else {}
markers = [marker.lower() for marker in normalize_string_list(check.get("forbidden_trace_markers"))]
if not turns:
return build_result(detector_name, detector, "skipped", "no turn.json-style artifacts matched detector scope")
evidence: list[dict[str, Any]] = []
for turn in turns:
text = str(turn.get("text") or "").lower()
for marker in markers:
if marker in text:
evidence.append({"path": turn["repo_path"], "marker": marker})
status = "fail" if evidence else "pass"
message = "forbidden trace markers found" if evidence else "no forbidden trace markers found"
return build_result(detector_name, detector, status, message, evidence=evidence)
def evaluate_prompt_manifest(
detector_name: str,
detector: dict[str, Any],
artifact_dir: Path,
) -> dict[str, Any]:
manifest_path = artifact_dir / "effective_runtime.json"
if not manifest_path.exists():
return build_result(detector_name, detector, "skipped", "effective_runtime.json is missing")
manifest = read_json_object(manifest_path)
check = detector.get("check") if isinstance(detector.get("check"), dict) else {}
evidence: list[dict[str, Any]] = []
for field_name in normalize_string_list(check.get("manifest_fields")):
if not str(manifest.get(field_name) or "").strip():
evidence.append({"path": repo_relative(manifest_path), "missing_field": field_name})
prompt_source = str(manifest.get("prompt_source") or "").strip().lower()
forbidden_sources = [item.lower() for item in normalize_string_list(check.get("forbidden_prompt_sources"))]
if prompt_source and prompt_source in forbidden_sources:
evidence.append({"path": repo_relative(manifest_path), "forbidden_prompt_source": prompt_source})
status = "fail" if evidence else "pass"
message = "prompt manifest metadata failed" if evidence else "prompt manifest metadata passed"
return build_result(detector_name, detector, status, message, evidence=evidence)
def evaluate_stage_review_signal(
detector_name: str,
detector: dict[str, Any],
artifact_dir: Path,
) -> dict[str, Any]:
check = detector.get("check") if isinstance(detector.get("check"), dict) else {}
target_status = str(check.get("target_status") or "").strip()
if not target_status:
return build_result(detector_name, detector, "skipped", "target_status is not configured")
evidence: list[dict[str, Any]] = []
for path in sorted(artifact_dir.rglob("*.json")):
if path.name not in {"run_review.json", "repair_targets.json", "semantic_repair_targets.json"}:
continue
text = read_text_or_empty(path)
if target_status in text:
evidence.append({"path": repo_relative(path), "target_status": target_status})
status = "fail" if evidence else "pass"
message = "stage review signal found" if evidence else "stage review signal not found"
return build_result(detector_name, detector, status, message, evidence=evidence)
def child_status(detector_name: str, results_by_name: dict[str, dict[str, Any]]) -> str:
child = results_by_name.get(detector_name)
return str(child.get("status") if child else "skipped")
def evaluate_composite(
detector_name: str,
detector: dict[str, Any],
results_by_name: dict[str, dict[str, Any]],
) -> dict[str, Any]:
check = detector.get("check") if isinstance(detector.get("check"), dict) else {}
children = normalize_string_list(check.get("uses_detectors"))
statuses = {child: child_status(child, results_by_name) for child in children}
if any(status == "fail" for status in statuses.values()):
status = "fail"
elif statuses and all(status == "pass" for status in statuses.values()):
status = "pass"
elif any(status == "review" for status in statuses.values()):
status = "review"
else:
status = "skipped"
return build_result(
detector_name,
detector,
status,
"composite detector evaluated from child detectors",
evidence=[{"child_statuses": statuses}],
)
def evaluate_manual_review(detector_name: str, detector: dict[str, Any], message: str) -> dict[str, Any]:
return build_result(detector_name, detector, "review", message)
def evaluate_detector(
detector_name: str,
detector: dict[str, Any],
*,
artifact_dir: Path,
outputs: list[dict[str, Any]],
turns: list[dict[str, Any]],
evidence_paths: list[str],
results_by_name: dict[str, dict[str, Any]],
issue_codes: list[str] | None = None,
) -> dict[str, Any]:
kind = str(detector.get("kind") or "").strip()
check = detector.get("check") if isinstance(detector.get("check"), dict) else {}
scoped_outputs = filter_outputs(outputs, artifact_dir, evidence_paths)
scoped_turns = filter_turns(turns, artifact_dir, evidence_paths)
if kind == "artifact_presence":
return evaluate_artifact_presence(detector_name, detector, artifact_dir)
if kind == "answer_text_regex_forbidden":
return evaluate_forbidden_regex(detector_name, detector, scoped_outputs)
if kind == "answer_text_regex_forbidden_in_prefix":
prefix_line_count = int(check.get("prefix_line_count") or 3)
return evaluate_forbidden_regex(detector_name, detector, scoped_outputs, prefix_line_count=prefix_line_count)
if kind == "answer_text_required_any":
return evaluate_required_any(detector_name, detector, scoped_outputs)
if kind == "answer_text_required_when_limited":
return evaluate_limited_next_action(detector_name, detector, scoped_outputs)
if kind == "trace_value_guard":
if not scoped_turns and scoped_outputs:
scoped_turn_paths = [item.get("turn_path") for item in scoped_outputs if item.get("turn_path")]
scoped_turns = [
{"path": path, "repo_path": repo_relative(path), "artifact_path": str(path.relative_to(artifact_dir)), "text": read_text_or_empty(path)}
for path in scoped_turn_paths
if isinstance(path, Path)
]
return evaluate_trace_guard(detector_name, detector, scoped_turns)
if kind == "prompt_registry_healthcheck":
if "manifest_fields" in check or "forbidden_prompt_sources" in check:
return evaluate_prompt_manifest(detector_name, detector, artifact_dir)
return build_result(detector_name, detector, "skipped", "prompt healthcheck command is validated by prompt_registry_healthcheck.py")
if kind == "stage_review_signal":
return evaluate_stage_review_signal(detector_name, detector, artifact_dir)
if kind in {"composite_detector", "contract_field_detector", "limited_answer_next_action"} and normalize_string_list(
check.get("uses_detectors")
):
return evaluate_composite(detector_name, detector, results_by_name)
if kind == "answer_text_shape":
return evaluate_manual_review(detector_name, detector, "direct-answer shape requires business review")
return build_result(detector_name, detector, "skipped", f"detector kind is not executable yet: {kind}")
def summarize_results(results: list[dict[str, Any]]) -> dict[str, Any]:
counts = {"pass": 0, "fail": 0, "skipped": 0, "review": 0}
for result in results:
status = str(result.get("status") or "skipped")
counts[status] = counts.get(status, 0) + 1
if counts.get("fail", 0):
status = "fail"
elif counts.get("review", 0):
status = "review"
elif counts.get("pass", 0):
status = "pass"
else:
status = "skipped"
return {"status": status, "detector_count": len(results), **counts}
def build_detector_results(
artifact_dir: Path,
*,
detector_names: list[str] | None = None,
issue_codes: list[str] | None = None,
detector_candidates_path: Path | None = None,
registry_path: Path = DETECTOR_REGISTRY_PATH,
issue_catalog_path: Path = ISSUE_CATALOG_PATH,
include_default_global: bool = True,
) -> dict[str, Any]:
artifact_dir = artifact_dir.resolve()
registry = load_registry(registry_path)
issue_catalog = load_issue_catalog(issue_catalog_path)
selected, evidence_by_detector, issue_codes_by_detector = select_detectors(
registry=registry,
issue_catalog=issue_catalog,
detector_names=detector_names,
issue_codes=issue_codes,
detector_candidates_path=detector_candidates_path,
include_default_global=include_default_global,
)
outputs = collect_output_artifacts(artifact_dir)
turns = collect_turn_artifacts(artifact_dir)
results: list[dict[str, Any]] = []
results_by_name: dict[str, dict[str, Any]] = {}
for detector_name in selected:
detector = registry.get(detector_name)
if not isinstance(detector, dict):
continue
result = evaluate_detector(
detector_name,
detector,
artifact_dir=artifact_dir,
outputs=outputs,
turns=turns,
evidence_paths=evidence_by_detector.get(detector_name, []),
results_by_name=results_by_name,
issue_codes=issue_codes_by_detector.get(detector_name),
)
if issue_codes_by_detector.get(detector_name):
result["issue_codes"] = issue_codes_by_detector[detector_name]
results.append(result)
results_by_name[detector_name] = result
return {
"schema_version": DETECTOR_RESULTS_SCHEMA_VERSION,
"created_at": utc_now(),
"artifact_dir": repo_relative(artifact_dir),
"registry_path": repo_relative(registry_path),
"issue_catalog_path": repo_relative(issue_catalog_path),
"detector_candidates_path": repo_relative(detector_candidates_path) if detector_candidates_path else None,
"selected_detectors": selected,
"artifact_counts": {
"output_artifacts": len(outputs),
"turn_artifacts": len(turns),
},
"summary": summarize_results(results),
"results": results,
}
def main() -> int:
parser = argparse.ArgumentParser(description="Run Agent Reliability detector registry checks against replay artifacts.")
parser.add_argument("--artifact-dir", required=True, help="Replay artifact directory to inspect.")
parser.add_argument("--output", help="Path for detector_results.json. Defaults to <artifact-dir>/detector_results.json.")
parser.add_argument("--registry", default=str(DETECTOR_REGISTRY_PATH), help="Detector registry JSON path.")
parser.add_argument("--issue-catalog", default=str(ISSUE_CATALOG_PATH), help="Issue catalog JSON path.")
parser.add_argument("--detector", action="append", default=[], help="Detector name to run. Can be repeated.")
parser.add_argument("--issue-code", action="append", default=[], help="Issue code whose detectors should run. Can be repeated.")
parser.add_argument("--detector-candidates", help="detector_candidates.json path.")
parser.add_argument("--no-default-global", action="store_true", help="Do not run default global detectors when no filter is provided.")
parser.add_argument("--json", action="store_true", help="Print detector_results JSON to stdout.")
args = parser.parse_args()
artifact_dir = Path(args.artifact_dir).resolve()
output_path = Path(args.output).resolve() if args.output else artifact_dir / "detector_results.json"
detector_candidates_path = Path(args.detector_candidates).resolve() if args.detector_candidates else None
results = build_detector_results(
artifact_dir,
detector_names=args.detector,
issue_codes=args.issue_code,
detector_candidates_path=detector_candidates_path,
registry_path=Path(args.registry).resolve(),
issue_catalog_path=Path(args.issue_catalog).resolve(),
include_default_global=not args.no_default_global,
)
write_json(output_path, results)
if args.json:
print(json.dumps(results, ensure_ascii=False, indent=2))
else:
summary = results["summary"]
print(f"status: {summary['status']}")
print(f"detector_results: {output_path}")
return 1 if results["summary"]["status"] == "fail" else 0
if __name__ == "__main__":
raise SystemExit(main())