#!/usr/bin/env python3 from __future__ import annotations import argparse import json import re import secrets from datetime import datetime, timezone from pathlib import Path from typing import Any REPO_ROOT = Path(__file__).resolve().parents[1] HISTORY_FILE = REPO_ROOT / "llm_normalizer" / "data" / "autorun_generators" / "history.json" SAVED_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "autorun_generators" / "saved_sessions" EVAL_CASES_DIR = REPO_ROOT / "llm_normalizer" / "data" / "eval_cases" def now_utc() -> datetime: return datetime.now(timezone.utc).replace(microsecond=0) def utc_stamp(dt: datetime) -> str: return ( f"{dt.year:04d}{dt.month:02d}{dt.day:02d}" f"{dt.hour:02d}{dt.minute:02d}{dt.second:02d}" ) def generate_id(dt: datetime) -> str: return f"gen-ag{dt.strftime('%m%d%H%M')}-{secrets.token_hex(3)}" def sanitize_question(value: Any) -> str: text = str(value or "").replace("\r\n", "\n").replace("\r", "\n") text = "\n".join(line.strip() for line in text.split("\n")) text = re.sub(r"[ \t]+", " ", text).strip() return text def ensure_agent_title(title: str) -> str: normalized = title.strip() if not normalized: raise RuntimeError("Agent semantic run title must not be empty") return normalized if normalized.upper().startswith("AGENT") else f"AGENT | {normalized}" def load_json(path: Path) -> Any: return json.loads(path.read_text(encoding="utf-8")) def write_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") def normalize_questions(raw_questions: list[Any]) -> list[str]: result: list[str] = [] seen: set[str] = set() for item in raw_questions: question = sanitize_question(item) if not question or question in seen: continue seen.add(question) result.append(question) return result def extract_semantic_tags(spec: dict[str, Any]) -> list[str]: steps = spec.get("steps") if not isinstance(steps, list): return [] tags: set[str] = set() for step in steps: if not isinstance(step, dict): continue raw_tags = step.get("semantic_tags") if not isinstance(raw_tags, list): continue for raw_tag in raw_tags: tag = str(raw_tag or "").strip() if tag: tags.add(tag) return sorted(tags) def extract_questions_from_spec(spec: dict[str, Any]) -> list[str]: if isinstance(spec.get("questions"), list): return normalize_questions(list(spec["questions"])) steps = spec.get("steps") if isinstance(steps, list): return normalize_questions( [step.get("question") for step in steps if isinstance(step, dict) and step.get("question")] ) raise RuntimeError("Spec must define either `questions[]` or `steps[].question`") def build_case_set_payload( generation_id: str, title: str, questions: list[str], domain: str | None, scenario_tag: str, ) -> dict[str, Any]: turns = [{"user_message": question} for question in questions] case_id = "SAVED-001" return { "suite_id": f"assistant_saved_session_{generation_id}", "suite_version": "0.1.0", "schema_version": "assistant_saved_session_suite_v0_1", "generated_at": now_utc().isoformat(), "generation_id": generation_id, "mode": "saved_user_sessions", "title": title, "domain": domain, "scenario_count": 1 if turns else 0, "case_ids": [case_id] if turns else [], "cases": [ { "case_id": case_id, "scenario_tag": scenario_tag, "title": title, "question_type": "followup" if len(turns) > 1 else "direct", "broadness_level": "medium", "turns": turns, } ] if turns else [], } def build_snapshot_payload( generation_id: str, title: str, questions: list[str], metadata: dict[str, Any], ) -> dict[str, Any]: created_at = now_utc().isoformat() items: list[dict[str, Any]] = [] for index, question in enumerate(questions, start=1): items.append( { "message_id": f"agent-user-{index:03d}", "role": "user", "text": question, "created_at": created_at, "reply_type": None, "trace_id": None, "debug": None, } ) return { "saved_at": created_at, "generation_id": generation_id, "mode": "saved_user_sessions", "title": title, "agent_run": True, "questions": questions, "metadata": metadata, "source_session_id": None, "session": { "session_id": None, "mode": "agent_semantic_run", "items": items, "agent_run": True, "metadata": metadata, }, } def read_history() -> list[dict[str, Any]]: if not HISTORY_FILE.exists(): return [] parsed = load_json(HISTORY_FILE) return parsed if isinstance(parsed, list) else [] def build_history_record( generation_id: str, title: str, questions: list[str], case_set_file: str, saved_session_file: str, domain: str | None, generated_by: str, metadata: dict[str, Any], ) -> dict[str, Any]: context = { "llm_provider": None, "model": None, "assistant_prompt_version": metadata.get("assistant_prompt_version"), "decomposition_prompt_version": metadata.get("decomposition_prompt_version"), "prompt_fingerprint": metadata.get("prompt_fingerprint"), "autogen_personality_id": None, "autogen_personality_prompt": None, "source_session_id": None, "saved_session_file": saved_session_file, "saved_case_set_kind": "agent_semantic_scenario", "agent_run": True, "agent_focus": metadata.get("agent_focus"), "architecture_phase": metadata.get("architecture_phase"), "source_spec_file": metadata.get("source_spec_file"), "scenario_id": metadata.get("scenario_id"), "semantic_tags": metadata.get("semantic_tags"), } return { "generation_id": generation_id, "created_at": now_utc().isoformat(), "mode": "saved_user_sessions", "title": title, "count": len(questions), "domain": domain, "questions": questions, "generated_by": generated_by, "saved_case_set_file": case_set_file, "context": context, } def build_metadata(args: argparse.Namespace, spec: dict[str, Any], spec_path: Path | None) -> dict[str, Any]: semantic_tags = extract_semantic_tags(spec) return { "assistant_prompt_version": args.assistant_prompt_version, "decomposition_prompt_version": args.decomposition_prompt_version, "prompt_fingerprint": args.prompt_fingerprint, "agent_focus": args.agent_focus or spec.get("description") or spec.get("title"), "architecture_phase": args.architecture_phase, "source_spec_file": str(spec_path.resolve()) if spec_path else None, "scenario_id": str(spec.get("scenario_id") or "").strip() or None, "semantic_tags": semantic_tags, } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Save a targeted AGENT semantic run into autoruns user sessions.") parser.add_argument("--spec", required=True, help="Path to a truth-harness spec or simple questions spec JSON.") parser.add_argument("--title", help="Override title for the AGENT run.") parser.add_argument("--generated-by", default="codex_agent", help="Author label for the generated run.") parser.add_argument("--architecture-phase", default="turnaround_11", help="Architecture phase / slice being validated.") parser.add_argument("--agent-focus", help="Short focus label for the targeted fix.") parser.add_argument("--assistant-prompt-version", help="Optional assistant prompt version metadata.") parser.add_argument("--decomposition-prompt-version", help="Optional decomposition prompt version metadata.") parser.add_argument("--prompt-fingerprint", help="Optional prompt fingerprint metadata.") parser.add_argument("--dry-run", action="store_true", help="Print resulting record metadata without writing files.") return parser.parse_args() def main() -> int: args = parse_args() spec_path = Path(args.spec) if not spec_path.is_absolute(): spec_path = (REPO_ROOT / spec_path).resolve() if not spec_path.exists(): raise RuntimeError(f"Spec file not found: {spec_path}") spec_raw = load_json(spec_path) if not isinstance(spec_raw, dict): raise RuntimeError("Spec JSON must be an object") questions = extract_questions_from_spec(spec_raw) if not questions: raise RuntimeError("Agent semantic run must contain at least one question") domain = str(spec_raw.get("domain") or "").strip() or None source_title = str(args.title or spec_raw.get("title") or spec_path.stem).strip() title = ensure_agent_title(source_title) metadata = build_metadata(args, spec_raw, spec_path) timestamp = now_utc() generation_id = generate_id(timestamp) case_set_file = f"assistant_autogen_saved_user_sessions_{utc_stamp(timestamp)}_{generation_id}.json" saved_session_file = f"assistant_saved_session_{utc_stamp(timestamp)}_{generation_id}.json" case_set_payload = build_case_set_payload( generation_id=generation_id, title=title, questions=questions, domain=domain, scenario_tag="agent_saved_user_sessions", ) snapshot_payload = build_snapshot_payload( generation_id=generation_id, title=title, questions=questions, metadata=metadata, ) record = build_history_record( generation_id=generation_id, title=title, questions=questions, case_set_file=case_set_file, saved_session_file=saved_session_file, domain=domain, generated_by=str(args.generated_by or "codex_agent").strip() or "codex_agent", metadata=metadata, ) if args.dry_run: print( json.dumps( { "ok": True, "dry_run": True, "generation_id": generation_id, "title": title, "questions_total": len(questions), "case_set_file": case_set_file, "saved_session_file": saved_session_file, "domain": domain, }, ensure_ascii=False, indent=2, ) ) return 0 write_json(EVAL_CASES_DIR / case_set_file, case_set_payload) write_json(SAVED_SESSIONS_DIR / saved_session_file, snapshot_payload) history = read_history() history = [record, *[item for item in history if item.get("generation_id") != generation_id]] write_json(HISTORY_FILE, history[:500]) print( json.dumps( { "ok": True, "generation_id": generation_id, "title": title, "questions_total": len(questions), "case_set_file": case_set_file, "saved_session_file": saved_session_file, }, ensure_ascii=False, indent=2, ) ) return 0 if __name__ == "__main__": raise SystemExit(main())