from __future__ import annotations import argparse import json import re from datetime import datetime, timezone from pathlib import Path from typing import Any REPO_ROOT = Path(__file__).resolve().parent.parent DETECTOR_REGISTRY_PATH = REPO_ROOT / "docs" / "orchestration" / "detector_registry.json" ISSUE_CATALOG_PATH = REPO_ROOT / "docs" / "orchestration" / "issue_catalog.json" DEFAULT_GLOBAL_DETECTORS = ["missing_effective_runtime_json"] DETECTOR_RESULTS_SCHEMA_VERSION = "agent_detector_results_v1" def read_json(path: Path) -> Any: return json.loads(path.read_text(encoding="utf-8")) def read_json_object(path: Path) -> dict[str, Any]: try: payload = read_json(path) except (OSError, json.JSONDecodeError): return {} return payload if isinstance(payload, dict) else {} def write_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") def utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat() def repo_relative(path: Path) -> str: try: return str(path.resolve().relative_to(REPO_ROOT)) except ValueError: return str(path) def normalize_string_list(raw_value: Any) -> list[str]: if isinstance(raw_value, str): value = raw_value.strip() return [value] if value else [] if not isinstance(raw_value, list): return [] result: list[str] = [] for item in raw_value: value = str(item or "").strip() if value: result.append(value) return result def normalize_path_key(value: str | Path) -> str: return str(value).replace("\\", "/").strip().lower() def load_registry(path: Path = DETECTOR_REGISTRY_PATH) -> dict[str, Any]: payload = read_json_object(path) detectors = payload.get("detectors") if isinstance(payload.get("detectors"), dict) else {} return detectors def load_issue_catalog(path: Path = ISSUE_CATALOG_PATH) -> dict[str, Any]: payload = read_json_object(path) issues = payload.get("issues") if isinstance(payload.get("issues"), dict) else {} return issues def detector_names_for_issue_codes(issue_codes: list[str], issue_catalog: dict[str, Any]) -> list[str]: names: list[str] = [] for issue_code in issue_codes: issue = issue_catalog.get(issue_code) if not isinstance(issue, dict): continue for detector_name in normalize_string_list(issue.get("detectors")): if detector_name not in names: names.append(detector_name) return names def load_detector_candidates(path: Path | None) -> tuple[list[str], dict[str, list[str]], dict[str, list[str]]]: if path is None or not path.exists(): return [], {}, {} payload = read_json_object(path) detectors: list[str] = [] evidence_by_detector: dict[str, list[str]] = {} issue_codes_by_detector: dict[str, list[str]] = {} candidates = payload.get("candidates") if isinstance(payload.get("candidates"), list) else [] for item in candidates: if not isinstance(item, dict): continue detector_name = str(item.get("detector") or "").strip() if not detector_name: continue if detector_name not in detectors: detectors.append(detector_name) evidence_by_detector.setdefault(detector_name, []) for evidence_path in normalize_string_list(item.get("evidence_paths")): if evidence_path not in evidence_by_detector[detector_name]: evidence_by_detector[detector_name].append(evidence_path) issue_code = str(item.get("issue_code") or "").strip() if issue_code: issue_codes_by_detector.setdefault(detector_name, []) if issue_code not in issue_codes_by_detector[detector_name]: issue_codes_by_detector[detector_name].append(issue_code) return detectors, evidence_by_detector, issue_codes_by_detector def expand_detector_dependencies(detector_names: list[str], registry: dict[str, Any]) -> list[str]: expanded: list[str] = [] def visit(detector_name: str) -> None: if detector_name in expanded: return detector = registry.get(detector_name) if isinstance(detector, dict): check = detector.get("check") if isinstance(detector.get("check"), dict) else {} for child_name in normalize_string_list(check.get("uses_detectors")): visit(child_name) if detector_name in expanded: return expanded.append(detector_name) for name in detector_names: visit(name) return expanded def select_detectors( *, registry: dict[str, Any], issue_catalog: dict[str, Any], detector_names: list[str] | None = None, issue_codes: list[str] | None = None, detector_candidates_path: Path | None = None, include_default_global: bool = True, ) -> tuple[list[str], dict[str, list[str]], dict[str, list[str]]]: selected: list[str] = [] candidate_names, evidence_by_detector, issue_codes_by_detector = load_detector_candidates(detector_candidates_path) for source_name in detector_names or []: if source_name not in selected: selected.append(source_name) for source_name in detector_names_for_issue_codes(issue_codes or [], issue_catalog): if source_name not in selected: selected.append(source_name) for source_name in candidate_names: if source_name not in selected: selected.append(source_name) if not selected and include_default_global: selected.extend(DEFAULT_GLOBAL_DETECTORS) selected = [name for name in expand_detector_dependencies(selected, registry) if name in registry] return selected, evidence_by_detector, issue_codes_by_detector def read_text_or_empty(path: Path) -> str: try: return path.read_text(encoding="utf-8") except OSError: return "" def output_turn_path(output_path: Path) -> Path | None: name = output_path.name if name == "output.md": candidate = output_path.with_name("turn.json") return candidate if candidate.exists() else None if name.endswith("_output.md"): prefix = name[: -len("_output.md")] candidate = output_path.with_name(f"{prefix}_turn.json") return candidate if candidate.exists() else None return None def collect_output_artifacts(artifact_dir: Path) -> list[dict[str, Any]]: outputs: list[dict[str, Any]] = [] seen: set[Path] = set() for path in sorted(artifact_dir.rglob("*.md")): if path in seen: continue if path.name == "output.md" or path.name in {"scenario_output.md"} or path.name.endswith("_output.md"): seen.add(path) turn_path = output_turn_path(path) outputs.append( { "path": path, "repo_path": repo_relative(path), "artifact_path": str(path.relative_to(artifact_dir)), "text": read_text_or_empty(path), "turn_path": turn_path, } ) return outputs def collect_turn_artifacts(artifact_dir: Path) -> list[dict[str, Any]]: turns: list[dict[str, Any]] = [] for path in sorted(artifact_dir.rglob("*.json")): if path.name == "turn.json" or path.name.endswith("_turn.json"): turns.append( { "path": path, "repo_path": repo_relative(path), "artifact_path": str(path.relative_to(artifact_dir)), "text": read_text_or_empty(path), } ) return turns def path_matches_evidence(path: Path, artifact_dir: Path, evidence_paths: list[str]) -> bool: if not evidence_paths: return True repo_key = normalize_path_key(repo_relative(path)) artifact_key = normalize_path_key(path.relative_to(artifact_dir)) name_key = normalize_path_key(path.name) for evidence_path in evidence_paths: evidence_key = normalize_path_key(evidence_path) if evidence_key in {repo_key, artifact_key, name_key}: return True if evidence_key.endswith(artifact_key) or repo_key.endswith(evidence_key): return True return False def filter_outputs(outputs: list[dict[str, Any]], artifact_dir: Path, evidence_paths: list[str]) -> list[dict[str, Any]]: return [item for item in outputs if path_matches_evidence(item["path"], artifact_dir, evidence_paths)] def filter_turns(turns: list[dict[str, Any]], artifact_dir: Path, evidence_paths: list[str]) -> list[dict[str, Any]]: return [item for item in turns if path_matches_evidence(item["path"], artifact_dir, evidence_paths)] def compile_patterns(patterns: list[str]) -> list[re.Pattern[str]]: compiled: list[re.Pattern[str]] = [] for pattern in patterns: try: compiled.append(re.compile(pattern)) except re.error: continue return compiled def build_result( detector_name: str, detector: dict[str, Any], status: str, message: str, *, evidence: list[dict[str, Any]] | None = None, issue_codes: list[str] | None = None, ) -> dict[str, Any]: return { "detector": detector_name, "kind": detector.get("kind"), "automation_level": detector.get("automation_level"), "status": status, "issue_codes": issue_codes or normalize_string_list(detector.get("issue_codes")), "message": message, "evidence": evidence or [], } def evaluate_artifact_presence( detector_name: str, detector: dict[str, Any], artifact_dir: Path, ) -> dict[str, Any]: check = detector.get("check") if isinstance(detector.get("check"), dict) else {} missing: list[str] = [] present: list[str] = [] for file_name in normalize_string_list(check.get("required_files")): candidate = artifact_dir / file_name if candidate.exists(): present.append(file_name) else: missing.append(file_name) status = "fail" if missing else "pass" message = f"missing required files: {', '.join(missing)}" if missing else "required files are present" return build_result( detector_name, detector, status, message, evidence=[{"present": present, "missing": missing}], ) def evaluate_forbidden_regex( detector_name: str, detector: dict[str, Any], outputs: list[dict[str, Any]], *, prefix_line_count: int | None = None, ) -> dict[str, Any]: check = detector.get("check") if isinstance(detector.get("check"), dict) else {} patterns = compile_patterns(normalize_string_list(check.get("forbidden_patterns"))) if not outputs: return build_result(detector_name, detector, "skipped", "no output.md-style artifacts matched detector scope") evidence: list[dict[str, Any]] = [] for output in outputs: text = str(output.get("text") or "") if prefix_line_count is not None: lines = [line for line in text.splitlines() if line.strip()] text = "\n".join(lines[:prefix_line_count]) for pattern in patterns: match = pattern.search(text) if match: evidence.append( { "path": output["repo_path"], "pattern": pattern.pattern, "match": match.group(0)[:240], } ) status = "fail" if evidence else "pass" message = "forbidden patterns found" if evidence else "no forbidden patterns found" return build_result(detector_name, detector, status, message, evidence=evidence) def evaluate_required_any( detector_name: str, detector: dict[str, Any], outputs: list[dict[str, Any]], ) -> dict[str, Any]: check = detector.get("check") if isinstance(detector.get("check"), dict) else {} patterns = compile_patterns(normalize_string_list(check.get("required_patterns_any"))) if not outputs: return build_result(detector_name, detector, "skipped", "no output.md-style artifacts matched detector scope") missing: list[dict[str, Any]] = [] matched: list[dict[str, Any]] = [] for output in outputs: text = str(output.get("text") or "") match = next((pattern.search(text) for pattern in patterns if pattern.search(text)), None) if match: matched.append({"path": output["repo_path"], "match": match.group(0)[:240]}) else: missing.append({"path": output["repo_path"]}) status = "fail" if missing else "pass" message = "required answer signals missing" if missing else "required answer signals found" return build_result(detector_name, detector, status, message, evidence=[*matched, *missing]) def evaluate_limited_next_action( detector_name: str, detector: dict[str, Any], outputs: list[dict[str, Any]], ) -> dict[str, Any]: check = detector.get("check") if isinstance(detector.get("check"), dict) else {} limited_patterns = compile_patterns(normalize_string_list(check.get("limited_patterns"))) next_action_patterns = compile_patterns(normalize_string_list(check.get("required_next_action_patterns_any"))) if not outputs: return build_result(detector_name, detector, "skipped", "no output.md-style artifacts matched detector scope") failures: list[dict[str, Any]] = [] limited_outputs = 0 for output in outputs: text = str(output.get("text") or "") limited_match = next((pattern.search(text) for pattern in limited_patterns if pattern.search(text)), None) if not limited_match: continue limited_outputs += 1 if not any(pattern.search(text) for pattern in next_action_patterns): failures.append({"path": output["repo_path"], "limited_match": limited_match.group(0)[:240]}) status = "fail" if failures else "pass" if failures: message = "limited answer has no next action" elif limited_outputs: message = "limited answer includes a next action" else: message = "answer is not limited; next action requirement not triggered" return build_result(detector_name, detector, status, message, evidence=failures) def evaluate_trace_guard( detector_name: str, detector: dict[str, Any], turns: list[dict[str, Any]], ) -> dict[str, Any]: check = detector.get("check") if isinstance(detector.get("check"), dict) else {} markers = [marker.lower() for marker in normalize_string_list(check.get("forbidden_trace_markers"))] if not turns: return build_result(detector_name, detector, "skipped", "no turn.json-style artifacts matched detector scope") evidence: list[dict[str, Any]] = [] for turn in turns: text = str(turn.get("text") or "").lower() for marker in markers: if marker in text: evidence.append({"path": turn["repo_path"], "marker": marker}) status = "fail" if evidence else "pass" message = "forbidden trace markers found" if evidence else "no forbidden trace markers found" return build_result(detector_name, detector, status, message, evidence=evidence) def evaluate_prompt_manifest( detector_name: str, detector: dict[str, Any], artifact_dir: Path, ) -> dict[str, Any]: manifest_path = artifact_dir / "effective_runtime.json" if not manifest_path.exists(): return build_result(detector_name, detector, "skipped", "effective_runtime.json is missing") manifest = read_json_object(manifest_path) check = detector.get("check") if isinstance(detector.get("check"), dict) else {} evidence: list[dict[str, Any]] = [] for field_name in normalize_string_list(check.get("manifest_fields")): if not str(manifest.get(field_name) or "").strip(): evidence.append({"path": repo_relative(manifest_path), "missing_field": field_name}) prompt_source = str(manifest.get("prompt_source") or "").strip().lower() forbidden_sources = [item.lower() for item in normalize_string_list(check.get("forbidden_prompt_sources"))] if prompt_source and prompt_source in forbidden_sources: evidence.append({"path": repo_relative(manifest_path), "forbidden_prompt_source": prompt_source}) status = "fail" if evidence else "pass" message = "prompt manifest metadata failed" if evidence else "prompt manifest metadata passed" return build_result(detector_name, detector, status, message, evidence=evidence) def evaluate_stage_review_signal( detector_name: str, detector: dict[str, Any], artifact_dir: Path, ) -> dict[str, Any]: check = detector.get("check") if isinstance(detector.get("check"), dict) else {} target_status = str(check.get("target_status") or "").strip() if not target_status: return build_result(detector_name, detector, "skipped", "target_status is not configured") evidence: list[dict[str, Any]] = [] for path in sorted(artifact_dir.rglob("*.json")): if path.name not in {"run_review.json", "repair_targets.json", "semantic_repair_targets.json"}: continue text = read_text_or_empty(path) if target_status in text: evidence.append({"path": repo_relative(path), "target_status": target_status}) status = "fail" if evidence else "pass" message = "stage review signal found" if evidence else "stage review signal not found" return build_result(detector_name, detector, status, message, evidence=evidence) def child_status(detector_name: str, results_by_name: dict[str, dict[str, Any]]) -> str: child = results_by_name.get(detector_name) return str(child.get("status") if child else "skipped") def evaluate_composite( detector_name: str, detector: dict[str, Any], results_by_name: dict[str, dict[str, Any]], ) -> dict[str, Any]: check = detector.get("check") if isinstance(detector.get("check"), dict) else {} children = normalize_string_list(check.get("uses_detectors")) statuses = {child: child_status(child, results_by_name) for child in children} if any(status == "fail" for status in statuses.values()): status = "fail" elif statuses and all(status == "pass" for status in statuses.values()): status = "pass" elif any(status == "review" for status in statuses.values()): status = "review" else: status = "skipped" return build_result( detector_name, detector, status, "composite detector evaluated from child detectors", evidence=[{"child_statuses": statuses}], ) def evaluate_manual_review(detector_name: str, detector: dict[str, Any], message: str) -> dict[str, Any]: return build_result(detector_name, detector, "review", message) def evaluate_detector( detector_name: str, detector: dict[str, Any], *, artifact_dir: Path, outputs: list[dict[str, Any]], turns: list[dict[str, Any]], evidence_paths: list[str], results_by_name: dict[str, dict[str, Any]], issue_codes: list[str] | None = None, ) -> dict[str, Any]: kind = str(detector.get("kind") or "").strip() check = detector.get("check") if isinstance(detector.get("check"), dict) else {} scoped_outputs = filter_outputs(outputs, artifact_dir, evidence_paths) scoped_turns = filter_turns(turns, artifact_dir, evidence_paths) if kind == "artifact_presence": return evaluate_artifact_presence(detector_name, detector, artifact_dir) if kind == "answer_text_regex_forbidden": return evaluate_forbidden_regex(detector_name, detector, scoped_outputs) if kind == "answer_text_regex_forbidden_in_prefix": prefix_line_count = int(check.get("prefix_line_count") or 3) return evaluate_forbidden_regex(detector_name, detector, scoped_outputs, prefix_line_count=prefix_line_count) if kind == "answer_text_required_any": return evaluate_required_any(detector_name, detector, scoped_outputs) if kind == "answer_text_required_when_limited": return evaluate_limited_next_action(detector_name, detector, scoped_outputs) if kind == "trace_value_guard": if not scoped_turns and scoped_outputs: scoped_turn_paths = [item.get("turn_path") for item in scoped_outputs if item.get("turn_path")] scoped_turns = [ {"path": path, "repo_path": repo_relative(path), "artifact_path": str(path.relative_to(artifact_dir)), "text": read_text_or_empty(path)} for path in scoped_turn_paths if isinstance(path, Path) ] return evaluate_trace_guard(detector_name, detector, scoped_turns) if kind == "prompt_registry_healthcheck": if "manifest_fields" in check or "forbidden_prompt_sources" in check: return evaluate_prompt_manifest(detector_name, detector, artifact_dir) return build_result(detector_name, detector, "skipped", "prompt healthcheck command is validated by prompt_registry_healthcheck.py") if kind == "stage_review_signal": return evaluate_stage_review_signal(detector_name, detector, artifact_dir) if kind in {"composite_detector", "contract_field_detector", "limited_answer_next_action"} and normalize_string_list( check.get("uses_detectors") ): return evaluate_composite(detector_name, detector, results_by_name) if kind == "answer_text_shape": return evaluate_manual_review(detector_name, detector, "direct-answer shape requires business review") return build_result(detector_name, detector, "skipped", f"detector kind is not executable yet: {kind}") def summarize_results(results: list[dict[str, Any]]) -> dict[str, Any]: counts = {"pass": 0, "fail": 0, "skipped": 0, "review": 0} for result in results: status = str(result.get("status") or "skipped") counts[status] = counts.get(status, 0) + 1 if counts.get("fail", 0): status = "fail" elif counts.get("review", 0): status = "review" elif counts.get("pass", 0): status = "pass" else: status = "skipped" return {"status": status, "detector_count": len(results), **counts} def build_detector_results( artifact_dir: Path, *, detector_names: list[str] | None = None, issue_codes: list[str] | None = None, detector_candidates_path: Path | None = None, registry_path: Path = DETECTOR_REGISTRY_PATH, issue_catalog_path: Path = ISSUE_CATALOG_PATH, include_default_global: bool = True, ) -> dict[str, Any]: artifact_dir = artifact_dir.resolve() registry = load_registry(registry_path) issue_catalog = load_issue_catalog(issue_catalog_path) selected, evidence_by_detector, issue_codes_by_detector = select_detectors( registry=registry, issue_catalog=issue_catalog, detector_names=detector_names, issue_codes=issue_codes, detector_candidates_path=detector_candidates_path, include_default_global=include_default_global, ) outputs = collect_output_artifacts(artifact_dir) turns = collect_turn_artifacts(artifact_dir) results: list[dict[str, Any]] = [] results_by_name: dict[str, dict[str, Any]] = {} for detector_name in selected: detector = registry.get(detector_name) if not isinstance(detector, dict): continue result = evaluate_detector( detector_name, detector, artifact_dir=artifact_dir, outputs=outputs, turns=turns, evidence_paths=evidence_by_detector.get(detector_name, []), results_by_name=results_by_name, issue_codes=issue_codes_by_detector.get(detector_name), ) if issue_codes_by_detector.get(detector_name): result["issue_codes"] = issue_codes_by_detector[detector_name] results.append(result) results_by_name[detector_name] = result return { "schema_version": DETECTOR_RESULTS_SCHEMA_VERSION, "created_at": utc_now(), "artifact_dir": repo_relative(artifact_dir), "registry_path": repo_relative(registry_path), "issue_catalog_path": repo_relative(issue_catalog_path), "detector_candidates_path": repo_relative(detector_candidates_path) if detector_candidates_path else None, "selected_detectors": selected, "artifact_counts": { "output_artifacts": len(outputs), "turn_artifacts": len(turns), }, "summary": summarize_results(results), "results": results, } def main() -> int: parser = argparse.ArgumentParser(description="Run Agent Reliability detector registry checks against replay artifacts.") parser.add_argument("--artifact-dir", required=True, help="Replay artifact directory to inspect.") parser.add_argument("--output", help="Path for detector_results.json. Defaults to /detector_results.json.") parser.add_argument("--registry", default=str(DETECTOR_REGISTRY_PATH), help="Detector registry JSON path.") parser.add_argument("--issue-catalog", default=str(ISSUE_CATALOG_PATH), help="Issue catalog JSON path.") parser.add_argument("--detector", action="append", default=[], help="Detector name to run. Can be repeated.") parser.add_argument("--issue-code", action="append", default=[], help="Issue code whose detectors should run. Can be repeated.") parser.add_argument("--detector-candidates", help="detector_candidates.json path.") parser.add_argument("--no-default-global", action="store_true", help="Do not run default global detectors when no filter is provided.") parser.add_argument("--json", action="store_true", help="Print detector_results JSON to stdout.") args = parser.parse_args() artifact_dir = Path(args.artifact_dir).resolve() output_path = Path(args.output).resolve() if args.output else artifact_dir / "detector_results.json" detector_candidates_path = Path(args.detector_candidates).resolve() if args.detector_candidates else None results = build_detector_results( artifact_dir, detector_names=args.detector, issue_codes=args.issue_code, detector_candidates_path=detector_candidates_path, registry_path=Path(args.registry).resolve(), issue_catalog_path=Path(args.issue_catalog).resolve(), include_default_global=not args.no_default_global, ) write_json(output_path, results) if args.json: print(json.dumps(results, ensure_ascii=False, indent=2)) else: summary = results["summary"] print(f"status: {summary['status']}") print(f"detector_results: {output_path}") return 1 if results["summary"]["status"] == "fail" else 0 if __name__ == "__main__": raise SystemExit(main())