#!/usr/bin/env python3 from __future__ import annotations import argparse import json from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any REQUIRED_FILES = ("run_summary.json", "full_live_results.json", "failures_only.json", "README.md") REQUIRED_TOTAL_KEYS = ( "questions_total", "semantic_pass_count", "route_pass_count", "strict_pass_count", "factual_count", "partial_coverage_count", "http_error_count", ) @dataclass class RunValidationResult: run_dir: str valid: bool errors: list[str] = field(default_factory=list) warnings: list[str] = field(default_factory=list) metrics: dict[str, Any] = field(default_factory=dict) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Validate ADDRESS run-pack structure and summary consistency." ) parser.add_argument( "run_dirs", nargs="+", help="One or more run directories (for example docs/ADDRESS/runs/).", ) parser.add_argument( "--allow-legacy-summary", action="store_true", help="Allow minimal/legacy run_summary format (without totals).", ) parser.add_argument( "--report-json", default="", help="Optional path to write full validation report JSON.", ) return parser.parse_args() def load_json(path: Path) -> tuple[dict[str, Any] | list[Any] | None, str | None]: try: payload = json.loads(path.read_text(encoding="utf-8-sig")) except FileNotFoundError: return None, f"missing file: {path.name}" except json.JSONDecodeError as exc: return None, f"invalid json in {path.name}: {exc}" return payload, None def validate_totals(totals: dict[str, Any], errors: list[str]) -> dict[str, Any]: metrics: dict[str, Any] = {} missing = [key for key in REQUIRED_TOTAL_KEYS if key not in totals] if missing: errors.append(f"run_summary.totals missing keys: {', '.join(missing)}") return metrics questions_total = int(totals.get("questions_total", 0) or 0) metrics["questions_total"] = questions_total metrics["route_pass_rate"] = float(totals.get("route_pass_rate", 0.0) or 0.0) metrics["strict_pass_rate"] = float(totals.get("strict_pass_rate", 0.0) or 0.0) metrics["http_error_count"] = int(totals.get("http_error_count", 0) or 0) if questions_total <= 0: errors.append("run_summary.totals.questions_total must be > 0") return metrics def validate_single_run(run_dir: Path, allow_legacy_summary: bool) -> RunValidationResult: errors: list[str] = [] warnings: list[str] = [] metrics: dict[str, Any] = {} if not run_dir.exists() or not run_dir.is_dir(): return RunValidationResult(run_dir=str(run_dir), valid=False, errors=["run directory does not exist"]) missing_files = [name for name in REQUIRED_FILES if not (run_dir / name).exists()] if missing_files: errors.append(f"missing required files: {', '.join(missing_files)}") summary_obj, summary_err = load_json(run_dir / "run_summary.json") full_obj, full_err = load_json(run_dir / "full_live_results.json") failures_obj, failures_err = load_json(run_dir / "failures_only.json") if summary_err: errors.append(summary_err) if full_err: errors.append(full_err) if failures_err: errors.append(failures_err) readme_path = run_dir / "README.md" if readme_path.exists(): content = readme_path.read_text(encoding="utf-8-sig").strip() if not content: errors.append("README.md is empty") summary = summary_obj if isinstance(summary_obj, dict) else None full = full_obj if isinstance(full_obj, dict) else None failures = failures_obj if isinstance(failures_obj, list) else None if summary is None and summary_obj is not None: errors.append("run_summary.json must contain object") if full is None and full_obj is not None: errors.append("full_live_results.json must contain object") if failures is None and failures_obj is not None: errors.append("failures_only.json must contain array") if summary: run_id = str(summary.get("run_id", "")).strip() if not run_id: errors.append("run_summary.run_id is required") else: metrics["run_id"] = run_id if run_id != run_dir.name: warnings.append(f"run_id ({run_id}) differs from directory name ({run_dir.name})") if "generated_at" not in summary and "date" not in summary: errors.append("run_summary must contain generated_at or date") totals = summary.get("totals") if isinstance(totals, dict): metrics.update(validate_totals(totals, errors)) elif not allow_legacy_summary: errors.append("run_summary.totals is required") else: warnings.append("legacy run_summary format (without totals) accepted") rows: list[Any] = [] if full: full_run_id = str(full.get("run_id", "")).strip() if full_run_id and summary and str(summary.get("run_id", "")).strip() and full_run_id != str(summary.get("run_id")).strip(): errors.append("run_id mismatch between run_summary.json and full_live_results.json") rows_obj = full.get("rows") if not isinstance(rows_obj, list): errors.append("full_live_results.rows must be array") else: rows = rows_obj metrics["rows_count"] = len(rows) if failures is not None: metrics["failures_count"] = len(failures) questions_total = metrics.get("questions_total") if isinstance(questions_total, int) and rows: if questions_total != len(rows): errors.append( f"questions_total mismatch: run_summary.totals.questions_total={questions_total}, full_live_results.rows={len(rows)}" ) if isinstance(questions_total, int) and isinstance(metrics.get("failures_count"), int): if int(metrics["failures_count"]) > questions_total: errors.append("failures_only count exceeds questions_total") return RunValidationResult(run_dir=str(run_dir), valid=not errors, errors=errors, warnings=warnings, metrics=metrics) def main() -> None: args = parse_args() run_dirs = [Path(p).resolve() for p in args.run_dirs] results = [validate_single_run(path, allow_legacy_summary=bool(args.allow_legacy_summary)) for path in run_dirs] total = len(results) passed = sum(1 for item in results if item.valid) failed = total - passed report = { "generated_at": datetime.now().isoformat(timespec="seconds"), "total": total, "passed": passed, "failed": failed, "results": [ { "run_dir": item.run_dir, "valid": item.valid, "errors": item.errors, "warnings": item.warnings, "metrics": item.metrics, } for item in results ], } if args.report_json: report_path = Path(args.report_json).resolve() report_path.parent.mkdir(parents=True, exist_ok=True) report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") for item in results: status = "PASS" if item.valid else "FAIL" print(f"[{status}] {item.run_dir}") for warning in item.warnings: print(f" warning: {warning}") for error in item.errors: print(f" error: {error}") print(f"\nValidated run packs: {total}, passed: {passed}, failed: {failed}") if failed: raise SystemExit(1) if __name__ == "__main__": main()