577 lines
23 KiB
Python
577 lines
23 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import secrets
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
HISTORY_FILE = REPO_ROOT / "llm_normalizer" / "data" / "autorun_generators" / "history.json"
|
|
SAVED_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "autorun_generators" / "saved_sessions"
|
|
EVAL_CASES_DIR = REPO_ROOT / "llm_normalizer" / "data" / "eval_cases"
|
|
VALIDATED_AGENT_SAVE_SCHEMA_VERSION = "agent_semantic_save_gate_v1"
|
|
|
|
|
|
def now_utc() -> datetime:
|
|
return datetime.now(timezone.utc).replace(microsecond=0)
|
|
|
|
|
|
def utc_stamp(dt: datetime) -> str:
|
|
return (
|
|
f"{dt.year:04d}{dt.month:02d}{dt.day:02d}"
|
|
f"{dt.hour:02d}{dt.minute:02d}{dt.second:02d}"
|
|
)
|
|
|
|
|
|
def generate_id(dt: datetime) -> str:
|
|
return f"gen-ag{dt.strftime('%m%d%H%M')}-{secrets.token_hex(3)}"
|
|
|
|
|
|
def sanitize_question(value: Any) -> str:
|
|
text = str(value or "").replace("\r\n", "\n").replace("\r", "\n")
|
|
text = "\n".join(line.strip() for line in text.split("\n"))
|
|
text = re.sub(r"[ \t]+", " ", text).strip()
|
|
return text
|
|
|
|
|
|
def ensure_agent_title(title: str) -> str:
|
|
normalized = title.strip()
|
|
if not normalized:
|
|
raise RuntimeError("Agent semantic run title must not be empty")
|
|
return normalized if normalized.upper().startswith("AGENT") else f"AGENT | {normalized}"
|
|
|
|
|
|
def load_json(path: Path) -> Any:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def write_json(path: Path, payload: Any) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
|
|
|
|
|
def resolve_repo_path(raw_path: str | Path) -> Path:
|
|
path = Path(raw_path)
|
|
return path if path.is_absolute() else (REPO_ROOT / path).resolve()
|
|
|
|
|
|
def repo_relative(path: Path) -> str:
|
|
try:
|
|
return str(path.resolve().relative_to(REPO_ROOT))
|
|
except ValueError:
|
|
return str(path.resolve())
|
|
|
|
|
|
def load_json_object(path: Path, label: str) -> dict[str, Any]:
|
|
if not path.exists():
|
|
raise RuntimeError(f"{label} not found: {path}")
|
|
parsed = load_json(path)
|
|
if not isinstance(parsed, dict):
|
|
raise RuntimeError(f"{label} must be a JSON object: {path}")
|
|
return parsed
|
|
|
|
|
|
def assert_status(value: Any, expected: str, label: str, problems: list[str]) -> None:
|
|
actual = str(value or "").strip().lower()
|
|
if actual != expected:
|
|
problems.append(f"{label}={actual or 'missing'}")
|
|
|
|
|
|
def validate_truth_harness_run_dir(run_dir: Path) -> dict[str, Any]:
|
|
run_dir = run_dir.resolve()
|
|
pack_state = load_json_object(run_dir / "pack_state.json", "Validated run pack_state.json")
|
|
truth_review = load_json_object(run_dir / "truth_review.json", "Validated run truth_review.json")
|
|
business_review = load_json_object(run_dir / "business_review.json", "Validated run business_review.json")
|
|
truth_summary = truth_review.get("summary") if isinstance(truth_review.get("summary"), dict) else {}
|
|
|
|
problems: list[str] = []
|
|
assert_status(pack_state.get("final_status"), "accepted", "pack_state.final_status", problems)
|
|
assert_status(pack_state.get("review_overall_status"), "pass", "pack_state.review_overall_status", problems)
|
|
assert_status(truth_summary.get("overall_status"), "pass", "truth_review.summary.overall_status", problems)
|
|
assert_status(business_review.get("overall_business_status"), "pass", "business_review.overall_business_status", problems)
|
|
if pack_state.get("acceptance_gate_passed") is not True:
|
|
problems.append("pack_state.acceptance_gate_passed=false")
|
|
if pack_state.get("no_unresolved_p0") is not True:
|
|
problems.append("pack_state.no_unresolved_p0=false")
|
|
if int(pack_state.get("unresolved_p0_count") or 0) != 0:
|
|
problems.append(f"pack_state.unresolved_p0_count={pack_state.get('unresolved_p0_count')}")
|
|
if int(business_review.get("steps_with_business_failures") or 0) != 0:
|
|
problems.append(f"business_review.steps_with_business_failures={business_review.get('steps_with_business_failures')}")
|
|
|
|
if problems:
|
|
raise RuntimeError(
|
|
"Refusing to save AGENT autorun because the validated run is not clean: "
|
|
+ ", ".join(problems)
|
|
)
|
|
|
|
return {
|
|
"schema_version": VALIDATED_AGENT_SAVE_SCHEMA_VERSION,
|
|
"validation_status": "accepted_live_replay",
|
|
"validated_run_dir": repo_relative(run_dir),
|
|
"final_status": pack_state.get("final_status"),
|
|
"review_overall_status": pack_state.get("review_overall_status"),
|
|
"business_overall_status": business_review.get("overall_business_status"),
|
|
"steps_total": pack_state.get("steps_total"),
|
|
"steps_passed": pack_state.get("steps_passed"),
|
|
"steps_failed": pack_state.get("steps_failed"),
|
|
"steps_with_business_failures": business_review.get("steps_with_business_failures"),
|
|
"steps_with_business_warnings": business_review.get("steps_with_business_warnings"),
|
|
"acceptance_gate_passed": pack_state.get("acceptance_gate_passed"),
|
|
"saved_after_validated_replay": True,
|
|
}
|
|
|
|
|
|
def validate_domain_pack_loop_dir(loop_dir: Path) -> dict[str, Any]:
|
|
loop_dir = loop_dir.resolve()
|
|
loop_state = load_json_object(loop_dir / "loop_state.json", "Validated loop_state.json")
|
|
iterations = loop_state.get("iterations")
|
|
if not isinstance(iterations, list) or not iterations:
|
|
raise RuntimeError("Refusing to save AGENT autorun because the validated loop has no iterations")
|
|
accepted_iterations = [
|
|
item for item in iterations if isinstance(item, dict) and bool(item.get("accepted_gate"))
|
|
]
|
|
last_iteration = accepted_iterations[-1] if accepted_iterations else iterations[-1]
|
|
if not isinstance(last_iteration, dict):
|
|
raise RuntimeError("Refusing to save AGENT autorun because the validated loop iteration is invalid")
|
|
|
|
analyst_path_raw = str(last_iteration.get("analyst_verdict_path") or "").strip()
|
|
repair_targets_path_raw = str(last_iteration.get("repair_targets_path") or "").strip()
|
|
analyst_verdict = load_json_object(resolve_repo_path(analyst_path_raw), "Validated loop analyst_verdict.json")
|
|
repair_targets = load_json_object(resolve_repo_path(repair_targets_path_raw), "Validated loop repair_targets.json")
|
|
severity_counts = repair_targets.get("severity_counts") if isinstance(repair_targets.get("severity_counts"), dict) else {}
|
|
|
|
problems: list[str] = []
|
|
assert_status(loop_state.get("final_status"), "accepted", "loop_state.final_status", problems)
|
|
if last_iteration.get("accepted_gate") is not True:
|
|
problems.append("last_iteration.accepted_gate=false")
|
|
if last_iteration.get("analyst_accepted_gate") is not True:
|
|
problems.append("last_iteration.analyst_accepted_gate=false")
|
|
if last_iteration.get("deterministic_gate_ok") is not True:
|
|
problems.append("last_iteration.deterministic_gate_ok=false")
|
|
if int(last_iteration.get("quality_score") or 0) < int(loop_state.get("target_score") or 80):
|
|
problems.append(
|
|
f"last_iteration.quality_score={last_iteration.get('quality_score')}<target_score={loop_state.get('target_score')}"
|
|
)
|
|
assert_status(analyst_verdict.get("loop_decision"), "accepted", "analyst_verdict.loop_decision", problems)
|
|
if int(analyst_verdict.get("unresolved_p0_count") or 0) != 0:
|
|
problems.append(f"analyst_verdict.unresolved_p0_count={analyst_verdict.get('unresolved_p0_count')}")
|
|
if bool(analyst_verdict.get("regression_detected")):
|
|
problems.append("analyst_verdict.regression_detected=true")
|
|
for field_name in (
|
|
"direct_answer_ok",
|
|
"business_usefulness_ok",
|
|
"temporal_honesty_ok",
|
|
"field_truth_ok",
|
|
"answer_layering_ok",
|
|
):
|
|
if analyst_verdict.get(field_name) is not True:
|
|
problems.append(f"analyst_verdict.{field_name}=false")
|
|
if int(severity_counts.get("P0") or 0) != 0 or int(severity_counts.get("P1") or 0) != 0:
|
|
problems.append(
|
|
f"repair_targets.severity_counts=P0:{severity_counts.get('P0') or 0},P1:{severity_counts.get('P1') or 0}"
|
|
)
|
|
|
|
if problems:
|
|
raise RuntimeError(
|
|
"Refusing to save AGENT autorun because the validated stage/domain loop is not clean: "
|
|
+ ", ".join(problems)
|
|
)
|
|
|
|
return {
|
|
"schema_version": VALIDATED_AGENT_SAVE_SCHEMA_VERSION,
|
|
"validation_status": "accepted_domain_pack_loop",
|
|
"validated_run_dir": repo_relative(loop_dir),
|
|
"final_status": loop_state.get("final_status"),
|
|
"loop_id": loop_state.get("loop_id"),
|
|
"target_score": loop_state.get("target_score"),
|
|
"iterations_ran": len(iterations),
|
|
"quality_score": last_iteration.get("quality_score"),
|
|
"repair_target_count": last_iteration.get("repair_target_count"),
|
|
"repair_target_severity_counts": last_iteration.get("repair_target_severity_counts"),
|
|
"accepted_gate": last_iteration.get("accepted_gate"),
|
|
"saved_after_validated_replay": True,
|
|
}
|
|
|
|
|
|
def validate_accepted_run_dir(run_dir: Path) -> dict[str, Any]:
|
|
run_dir = run_dir.resolve()
|
|
if (run_dir / "loop_state.json").exists():
|
|
return validate_domain_pack_loop_dir(run_dir)
|
|
return validate_truth_harness_run_dir(run_dir)
|
|
|
|
|
|
def build_save_gate_metadata(args: argparse.Namespace, spec: dict[str, Any], spec_path: Path) -> dict[str, Any]:
|
|
raw_run_dir = args.validated_run_dir or spec.get("validated_run_dir") or spec.get("validated_artifact_dir")
|
|
if raw_run_dir:
|
|
return validate_accepted_run_dir(resolve_repo_path(str(raw_run_dir)))
|
|
|
|
if args.dry_run:
|
|
return {
|
|
"schema_version": VALIDATED_AGENT_SAVE_SCHEMA_VERSION,
|
|
"validation_status": "dry_run_unvalidated",
|
|
"source_spec_file": repo_relative(spec_path),
|
|
"saved_after_validated_replay": False,
|
|
}
|
|
|
|
if args.allow_unvalidated:
|
|
reason = str(args.unvalidated_reason or "").strip()
|
|
if not reason:
|
|
raise RuntimeError("--unvalidated-reason is required when --allow-unvalidated is used")
|
|
return {
|
|
"schema_version": VALIDATED_AGENT_SAVE_SCHEMA_VERSION,
|
|
"validation_status": "explicitly_unvalidated",
|
|
"source_spec_file": repo_relative(spec_path),
|
|
"unvalidated_reason": reason,
|
|
"saved_after_validated_replay": False,
|
|
}
|
|
|
|
raise RuntimeError(
|
|
"Refusing to save AGENT autorun before a reviewed live replay. "
|
|
"Pass --validated-run-dir artifacts/domain_runs/<run_id> after run-live/review-export is accepted, "
|
|
"or use --allow-unvalidated --unvalidated-reason only for an explicit draft."
|
|
)
|
|
|
|
|
|
def normalize_questions(raw_questions: list[Any]) -> list[str]:
|
|
result: list[str] = []
|
|
seen: set[str] = set()
|
|
for item in raw_questions:
|
|
question = sanitize_question(item)
|
|
if not question or question in seen:
|
|
continue
|
|
seen.add(question)
|
|
result.append(question)
|
|
return result
|
|
|
|
|
|
def extract_semantic_tags(spec: dict[str, Any]) -> list[str]:
|
|
steps = spec.get("steps")
|
|
if not isinstance(steps, list):
|
|
return []
|
|
tags: set[str] = set()
|
|
for step in steps:
|
|
if not isinstance(step, dict):
|
|
continue
|
|
raw_tags = step.get("semantic_tags")
|
|
if not isinstance(raw_tags, list):
|
|
continue
|
|
for raw_tag in raw_tags:
|
|
tag = str(raw_tag or "").strip()
|
|
if tag:
|
|
tags.add(tag)
|
|
return sorted(tags)
|
|
|
|
|
|
def extract_questions_from_spec(spec: dict[str, Any]) -> list[str]:
|
|
if isinstance(spec.get("questions"), list):
|
|
return normalize_questions(list(spec["questions"]))
|
|
steps = spec.get("steps")
|
|
if isinstance(steps, list):
|
|
return normalize_questions(
|
|
[
|
|
step.get("question") or step.get("question_template")
|
|
for step in steps
|
|
if isinstance(step, dict) and (step.get("question") or step.get("question_template"))
|
|
]
|
|
)
|
|
scenarios = spec.get("scenarios")
|
|
if isinstance(scenarios, list):
|
|
raw_questions: list[Any] = []
|
|
for scenario in scenarios:
|
|
if not isinstance(scenario, dict):
|
|
continue
|
|
scenario_steps = scenario.get("steps")
|
|
if not isinstance(scenario_steps, list):
|
|
continue
|
|
raw_questions.extend(
|
|
step.get("question") or step.get("question_template")
|
|
for step in scenario_steps
|
|
if isinstance(step, dict) and (step.get("question") or step.get("question_template"))
|
|
)
|
|
return normalize_questions(raw_questions)
|
|
raise RuntimeError(
|
|
"Spec must define `questions[]`, `steps[].question`, `steps[].question_template`, "
|
|
"or `scenarios[].steps[]` questions"
|
|
)
|
|
|
|
|
|
def build_case_set_payload(
|
|
generation_id: str,
|
|
title: str,
|
|
questions: list[str],
|
|
domain: str | None,
|
|
scenario_tag: str,
|
|
) -> dict[str, Any]:
|
|
turns = [{"user_message": question} for question in questions]
|
|
case_id = "SAVED-001"
|
|
return {
|
|
"suite_id": f"assistant_saved_session_{generation_id}",
|
|
"suite_version": "0.1.0",
|
|
"schema_version": "assistant_saved_session_suite_v0_1",
|
|
"generated_at": now_utc().isoformat(),
|
|
"generation_id": generation_id,
|
|
"mode": "saved_user_sessions",
|
|
"title": title,
|
|
"domain": domain,
|
|
"scenario_count": 1 if turns else 0,
|
|
"case_ids": [case_id] if turns else [],
|
|
"cases": [
|
|
{
|
|
"case_id": case_id,
|
|
"scenario_tag": scenario_tag,
|
|
"title": title,
|
|
"question_type": "followup" if len(turns) > 1 else "direct",
|
|
"broadness_level": "medium",
|
|
"turns": turns,
|
|
}
|
|
]
|
|
if turns
|
|
else [],
|
|
}
|
|
|
|
|
|
def build_snapshot_payload(
|
|
generation_id: str,
|
|
title: str,
|
|
questions: list[str],
|
|
metadata: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
created_at = now_utc().isoformat()
|
|
items: list[dict[str, Any]] = []
|
|
for index, question in enumerate(questions, start=1):
|
|
items.append(
|
|
{
|
|
"message_id": f"agent-user-{index:03d}",
|
|
"role": "user",
|
|
"text": question,
|
|
"created_at": created_at,
|
|
"reply_type": None,
|
|
"trace_id": None,
|
|
"debug": None,
|
|
}
|
|
)
|
|
return {
|
|
"saved_at": created_at,
|
|
"generation_id": generation_id,
|
|
"mode": "saved_user_sessions",
|
|
"title": title,
|
|
"agent_run": True,
|
|
"questions": questions,
|
|
"metadata": metadata,
|
|
"source_session_id": None,
|
|
"session": {
|
|
"session_id": None,
|
|
"mode": "agent_semantic_run",
|
|
"items": items,
|
|
"agent_run": True,
|
|
"metadata": metadata,
|
|
},
|
|
}
|
|
|
|
|
|
def read_history() -> list[dict[str, Any]]:
|
|
if not HISTORY_FILE.exists():
|
|
return []
|
|
parsed = load_json(HISTORY_FILE)
|
|
return parsed if isinstance(parsed, list) else []
|
|
|
|
|
|
def build_history_record(
|
|
generation_id: str,
|
|
title: str,
|
|
questions: list[str],
|
|
case_set_file: str,
|
|
saved_session_file: str,
|
|
domain: str | None,
|
|
generated_by: str,
|
|
metadata: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
context = {
|
|
"llm_provider": None,
|
|
"model": None,
|
|
"assistant_prompt_version": metadata.get("assistant_prompt_version"),
|
|
"decomposition_prompt_version": metadata.get("decomposition_prompt_version"),
|
|
"prompt_fingerprint": metadata.get("prompt_fingerprint"),
|
|
"autogen_personality_id": None,
|
|
"autogen_personality_prompt": None,
|
|
"source_session_id": None,
|
|
"saved_session_file": saved_session_file,
|
|
"saved_case_set_kind": "agent_semantic_scenario",
|
|
"agent_run": True,
|
|
"agent_focus": metadata.get("agent_focus"),
|
|
"architecture_phase": metadata.get("architecture_phase"),
|
|
"source_spec_file": metadata.get("source_spec_file"),
|
|
"scenario_id": metadata.get("scenario_id"),
|
|
"semantic_tags": metadata.get("semantic_tags"),
|
|
"validation_status": metadata.get("validation_status"),
|
|
"validated_run_dir": metadata.get("validated_run_dir"),
|
|
"saved_after_validated_replay": metadata.get("saved_after_validated_replay"),
|
|
}
|
|
return {
|
|
"generation_id": generation_id,
|
|
"created_at": now_utc().isoformat(),
|
|
"mode": "saved_user_sessions",
|
|
"title": title,
|
|
"count": len(questions),
|
|
"domain": domain,
|
|
"questions": questions,
|
|
"generated_by": generated_by,
|
|
"saved_case_set_file": case_set_file,
|
|
"context": context,
|
|
}
|
|
|
|
|
|
def build_metadata(
|
|
args: argparse.Namespace,
|
|
spec: dict[str, Any],
|
|
spec_path: Path | None,
|
|
save_gate: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
semantic_tags = extract_semantic_tags(spec)
|
|
return {
|
|
"assistant_prompt_version": args.assistant_prompt_version,
|
|
"decomposition_prompt_version": args.decomposition_prompt_version,
|
|
"prompt_fingerprint": args.prompt_fingerprint,
|
|
"agent_focus": args.agent_focus or spec.get("description") or spec.get("title"),
|
|
"architecture_phase": args.architecture_phase,
|
|
"source_spec_file": str(spec_path.resolve()) if spec_path else None,
|
|
"scenario_id": str(spec.get("scenario_id") or "").strip() or None,
|
|
"semantic_tags": semantic_tags,
|
|
"validation_status": save_gate.get("validation_status"),
|
|
"validated_run_dir": save_gate.get("validated_run_dir"),
|
|
"saved_after_validated_replay": save_gate.get("saved_after_validated_replay"),
|
|
"save_gate": save_gate,
|
|
}
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Save a targeted AGENT semantic run into autoruns user sessions.")
|
|
parser.add_argument("--spec", required=True, help="Path to a truth-harness spec or simple questions spec JSON.")
|
|
parser.add_argument("--title", help="Override title for the AGENT run.")
|
|
parser.add_argument("--generated-by", default="codex_agent", help="Author label for the generated run.")
|
|
parser.add_argument("--architecture-phase", default="turnaround_11", help="Architecture phase / slice being validated.")
|
|
parser.add_argument("--agent-focus", help="Short focus label for the targeted fix.")
|
|
parser.add_argument("--assistant-prompt-version", help="Optional assistant prompt version metadata.")
|
|
parser.add_argument("--decomposition-prompt-version", help="Optional decomposition prompt version metadata.")
|
|
parser.add_argument("--prompt-fingerprint", help="Optional prompt fingerprint metadata.")
|
|
parser.add_argument(
|
|
"--validated-run-dir",
|
|
help="Accepted truth-harness artifact directory containing pack_state.json, truth_review.json, and business_review.json.",
|
|
)
|
|
parser.add_argument(
|
|
"--allow-unvalidated",
|
|
action="store_true",
|
|
help="Explicitly save a draft AGENT run without accepted replay artifacts. This is not an acceptance proof.",
|
|
)
|
|
parser.add_argument(
|
|
"--unvalidated-reason",
|
|
help="Required explanation when --allow-unvalidated is used.",
|
|
)
|
|
parser.add_argument("--dry-run", action="store_true", help="Print resulting record metadata without writing files.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
spec_path = Path(args.spec)
|
|
if not spec_path.is_absolute():
|
|
spec_path = (REPO_ROOT / spec_path).resolve()
|
|
if not spec_path.exists():
|
|
raise RuntimeError(f"Spec file not found: {spec_path}")
|
|
|
|
spec_raw = load_json(spec_path)
|
|
if not isinstance(spec_raw, dict):
|
|
raise RuntimeError("Spec JSON must be an object")
|
|
|
|
questions = extract_questions_from_spec(spec_raw)
|
|
if not questions:
|
|
raise RuntimeError("Agent semantic run must contain at least one question")
|
|
|
|
save_gate = build_save_gate_metadata(args, spec_raw, spec_path)
|
|
domain = str(spec_raw.get("domain") or "").strip() or None
|
|
source_title = str(args.title or spec_raw.get("title") or spec_path.stem).strip()
|
|
title = ensure_agent_title(source_title)
|
|
metadata = build_metadata(args, spec_raw, spec_path, save_gate)
|
|
|
|
timestamp = now_utc()
|
|
generation_id = generate_id(timestamp)
|
|
case_set_file = f"assistant_autogen_saved_user_sessions_{utc_stamp(timestamp)}_{generation_id}.json"
|
|
saved_session_file = f"assistant_saved_session_{utc_stamp(timestamp)}_{generation_id}.json"
|
|
case_set_payload = build_case_set_payload(
|
|
generation_id=generation_id,
|
|
title=title,
|
|
questions=questions,
|
|
domain=domain,
|
|
scenario_tag="agent_saved_user_sessions",
|
|
)
|
|
snapshot_payload = build_snapshot_payload(
|
|
generation_id=generation_id,
|
|
title=title,
|
|
questions=questions,
|
|
metadata=metadata,
|
|
)
|
|
record = build_history_record(
|
|
generation_id=generation_id,
|
|
title=title,
|
|
questions=questions,
|
|
case_set_file=case_set_file,
|
|
saved_session_file=saved_session_file,
|
|
domain=domain,
|
|
generated_by=str(args.generated_by or "codex_agent").strip() or "codex_agent",
|
|
metadata=metadata,
|
|
)
|
|
|
|
if args.dry_run:
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"ok": True,
|
|
"dry_run": True,
|
|
"generation_id": generation_id,
|
|
"title": title,
|
|
"questions_total": len(questions),
|
|
"case_set_file": case_set_file,
|
|
"saved_session_file": saved_session_file,
|
|
"domain": domain,
|
|
"validation_status": save_gate.get("validation_status"),
|
|
"validated_run_dir": save_gate.get("validated_run_dir"),
|
|
},
|
|
ensure_ascii=False,
|
|
indent=2,
|
|
)
|
|
)
|
|
return 0
|
|
|
|
write_json(EVAL_CASES_DIR / case_set_file, case_set_payload)
|
|
write_json(SAVED_SESSIONS_DIR / saved_session_file, snapshot_payload)
|
|
history = read_history()
|
|
history = [record, *[item for item in history if item.get("generation_id") != generation_id]]
|
|
write_json(HISTORY_FILE, history[:500])
|
|
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"ok": True,
|
|
"generation_id": generation_id,
|
|
"title": title,
|
|
"questions_total": len(questions),
|
|
"case_set_file": case_set_file,
|
|
"saved_session_file": saved_session_file,
|
|
"validation_status": save_gate.get("validation_status"),
|
|
"validated_run_dir": save_gate.get("validated_run_dir"),
|
|
},
|
|
ensure_ascii=False,
|
|
indent=2,
|
|
)
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|