NODEDC_1C/scripts/validate_address_run_pack.py

219 lines
7.7 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
REQUIRED_FILES = ("run_summary.json", "full_live_results.json", "failures_only.json", "README.md")
REQUIRED_TOTAL_KEYS = (
"questions_total",
"semantic_pass_count",
"route_pass_count",
"strict_pass_count",
"factual_count",
"partial_coverage_count",
"http_error_count",
)
@dataclass
class RunValidationResult:
run_dir: str
valid: bool
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
metrics: dict[str, Any] = field(default_factory=dict)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Validate ADDRESS run-pack structure and summary consistency."
)
parser.add_argument(
"run_dirs",
nargs="+",
help="One or more run directories (for example docs/ADDRESS/runs/<run_id>).",
)
parser.add_argument(
"--allow-legacy-summary",
action="store_true",
help="Allow minimal/legacy run_summary format (without totals).",
)
parser.add_argument(
"--report-json",
default="",
help="Optional path to write full validation report JSON.",
)
return parser.parse_args()
def load_json(path: Path) -> tuple[dict[str, Any] | list[Any] | None, str | None]:
try:
payload = json.loads(path.read_text(encoding="utf-8-sig"))
except FileNotFoundError:
return None, f"missing file: {path.name}"
except json.JSONDecodeError as exc:
return None, f"invalid json in {path.name}: {exc}"
return payload, None
def validate_totals(totals: dict[str, Any], errors: list[str]) -> dict[str, Any]:
metrics: dict[str, Any] = {}
missing = [key for key in REQUIRED_TOTAL_KEYS if key not in totals]
if missing:
errors.append(f"run_summary.totals missing keys: {', '.join(missing)}")
return metrics
questions_total = int(totals.get("questions_total", 0) or 0)
metrics["questions_total"] = questions_total
metrics["route_pass_rate"] = float(totals.get("route_pass_rate", 0.0) or 0.0)
metrics["strict_pass_rate"] = float(totals.get("strict_pass_rate", 0.0) or 0.0)
metrics["http_error_count"] = int(totals.get("http_error_count", 0) or 0)
if questions_total <= 0:
errors.append("run_summary.totals.questions_total must be > 0")
return metrics
def validate_single_run(run_dir: Path, allow_legacy_summary: bool) -> RunValidationResult:
errors: list[str] = []
warnings: list[str] = []
metrics: dict[str, Any] = {}
if not run_dir.exists() or not run_dir.is_dir():
return RunValidationResult(run_dir=str(run_dir), valid=False, errors=["run directory does not exist"])
missing_files = [name for name in REQUIRED_FILES if not (run_dir / name).exists()]
if missing_files:
errors.append(f"missing required files: {', '.join(missing_files)}")
summary_obj, summary_err = load_json(run_dir / "run_summary.json")
full_obj, full_err = load_json(run_dir / "full_live_results.json")
failures_obj, failures_err = load_json(run_dir / "failures_only.json")
if summary_err:
errors.append(summary_err)
if full_err:
errors.append(full_err)
if failures_err:
errors.append(failures_err)
readme_path = run_dir / "README.md"
if readme_path.exists():
content = readme_path.read_text(encoding="utf-8-sig").strip()
if not content:
errors.append("README.md is empty")
summary = summary_obj if isinstance(summary_obj, dict) else None
full = full_obj if isinstance(full_obj, dict) else None
failures = failures_obj if isinstance(failures_obj, list) else None
if summary is None and summary_obj is not None:
errors.append("run_summary.json must contain object")
if full is None and full_obj is not None:
errors.append("full_live_results.json must contain object")
if failures is None and failures_obj is not None:
errors.append("failures_only.json must contain array")
if summary:
run_id = str(summary.get("run_id", "")).strip()
if not run_id:
errors.append("run_summary.run_id is required")
else:
metrics["run_id"] = run_id
if run_id != run_dir.name:
warnings.append(f"run_id ({run_id}) differs from directory name ({run_dir.name})")
if "generated_at" not in summary and "date" not in summary:
errors.append("run_summary must contain generated_at or date")
totals = summary.get("totals")
if isinstance(totals, dict):
metrics.update(validate_totals(totals, errors))
elif not allow_legacy_summary:
errors.append("run_summary.totals is required")
else:
warnings.append("legacy run_summary format (without totals) accepted")
rows: list[Any] = []
if full:
full_run_id = str(full.get("run_id", "")).strip()
if full_run_id and summary and str(summary.get("run_id", "")).strip() and full_run_id != str(summary.get("run_id")).strip():
errors.append("run_id mismatch between run_summary.json and full_live_results.json")
rows_obj = full.get("rows")
if not isinstance(rows_obj, list):
errors.append("full_live_results.rows must be array")
else:
rows = rows_obj
metrics["rows_count"] = len(rows)
if failures is not None:
metrics["failures_count"] = len(failures)
questions_total = metrics.get("questions_total")
if isinstance(questions_total, int) and rows:
if questions_total != len(rows):
errors.append(
f"questions_total mismatch: run_summary.totals.questions_total={questions_total}, full_live_results.rows={len(rows)}"
)
if isinstance(questions_total, int) and isinstance(metrics.get("failures_count"), int):
if int(metrics["failures_count"]) > questions_total:
errors.append("failures_only count exceeds questions_total")
return RunValidationResult(run_dir=str(run_dir), valid=not errors, errors=errors, warnings=warnings, metrics=metrics)
def main() -> None:
args = parse_args()
run_dirs = [Path(p).resolve() for p in args.run_dirs]
results = [validate_single_run(path, allow_legacy_summary=bool(args.allow_legacy_summary)) for path in run_dirs]
total = len(results)
passed = sum(1 for item in results if item.valid)
failed = total - passed
report = {
"generated_at": datetime.now().isoformat(timespec="seconds"),
"total": total,
"passed": passed,
"failed": failed,
"results": [
{
"run_dir": item.run_dir,
"valid": item.valid,
"errors": item.errors,
"warnings": item.warnings,
"metrics": item.metrics,
}
for item in results
],
}
if args.report_json:
report_path = Path(args.report_json).resolve()
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
for item in results:
status = "PASS" if item.valid else "FAIL"
print(f"[{status}] {item.run_dir}")
for warning in item.warnings:
print(f" warning: {warning}")
for error in item.errors:
print(f" error: {error}")
print(f"\nValidated run packs: {total}, passed: {passed}, failed: {failed}")
if failed:
raise SystemExit(1)
if __name__ == "__main__":
main()