diff --git a/docs/orchestration/schemas/agent_detector_results.schema.json b/docs/orchestration/schemas/agent_detector_results.schema.json new file mode 100644 index 0000000..cdbc63f --- /dev/null +++ b/docs/orchestration/schemas/agent_detector_results.schema.json @@ -0,0 +1,96 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Agent Detector Results", + "type": "object", + "additionalProperties": true, + "required": [ + "schema_version", + "created_at", + "artifact_dir", + "registry_path", + "selected_detectors", + "artifact_counts", + "summary", + "results" + ], + "properties": { + "schema_version": { + "const": "agent_detector_results_v1" + }, + "created_at": { + "type": "string" + }, + "artifact_dir": { + "type": "string" + }, + "registry_path": { + "type": "string" + }, + "issue_catalog_path": { + "type": "string" + }, + "detector_candidates_path": { + "type": ["string", "null"] + }, + "selected_detectors": { + "type": "array", + "items": { + "type": "string" + } + }, + "artifact_counts": { + "type": "object", + "additionalProperties": true + }, + "summary": { + "type": "object", + "required": ["status", "detector_count"], + "additionalProperties": true, + "properties": { + "status": { + "type": "string", + "enum": ["pass", "fail", "skipped", "review"] + }, + "detector_count": { + "type": "integer", + "minimum": 0 + } + } + }, + "results": { + "type": "array", + "items": { + "type": "object", + "additionalProperties": true, + "required": ["detector", "kind", "automation_level", "status", "issue_codes", "message", "evidence"], + "properties": { + "detector": { + "type": "string" + }, + "kind": { + "type": "string" + }, + "automation_level": { + "type": "string" + }, + "status": { + "type": "string", + "enum": ["pass", "fail", "skipped", "review"] + }, + "issue_codes": { + "type": "array", + "items": { + "type": "string" + } + }, + "message": { + "type": "string" + }, + "evidence": { + "type": "array" + } + } + } + } + } +} diff --git a/scripts/agent_detector_runner.py b/scripts/agent_detector_runner.py new file mode 100644 index 0000000..025b099 --- /dev/null +++ b/scripts/agent_detector_runner.py @@ -0,0 +1,639 @@ +from __future__ import annotations + +import argparse +import json +import re +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +REPO_ROOT = Path(__file__).resolve().parent.parent +DETECTOR_REGISTRY_PATH = REPO_ROOT / "docs" / "orchestration" / "detector_registry.json" +ISSUE_CATALOG_PATH = REPO_ROOT / "docs" / "orchestration" / "issue_catalog.json" +DEFAULT_GLOBAL_DETECTORS = ["missing_effective_runtime_json"] +DETECTOR_RESULTS_SCHEMA_VERSION = "agent_detector_results_v1" + + +def read_json(path: Path) -> Any: + return json.loads(path.read_text(encoding="utf-8")) + + +def read_json_object(path: Path) -> dict[str, Any]: + try: + payload = read_json(path) + except (OSError, json.JSONDecodeError): + return {} + return payload if isinstance(payload, dict) else {} + + +def write_json(path: Path, payload: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + + +def utc_now() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat() + + +def repo_relative(path: Path) -> str: + try: + return str(path.resolve().relative_to(REPO_ROOT)) + except ValueError: + return str(path) + + +def normalize_string_list(raw_value: Any) -> list[str]: + if isinstance(raw_value, str): + value = raw_value.strip() + return [value] if value else [] + if not isinstance(raw_value, list): + return [] + result: list[str] = [] + for item in raw_value: + value = str(item or "").strip() + if value: + result.append(value) + return result + + +def normalize_path_key(value: str | Path) -> str: + return str(value).replace("\\", "/").strip().lower() + + +def load_registry(path: Path = DETECTOR_REGISTRY_PATH) -> dict[str, Any]: + payload = read_json_object(path) + detectors = payload.get("detectors") if isinstance(payload.get("detectors"), dict) else {} + return detectors + + +def load_issue_catalog(path: Path = ISSUE_CATALOG_PATH) -> dict[str, Any]: + payload = read_json_object(path) + issues = payload.get("issues") if isinstance(payload.get("issues"), dict) else {} + return issues + + +def detector_names_for_issue_codes(issue_codes: list[str], issue_catalog: dict[str, Any]) -> list[str]: + names: list[str] = [] + for issue_code in issue_codes: + issue = issue_catalog.get(issue_code) + if not isinstance(issue, dict): + continue + for detector_name in normalize_string_list(issue.get("detectors")): + if detector_name not in names: + names.append(detector_name) + return names + + +def load_detector_candidates(path: Path | None) -> tuple[list[str], dict[str, list[str]], dict[str, list[str]]]: + if path is None or not path.exists(): + return [], {}, {} + payload = read_json_object(path) + detectors: list[str] = [] + evidence_by_detector: dict[str, list[str]] = {} + issue_codes_by_detector: dict[str, list[str]] = {} + candidates = payload.get("candidates") if isinstance(payload.get("candidates"), list) else [] + for item in candidates: + if not isinstance(item, dict): + continue + detector_name = str(item.get("detector") or "").strip() + if not detector_name: + continue + if detector_name not in detectors: + detectors.append(detector_name) + evidence_by_detector.setdefault(detector_name, []) + for evidence_path in normalize_string_list(item.get("evidence_paths")): + if evidence_path not in evidence_by_detector[detector_name]: + evidence_by_detector[detector_name].append(evidence_path) + issue_code = str(item.get("issue_code") or "").strip() + if issue_code: + issue_codes_by_detector.setdefault(detector_name, []) + if issue_code not in issue_codes_by_detector[detector_name]: + issue_codes_by_detector[detector_name].append(issue_code) + return detectors, evidence_by_detector, issue_codes_by_detector + + +def expand_detector_dependencies(detector_names: list[str], registry: dict[str, Any]) -> list[str]: + expanded: list[str] = [] + + def visit(detector_name: str) -> None: + if detector_name in expanded: + return + detector = registry.get(detector_name) + if isinstance(detector, dict): + check = detector.get("check") if isinstance(detector.get("check"), dict) else {} + for child_name in normalize_string_list(check.get("uses_detectors")): + visit(child_name) + if detector_name in expanded: + return + expanded.append(detector_name) + + for name in detector_names: + visit(name) + return expanded + + +def select_detectors( + *, + registry: dict[str, Any], + issue_catalog: dict[str, Any], + detector_names: list[str] | None = None, + issue_codes: list[str] | None = None, + detector_candidates_path: Path | None = None, + include_default_global: bool = True, +) -> tuple[list[str], dict[str, list[str]], dict[str, list[str]]]: + selected: list[str] = [] + candidate_names, evidence_by_detector, issue_codes_by_detector = load_detector_candidates(detector_candidates_path) + for source_name in detector_names or []: + if source_name not in selected: + selected.append(source_name) + for source_name in detector_names_for_issue_codes(issue_codes or [], issue_catalog): + if source_name not in selected: + selected.append(source_name) + for source_name in candidate_names: + if source_name not in selected: + selected.append(source_name) + if not selected and include_default_global: + selected.extend(DEFAULT_GLOBAL_DETECTORS) + selected = [name for name in expand_detector_dependencies(selected, registry) if name in registry] + return selected, evidence_by_detector, issue_codes_by_detector + + +def read_text_or_empty(path: Path) -> str: + try: + return path.read_text(encoding="utf-8") + except OSError: + return "" + + +def output_turn_path(output_path: Path) -> Path | None: + name = output_path.name + if name == "output.md": + candidate = output_path.with_name("turn.json") + return candidate if candidate.exists() else None + if name.endswith("_output.md"): + prefix = name[: -len("_output.md")] + candidate = output_path.with_name(f"{prefix}_turn.json") + return candidate if candidate.exists() else None + return None + + +def collect_output_artifacts(artifact_dir: Path) -> list[dict[str, Any]]: + outputs: list[dict[str, Any]] = [] + seen: set[Path] = set() + for path in sorted(artifact_dir.rglob("*.md")): + if path in seen: + continue + if path.name == "output.md" or path.name in {"scenario_output.md"} or path.name.endswith("_output.md"): + seen.add(path) + turn_path = output_turn_path(path) + outputs.append( + { + "path": path, + "repo_path": repo_relative(path), + "artifact_path": str(path.relative_to(artifact_dir)), + "text": read_text_or_empty(path), + "turn_path": turn_path, + } + ) + return outputs + + +def collect_turn_artifacts(artifact_dir: Path) -> list[dict[str, Any]]: + turns: list[dict[str, Any]] = [] + for path in sorted(artifact_dir.rglob("*.json")): + if path.name == "turn.json" or path.name.endswith("_turn.json"): + turns.append( + { + "path": path, + "repo_path": repo_relative(path), + "artifact_path": str(path.relative_to(artifact_dir)), + "text": read_text_or_empty(path), + } + ) + return turns + + +def path_matches_evidence(path: Path, artifact_dir: Path, evidence_paths: list[str]) -> bool: + if not evidence_paths: + return True + repo_key = normalize_path_key(repo_relative(path)) + artifact_key = normalize_path_key(path.relative_to(artifact_dir)) + name_key = normalize_path_key(path.name) + for evidence_path in evidence_paths: + evidence_key = normalize_path_key(evidence_path) + if evidence_key in {repo_key, artifact_key, name_key}: + return True + if evidence_key.endswith(artifact_key) or repo_key.endswith(evidence_key): + return True + return False + + +def filter_outputs(outputs: list[dict[str, Any]], artifact_dir: Path, evidence_paths: list[str]) -> list[dict[str, Any]]: + return [item for item in outputs if path_matches_evidence(item["path"], artifact_dir, evidence_paths)] + + +def filter_turns(turns: list[dict[str, Any]], artifact_dir: Path, evidence_paths: list[str]) -> list[dict[str, Any]]: + return [item for item in turns if path_matches_evidence(item["path"], artifact_dir, evidence_paths)] + + +def compile_patterns(patterns: list[str]) -> list[re.Pattern[str]]: + compiled: list[re.Pattern[str]] = [] + for pattern in patterns: + try: + compiled.append(re.compile(pattern)) + except re.error: + continue + return compiled + + +def build_result( + detector_name: str, + detector: dict[str, Any], + status: str, + message: str, + *, + evidence: list[dict[str, Any]] | None = None, + issue_codes: list[str] | None = None, +) -> dict[str, Any]: + return { + "detector": detector_name, + "kind": detector.get("kind"), + "automation_level": detector.get("automation_level"), + "status": status, + "issue_codes": issue_codes or normalize_string_list(detector.get("issue_codes")), + "message": message, + "evidence": evidence or [], + } + + +def evaluate_artifact_presence( + detector_name: str, + detector: dict[str, Any], + artifact_dir: Path, +) -> dict[str, Any]: + check = detector.get("check") if isinstance(detector.get("check"), dict) else {} + missing: list[str] = [] + present: list[str] = [] + for file_name in normalize_string_list(check.get("required_files")): + candidate = artifact_dir / file_name + if candidate.exists(): + present.append(file_name) + else: + missing.append(file_name) + status = "fail" if missing else "pass" + message = f"missing required files: {', '.join(missing)}" if missing else "required files are present" + return build_result( + detector_name, + detector, + status, + message, + evidence=[{"present": present, "missing": missing}], + ) + + +def evaluate_forbidden_regex( + detector_name: str, + detector: dict[str, Any], + outputs: list[dict[str, Any]], + *, + prefix_line_count: int | None = None, +) -> dict[str, Any]: + check = detector.get("check") if isinstance(detector.get("check"), dict) else {} + patterns = compile_patterns(normalize_string_list(check.get("forbidden_patterns"))) + if not outputs: + return build_result(detector_name, detector, "skipped", "no output.md-style artifacts matched detector scope") + evidence: list[dict[str, Any]] = [] + for output in outputs: + text = str(output.get("text") or "") + if prefix_line_count is not None: + lines = [line for line in text.splitlines() if line.strip()] + text = "\n".join(lines[:prefix_line_count]) + for pattern in patterns: + match = pattern.search(text) + if match: + evidence.append( + { + "path": output["repo_path"], + "pattern": pattern.pattern, + "match": match.group(0)[:240], + } + ) + status = "fail" if evidence else "pass" + message = "forbidden patterns found" if evidence else "no forbidden patterns found" + return build_result(detector_name, detector, status, message, evidence=evidence) + + +def evaluate_required_any( + detector_name: str, + detector: dict[str, Any], + outputs: list[dict[str, Any]], +) -> dict[str, Any]: + check = detector.get("check") if isinstance(detector.get("check"), dict) else {} + patterns = compile_patterns(normalize_string_list(check.get("required_patterns_any"))) + if not outputs: + return build_result(detector_name, detector, "skipped", "no output.md-style artifacts matched detector scope") + missing: list[dict[str, Any]] = [] + matched: list[dict[str, Any]] = [] + for output in outputs: + text = str(output.get("text") or "") + match = next((pattern.search(text) for pattern in patterns if pattern.search(text)), None) + if match: + matched.append({"path": output["repo_path"], "match": match.group(0)[:240]}) + else: + missing.append({"path": output["repo_path"]}) + status = "fail" if missing else "pass" + message = "required answer signals missing" if missing else "required answer signals found" + return build_result(detector_name, detector, status, message, evidence=[*matched, *missing]) + + +def evaluate_limited_next_action( + detector_name: str, + detector: dict[str, Any], + outputs: list[dict[str, Any]], +) -> dict[str, Any]: + check = detector.get("check") if isinstance(detector.get("check"), dict) else {} + limited_patterns = compile_patterns(normalize_string_list(check.get("limited_patterns"))) + next_action_patterns = compile_patterns(normalize_string_list(check.get("required_next_action_patterns_any"))) + if not outputs: + return build_result(detector_name, detector, "skipped", "no output.md-style artifacts matched detector scope") + failures: list[dict[str, Any]] = [] + limited_outputs = 0 + for output in outputs: + text = str(output.get("text") or "") + limited_match = next((pattern.search(text) for pattern in limited_patterns if pattern.search(text)), None) + if not limited_match: + continue + limited_outputs += 1 + if not any(pattern.search(text) for pattern in next_action_patterns): + failures.append({"path": output["repo_path"], "limited_match": limited_match.group(0)[:240]}) + status = "fail" if failures else "pass" + if failures: + message = "limited answer has no next action" + elif limited_outputs: + message = "limited answer includes a next action" + else: + message = "answer is not limited; next action requirement not triggered" + return build_result(detector_name, detector, status, message, evidence=failures) + + +def evaluate_trace_guard( + detector_name: str, + detector: dict[str, Any], + turns: list[dict[str, Any]], +) -> dict[str, Any]: + check = detector.get("check") if isinstance(detector.get("check"), dict) else {} + markers = [marker.lower() for marker in normalize_string_list(check.get("forbidden_trace_markers"))] + if not turns: + return build_result(detector_name, detector, "skipped", "no turn.json-style artifacts matched detector scope") + evidence: list[dict[str, Any]] = [] + for turn in turns: + text = str(turn.get("text") or "").lower() + for marker in markers: + if marker in text: + evidence.append({"path": turn["repo_path"], "marker": marker}) + status = "fail" if evidence else "pass" + message = "forbidden trace markers found" if evidence else "no forbidden trace markers found" + return build_result(detector_name, detector, status, message, evidence=evidence) + + +def evaluate_prompt_manifest( + detector_name: str, + detector: dict[str, Any], + artifact_dir: Path, +) -> dict[str, Any]: + manifest_path = artifact_dir / "effective_runtime.json" + if not manifest_path.exists(): + return build_result(detector_name, detector, "skipped", "effective_runtime.json is missing") + manifest = read_json_object(manifest_path) + check = detector.get("check") if isinstance(detector.get("check"), dict) else {} + evidence: list[dict[str, Any]] = [] + for field_name in normalize_string_list(check.get("manifest_fields")): + if not str(manifest.get(field_name) or "").strip(): + evidence.append({"path": repo_relative(manifest_path), "missing_field": field_name}) + prompt_source = str(manifest.get("prompt_source") or "").strip().lower() + forbidden_sources = [item.lower() for item in normalize_string_list(check.get("forbidden_prompt_sources"))] + if prompt_source and prompt_source in forbidden_sources: + evidence.append({"path": repo_relative(manifest_path), "forbidden_prompt_source": prompt_source}) + status = "fail" if evidence else "pass" + message = "prompt manifest metadata failed" if evidence else "prompt manifest metadata passed" + return build_result(detector_name, detector, status, message, evidence=evidence) + + +def evaluate_stage_review_signal( + detector_name: str, + detector: dict[str, Any], + artifact_dir: Path, +) -> dict[str, Any]: + check = detector.get("check") if isinstance(detector.get("check"), dict) else {} + target_status = str(check.get("target_status") or "").strip() + if not target_status: + return build_result(detector_name, detector, "skipped", "target_status is not configured") + evidence: list[dict[str, Any]] = [] + for path in sorted(artifact_dir.rglob("*.json")): + if path.name not in {"run_review.json", "repair_targets.json", "semantic_repair_targets.json"}: + continue + text = read_text_or_empty(path) + if target_status in text: + evidence.append({"path": repo_relative(path), "target_status": target_status}) + status = "fail" if evidence else "pass" + message = "stage review signal found" if evidence else "stage review signal not found" + return build_result(detector_name, detector, status, message, evidence=evidence) + + +def child_status(detector_name: str, results_by_name: dict[str, dict[str, Any]]) -> str: + child = results_by_name.get(detector_name) + return str(child.get("status") if child else "skipped") + + +def evaluate_composite( + detector_name: str, + detector: dict[str, Any], + results_by_name: dict[str, dict[str, Any]], +) -> dict[str, Any]: + check = detector.get("check") if isinstance(detector.get("check"), dict) else {} + children = normalize_string_list(check.get("uses_detectors")) + statuses = {child: child_status(child, results_by_name) for child in children} + if any(status == "fail" for status in statuses.values()): + status = "fail" + elif statuses and all(status == "pass" for status in statuses.values()): + status = "pass" + elif any(status == "review" for status in statuses.values()): + status = "review" + else: + status = "skipped" + return build_result( + detector_name, + detector, + status, + "composite detector evaluated from child detectors", + evidence=[{"child_statuses": statuses}], + ) + + +def evaluate_manual_review(detector_name: str, detector: dict[str, Any], message: str) -> dict[str, Any]: + return build_result(detector_name, detector, "review", message) + + +def evaluate_detector( + detector_name: str, + detector: dict[str, Any], + *, + artifact_dir: Path, + outputs: list[dict[str, Any]], + turns: list[dict[str, Any]], + evidence_paths: list[str], + results_by_name: dict[str, dict[str, Any]], + issue_codes: list[str] | None = None, +) -> dict[str, Any]: + kind = str(detector.get("kind") or "").strip() + check = detector.get("check") if isinstance(detector.get("check"), dict) else {} + scoped_outputs = filter_outputs(outputs, artifact_dir, evidence_paths) + scoped_turns = filter_turns(turns, artifact_dir, evidence_paths) + if kind == "artifact_presence": + return evaluate_artifact_presence(detector_name, detector, artifact_dir) + if kind == "answer_text_regex_forbidden": + return evaluate_forbidden_regex(detector_name, detector, scoped_outputs) + if kind == "answer_text_regex_forbidden_in_prefix": + prefix_line_count = int(check.get("prefix_line_count") or 3) + return evaluate_forbidden_regex(detector_name, detector, scoped_outputs, prefix_line_count=prefix_line_count) + if kind == "answer_text_required_any": + return evaluate_required_any(detector_name, detector, scoped_outputs) + if kind == "answer_text_required_when_limited": + return evaluate_limited_next_action(detector_name, detector, scoped_outputs) + if kind == "trace_value_guard": + if not scoped_turns and scoped_outputs: + scoped_turn_paths = [item.get("turn_path") for item in scoped_outputs if item.get("turn_path")] + scoped_turns = [ + {"path": path, "repo_path": repo_relative(path), "artifact_path": str(path.relative_to(artifact_dir)), "text": read_text_or_empty(path)} + for path in scoped_turn_paths + if isinstance(path, Path) + ] + return evaluate_trace_guard(detector_name, detector, scoped_turns) + if kind == "prompt_registry_healthcheck": + if "manifest_fields" in check or "forbidden_prompt_sources" in check: + return evaluate_prompt_manifest(detector_name, detector, artifact_dir) + return build_result(detector_name, detector, "skipped", "prompt healthcheck command is validated by prompt_registry_healthcheck.py") + if kind == "stage_review_signal": + return evaluate_stage_review_signal(detector_name, detector, artifact_dir) + if kind in {"composite_detector", "contract_field_detector", "limited_answer_next_action"} and normalize_string_list( + check.get("uses_detectors") + ): + return evaluate_composite(detector_name, detector, results_by_name) + if kind == "answer_text_shape": + return evaluate_manual_review(detector_name, detector, "direct-answer shape requires business review") + return build_result(detector_name, detector, "skipped", f"detector kind is not executable yet: {kind}") + + +def summarize_results(results: list[dict[str, Any]]) -> dict[str, Any]: + counts = {"pass": 0, "fail": 0, "skipped": 0, "review": 0} + for result in results: + status = str(result.get("status") or "skipped") + counts[status] = counts.get(status, 0) + 1 + if counts.get("fail", 0): + status = "fail" + elif counts.get("review", 0): + status = "review" + elif counts.get("pass", 0): + status = "pass" + else: + status = "skipped" + return {"status": status, "detector_count": len(results), **counts} + + +def build_detector_results( + artifact_dir: Path, + *, + detector_names: list[str] | None = None, + issue_codes: list[str] | None = None, + detector_candidates_path: Path | None = None, + registry_path: Path = DETECTOR_REGISTRY_PATH, + issue_catalog_path: Path = ISSUE_CATALOG_PATH, + include_default_global: bool = True, +) -> dict[str, Any]: + artifact_dir = artifact_dir.resolve() + registry = load_registry(registry_path) + issue_catalog = load_issue_catalog(issue_catalog_path) + selected, evidence_by_detector, issue_codes_by_detector = select_detectors( + registry=registry, + issue_catalog=issue_catalog, + detector_names=detector_names, + issue_codes=issue_codes, + detector_candidates_path=detector_candidates_path, + include_default_global=include_default_global, + ) + outputs = collect_output_artifacts(artifact_dir) + turns = collect_turn_artifacts(artifact_dir) + results: list[dict[str, Any]] = [] + results_by_name: dict[str, dict[str, Any]] = {} + for detector_name in selected: + detector = registry.get(detector_name) + if not isinstance(detector, dict): + continue + result = evaluate_detector( + detector_name, + detector, + artifact_dir=artifact_dir, + outputs=outputs, + turns=turns, + evidence_paths=evidence_by_detector.get(detector_name, []), + results_by_name=results_by_name, + issue_codes=issue_codes_by_detector.get(detector_name), + ) + if issue_codes_by_detector.get(detector_name): + result["issue_codes"] = issue_codes_by_detector[detector_name] + results.append(result) + results_by_name[detector_name] = result + return { + "schema_version": DETECTOR_RESULTS_SCHEMA_VERSION, + "created_at": utc_now(), + "artifact_dir": repo_relative(artifact_dir), + "registry_path": repo_relative(registry_path), + "issue_catalog_path": repo_relative(issue_catalog_path), + "detector_candidates_path": repo_relative(detector_candidates_path) if detector_candidates_path else None, + "selected_detectors": selected, + "artifact_counts": { + "output_artifacts": len(outputs), + "turn_artifacts": len(turns), + }, + "summary": summarize_results(results), + "results": results, + } + + +def main() -> int: + parser = argparse.ArgumentParser(description="Run Agent Reliability detector registry checks against replay artifacts.") + parser.add_argument("--artifact-dir", required=True, help="Replay artifact directory to inspect.") + parser.add_argument("--output", help="Path for detector_results.json. Defaults to /detector_results.json.") + parser.add_argument("--registry", default=str(DETECTOR_REGISTRY_PATH), help="Detector registry JSON path.") + parser.add_argument("--issue-catalog", default=str(ISSUE_CATALOG_PATH), help="Issue catalog JSON path.") + parser.add_argument("--detector", action="append", default=[], help="Detector name to run. Can be repeated.") + parser.add_argument("--issue-code", action="append", default=[], help="Issue code whose detectors should run. Can be repeated.") + parser.add_argument("--detector-candidates", help="detector_candidates.json path.") + parser.add_argument("--no-default-global", action="store_true", help="Do not run default global detectors when no filter is provided.") + parser.add_argument("--json", action="store_true", help="Print detector_results JSON to stdout.") + args = parser.parse_args() + artifact_dir = Path(args.artifact_dir).resolve() + output_path = Path(args.output).resolve() if args.output else artifact_dir / "detector_results.json" + detector_candidates_path = Path(args.detector_candidates).resolve() if args.detector_candidates else None + results = build_detector_results( + artifact_dir, + detector_names=args.detector, + issue_codes=args.issue_code, + detector_candidates_path=detector_candidates_path, + registry_path=Path(args.registry).resolve(), + issue_catalog_path=Path(args.issue_catalog).resolve(), + include_default_global=not args.no_default_global, + ) + write_json(output_path, results) + if args.json: + print(json.dumps(results, ensure_ascii=False, indent=2)) + else: + summary = results["summary"] + print(f"status: {summary['status']}") + print(f"detector_results: {output_path}") + return 1 if results["summary"]["status"] == "fail" else 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/agent_reliability_contract_healthcheck.py b/scripts/agent_reliability_contract_healthcheck.py index d432622..024f98f 100644 --- a/scripts/agent_reliability_contract_healthcheck.py +++ b/scripts/agent_reliability_contract_healthcheck.py @@ -16,6 +16,7 @@ CONTRACTS_DIR = REPO_ROOT / "docs" / "orchestration" / "contracts" EXPECTED_SCHEMA_FILES = { "agent_issue_catalog.schema.json": "Agent Issue Catalog", "agent_detector_registry.schema.json": "Agent Detector Registry", + "agent_detector_results.schema.json": "Agent Detector Results", "auto_coder_gate.schema.json": "Auto-Coder Gate", "business_audit_contract.schema.json": "Business Audit Contract", "domain_loop_lead_coder_handoff.schema.json": "Domain Loop Lead Coder Handoff", diff --git a/scripts/domain_case_loop.py b/scripts/domain_case_loop.py index 0f6f400..1bfe26e 100644 --- a/scripts/domain_case_loop.py +++ b/scripts/domain_case_loop.py @@ -15,6 +15,7 @@ from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen import agent_runtime_manifest as runtime_manifest +import agent_detector_runner REPO_ROOT = Path(__file__).resolve().parent.parent DEFAULT_ARTIFACTS_ROOT = REPO_ROOT / "artifacts" / "domain_runs" @@ -5255,6 +5256,7 @@ def build_business_audit_contract( issue_catalog_snapshot_path: Path | None = None, rerun_matrix_path: Path | None = None, detector_candidates_path: Path | None = None, + detector_results_path: Path | None = None, ) -> dict[str, Any]: catalog = load_issue_catalog() targets = repair_targets.get("targets") if isinstance(repair_targets.get("targets"), list) else [] @@ -5312,6 +5314,8 @@ def build_business_audit_contract( artifact_refs["rerun_matrix_json"] = repo_relative(rerun_matrix_path) if detector_candidates_path is not None: artifact_refs["detector_candidates_json"] = repo_relative(detector_candidates_path) + if detector_results_path is not None: + artifact_refs["detector_results_json"] = repo_relative(detector_results_path) return result @@ -5405,6 +5409,7 @@ def build_lead_coder_handoff( issue_catalog_snapshot_path: Path | None = None, rerun_matrix_path: Path | None = None, detector_candidates_path: Path | None = None, + detector_results_path: Path | None = None, analyst_verdict: dict[str, Any], repair_targets: dict[str, Any], target_score: int, @@ -5456,6 +5461,8 @@ def build_lead_coder_handoff( artifact_refs["rerun_matrix"] = repo_relative(rerun_matrix_path) if detector_candidates_path is not None: artifact_refs["detector_candidates"] = repo_relative(detector_candidates_path) + if detector_results_path is not None: + artifact_refs["detector_results"] = repo_relative(detector_results_path) return { "schema_version": "domain_loop_lead_coder_handoff_v1", @@ -5528,6 +5535,7 @@ def build_lead_coder_handoff_markdown(handoff: dict[str, Any]) -> str: f"- issue_catalog_snapshot: `{artifact_refs.get('issue_catalog_snapshot') or 'n/a'}`", f"- rerun_matrix: `{artifact_refs.get('rerun_matrix') or 'n/a'}`", f"- detector_candidates: `{artifact_refs.get('detector_candidates') or 'n/a'}`", + f"- detector_results: `{artifact_refs.get('detector_results') or 'n/a'}`", f"- auto_coder_gate: `{artifact_refs.get('auto_coder_gate') or 'n/a'}`", f"- pack_dir: `{artifact_refs.get('pack_dir')}`", "", @@ -5812,6 +5820,8 @@ def build_loop_summary(loop_state: dict[str, Any]) -> str: f" repair_targets: `{item.get('repair_targets_path') or 'n/a'}`", f" rerun_matrix: `{item.get('rerun_matrix_path') or 'n/a'}`", f" detector_candidates: `{item.get('detector_candidates_path') or 'n/a'}`", + f" detector_results: `{item.get('detector_results_path') or 'n/a'}`", + f" detector_results_status: `{item.get('detector_results_status') or 'n/a'}`", f" auto_coder_gate: `{item.get('auto_coder_gate_path') or 'n/a'}`", f" lead_coder_handoff: `{item.get('lead_coder_handoff_path') or 'n/a'}`", f" repair_target_count: `{item.get('repair_target_count')}`", @@ -5834,6 +5844,7 @@ def build_loop_final_status(loop_state: dict[str, Any]) -> str: - last_analyst_decision: `{loop_state.get('last_analyst_decision') or 'n/a'}` - latest_business_audit_json: `{loop_state.get('latest_business_audit_json_path') or 'n/a'}` - latest_rerun_matrix: `{loop_state.get('latest_rerun_matrix_path') or 'n/a'}` + - latest_detector_results: `{loop_state.get('latest_detector_results_path') or 'n/a'}` - latest_lead_coder_handoff: `{loop_state.get('latest_lead_coder_handoff_path') or 'n/a'}` - stop_reason: {loop_state.get('stop_reason') or 'n/a'} """ @@ -5959,6 +5970,7 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int: issue_catalog_snapshot_path = iteration_dir / "issue_catalog_snapshot.json" rerun_matrix_path = iteration_dir / "rerun_matrix.json" detector_candidates_path = iteration_dir / "detector_candidates.json" + detector_results_path = iteration_dir / "detector_results.json" write_text( business_audit_path, build_business_audit_markdown( @@ -5991,11 +6003,18 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int: issue_catalog_snapshot_path=issue_catalog_snapshot_path, rerun_matrix_path=rerun_matrix_path, detector_candidates_path=detector_candidates_path, + detector_results_path=detector_results_path, ) write_json(business_audit_json_path, business_audit_contract) write_json(issue_catalog_snapshot_path, issue_catalog_snapshot) write_json(rerun_matrix_path, rerun_matrix_contract) write_json(detector_candidates_path, detector_candidates) + detector_results = agent_detector_runner.build_detector_results( + pack_dir, + detector_candidates_path=detector_candidates_path, + include_default_global=False, + ) + write_json(detector_results_path, detector_results) repair_target_count = int(repair_targets.get("target_count") or 0) if isinstance(repair_targets, dict) else 0 repair_target_severity_counts = ( repair_targets.get("severity_counts") @@ -6010,6 +6029,7 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int: loop_state["latest_issue_catalog_snapshot_path"] = str(issue_catalog_snapshot_path) loop_state["latest_rerun_matrix_path"] = str(rerun_matrix_path) loop_state["latest_detector_candidates_path"] = str(detector_candidates_path) + loop_state["latest_detector_results_path"] = str(detector_results_path) iteration_record: dict[str, Any] = { "iteration_id": iteration_id, @@ -6030,6 +6050,8 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int: "issue_catalog_snapshot_path": str(issue_catalog_snapshot_path), "rerun_matrix_path": str(rerun_matrix_path), "detector_candidates_path": str(detector_candidates_path), + "detector_results_path": str(detector_results_path), + "detector_results_status": detector_results.get("summary", {}).get("status"), "repair_target_count": repair_target_count, "repair_target_severity_counts": repair_target_severity_counts, "coder_status": None, @@ -6075,6 +6097,7 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int: issue_catalog_snapshot_path=issue_catalog_snapshot_path, rerun_matrix_path=rerun_matrix_path, detector_candidates_path=detector_candidates_path, + detector_results_path=detector_results_path, analyst_verdict=analyst_verdict, repair_targets=repair_targets, target_score=target_score, @@ -6129,6 +6152,7 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int: issue_catalog_snapshot_path=issue_catalog_snapshot_path, rerun_matrix_path=rerun_matrix_path, detector_candidates_path=detector_candidates_path, + detector_results_path=detector_results_path, analyst_verdict=analyst_verdict, repair_targets=repair_targets, target_score=target_score, diff --git a/scripts/test_agent_detector_runner.py b/scripts/test_agent_detector_runner.py new file mode 100644 index 0000000..d66d962 --- /dev/null +++ b/scripts/test_agent_detector_runner.py @@ -0,0 +1,226 @@ +from __future__ import annotations + +import json +import sys +import tempfile +import unittest +from pathlib import Path + + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +import agent_detector_runner as runner + + +def write_json(path: Path, payload: object) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + + +def write_text(path: Path, text: str) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(text, encoding="utf-8") + + +class AgentDetectorRunnerTests(unittest.TestCase): + def test_default_runner_fails_missing_effective_runtime(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + artifact_dir = root / "run" + artifact_dir.mkdir() + registry_path = root / "detector_registry.json" + issue_catalog_path = root / "issue_catalog.json" + write_json( + registry_path, + { + "schema_version": "agent_detector_registry_v1", + "detectors": { + "missing_effective_runtime_json": { + "kind": "artifact_presence", + "automation_level": "automatic", + "description": "Manifest is required.", + "issue_codes": ["runtime_manifest_missing"], + "inputs": ["effective_runtime.json"], + "check": {"required_files": ["effective_runtime.json"]}, + } + }, + }, + ) + write_json( + issue_catalog_path, + { + "schema_version": "agent_issue_catalog_v1", + "issues": { + "runtime_manifest_missing": { + "detectors": ["missing_effective_runtime_json"], + } + }, + }, + ) + + results = runner.build_detector_results( + artifact_dir, + registry_path=registry_path, + issue_catalog_path=issue_catalog_path, + ) + + self.assertEqual(results["summary"]["status"], "fail") + self.assertEqual(results["results"][0]["detector"], "missing_effective_runtime_json") + self.assertEqual(results["results"][0]["status"], "fail") + + def test_default_runner_passes_when_effective_runtime_exists(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + artifact_dir = root / "run" + write_json(artifact_dir / "effective_runtime.json", {"runner": "test"}) + registry_path = root / "detector_registry.json" + issue_catalog_path = root / "issue_catalog.json" + write_json( + registry_path, + { + "schema_version": "agent_detector_registry_v1", + "detectors": { + "missing_effective_runtime_json": { + "kind": "artifact_presence", + "automation_level": "automatic", + "description": "Manifest is required.", + "issue_codes": ["runtime_manifest_missing"], + "inputs": ["effective_runtime.json"], + "check": {"required_files": ["effective_runtime.json"]}, + } + }, + }, + ) + write_json(issue_catalog_path, {"schema_version": "agent_issue_catalog_v1", "issues": {}}) + + results = runner.build_detector_results( + artifact_dir, + registry_path=registry_path, + issue_catalog_path=issue_catalog_path, + ) + + self.assertEqual(results["summary"]["status"], "pass") + self.assertEqual(results["results"][0]["status"], "pass") + + def test_candidate_forbidden_regex_is_limited_to_evidence_path(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + artifact_dir = root / "run" + write_text(artifact_dir / "scenarios" / "ok" / "steps" / "s01" / "output.md", "Маржа не подтверждена.") + write_text( + artifact_dir / "scenarios" / "bad" / "steps" / "s02" / "output.md", + "Это амортизация объекта ОС, но этот шаг не в evidence scope.", + ) + registry_path = root / "detector_registry.json" + issue_catalog_path = root / "issue_catalog.json" + candidates_path = root / "detector_candidates.json" + write_json( + registry_path, + { + "schema_version": "agent_detector_registry_v1", + "detectors": { + "forbidden_margin_terms": { + "kind": "answer_text_regex_forbidden", + "automation_level": "automatic", + "description": "No wrong-domain words.", + "issue_codes": ["margin_domain_leak_accounting_route"], + "inputs": ["output.md"], + "check": {"forbidden_patterns": ["(?i)(амортизац|объект\\s+ОС)"]}, + } + }, + }, + ) + write_json( + issue_catalog_path, + { + "schema_version": "agent_issue_catalog_v1", + "issues": { + "margin_domain_leak_accounting_route": { + "detectors": ["forbidden_margin_terms"], + } + }, + }, + ) + write_json( + candidates_path, + { + "schema_version": "detector_candidates_v1", + "candidates": [ + { + "issue_code": "margin_domain_leak_accounting_route", + "detector": "forbidden_margin_terms", + "evidence_paths": ["scenarios/ok/steps/s01/output.md"], + } + ], + }, + ) + + results = runner.build_detector_results( + artifact_dir, + detector_candidates_path=candidates_path, + registry_path=registry_path, + issue_catalog_path=issue_catalog_path, + include_default_global=False, + ) + + self.assertEqual(results["summary"]["status"], "pass") + self.assertEqual(results["results"][0]["status"], "pass") + + def test_composite_detector_fails_after_child_detector_fails(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + artifact_dir = root / "run" + write_text(artifact_dir / "steps" / "s01" / "output.md", "Маржа почему-то посчитана через амортизацию.") + registry_path = root / "detector_registry.json" + issue_catalog_path = root / "issue_catalog.json" + write_json( + registry_path, + { + "schema_version": "agent_detector_registry_v1", + "detectors": { + "forbidden_margin_terms": { + "kind": "answer_text_regex_forbidden", + "automation_level": "automatic", + "description": "No wrong-domain words.", + "issue_codes": ["margin_domain_leak_accounting_route"], + "inputs": ["output.md"], + "check": {"forbidden_patterns": ["(?i)амортизац"]}, + }, + "margin_domain_leak_accounting_route": { + "kind": "composite_detector", + "automation_level": "semi_automatic", + "description": "Composite leak detector.", + "issue_codes": ["margin_domain_leak_accounting_route"], + "inputs": ["output.md"], + "check": {"uses_detectors": ["forbidden_margin_terms"]}, + }, + }, + }, + ) + write_json( + issue_catalog_path, + { + "schema_version": "agent_issue_catalog_v1", + "issues": { + "margin_domain_leak_accounting_route": { + "detectors": ["margin_domain_leak_accounting_route"], + } + }, + }, + ) + + results = runner.build_detector_results( + artifact_dir, + detector_names=["margin_domain_leak_accounting_route"], + registry_path=registry_path, + issue_catalog_path=issue_catalog_path, + include_default_global=False, + ) + + statuses = {item["detector"]: item["status"] for item in results["results"]} + self.assertEqual(statuses["forbidden_margin_terms"], "fail") + self.assertEqual(statuses["margin_domain_leak_accounting_route"], "fail") + + +if __name__ == "__main__": + unittest.main()