219 lines
7.7 KiB
Python
219 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
REQUIRED_FILES = ("run_summary.json", "full_live_results.json", "failures_only.json", "README.md")
|
|
REQUIRED_TOTAL_KEYS = (
|
|
"questions_total",
|
|
"semantic_pass_count",
|
|
"route_pass_count",
|
|
"strict_pass_count",
|
|
"factual_count",
|
|
"partial_coverage_count",
|
|
"http_error_count",
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class RunValidationResult:
|
|
run_dir: str
|
|
valid: bool
|
|
errors: list[str] = field(default_factory=list)
|
|
warnings: list[str] = field(default_factory=list)
|
|
metrics: dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Validate ADDRESS run-pack structure and summary consistency."
|
|
)
|
|
parser.add_argument(
|
|
"run_dirs",
|
|
nargs="+",
|
|
help="One or more run directories (for example docs/ADDRESS/runs/<run_id>).",
|
|
)
|
|
parser.add_argument(
|
|
"--allow-legacy-summary",
|
|
action="store_true",
|
|
help="Allow minimal/legacy run_summary format (without totals).",
|
|
)
|
|
parser.add_argument(
|
|
"--report-json",
|
|
default="",
|
|
help="Optional path to write full validation report JSON.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_json(path: Path) -> tuple[dict[str, Any] | list[Any] | None, str | None]:
|
|
try:
|
|
payload = json.loads(path.read_text(encoding="utf-8-sig"))
|
|
except FileNotFoundError:
|
|
return None, f"missing file: {path.name}"
|
|
except json.JSONDecodeError as exc:
|
|
return None, f"invalid json in {path.name}: {exc}"
|
|
return payload, None
|
|
|
|
|
|
def validate_totals(totals: dict[str, Any], errors: list[str]) -> dict[str, Any]:
|
|
metrics: dict[str, Any] = {}
|
|
missing = [key for key in REQUIRED_TOTAL_KEYS if key not in totals]
|
|
if missing:
|
|
errors.append(f"run_summary.totals missing keys: {', '.join(missing)}")
|
|
return metrics
|
|
|
|
questions_total = int(totals.get("questions_total", 0) or 0)
|
|
metrics["questions_total"] = questions_total
|
|
metrics["route_pass_rate"] = float(totals.get("route_pass_rate", 0.0) or 0.0)
|
|
metrics["strict_pass_rate"] = float(totals.get("strict_pass_rate", 0.0) or 0.0)
|
|
metrics["http_error_count"] = int(totals.get("http_error_count", 0) or 0)
|
|
|
|
if questions_total <= 0:
|
|
errors.append("run_summary.totals.questions_total must be > 0")
|
|
|
|
return metrics
|
|
|
|
|
|
def validate_single_run(run_dir: Path, allow_legacy_summary: bool) -> RunValidationResult:
|
|
errors: list[str] = []
|
|
warnings: list[str] = []
|
|
metrics: dict[str, Any] = {}
|
|
|
|
if not run_dir.exists() or not run_dir.is_dir():
|
|
return RunValidationResult(run_dir=str(run_dir), valid=False, errors=["run directory does not exist"])
|
|
|
|
missing_files = [name for name in REQUIRED_FILES if not (run_dir / name).exists()]
|
|
if missing_files:
|
|
errors.append(f"missing required files: {', '.join(missing_files)}")
|
|
|
|
summary_obj, summary_err = load_json(run_dir / "run_summary.json")
|
|
full_obj, full_err = load_json(run_dir / "full_live_results.json")
|
|
failures_obj, failures_err = load_json(run_dir / "failures_only.json")
|
|
|
|
if summary_err:
|
|
errors.append(summary_err)
|
|
if full_err:
|
|
errors.append(full_err)
|
|
if failures_err:
|
|
errors.append(failures_err)
|
|
|
|
readme_path = run_dir / "README.md"
|
|
if readme_path.exists():
|
|
content = readme_path.read_text(encoding="utf-8-sig").strip()
|
|
if not content:
|
|
errors.append("README.md is empty")
|
|
|
|
summary = summary_obj if isinstance(summary_obj, dict) else None
|
|
full = full_obj if isinstance(full_obj, dict) else None
|
|
failures = failures_obj if isinstance(failures_obj, list) else None
|
|
|
|
if summary is None and summary_obj is not None:
|
|
errors.append("run_summary.json must contain object")
|
|
if full is None and full_obj is not None:
|
|
errors.append("full_live_results.json must contain object")
|
|
if failures is None and failures_obj is not None:
|
|
errors.append("failures_only.json must contain array")
|
|
|
|
if summary:
|
|
run_id = str(summary.get("run_id", "")).strip()
|
|
if not run_id:
|
|
errors.append("run_summary.run_id is required")
|
|
else:
|
|
metrics["run_id"] = run_id
|
|
if run_id != run_dir.name:
|
|
warnings.append(f"run_id ({run_id}) differs from directory name ({run_dir.name})")
|
|
|
|
if "generated_at" not in summary and "date" not in summary:
|
|
errors.append("run_summary must contain generated_at or date")
|
|
|
|
totals = summary.get("totals")
|
|
if isinstance(totals, dict):
|
|
metrics.update(validate_totals(totals, errors))
|
|
elif not allow_legacy_summary:
|
|
errors.append("run_summary.totals is required")
|
|
else:
|
|
warnings.append("legacy run_summary format (without totals) accepted")
|
|
|
|
rows: list[Any] = []
|
|
if full:
|
|
full_run_id = str(full.get("run_id", "")).strip()
|
|
if full_run_id and summary and str(summary.get("run_id", "")).strip() and full_run_id != str(summary.get("run_id")).strip():
|
|
errors.append("run_id mismatch between run_summary.json and full_live_results.json")
|
|
rows_obj = full.get("rows")
|
|
if not isinstance(rows_obj, list):
|
|
errors.append("full_live_results.rows must be array")
|
|
else:
|
|
rows = rows_obj
|
|
metrics["rows_count"] = len(rows)
|
|
|
|
if failures is not None:
|
|
metrics["failures_count"] = len(failures)
|
|
|
|
questions_total = metrics.get("questions_total")
|
|
if isinstance(questions_total, int) and rows:
|
|
if questions_total != len(rows):
|
|
errors.append(
|
|
f"questions_total mismatch: run_summary.totals.questions_total={questions_total}, full_live_results.rows={len(rows)}"
|
|
)
|
|
|
|
if isinstance(questions_total, int) and isinstance(metrics.get("failures_count"), int):
|
|
if int(metrics["failures_count"]) > questions_total:
|
|
errors.append("failures_only count exceeds questions_total")
|
|
|
|
return RunValidationResult(run_dir=str(run_dir), valid=not errors, errors=errors, warnings=warnings, metrics=metrics)
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
run_dirs = [Path(p).resolve() for p in args.run_dirs]
|
|
results = [validate_single_run(path, allow_legacy_summary=bool(args.allow_legacy_summary)) for path in run_dirs]
|
|
|
|
total = len(results)
|
|
passed = sum(1 for item in results if item.valid)
|
|
failed = total - passed
|
|
|
|
report = {
|
|
"generated_at": datetime.now().isoformat(timespec="seconds"),
|
|
"total": total,
|
|
"passed": passed,
|
|
"failed": failed,
|
|
"results": [
|
|
{
|
|
"run_dir": item.run_dir,
|
|
"valid": item.valid,
|
|
"errors": item.errors,
|
|
"warnings": item.warnings,
|
|
"metrics": item.metrics,
|
|
}
|
|
for item in results
|
|
],
|
|
}
|
|
|
|
if args.report_json:
|
|
report_path = Path(args.report_json).resolve()
|
|
report_path.parent.mkdir(parents=True, exist_ok=True)
|
|
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
|
|
|
for item in results:
|
|
status = "PASS" if item.valid else "FAIL"
|
|
print(f"[{status}] {item.run_dir}")
|
|
for warning in item.warnings:
|
|
print(f" warning: {warning}")
|
|
for error in item.errors:
|
|
print(f" error: {error}")
|
|
|
|
print(f"\nValidated run packs: {total}, passed: {passed}, failed: {failed}")
|
|
if failed:
|
|
raise SystemExit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|