319 lines
11 KiB
Python
319 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import secrets
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
HISTORY_FILE = REPO_ROOT / "llm_normalizer" / "data" / "autorun_generators" / "history.json"
|
|
SAVED_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "autorun_generators" / "saved_sessions"
|
|
EVAL_CASES_DIR = REPO_ROOT / "llm_normalizer" / "data" / "eval_cases"
|
|
|
|
|
|
def now_utc() -> datetime:
|
|
return datetime.now(timezone.utc).replace(microsecond=0)
|
|
|
|
|
|
def utc_stamp(dt: datetime) -> str:
|
|
return (
|
|
f"{dt.year:04d}{dt.month:02d}{dt.day:02d}"
|
|
f"{dt.hour:02d}{dt.minute:02d}{dt.second:02d}"
|
|
)
|
|
|
|
|
|
def generate_id(dt: datetime) -> str:
|
|
return f"gen-ag{dt.strftime('%m%d%H%M')}-{secrets.token_hex(3)}"
|
|
|
|
|
|
def sanitize_question(value: Any) -> str:
|
|
text = str(value or "").replace("\r\n", "\n").replace("\r", "\n")
|
|
text = "\n".join(line.strip() for line in text.split("\n"))
|
|
text = re.sub(r"[ \t]+", " ", text).strip()
|
|
return text
|
|
|
|
|
|
def ensure_agent_title(title: str) -> str:
|
|
normalized = title.strip()
|
|
if not normalized:
|
|
raise RuntimeError("Agent semantic run title must not be empty")
|
|
return normalized if normalized.upper().startswith("AGENT") else f"AGENT | {normalized}"
|
|
|
|
|
|
def load_json(path: Path) -> Any:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def write_json(path: Path, payload: Any) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
|
|
|
|
|
def normalize_questions(raw_questions: list[Any]) -> list[str]:
|
|
result: list[str] = []
|
|
seen: set[str] = set()
|
|
for item in raw_questions:
|
|
question = sanitize_question(item)
|
|
if not question or question in seen:
|
|
continue
|
|
seen.add(question)
|
|
result.append(question)
|
|
return result
|
|
|
|
|
|
def extract_questions_from_spec(spec: dict[str, Any]) -> list[str]:
|
|
if isinstance(spec.get("questions"), list):
|
|
return normalize_questions(list(spec["questions"]))
|
|
steps = spec.get("steps")
|
|
if isinstance(steps, list):
|
|
return normalize_questions(
|
|
[step.get("question") for step in steps if isinstance(step, dict) and step.get("question")]
|
|
)
|
|
raise RuntimeError("Spec must define either `questions[]` or `steps[].question`")
|
|
|
|
|
|
def build_case_set_payload(
|
|
generation_id: str,
|
|
title: str,
|
|
questions: list[str],
|
|
domain: str | None,
|
|
scenario_tag: str,
|
|
) -> dict[str, Any]:
|
|
turns = [{"user_message": question} for question in questions]
|
|
case_id = "SAVED-001"
|
|
return {
|
|
"suite_id": f"assistant_saved_session_{generation_id}",
|
|
"suite_version": "0.1.0",
|
|
"schema_version": "assistant_saved_session_suite_v0_1",
|
|
"generated_at": now_utc().isoformat(),
|
|
"generation_id": generation_id,
|
|
"mode": "saved_user_sessions",
|
|
"title": title,
|
|
"domain": domain,
|
|
"scenario_count": 1 if turns else 0,
|
|
"case_ids": [case_id] if turns else [],
|
|
"cases": [
|
|
{
|
|
"case_id": case_id,
|
|
"scenario_tag": scenario_tag,
|
|
"title": title,
|
|
"question_type": "followup" if len(turns) > 1 else "direct",
|
|
"broadness_level": "medium",
|
|
"turns": turns,
|
|
}
|
|
]
|
|
if turns
|
|
else [],
|
|
}
|
|
|
|
|
|
def build_snapshot_payload(
|
|
generation_id: str,
|
|
title: str,
|
|
questions: list[str],
|
|
metadata: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
created_at = now_utc().isoformat()
|
|
items: list[dict[str, Any]] = []
|
|
for index, question in enumerate(questions, start=1):
|
|
items.append(
|
|
{
|
|
"message_id": f"agent-user-{index:03d}",
|
|
"role": "user",
|
|
"text": question,
|
|
"created_at": created_at,
|
|
"reply_type": None,
|
|
"trace_id": None,
|
|
"debug": None,
|
|
}
|
|
)
|
|
return {
|
|
"saved_at": created_at,
|
|
"generation_id": generation_id,
|
|
"mode": "saved_user_sessions",
|
|
"title": title,
|
|
"agent_run": True,
|
|
"questions": questions,
|
|
"metadata": metadata,
|
|
"source_session_id": None,
|
|
"session": {
|
|
"session_id": None,
|
|
"mode": "agent_semantic_run",
|
|
"items": items,
|
|
"agent_run": True,
|
|
"metadata": metadata,
|
|
},
|
|
}
|
|
|
|
|
|
def read_history() -> list[dict[str, Any]]:
|
|
if not HISTORY_FILE.exists():
|
|
return []
|
|
parsed = load_json(HISTORY_FILE)
|
|
return parsed if isinstance(parsed, list) else []
|
|
|
|
|
|
def build_history_record(
|
|
generation_id: str,
|
|
title: str,
|
|
questions: list[str],
|
|
case_set_file: str,
|
|
saved_session_file: str,
|
|
domain: str | None,
|
|
generated_by: str,
|
|
metadata: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
context = {
|
|
"llm_provider": None,
|
|
"model": None,
|
|
"assistant_prompt_version": metadata.get("assistant_prompt_version"),
|
|
"decomposition_prompt_version": metadata.get("decomposition_prompt_version"),
|
|
"prompt_fingerprint": metadata.get("prompt_fingerprint"),
|
|
"autogen_personality_id": None,
|
|
"autogen_personality_prompt": None,
|
|
"source_session_id": None,
|
|
"saved_session_file": saved_session_file,
|
|
"saved_case_set_kind": "agent_semantic_scenario",
|
|
"agent_run": True,
|
|
"agent_focus": metadata.get("agent_focus"),
|
|
"architecture_phase": metadata.get("architecture_phase"),
|
|
"source_spec_file": metadata.get("source_spec_file"),
|
|
}
|
|
return {
|
|
"generation_id": generation_id,
|
|
"created_at": now_utc().isoformat(),
|
|
"mode": "saved_user_sessions",
|
|
"title": title,
|
|
"count": len(questions),
|
|
"domain": domain,
|
|
"questions": questions,
|
|
"generated_by": generated_by,
|
|
"saved_case_set_file": case_set_file,
|
|
"context": context,
|
|
}
|
|
|
|
|
|
def build_metadata(args: argparse.Namespace, spec: dict[str, Any], spec_path: Path | None) -> dict[str, Any]:
|
|
return {
|
|
"assistant_prompt_version": args.assistant_prompt_version,
|
|
"decomposition_prompt_version": args.decomposition_prompt_version,
|
|
"prompt_fingerprint": args.prompt_fingerprint,
|
|
"agent_focus": args.agent_focus or spec.get("description") or spec.get("title"),
|
|
"architecture_phase": args.architecture_phase,
|
|
"source_spec_file": str(spec_path.resolve()) if spec_path else None,
|
|
}
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Save a targeted AGENT semantic run into autoruns user sessions.")
|
|
parser.add_argument("--spec", required=True, help="Path to a truth-harness spec or simple questions spec JSON.")
|
|
parser.add_argument("--title", help="Override title for the AGENT run.")
|
|
parser.add_argument("--generated-by", default="codex_agent", help="Author label for the generated run.")
|
|
parser.add_argument("--architecture-phase", default="turnaround_11", help="Architecture phase / slice being validated.")
|
|
parser.add_argument("--agent-focus", help="Short focus label for the targeted fix.")
|
|
parser.add_argument("--assistant-prompt-version", help="Optional assistant prompt version metadata.")
|
|
parser.add_argument("--decomposition-prompt-version", help="Optional decomposition prompt version metadata.")
|
|
parser.add_argument("--prompt-fingerprint", help="Optional prompt fingerprint metadata.")
|
|
parser.add_argument("--dry-run", action="store_true", help="Print resulting record metadata without writing files.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
spec_path = Path(args.spec)
|
|
if not spec_path.is_absolute():
|
|
spec_path = (REPO_ROOT / spec_path).resolve()
|
|
if not spec_path.exists():
|
|
raise RuntimeError(f"Spec file not found: {spec_path}")
|
|
|
|
spec_raw = load_json(spec_path)
|
|
if not isinstance(spec_raw, dict):
|
|
raise RuntimeError("Spec JSON must be an object")
|
|
|
|
questions = extract_questions_from_spec(spec_raw)
|
|
if not questions:
|
|
raise RuntimeError("Agent semantic run must contain at least one question")
|
|
|
|
domain = str(spec_raw.get("domain") or "").strip() or None
|
|
source_title = str(args.title or spec_raw.get("title") or spec_path.stem).strip()
|
|
title = ensure_agent_title(source_title)
|
|
metadata = build_metadata(args, spec_raw, spec_path)
|
|
|
|
timestamp = now_utc()
|
|
generation_id = generate_id(timestamp)
|
|
case_set_file = f"assistant_autogen_saved_user_sessions_{utc_stamp(timestamp)}_{generation_id}.json"
|
|
saved_session_file = f"assistant_saved_session_{utc_stamp(timestamp)}_{generation_id}.json"
|
|
case_set_payload = build_case_set_payload(
|
|
generation_id=generation_id,
|
|
title=title,
|
|
questions=questions,
|
|
domain=domain,
|
|
scenario_tag="agent_saved_user_sessions",
|
|
)
|
|
snapshot_payload = build_snapshot_payload(
|
|
generation_id=generation_id,
|
|
title=title,
|
|
questions=questions,
|
|
metadata=metadata,
|
|
)
|
|
record = build_history_record(
|
|
generation_id=generation_id,
|
|
title=title,
|
|
questions=questions,
|
|
case_set_file=case_set_file,
|
|
saved_session_file=saved_session_file,
|
|
domain=domain,
|
|
generated_by=str(args.generated_by or "codex_agent").strip() or "codex_agent",
|
|
metadata=metadata,
|
|
)
|
|
|
|
if args.dry_run:
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"ok": True,
|
|
"dry_run": True,
|
|
"generation_id": generation_id,
|
|
"title": title,
|
|
"questions_total": len(questions),
|
|
"case_set_file": case_set_file,
|
|
"saved_session_file": saved_session_file,
|
|
"domain": domain,
|
|
},
|
|
ensure_ascii=False,
|
|
indent=2,
|
|
)
|
|
)
|
|
return 0
|
|
|
|
write_json(EVAL_CASES_DIR / case_set_file, case_set_payload)
|
|
write_json(SAVED_SESSIONS_DIR / saved_session_file, snapshot_payload)
|
|
history = read_history()
|
|
history = [record, *[item for item in history if item.get("generation_id") != generation_id]]
|
|
write_json(HISTORY_FILE, history[:500])
|
|
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"ok": True,
|
|
"generation_id": generation_id,
|
|
"title": title,
|
|
"questions_total": len(questions),
|
|
"case_set_file": case_set_file,
|
|
"saved_session_file": saved_session_file,
|
|
},
|
|
ensure_ascii=False,
|
|
indent=2,
|
|
)
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|