from __future__ import annotations import argparse import json import re import subprocess import sys import textwrap import time from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen REPO_ROOT = Path(__file__).resolve().parent.parent DEFAULT_ARTIFACTS_ROOT = REPO_ROOT / "artifacts" / "domain_runs" DEFAULT_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "assistant_sessions" DEFAULT_REPORTS_DIR = REPO_ROOT / "llm_normalizer" / "reports" DEFAULT_LOOP_SCHEMA_DIR = REPO_ROOT / "docs" / "orchestration" / "schemas" DEFAULT_BACKEND_URL = "http://127.0.0.1:8787" DEFAULT_PROMPT_VERSION = "address_query_runtime_v1" DEFAULT_LLM_PROVIDER = "local" DEFAULT_LLM_MODEL = "qwen2.5-14b-instruct-1m" DEFAULT_LLM_BASE_URL = "http://127.0.0.1:1234/v1" DEFAULT_LLM_API_KEY = "" DEFAULT_TEMPERATURE = 0.0 DEFAULT_MAX_OUTPUT_TOKENS = 900 TECH_SECTION_HEADER = "### technical_debug_payload_json" SCENARIO_MANIFEST_SCHEMA_VERSION = "domain_scenario_manifest_v1" SCENARIO_STATE_SCHEMA_VERSION = "domain_scenario_state_v1" SCENARIO_STEP_STATE_SCHEMA_VERSION = "domain_scenario_step_state_v1" SCENARIO_PACK_SCHEMA_VERSION = "domain_scenario_pack_v1" AUTONOMOUS_LOOP_SCHEMA_VERSION = "domain_autonomous_loop_v1" def dump_json(payload: Any) -> str: return json.dumps(payload, ensure_ascii=False, indent=2) def write_text(file_path: Path, text: str) -> None: file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(text, encoding="utf-8", newline="\n") def write_json(file_path: Path, payload: Any) -> None: write_text(file_path, dump_json(payload) + "\n") def read_text_file(file_path: Path) -> str: return file_path.read_text(encoding="utf-8-sig") def sanitize_export_text(value: str) -> str: raw = str(value or "") debug_heading = re.search( r"(?:^|\n)\s*#{0,6}\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json|debug_payload|technical_breakdown)\b", raw, flags=re.IGNORECASE, ) pre_cut = raw[: debug_heading.start()] if debug_heading else raw without_debug = re.sub( r"###\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json)[\s\S]*?(?:```[\s\S]*?```|$)", "", pre_cut, flags=re.IGNORECASE, ) without_debug = re.sub( r"(?:^|\n)\s*#{0,6}\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json)\b[\s\S]*$", "", without_debug, flags=re.IGNORECASE, ) inline_patterns = [ re.compile(r"\b(?:debug_payload_json|technical_breakdown_json)\b", re.IGNORECASE), re.compile(r"\b(?:route_summary|semantic_profile|domain_scope|relation_patterns|account_scope)\b", re.IGNORECASE), re.compile(r"\b(?:coverage_report|retrieval_status|problem_unit_state|candidate_evidence)\b", re.IGNORECASE), re.compile(r"\b(?:graph_domain_scope|graph_runtime|selection_reason|why_included)\b", re.IGNORECASE), ] output_lines: list[str] = [] for line in without_debug.splitlines(): cleaned = line.rstrip() if not cleaned.strip(): continue if any(pattern.search(cleaned) for pattern in inline_patterns): continue output_lines.append(cleaned) return "\n".join(output_lines).strip() def build_conversation_export(session_id: str, conversation: list[dict[str, Any]], mode: str = "technical") -> str: include_debug = mode == "technical" lines = [ "# Assistant conversation export", f"session_id: {session_id or 'n/a'}", f"export_mode: {mode}", f"exported_at: {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}", "", ] for index, item in enumerate(conversation, start=1): safe_text = sanitize_export_text(str(item.get("text") or "")) lines.append(f"## {index}. {item.get('role', 'unknown')}") lines.append(f"message_id: {item.get('message_id') or 'n/a'}") lines.append(f"created_at: {item.get('created_at') or 'n/a'}") lines.append(f"reply_type: {item.get('reply_type') or 'n/a'}") trace_id = item.get("trace_id") if trace_id: lines.append(f"trace_id: {trace_id}") lines.extend(["", safe_text or "(empty)", ""]) if include_debug and item.get("role") == "assistant" and item.get("debug") is not None: lines.extend([TECH_SECTION_HEADER, "```json", dump_json(item["debug"]), "```", ""]) return "\n".join(lines) def slugify_token(value: str | None, fallback: str) -> str: cleaned = re.sub(r"[^\w-]+", "_", str(value or "").strip(), flags=re.UNICODE).strip("_") return cleaned or fallback def slugify_case_id(domain: str, explicit_case_id: str | None) -> str: if explicit_case_id: normalized = explicit_case_id.strip() if normalized: return normalized timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") return f"{slugify_token(domain, 'domain_case')}_{timestamp}" def normalize_iso_date(value: Any) -> str | None: if not isinstance(value, str): return None candidate = value.strip() if not re.fullmatch(r"\d{4}-\d{2}-\d{2}", candidate): return None try: datetime.strptime(candidate, "%Y-%m-%d") except ValueError: return None return candidate def normalize_analysis_context(raw_context: Any) -> dict[str, Any]: source = raw_context if isinstance(raw_context, dict) else {} output: dict[str, Any] = {} as_of_date = normalize_iso_date(source.get("as_of_date")) period_from = normalize_iso_date(source.get("period_from")) period_to = normalize_iso_date(source.get("period_to")) snapshot_mode_raw = str(source.get("snapshot_mode") or "").strip() source_raw = str(source.get("source") or "").strip() if as_of_date: output["as_of_date"] = as_of_date if period_from: output["period_from"] = period_from if period_to: output["period_to"] = period_to if snapshot_mode_raw in {"auto", "force_snapshot", "force_live"}: output["snapshot_mode"] = snapshot_mode_raw if source_raw: output["source"] = source_raw return output def merge_analysis_context(base_context: Any, override_context: Any) -> dict[str, Any]: merged = normalize_analysis_context(base_context) merged.update(normalize_analysis_context(override_context)) return merged def carry_forward_analysis_context( scenario_state: dict[str, Any], analysis_context: dict[str, Any], ) -> dict[str, Any]: carried = dict(analysis_context) semantic_memory = scenario_state.get("semantic_memory") if isinstance(semantic_memory, dict): date_scope = semantic_memory.get("date_scope") if isinstance(date_scope, dict): carried_as_of_date = normalize_iso_date(date_scope.get("as_of_date")) if carried_as_of_date and not carried.get("as_of_date"): carried["as_of_date"] = carried_as_of_date if not carried.get("source"): carried["source"] = "scenario_state_carryover" return carried def merge_scenario_date_scope( previous_date_scope: Any, current_date_scope: Any, *, depends_on: list[str], ) -> Any: previous = previous_date_scope if isinstance(previous_date_scope, dict) else None current = current_date_scope if isinstance(current_date_scope, dict) else None if not current: return previous or current_date_scope if not depends_on or not previous: return current previous_as_of_date = normalize_iso_date(previous.get("as_of_date")) current_as_of_date = normalize_iso_date(current.get("as_of_date")) if previous_as_of_date and current_as_of_date and current_as_of_date != previous_as_of_date: merged = dict(current) merged["as_of_date"] = previous_as_of_date if not merged.get("source"): merged["source"] = "scenario_state_carryover" return merged return current def repair_text_mojibake(value: str) -> str: if not value: return value for encoding in ("utf-8", "cp1251", "cp866"): try: repaired = value.encode("latin1").decode(encoding) except (UnicodeEncodeError, UnicodeDecodeError): continue if repaired != value: return repaired return value def normalize_binding_value(value: Any) -> Any: if isinstance(value, str): return repair_text_mojibake(value) if isinstance(value, list): return [normalize_binding_value(item) for item in value] if isinstance(value, dict): return {normalize_binding_value(key) if isinstance(key, str) else key: normalize_binding_value(item) for key, item in value.items()} return value def normalize_bindings(raw_bindings: Any) -> dict[str, Any]: if not isinstance(raw_bindings, dict): return {} return {str(key): normalize_binding_value(value) for key, value in raw_bindings.items()} def drop_none_values(payload: dict[str, Any]) -> dict[str, Any]: return {key: value for key, value in payload.items() if value is not None} def http_json(url: str, *, method: str = "GET", payload: dict[str, Any] | None = None, timeout: int = 30) -> dict[str, Any]: data = None headers = {"Accept": "application/json"} if payload is not None: data = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json; charset=utf-8" request = Request(url, data=data, method=method, headers=headers) try: with urlopen(request, timeout=timeout) as response: body = response.read().decode("utf-8") except HTTPError as error: detail = error.read().decode("utf-8", errors="replace") raise RuntimeError(f"HTTP {error.code} for {url}: {detail}") from error except URLError as error: raise RuntimeError(f"Failed to reach backend at {url}: {error}") from error try: return json.loads(body) except json.JSONDecodeError as error: raise RuntimeError(f"Backend returned non-JSON payload for {url}") from error def ensure_backend_health(backend_url: str, timeout_seconds: int) -> dict[str, Any]: health = http_json(f"{backend_url}/api/health", timeout=max(5, min(timeout_seconds, 30))) if not isinstance(health, dict): raise RuntimeError(f"Backend health endpoint returned invalid payload at {backend_url}/api/health") if health.get("ok") is False: raise RuntimeError(f"Backend health check failed for {backend_url}: {dump_json(health)}") return health def wait_for_job(backend_url: str, job_id: str, timeout_seconds: int, poll_interval_seconds: float) -> dict[str, Any]: deadline = time.time() + timeout_seconds last_status = None while time.time() < deadline: response = http_json(f"{backend_url}/api/eval/run-async/{job_id}") job = response.get("job") if not isinstance(job, dict): raise RuntimeError("Async job response does not contain `job` object") status = str(job.get("status") or "unknown") if status != last_status: print(f"[domain-case-loop] job {job_id}: {status}") last_status = status if status in {"completed", "failed"}: return job time.sleep(poll_interval_seconds) raise TimeoutError(f"Async job {job_id} did not finish within {timeout_seconds} seconds") def wait_for_file(file_path: Path, timeout_seconds: int = 30) -> None: deadline = time.time() + timeout_seconds while time.time() < deadline: if file_path.exists(): return time.sleep(0.5) raise FileNotFoundError(f"Timed out waiting for file: {file_path}") def read_json_file(file_path: Path) -> dict[str, Any]: return json.loads(read_text_file(file_path)) def extract_conversation_from_session(session_record: dict[str, Any]) -> list[dict[str, Any]]: items = session_record.get("items") if isinstance(items, list) and items: output: list[dict[str, Any]] = [] for item in items: if not isinstance(item, dict): continue output.append( { "message_id": item.get("message_id"), "role": item.get("role"), "text": item.get("text") or "", "reply_type": item.get("reply_type"), "created_at": item.get("created_at"), "trace_id": item.get("trace_id"), "debug": item.get("debug"), } ) if output: return output conversation = session_record.get("conversation") if isinstance(conversation, list) and conversation: output = [] for item in conversation: if not isinstance(item, dict): continue output.append( { "message_id": item.get("message_id"), "role": item.get("role"), "text": item.get("text") or "", "reply_type": item.get("reply_type"), "created_at": item.get("created_at"), "trace_id": item.get("trace_id"), "debug": item.get("debug"), } ) if output: return output turns = session_record.get("turns") if not isinstance(turns, list): return [] output = [] for turn in turns: if not isinstance(turn, dict): continue technical_json = turn.get("technical_json") if not isinstance(technical_json, dict): continue for item in (technical_json.get("user_message"), technical_json.get("assistant_message")): if not isinstance(item, dict): continue output.append( { "message_id": item.get("message_id"), "role": item.get("role"), "text": item.get("text") or "", "reply_type": item.get("reply_type"), "created_at": item.get("created_at"), "trace_id": item.get("trace_id"), "debug": item.get("debug"), } ) return output def find_last_assistant(conversation: list[dict[str, Any]]) -> dict[str, Any]: for item in reversed(conversation): if item.get("role") == "assistant": return item raise RuntimeError("Conversation does not contain assistant message") def find_last_user_before(conversation: list[dict[str, Any]], assistant_message_id: Any) -> dict[str, Any] | None: before_assistant: list[dict[str, Any]] = [] for item in conversation: if item.get("message_id") == assistant_message_id: break before_assistant.append(item) for item in reversed(before_assistant): if item.get("role") == "user": return item return None def extract_last_turn(session_record: dict[str, Any]) -> dict[str, Any] | None: turns = session_record.get("turns") if not isinstance(turns, list) or not turns: return None last_turn = turns[-1] return last_turn if isinstance(last_turn, dict) else None def extract_report_case(report_record: dict[str, Any], case_id: str) -> dict[str, Any] | None: results = report_record.get("results") if not isinstance(results, list): return None for item in results: if isinstance(item, dict) and str(item.get("case_id") or "") == case_id: return item return None def build_turn_artifact( *, slot: str, domain: str, case_id: str, question: str | None, session_id: str, conversation: list[dict[str, Any]], session_record: dict[str, Any] | None, job_record: dict[str, Any] | None, report_case: dict[str, Any] | None, export_file_name: str, ) -> dict[str, Any]: last_assistant = find_last_assistant(conversation) last_user = find_last_user_before(conversation, last_assistant.get("message_id")) final_question = question or (last_user.get("text") if isinstance(last_user, dict) else None) last_turn = extract_last_turn(session_record or {}) session_record_safe = session_record or {} return { "schema_version": "domain_case_turn_artifact_v1", "artifact_slot": slot, "domain": domain, "case_id": case_id, "question": final_question, "session_id": session_id, "run": { "job_id": job_record.get("job_id") if isinstance(job_record, dict) else None, "run_id": job_record.get("run_id") if isinstance(job_record, dict) else None, "analysis_date": job_record.get("analysis_date") if isinstance(job_record, dict) else None, "report_case_available": report_case is not None, }, "user_message": last_user, "assistant_message": last_assistant, "technical_debug_payload": last_assistant.get("debug"), "session_summary": { "schema_version": session_record_safe.get("schema_version"), "updated_at": session_record_safe.get("updated_at"), "trace_ids": session_record_safe.get("trace_ids"), "reply_types": session_record_safe.get("reply_types"), "investigation_state": session_record_safe.get("investigation_state"), "address_navigation_state": session_record_safe.get("address_navigation_state"), "items_count": len(session_record_safe.get("items", [])) if isinstance(session_record_safe.get("items"), list) else None, "last_turn": last_turn, }, "report_case": report_case, "export_markdown_file": export_file_name, } def ensure_case_brief( case_dir: Path, *, domain: str, question: str | None, expected_capability: str | None, expected_result_mode: str | None, ) -> None: file_path = case_dir / "case_brief.md" if file_path.exists(): return content = textwrap.dedent( f"""\ # Case brief ## Domain `{domain}` ## Raw user question `{question or ""}` ## Expected business meaning - ## Expected capability - {expected_capability or ""} ## Expected result mode - {expected_result_mode or ""} ## Constraints - no architecture changes - 1C/MCP first - no fabricated values - heuristic is not product success - accepted requires analyst quality score >= 80 and zero unresolved P0 ## Known current behavior - ## Draft acceptance criteria - """ ) write_text(file_path, content) def save_capture_bundle( *, case_dir: Path, slot: str, export_markdown: str, debug_payload: Any, turn_artifact: dict[str, Any], session_record: dict[str, Any] | None, job_record: dict[str, Any] | None, report_case: dict[str, Any] | None, ) -> None: write_text(case_dir / f"{slot}_output.md", export_markdown) write_json(case_dir / f"{slot}_debug.json", debug_payload if debug_payload is not None else {}) write_json(case_dir / f"{slot}_turn.json", turn_artifact) if session_record is not None: write_json(case_dir / f"{slot}_session.json", session_record) if job_record is not None: write_json(case_dir / f"{slot}_job.json", job_record) if report_case is not None: write_json(case_dir / f"{slot}_report_case.json", report_case) def parse_metadata_line(line: str) -> tuple[str, str] | None: if ":" not in line: return None key, value = line.split(":", 1) return key.strip(), value.strip() def parse_export_markdown(text: str) -> tuple[str, list[dict[str, Any]]]: session_id = "n/a" session_match = re.search(r"^session_id:\s*(.+?)\s*$", text, flags=re.MULTILINE) if session_match: session_id = session_match.group(1).strip() section_pattern = re.compile(r"^##\s+\d+\.\s+(user|assistant)\s*$", flags=re.MULTILINE) sections = list(section_pattern.finditer(text)) conversation: list[dict[str, Any]] = [] for index, match in enumerate(sections): role = match.group(1) start = match.end() end = sections[index + 1].start() if index + 1 < len(sections) else len(text) block = text[start:end].lstrip("\r\n") lines = block.splitlines() metadata: dict[str, Any] = {"role": role} cursor = 0 while cursor < len(lines): line = lines[cursor] if not line.strip(): cursor += 1 break meta = parse_metadata_line(line) if not meta: break key, value = meta metadata[key] = value cursor += 1 body_lines = lines[cursor:] debug_payload = None debug_start = None for body_index, line in enumerate(body_lines): if line.strip().lower() == TECH_SECTION_HEADER.lower(): debug_start = body_index break if debug_start is not None: body_text_lines = body_lines[:debug_start] debug_lines = body_lines[debug_start + 1 :] debug_text = "\n".join(debug_lines).strip() fenced = re.search(r"```json\s*(.*?)\s*```", debug_text, flags=re.DOTALL | re.IGNORECASE) if fenced: debug_text = fenced.group(1).strip() if debug_text: debug_payload = json.loads(debug_text) else: body_text_lines = body_lines conversation.append( { "message_id": metadata.get("message_id"), "role": role, "text": "\n".join(body_text_lines).strip(), "reply_type": metadata.get("reply_type"), "created_at": metadata.get("created_at"), "trace_id": metadata.get("trace_id"), "debug": debug_payload, } ) if not conversation: raise RuntimeError("Could not parse conversation sections from export markdown") return session_id, conversation def build_normalize_config(args: argparse.Namespace) -> dict[str, Any]: return { "llmProvider": args.llm_provider, "apiKey": args.llm_api_key, "model": args.llm_model, "baseUrl": args.llm_base_url, "temperature": args.temperature, "maxOutputTokens": args.max_output_tokens, "promptVersion": args.prompt_version, } def fetch_session_snapshot(backend_url: str, session_id: str, timeout_seconds: int) -> dict[str, Any]: deadline = time.time() + timeout_seconds last_error: Exception | None = None timeout_per_call = max(10, min(30, timeout_seconds)) while time.time() < deadline: try: response = http_json(f"{backend_url}/api/assistant/session/{session_id}", timeout=timeout_per_call) session = response.get("session") if isinstance(session, dict): return session except Exception as error: # noqa: BLE001 last_error = error time.sleep(0.5) if last_error is not None: raise RuntimeError(f"Failed to fetch assistant session `{session_id}`: {last_error}") from last_error raise RuntimeError(f"Assistant session `{session_id}` was not available within {timeout_seconds} seconds") def build_assistant_message_payload( args: argparse.Namespace, *, question: str, session_id: str | None, analysis_context: dict[str, Any], ) -> dict[str, Any]: context_payload: dict[str, Any] = {} normalized_analysis_context = normalize_analysis_context(analysis_context) if normalized_analysis_context: context_payload["analysis_context"] = normalized_analysis_context as_of_date = normalized_analysis_context.get("as_of_date") if isinstance(as_of_date, str): context_payload["period_hint"] = as_of_date payload = drop_none_values( { "session_id": session_id, "user_message": question, "message": question, "mode": "assistant", "llmProvider": args.llm_provider, "apiKey": args.llm_api_key, "model": args.llm_model, "baseUrl": args.llm_base_url, "temperature": args.temperature, "maxOutputTokens": args.max_output_tokens, "promptVersion": args.prompt_version, "context": context_payload or None, "useMock": bool(args.use_mock), } ) return payload def parse_path_tokens(path_expression: str) -> list[str | int]: tokens: list[str | int] = [] cursor = 0 path = path_expression.strip() while cursor < len(path): current = path[cursor] if current == ".": cursor += 1 continue if current == "[": closing = path.find("]", cursor) if closing < 0: raise RuntimeError(f"Invalid placeholder path: {path_expression}") raw_index = path[cursor + 1 : closing].strip() if not raw_index.isdigit(): raise RuntimeError(f"Only numeric list indexes are supported in placeholders: {path_expression}") tokens.append(int(raw_index)) cursor = closing + 1 continue end = cursor while end < len(path) and path[end] not in ".[": end += 1 tokens.append(path[cursor:end]) cursor = end return tokens def lookup_placeholder_value(path_expression: str, scenario_state: dict[str, Any]) -> Any: root: dict[str, Any] = { "scenario_state": scenario_state, "step_outputs": scenario_state.get("step_outputs", {}), "semantic_memory": scenario_state.get("semantic_memory", {}), "bindings": scenario_state.get("bindings", {}), } if isinstance(scenario_state.get("step_outputs"), dict): root.update(scenario_state["step_outputs"]) current: Any = root for token in parse_path_tokens(path_expression): if isinstance(token, int): if not isinstance(current, list): raise RuntimeError(f"Placeholder `{path_expression}` does not point to a list before index access") if token >= len(current): raise RuntimeError(f"Placeholder `{path_expression}` index {token} is out of range") current = current[token] continue if not isinstance(current, dict) or token not in current: raise RuntimeError(f"Placeholder `{path_expression}` could not be resolved at `{token}`") current = current[token] return current def resolve_question_template(question_template: str, scenario_state: dict[str, Any]) -> str: pattern = re.compile(r"{{\s*([^{}]+?)\s*}}") def replace(match: re.Match[str]) -> str: value = lookup_placeholder_value(match.group(1), scenario_state) if isinstance(value, (dict, list)): return dump_json(value) return str(value) return pattern.sub(replace, question_template).strip() def build_failed_step_state( *, scenario_id: str, domain: str, step: dict[str, Any], step_index: int, question_resolved: str, status: str, failure_type: str, error_message: str, ) -> dict[str, Any]: return { "schema_version": SCENARIO_STEP_STATE_SCHEMA_VERSION, "scenario_id": scenario_id, "domain": domain, "step_id": step["step_id"], "step_index": step_index, "title": step["title"], "depends_on": step["depends_on"], "question_template": step["question_template"], "question_resolved": question_resolved, "expected_capability": step.get("expected_capability"), "expected_result_mode": step.get("expected_result_mode"), "reply_type": "backend_error" if status == "blocked" else "unresolved_followup", "assistant_message_id": None, "trace_id": None, "detected_mode": None, "detected_intent": None, "selected_recipe": None, "capability_id": None, "capability_route_mode": None, "route_expectation_status": None, "result_mode": None, "response_type": None, "fallback_type": failure_type, "mcp_call_status": None, "balance_confirmed": None, "active_result_set_id": None, "last_confirmed_route": None, "date_scope": None, "organization_scope": None, "entries": [], "status": status, "failure_type": failure_type, "error_message": error_message, } def normalize_step_definition(index: int, raw_step: Any) -> dict[str, Any]: if isinstance(raw_step, str): step_id = f"step_{index:02d}" question_template = raw_step.strip() if not question_template: raise RuntimeError(f"Scenario step {index} must not be empty") return { "step_id": step_id, "title": step_id, "question_template": question_template, "depends_on": [], "analysis_context": {}, "expected_capability": None, "expected_result_mode": None, } if not isinstance(raw_step, dict): raise RuntimeError(f"Scenario step {index} must be a string or object") raw_step_id = str(raw_step.get("step_id") or "").strip() step_id = slugify_token(raw_step_id, f"step_{index:02d}") question_template = str(raw_step.get("question") or raw_step.get("question_template") or "").strip() if not question_template: raise RuntimeError(f"Scenario step `{step_id}` must define `question` or `question_template`") depends_on_raw = raw_step.get("depends_on") depends_on: list[str] = [] if isinstance(depends_on_raw, str) and depends_on_raw.strip(): depends_on = [depends_on_raw.strip()] elif isinstance(depends_on_raw, list): depends_on = [str(item).strip() for item in depends_on_raw if str(item).strip()] return { "step_id": step_id, "title": str(raw_step.get("title") or step_id).strip() or step_id, "question_template": question_template, "depends_on": depends_on, "analysis_context": normalize_analysis_context(raw_step.get("analysis_context")), "expected_capability": str(raw_step.get("expected_capability") or "").strip() or None, "expected_result_mode": str(raw_step.get("expected_result_mode") or "").strip() or None, } def normalize_scenario_manifest( raw_manifest: dict[str, Any], *, fallback_domain: str | None = None, fallback_analysis_context: Any = None, fallback_bindings: Any = None, default_scenario_id: str | None = None, ) -> dict[str, Any]: domain = str(raw_manifest.get("domain") or fallback_domain or "").strip() if not domain: raise RuntimeError("Scenario manifest must define `domain`") raw_steps = raw_manifest.get("steps") if not isinstance(raw_steps, list) or not raw_steps: raise RuntimeError("Scenario manifest must define non-empty `steps`") steps = [normalize_step_definition(index + 1, raw_step) for index, raw_step in enumerate(raw_steps)] scenario_id = str(raw_manifest.get("scenario_id") or "").strip() or default_scenario_id or slugify_case_id(domain, None) title = str(raw_manifest.get("title") or domain).strip() or domain description = str(raw_manifest.get("description") or "").strip() or None analysis_context = merge_analysis_context(fallback_analysis_context, raw_manifest.get("analysis_context")) if analysis_context and "source" not in analysis_context: analysis_context["source"] = "scenario_manifest" bindings = normalize_bindings(fallback_bindings) bindings.update(normalize_bindings(raw_manifest.get("bindings"))) return { "schema_version": str(raw_manifest.get("schema_version") or SCENARIO_MANIFEST_SCHEMA_VERSION), "scenario_id": scenario_id, "domain": domain, "title": title, "description": description, "analysis_context": analysis_context, "bindings": bindings, "steps": steps, } def load_scenario_manifest(file_path: Path) -> dict[str, Any]: raw_manifest = read_json_file(file_path) return normalize_scenario_manifest(raw_manifest) def load_scenario_pack(file_path: Path) -> dict[str, Any]: raw_pack = read_json_file(file_path) domain = str(raw_pack.get("domain") or "").strip() if not domain: raise RuntimeError("Scenario pack must define `domain`") raw_scenarios = raw_pack.get("scenarios") if not isinstance(raw_scenarios, list) or not raw_scenarios: raise RuntimeError("Scenario pack must define non-empty `scenarios`") pack_id = str(raw_pack.get("pack_id") or "").strip() or slugify_case_id(domain, None) title = str(raw_pack.get("title") or domain).strip() or domain description = str(raw_pack.get("description") or "").strip() or None analysis_context = normalize_analysis_context(raw_pack.get("analysis_context")) if analysis_context and "source" not in analysis_context: analysis_context["source"] = "scenario_pack" bindings = normalize_bindings(raw_pack.get("bindings")) scenarios = [ normalize_scenario_manifest( raw_scenario, fallback_domain=domain, fallback_analysis_context=analysis_context, fallback_bindings=bindings, default_scenario_id=f"{pack_id}_scenario_{index + 1:02d}", ) for index, raw_scenario in enumerate(raw_scenarios) ] return { "schema_version": str(raw_pack.get("schema_version") or SCENARIO_PACK_SCHEMA_VERSION), "pack_id": pack_id, "domain": domain, "title": title, "description": description, "analysis_context": analysis_context, "bindings": bindings, "scenarios": scenarios, } def ensure_scenario_brief(scenario_dir: Path, manifest: dict[str, Any]) -> None: file_path = scenario_dir / "scenario_brief.md" if file_path.exists(): return steps_lines = "\n".join( f"{index}. `{step['step_id']}` - {step['question_template']}" for index, step in enumerate(manifest["steps"], start=1) ) content = textwrap.dedent( f"""\ # Scenario brief ## Domain `{manifest["domain"]}` ## Scenario id `{manifest["scenario_id"]}` ## Title {manifest["title"]} ## Description {manifest.get("description") or ""} ## Shared analysis context ```json {dump_json(manifest.get("analysis_context") or {})} ``` ## Bindings ```json {dump_json(manifest.get("bindings") or {})} ``` ## Steps {steps_lines} ## Constraints - no architecture changes - reuse current assistant runtime and session state - 1C/MCP first - no fabricated values - missing route or capability is domain enablement work, not silent rejection """ ) write_text(file_path, content) def normalize_field_key(raw_key: str) -> str: normalized = re.sub(r"\s+", " ", raw_key.strip().lower().replace("ё", "е")).strip(" :") aliases = { "склад": "warehouse", "количество": "quantity", "стоимость": "amount", "сумма": "amount", "организация": "organization", "дата строки": "row_date", "дата": "date", "поставщик": "supplier", "договор": "contract", "документ": "document", "документы": "documents", "покупатель": "customer", } if normalized in aliases: return aliases[normalized] return slugify_token(normalized, "field").lower() def extract_structured_entries(answer_text: str) -> list[dict[str, Any]]: entries: list[dict[str, Any]] = [] for line in answer_text.splitlines(): match = re.match(r"^\s*(\d+)\.\s+(.*\S)\s*$", line) if not match: continue index = int(match.group(1)) payload = match.group(2).strip() segments = [segment.strip() for segment in payload.split(" | ") if segment.strip()] title = segments[0] if segments else payload fields: dict[str, str] = {} raw_fields: dict[str, str] = {} for segment in segments[1:]: if ":" not in segment: continue raw_key, raw_value = segment.split(":", 1) key = normalize_field_key(raw_key) value = raw_value.strip() fields[key] = value raw_fields[raw_key.strip()] = value entry: dict[str, Any] = { "index": index, "title": title, "item": title, "fields": fields, "raw_fields": raw_fields, "raw_line": line.strip(), } for key, value in fields.items(): entry[key] = value entries.append(entry) return entries def derive_step_status(reply_type: str | None, debug_payload: dict[str, Any]) -> str: if reply_type == "backend_error": return "blocked" capability_route_mode = str(debug_payload.get("capability_route_mode") or "").strip() fallback_type = str(debug_payload.get("fallback_type") or "").strip() selected_recipe = str(debug_payload.get("selected_recipe") or "").strip() detected_intent = str(debug_payload.get("detected_intent") or "").strip() detected_mode = str(debug_payload.get("detected_mode") or "").strip() if capability_route_mode == "exact" and fallback_type in {"", "none"} and reply_type in {"factual", "factual_with_explanation", "empty_but_valid"}: return "exact" if fallback_type in {"out_of_scope", "unknown"}: return "needs_exact_capability" if capability_route_mode == "exact" and detected_mode == "address_query": return "partial" if detected_mode == "address_query" and (selected_recipe or detected_intent): return "partial" if reply_type in {"partial_coverage", "clarification_required", "out_of_scope", "no_grounded_answer", "route_mismatch_blocked"}: return "needs_exact_capability" return "needs_exact_capability" def build_scenario_step_state( *, scenario_id: str, domain: str, step: dict[str, Any], step_index: int, question_resolved: str, turn_artifact: dict[str, Any], entries: list[dict[str, Any]], ) -> dict[str, Any]: debug_payload = turn_artifact.get("technical_debug_payload") debug = debug_payload if isinstance(debug_payload, dict) else {} session_summary = turn_artifact.get("session_summary") summary = session_summary if isinstance(session_summary, dict) else {} address_state = summary.get("address_navigation_state") navigation_state = address_state if isinstance(address_state, dict) else {} session_context = navigation_state.get("session_context") context = session_context if isinstance(session_context, dict) else {} assistant_message = turn_artifact.get("assistant_message") assistant_item = assistant_message if isinstance(assistant_message, dict) else {} reply_type = assistant_item.get("reply_type") step_state = { "schema_version": SCENARIO_STEP_STATE_SCHEMA_VERSION, "scenario_id": scenario_id, "domain": domain, "step_id": step["step_id"], "step_index": step_index, "title": step["title"], "depends_on": step["depends_on"], "question_template": step["question_template"], "question_resolved": question_resolved, "expected_capability": step.get("expected_capability"), "expected_result_mode": step.get("expected_result_mode"), "reply_type": reply_type, "assistant_message_id": assistant_item.get("message_id"), "trace_id": assistant_item.get("trace_id"), "detected_mode": debug.get("detected_mode"), "detected_intent": debug.get("detected_intent"), "selected_recipe": debug.get("selected_recipe"), "capability_id": debug.get("capability_id"), "capability_route_mode": debug.get("capability_route_mode"), "route_expectation_status": debug.get("route_expectation_status"), "result_mode": debug.get("result_mode"), "response_type": debug.get("response_type"), "fallback_type": debug.get("fallback_type"), "mcp_call_status": debug.get("mcp_call_status"), "balance_confirmed": debug.get("balance_confirmed"), "active_result_set_id": context.get("active_result_set_id"), "last_confirmed_route": context.get("last_confirmed_route"), "date_scope": context.get("date_scope"), "organization_scope": context.get("organization_scope"), "entries": entries, } step_state["status"] = derive_step_status(reply_type if isinstance(reply_type, str) else None, debug) return step_state def save_scenario_step_bundle( *, step_dir: Path, export_markdown: str, turn_artifact: dict[str, Any], session_record: dict[str, Any], response_payload: dict[str, Any], step_state: dict[str, Any], ) -> None: debug_payload = turn_artifact.get("technical_debug_payload") write_text(step_dir / "output.md", export_markdown) write_json(step_dir / "debug.json", debug_payload if debug_payload is not None else {}) write_json(step_dir / "turn.json", turn_artifact) write_json(step_dir / "session.json", session_record) write_json(step_dir / "assistant_response.json", response_payload) write_json(step_dir / "step_state.json", step_state) write_text(step_dir / "resolved_question.txt", f"{step_state['question_resolved']}\n") def derive_scenario_status(step_outputs: dict[str, dict[str, Any]]) -> str: statuses = [str(item.get("status") or "") for item in step_outputs.values()] if not statuses: return "blocked" if any(status == "blocked" for status in statuses): return "blocked" if any(status == "needs_exact_capability" for status in statuses): return "needs_exact_capability" if any(status == "partial" for status in statuses): return "partial" return "accepted" def build_scenario_summary(manifest: dict[str, Any], scenario_state: dict[str, Any], final_status: str) -> str: lines = [ "# Scenario summary", "", f"- scenario_id: `{manifest['scenario_id']}`", f"- domain: `{manifest['domain']}`", f"- title: {manifest['title']}", f"- session_id: `{scenario_state.get('session_id') or 'n/a'}`", f"- final_status: `{final_status}`", "", "## Steps", ] for index, step in enumerate(manifest["steps"], start=1): step_output = scenario_state.get("step_outputs", {}).get(step["step_id"], {}) lines.extend( [ f"{index}. `{step['step_id']}` - {step['question_template']}", f"status: `{step_output.get('status') or 'n/a'}`", f"question_resolved: {step_output.get('question_resolved') or 'n/a'}", f"intent: `{step_output.get('detected_intent') or 'n/a'}`", f"recipe: `{step_output.get('selected_recipe') or 'n/a'}`", f"capability: `{step_output.get('capability_id') or 'n/a'}`", f"result_mode: `{step_output.get('result_mode') or 'n/a'}`", f"result_set: `{step_output.get('active_result_set_id') or 'n/a'}`", "", ] ) return "\n".join(lines).strip() + "\n" def build_scenario_final_status(manifest: dict[str, Any], scenario_state: dict[str, Any], final_status: str) -> str: reason = { "accepted": "all scenario steps executed in one assistant session with no unresolved route or capability gaps", "partial": "scenario captured successfully, but at least one step still needs exact capability enablement or route hardening", "needs_exact_capability": "scenario is valid for the project, but at least one step still requires exact capability or route enablement", "blocked": "scenario run was interrupted by runtime or backend failure", }.get(final_status, "scenario status unknown") return textwrap.dedent( f"""\ # Final status - status: `{final_status}` - scenario_id: `{manifest['scenario_id']}` - session_id: `{scenario_state.get('session_id') or 'n/a'}` - reason: {reason} """ ) def run_assistant_step( *, args: argparse.Namespace, domain: str, scenario_id: str, step: dict[str, Any], step_index: int, session_id: str | None, question_resolved: str, analysis_context: dict[str, Any], ) -> dict[str, Any]: payload = build_assistant_message_payload( args, question=question_resolved, session_id=session_id, analysis_context=analysis_context, ) response_payload = http_json( f"{args.backend_url}/api/assistant/message", method="POST", payload=payload, timeout=max(30, int(args.timeout_seconds)), ) resolved_session_id = str(response_payload.get("session_id") or session_id or "").strip() if not resolved_session_id: raise RuntimeError(f"Assistant response for step `{step['step_id']}` does not contain session_id") session_record = fetch_session_snapshot(args.backend_url, resolved_session_id, args.timeout_seconds) conversation = extract_conversation_from_session(session_record) export_markdown = build_conversation_export(resolved_session_id, conversation, mode="technical") turn_artifact = build_turn_artifact( slot="step", domain=domain, case_id=scenario_id, question=question_resolved, session_id=resolved_session_id, conversation=conversation, session_record=session_record, job_record=None, report_case=None, export_file_name="output.md", ) turn_artifact["schema_version"] = "domain_scenario_turn_artifact_v1" turn_artifact["scenario"] = { "scenario_id": scenario_id, "step_id": step["step_id"], "step_index": step_index, "question_template": step["question_template"], "question_resolved": question_resolved, "depends_on": step["depends_on"], "analysis_context": analysis_context, } last_assistant = find_last_assistant(conversation) entries = extract_structured_entries(str(last_assistant.get("text") or "")) step_state = build_scenario_step_state( scenario_id=scenario_id, domain=domain, step=step, step_index=step_index, question_resolved=question_resolved, turn_artifact=turn_artifact, entries=entries, ) return { "session_id": resolved_session_id, "response_payload": response_payload, "session_record": session_record, "conversation": conversation, "export_markdown": export_markdown, "turn_artifact": turn_artifact, "step_state": step_state, } def execute_scenario_manifest( *, args: argparse.Namespace, manifest: dict[str, Any], scenario_dir: Path, manifest_source_label: str | None, ) -> tuple[dict[str, Any], str]: steps_dir = scenario_dir / "steps" steps_dir.mkdir(parents=True, exist_ok=True) write_json(scenario_dir / "scenario_manifest.json", manifest) if manifest_source_label: write_text(scenario_dir / "manifest_source.txt", f"{manifest_source_label}\n") ensure_scenario_brief(scenario_dir, manifest) scenario_state: dict[str, Any] = { "schema_version": SCENARIO_STATE_SCHEMA_VERSION, "scenario_id": manifest["scenario_id"], "domain": manifest["domain"], "title": manifest["title"], "session_id": None, "analysis_context": manifest.get("analysis_context") or {}, "bindings": manifest.get("bindings") or {}, "step_outputs": {}, "semantic_memory": {}, "updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), } write_json(scenario_dir / "scenario_state.json", scenario_state) last_export_markdown = "" for step_index, step in enumerate(manifest["steps"], start=1): step_dir = steps_dir / step["step_id"] step_analysis_context = merge_analysis_context(manifest.get("analysis_context"), step.get("analysis_context")) step_analysis_context = carry_forward_analysis_context(scenario_state, step_analysis_context) try: resolved_question = resolve_question_template(step["question_template"], scenario_state) result = run_assistant_step( args=args, domain=manifest["domain"], scenario_id=manifest["scenario_id"], step=step, step_index=step_index, session_id=scenario_state.get("session_id"), question_resolved=resolved_question, analysis_context=step_analysis_context, ) except RuntimeError as exc: failure_type = "runtime_error" failure_status = "blocked" if "Placeholder `" in str(exc): failure_type = "placeholder_resolution_error" failure_status = "needs_exact_capability" step_state = build_failed_step_state( scenario_id=manifest["scenario_id"], domain=manifest["domain"], step=step, step_index=step_index, question_resolved=step["question_template"], status=failure_status, failure_type=failure_type, error_message=str(exc), ) scenario_state["step_outputs"][step["step_id"]] = step_state scenario_state["semantic_memory"] = { **(scenario_state.get("semantic_memory") or {}), "latest_step_id": step["step_id"], "latest_step_status": step_state["status"], } scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat() save_scenario_step_bundle( step_dir=step_dir, export_markdown="", turn_artifact={}, session_record={}, response_payload={}, step_state=step_state, ) write_json(scenario_dir / "scenario_state.json", scenario_state) print( f"[domain-case-loop] scenario {manifest['scenario_id']} step {step_index}/{len(manifest['steps'])}: " f"{step['step_id']} -> {step_state['status']} ({failure_type})" ) if failure_status == "blocked": break continue scenario_state["session_id"] = result["session_id"] scenario_state["step_outputs"][step["step_id"]] = result["step_state"] previous_semantic_memory = scenario_state.get("semantic_memory") or {} previous_date_scope = previous_semantic_memory.get("date_scope") if isinstance(previous_semantic_memory, dict) else None current_date_scope = result["step_state"].get("date_scope") scenario_state["semantic_memory"] = { "latest_step_id": step["step_id"], "latest_step_status": result["step_state"].get("status"), "active_result_set_id": result["step_state"].get("active_result_set_id"), "last_confirmed_route": result["step_state"].get("last_confirmed_route"), "date_scope": merge_scenario_date_scope( previous_date_scope, current_date_scope, depends_on=step.get("depends_on") or [], ), "organization_scope": result["step_state"].get("organization_scope"), "entries": result["step_state"].get("entries"), } scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat() save_scenario_step_bundle( step_dir=step_dir, export_markdown=result["export_markdown"], turn_artifact=result["turn_artifact"], session_record=result["session_record"], response_payload=result["response_payload"], step_state=result["step_state"], ) write_json(scenario_dir / "scenario_state.json", scenario_state) last_export_markdown = result["export_markdown"] print( f"[domain-case-loop] scenario {manifest['scenario_id']} step {step_index}/{len(manifest['steps'])}: " f"{step['step_id']} -> {result['step_state']['status']}" ) final_status = derive_scenario_status(scenario_state["step_outputs"]) write_text(scenario_dir / "scenario_output.md", last_export_markdown or "") write_text(scenario_dir / "scenario_summary.md", build_scenario_summary(manifest, scenario_state, final_status)) write_text(scenario_dir / "final_status.md", build_scenario_final_status(manifest, scenario_state, final_status)) if scenario_state.get("session_id"): write_text(scenario_dir / "session_id.txt", f"{scenario_state['session_id']}\n") print(f"[domain-case-loop] saved scenario artifacts to {scenario_dir}") print(f"[domain-case-loop] final_status={final_status}") return scenario_state, final_status def handle_run_case(args: argparse.Namespace) -> int: ensure_backend_health(args.backend_url, args.timeout_seconds) case_id = slugify_case_id(args.domain, args.case_id) case_dir = Path(args.output_root).resolve() / case_id case_dir.mkdir(parents=True, exist_ok=True) ensure_case_brief( case_dir, domain=args.domain, question=args.question, expected_capability=args.expected_capability, expected_result_mode=args.expected_result_mode, ) payload: dict[str, Any] = { "normalizeConfig": build_normalize_config(args), "eval_target": "assistant_stage1", "questions": [args.question], "useMock": bool(args.use_mock), "mode": "standard", } if args.analysis_date: payload["analysis_date"] = args.analysis_date start_response = http_json(f"{args.backend_url}/api/eval/run-async/start", method="POST", payload=payload) job = start_response.get("job") if not isinstance(job, dict): raise RuntimeError("Async start response does not contain `job` object") job_id = str(job.get("job_id") or "") if not job_id: raise RuntimeError("Async start response does not contain job_id") final_job = wait_for_job(args.backend_url, job_id, args.timeout_seconds, args.poll_interval_seconds) if str(final_job.get("status") or "") != "completed": raise RuntimeError(f"Async job did not complete successfully: {final_job.get('status')}") run_id = str(final_job.get("run_id") or "") report_case_id = "AUTO-001" session_id = f"{run_id}-{report_case_id}" session_file = Path(args.sessions_dir).resolve() / f"{session_id}.json" wait_for_file(session_file) session_record = read_json_file(session_file) conversation = extract_conversation_from_session(session_record) export_markdown = build_conversation_export(session_id, conversation, mode="technical") report_case = None report_file = Path(args.reports_dir).resolve() / f"{run_id}.json" if report_file.exists(): report_record = read_json_file(report_file) report_case = extract_report_case(report_record, report_case_id) turn_artifact = build_turn_artifact( slot=args.slot, domain=args.domain, case_id=case_id, question=args.question, session_id=session_id, conversation=conversation, session_record=session_record, job_record=final_job, report_case=report_case, export_file_name=f"{args.slot}_output.md", ) save_capture_bundle( case_dir=case_dir, slot=args.slot, export_markdown=export_markdown, debug_payload=turn_artifact.get("technical_debug_payload"), turn_artifact=turn_artifact, session_record=session_record, job_record=final_job, report_case=report_case, ) print(f"[domain-case-loop] saved {args.slot} artifacts to {case_dir}") print(f"[domain-case-loop] session_id={session_id}") return 0 def handle_import_export(args: argparse.Namespace) -> int: export_text = Path(args.input).read_text(encoding="utf-8-sig") session_id, conversation = parse_export_markdown(export_text) case_id = slugify_case_id(args.domain, args.case_id) case_dir = Path(args.output_root).resolve() / case_id case_dir.mkdir(parents=True, exist_ok=True) last_assistant = find_last_assistant(conversation) last_user = find_last_user_before(conversation, last_assistant.get("message_id")) question = args.question or (last_user.get("text") if isinstance(last_user, dict) else None) ensure_case_brief( case_dir, domain=args.domain, question=question, expected_capability=args.expected_capability, expected_result_mode=args.expected_result_mode, ) turn_artifact = build_turn_artifact( slot=args.slot, domain=args.domain, case_id=case_id, question=question, session_id=session_id, conversation=conversation, session_record=None, job_record=None, report_case=None, export_file_name=f"{args.slot}_output.md", ) save_capture_bundle( case_dir=case_dir, slot=args.slot, export_markdown=export_text, debug_payload=last_assistant.get("debug"), turn_artifact=turn_artifact, session_record=None, job_record=None, report_case=None, ) print(f"[domain-case-loop] imported {args.slot} artifacts to {case_dir}") return 0 def handle_run_scenario(args: argparse.Namespace) -> int: ensure_backend_health(args.backend_url, args.timeout_seconds) manifest_path = Path(args.manifest).resolve() manifest = load_scenario_manifest(manifest_path) if args.scenario_id: manifest["scenario_id"] = args.scenario_id.strip() if args.analysis_date: manifest["analysis_context"] = merge_analysis_context( manifest.get("analysis_context"), {"as_of_date": args.analysis_date, "source": "cli_override"}, ) scenario_dir = Path(args.output_root).resolve() / manifest["scenario_id"] execute_scenario_manifest( args=args, manifest=manifest, scenario_dir=scenario_dir, manifest_source_label=str(manifest_path), ) return 0 def build_pack_summary(pack: dict[str, Any], scenario_results: list[dict[str, Any]], final_status: str) -> str: lines = [ "# Pack summary", "", f"- pack_id: `{pack['pack_id']}`", f"- domain: `{pack['domain']}`", f"- title: {pack['title']}", f"- final_status: `{final_status}`", "", "## Scenarios", ] for index, item in enumerate(scenario_results, start=1): lines.extend( [ f"{index}. `{item['scenario_id']}` - {item['title']}", f"status: `{item['final_status']}`", f"session_id: `{item.get('session_id') or 'n/a'}`", f"artifact_dir: `{item['artifact_dir']}`", "", ] ) return "\n".join(lines).strip() + "\n" def build_pack_final_status(pack: dict[str, Any], scenario_results: list[dict[str, Any]], final_status: str) -> str: expected_scenarios = len(pack.get("scenarios") or []) executed_scenarios = len(scenario_results) has_missing_scenarios = executed_scenarios < expected_scenarios reason = { "accepted": "all declared scenarios in the pack executed without blocked or missing-capability states", "partial": "the pack executed, but at least one scenario still needs capability enablement or route hardening", "needs_exact_capability": "the pack is valid, but at least one scenario still requires exact capability or route enablement", "blocked": "the pack could not be completed because at least one scenario failed at runtime", }.get(final_status, "pack status unknown") if final_status == "accepted" and has_missing_scenarios: reason = ( f"only {executed_scenarios}/{expected_scenarios} declared scenarios were executed; " "missing scenarios keep the pack from being confirmed as accepted" ) return textwrap.dedent( f"""\ # Final status - status: `{final_status}` - pack_id: `{pack['pack_id']}` - domain: `{pack['domain']}` - reason: {reason} """ ) def run_subprocess_command( command: list[str], *, cwd: Path, timeout_seconds: int, input_text: str | None = None, stdout_path: Path | None = None, stderr_path: Path | None = None, ) -> subprocess.CompletedProcess[str]: result = subprocess.run( command, cwd=str(cwd), input=input_text, text=True, encoding="utf-8", errors="replace", capture_output=True, timeout=timeout_seconds, check=False, ) if stdout_path is not None: write_text(stdout_path, result.stdout) if stderr_path is not None: write_text(stderr_path, result.stderr) if result.returncode != 0: raise RuntimeError( f"Command failed with exit code {result.returncode}: {' '.join(command)}\n{result.stderr.strip()}" ) return result def build_run_pack_command( args: argparse.Namespace, *, manifest_path: Path, pack_id: str, output_root: Path, ) -> list[str]: command = [ sys.executable, str(Path(__file__).resolve()), "run-pack", "--manifest", str(manifest_path), "--pack-id", pack_id, "--output-root", str(output_root), "--backend-url", str(args.backend_url), "--prompt-version", str(args.prompt_version), "--llm-provider", str(args.llm_provider), "--llm-model", str(args.llm_model), "--llm-base-url", str(args.llm_base_url), "--llm-api-key", str(args.llm_api_key), "--temperature", str(args.temperature), "--max-output-tokens", str(args.max_output_tokens), "--timeout-seconds", str(args.timeout_seconds), ] if getattr(args, "analysis_date", None): command.extend(["--analysis-date", str(args.analysis_date)]) max_scenarios = getattr(args, "max_scenarios", None) if max_scenarios is not None: command.extend(["--max-scenarios", str(int(max_scenarios))]) if bool(getattr(args, "use_mock", False)): command.append("--use-mock") return command def build_codex_exec_command( args: argparse.Namespace, *, output_file: Path, schema_file: Path, sandbox_mode: str, model_override: str | None = None, reasoning_effort: str | None = None, ) -> list[str]: command = [ str(args.codex_binary), "exec", "-C", str(REPO_ROOT), "-s", sandbox_mode, "-c", 'approval_policy="never"', "--output-schema", str(schema_file), "-o", str(output_file), "--color", "never", ] if reasoning_effort: command.extend(["-c", f'model_reasoning_effort="{reasoning_effort}"']) if getattr(args, "codex_profile", None): command.extend(["-p", str(args.codex_profile)]) selected_model = model_override or getattr(args, "codex_model", None) if selected_model: command.extend(["-m", str(selected_model)]) return command def read_json_output(file_path: Path) -> dict[str, Any]: payload = json.loads(read_text_file(file_path)) if not isinstance(payload, dict): raise RuntimeError(f"Expected JSON object in {file_path}") return payload def compact_step_output_for_review(step_output: Any) -> dict[str, Any]: if not isinstance(step_output, dict): return {} entry_titles_sample: list[str] = [] entries = step_output.get("entries") if isinstance(entries, list): for item in entries[:5]: if not isinstance(item, dict): continue title = str(item.get("item") or item.get("title") or "").strip() if title: entry_titles_sample.append(title) return { "status": step_output.get("status"), "question_resolved": step_output.get("question_resolved"), "detected_intent": step_output.get("detected_intent"), "selected_recipe": step_output.get("selected_recipe"), "capability_id": step_output.get("capability_id"), "result_mode": step_output.get("result_mode"), "fallback_type": step_output.get("fallback_type"), "mcp_call_status": step_output.get("mcp_call_status"), "failure_type": step_output.get("failure_type"), "error_message": step_output.get("error_message"), "entry_titles_sample": entry_titles_sample, } def build_pack_review_bundle(pack_dir: Path) -> str: pack_state = read_json_file(pack_dir / "pack_state.json") if (pack_dir / "pack_state.json").exists() else {} scenarios_root = pack_dir / "scenarios" scenarios_bundle: list[dict[str, Any]] = [] if scenarios_root.exists(): for scenario_dir in sorted(path for path in scenarios_root.iterdir() if path.is_dir()): scenario_state = read_json_file(scenario_dir / "scenario_state.json") if (scenario_dir / "scenario_state.json").exists() else {} step_outputs_raw = scenario_state.get("step_outputs") compact_steps: dict[str, Any] = {} if isinstance(step_outputs_raw, dict): for step_id, step_output in step_outputs_raw.items(): compact_steps[str(step_id)] = compact_step_output_for_review(step_output) scenarios_bundle.append( { "scenario_id": scenario_state.get("scenario_id") or scenario_dir.name, "title": scenario_state.get("title"), "session_id": scenario_state.get("session_id"), "summary": read_text_file(scenario_dir / "scenario_summary.md") if (scenario_dir / "scenario_summary.md").exists() else "", "step_outputs": compact_steps, } ) bundle = { "pack_state": { "pack_id": pack_state.get("pack_id"), "domain": pack_state.get("domain"), "title": pack_state.get("title"), "final_status": pack_state.get("final_status"), "scenario_results": pack_state.get("scenario_results"), }, "pack_summary": read_text_file(pack_dir / "pack_summary.md") if (pack_dir / "pack_summary.md").exists() else "", "scenarios": scenarios_bundle, } return dump_json(bundle) def build_analyst_loop_prompt( *, loop_dir: Path, iteration_dir: Path, pack_dir: Path, previous_pack_dir: Path | None, previous_verdict_path: Path | None, target_score: int, review_bundle_json: str, previous_verdict_json: str | None, ) -> str: comparison_block = "" if previous_pack_dir is not None: comparison_block = textwrap.dedent( f"""\ Compare the current run with the previous run: - previous_pack_dir: `{previous_pack_dir}` """ ) if previous_verdict_path is not None and previous_verdict_path.exists(): comparison_block += f"- previous_analyst_verdict: `{previous_verdict_path}`\n" previous_verdict_block = "" if previous_verdict_json: previous_verdict_block = textwrap.dedent( f"""\ Previous analyst verdict JSON: ```json {previous_verdict_json} ``` """ ) return textwrap.dedent( f"""\ You are the strict `domain_analyst` for NDC_1C. Use the repo rules from: - `.codex/agents/domain_analyst.toml` - `.codex/skills/domain-case-loop/SKILL.md` - `.codex/skills/domain-case-loop/references/verdict_template.md` Current loop context: - loop_dir: `{loop_dir}` - iteration_dir: `{iteration_dir}` - current_pack_dir: `{pack_dir}` {comparison_block} Required artifacts to inspect: - `{pack_dir / 'pack_summary.md'}` - `{pack_dir / 'pack_state.json'}` - all `scenario_summary.md`, `scenario_state.json`, and problematic `steps/*/step_state.json` files inside `{pack_dir / 'scenarios'}` Goal: - evaluate current domain-pack correctness for business meaning, route/capability quality, evidence quality, and absence of silent heuristic masking; - determine whether the gate `quality_score >= {target_score}` is reached; - if not, provide the smallest high-value fix targets for the coder. Rules: - `accepted` is allowed only if quality_score >= {target_score}, unresolved_p0_count = 0, and regression_detected = false; - `partial` means the pack is usable but exactness, routing, or coverage is still insufficient; - `needs_exact_capability` means the primary blocker is a missing exact route or capability, but the loop should still continue autonomously unless a user decision is required; - `continue` means there is a clear next patch cycle; - `blocked` means the loop is stopped by a real hard blocker such as runtime/infrastructure failure, unavailable 1C data, or another condition that the repo cannot autonomously repair; - set `requires_user_decision = true` when the next step cannot be chosen safely without user input, for example: - an architecture fork or risky contour expansion; - a scope tradeoff or important business ambiguity; - a required observation anchor is missing and cannot be recovered safely from artifacts, 1C, or the current scenario state; - the only remaining implementation path would rely on a hack, brittle workaround, heuristic masking, or disproportionate complexity/risk; - if `requires_user_decision = true`, fill `user_decision_type` and `user_decision_prompt`; - if the pack is below {target_score} but there is still safe autonomous implementation work, keep `requires_user_decision = false`; - do not request user input merely because the score is still below {target_score}; request it only when the loop would otherwise guess, overfit, or risk architecture drift. Use this UTF-8 evidence bundle as the source of truth for artifact contents. Do not treat shell rendering artifacts as file corruption if the embedded bundle is readable. Current evidence bundle: ```json {review_bundle_json} ``` {previous_verdict_block} Return JSON only and follow the schema exactly. """ ).strip() def build_coder_loop_prompt( *, loop_dir: Path, iteration_dir: Path, pack_dir: Path, analyst_verdict_path: Path, analyst_verdict_json: str, ) -> str: return textwrap.dedent( f"""\ You are the `domain_coder` for NDC_1C. Use the repo rules from: - `.codex/agents/domain_coder.toml` - `.codex/skills/domain-case-loop/SKILL.md` Current loop context: - loop_dir: `{loop_dir}` - iteration_dir: `{iteration_dir}` - current_pack_dir: `{pack_dir}` - analyst_verdict_json: `{analyst_verdict_path}` Make the smallest domain-only patch in the working tree that improves the failing or partial scenarios named in the analyst verdict. Hard rules: - do not change the architecture; - do not fabricate data; - do not present heuristic answers as confirmed; - do not touch unrelated files; - preserve already successful baseline flows. Required outputs: - create `{iteration_dir / 'coder_plan.md'}` with a short plan; - create `{iteration_dir / 'patch_summary.md'}` with a short summary of the patch; Analyst verdict JSON: ```json {analyst_verdict_json} ``` - then return JSON only and follow the schema exactly. """ ).strip() def evaluate_analyst_gate( verdict: dict[str, Any], target_score: int ) -> tuple[bool, str, bool, str, str | None]: quality_score = int(verdict.get("quality_score") or 0) unresolved_p0_count = int(verdict.get("unresolved_p0_count") or 0) regression_detected = bool(verdict.get("regression_detected")) loop_decision = str(verdict.get("loop_decision") or "").strip() or "continue" requires_user_decision = bool(verdict.get("requires_user_decision")) user_decision_type = str(verdict.get("user_decision_type") or "").strip() or "none" user_decision_prompt_raw = verdict.get("user_decision_prompt") user_decision_prompt = str(user_decision_prompt_raw).strip() if user_decision_prompt_raw else None accepted = quality_score >= target_score and unresolved_p0_count == 0 and not regression_detected and loop_decision == "accepted" return accepted, loop_decision, requires_user_decision, user_decision_type, user_decision_prompt def handle_run_pack(args: argparse.Namespace) -> int: ensure_backend_health(args.backend_url, args.timeout_seconds) pack_path = Path(args.manifest).resolve() pack = load_scenario_pack(pack_path) if args.pack_id: pack["pack_id"] = args.pack_id.strip() if args.analysis_date: pack["analysis_context"] = merge_analysis_context( pack.get("analysis_context"), {"as_of_date": args.analysis_date, "source": "cli_override"}, ) pack_dir = Path(args.output_root).resolve() / pack["pack_id"] scenarios_dir = pack_dir / "scenarios" scenarios_dir.mkdir(parents=True, exist_ok=True) write_json(pack_dir / "pack_manifest.json", pack) write_text(pack_dir / "manifest_source.txt", f"{pack_path}\n") scenario_results: list[dict[str, Any]] = [] max_scenarios = max(0, int(args.max_scenarios)) if args.max_scenarios is not None else None scenarios_to_run = pack["scenarios"][:max_scenarios] if max_scenarios else pack["scenarios"] for scenario in scenarios_to_run: scenario_manifest = normalize_scenario_manifest( scenario, fallback_domain=pack["domain"], fallback_analysis_context=pack.get("analysis_context"), fallback_bindings=pack.get("bindings"), default_scenario_id=scenario["scenario_id"], ) scenario_dir = scenarios_dir / scenario_manifest["scenario_id"] scenario_state, scenario_final_status = execute_scenario_manifest( args=args, manifest=scenario_manifest, scenario_dir=scenario_dir, manifest_source_label=f"{pack_path}#{scenario_manifest['scenario_id']}", ) scenario_results.append( { "scenario_id": scenario_manifest["scenario_id"], "title": scenario_manifest["title"], "final_status": scenario_final_status, "session_id": scenario_state.get("session_id"), "artifact_dir": str(scenario_dir), } ) aggregate_statuses = [item["final_status"] for item in scenario_results] if not aggregate_statuses: final_status = "blocked" elif any(status == "blocked" for status in aggregate_statuses): final_status = "blocked" elif any(status == "needs_exact_capability" for status in aggregate_statuses): final_status = "needs_exact_capability" elif any(status == "partial" for status in aggregate_statuses): final_status = "partial" else: final_status = "accepted" if len(scenario_results) == len(pack.get("scenarios") or []) else "partial" pack_state = { "schema_version": SCENARIO_PACK_SCHEMA_VERSION, "pack_id": pack["pack_id"], "domain": pack["domain"], "title": pack["title"], "analysis_context": pack.get("analysis_context") or {}, "bindings": pack.get("bindings") or {}, "scenario_results": scenario_results, "final_status": final_status, "updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), } write_json(pack_dir / "pack_state.json", pack_state) write_text(pack_dir / "pack_summary.md", build_pack_summary(pack, scenario_results, final_status)) write_text(pack_dir / "final_status.md", build_pack_final_status(pack, scenario_results, final_status)) print(f"[domain-case-loop] saved pack artifacts to {pack_dir}") print(f"[domain-case-loop] final_status={final_status}") return 0 def build_loop_summary(loop_state: dict[str, Any]) -> str: lines = [ "# Loop summary", "", f"- loop_id: `{loop_state['loop_id']}`", f"- target_score: `{loop_state['target_score']}`", f"- max_iterations: `{loop_state['max_iterations']}`", f"- final_status: `{loop_state['final_status']}`", f"- last_analyst_decision: `{loop_state.get('last_analyst_decision') or 'n/a'}`", "", "## Iterations", ] for item in loop_state.get("iterations", []): lines.extend( [ f"- `{item['iteration_id']}`", f" baseline_pack_dir: `{item['pack_dir']}`", f" analyst_score: `{item.get('quality_score')}`", f" analyst_decision: `{item.get('loop_decision')}`", f" accepted_gate: `{item.get('accepted_gate')}`", f" requires_user_decision: `{item.get('requires_user_decision')}`", f" user_decision_type: `{item.get('user_decision_type') or 'none'}`", f" coder_status: `{item.get('coder_status') or 'n/a'}`", f" analyst_verdict: `{item.get('analyst_verdict_path') or 'n/a'}`", ] ) return "\n".join(lines).strip() + "\n" def build_loop_final_status(loop_state: dict[str, Any]) -> str: return textwrap.dedent( f"""\ # Final status - status: `{loop_state['final_status']}` - loop_id: `{loop_state['loop_id']}` - target_score: `{loop_state['target_score']}` - iterations_ran: `{len(loop_state.get('iterations', []))}` - last_analyst_decision: `{loop_state.get('last_analyst_decision') or 'n/a'}` - stop_reason: {loop_state.get('stop_reason') or 'n/a'} """ ) def handle_run_pack_loop(args: argparse.Namespace) -> int: manifest_path = Path(args.manifest).resolve() loop_id = str(args.loop_id or slugify_case_id("domain_pack_loop", None)).strip() loop_dir = Path(args.output_root).resolve() / loop_id iterations_dir = loop_dir / "iterations" iterations_dir.mkdir(parents=True, exist_ok=True) write_text(loop_dir / "manifest_source.txt", f"{manifest_path}\n") target_score = int(args.target_score) max_iterations = int(args.max_iterations) if max_iterations < 1: raise RuntimeError("--max-iterations must be >= 1") loop_state: dict[str, Any] = { "schema_version": AUTONOMOUS_LOOP_SCHEMA_VERSION, "loop_id": loop_id, "manifest_path": str(manifest_path), "target_score": target_score, "max_iterations": max_iterations, "iterations": [], "final_status": "partial", "stop_reason": None, "last_analyst_decision": None, "last_user_decision_type": "none", "last_user_decision_prompt": None, "updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), } write_json(loop_dir / "loop_state.json", loop_state) previous_pack_dir: Path | None = None previous_verdict_path: Path | None = None for iteration_index in range(max_iterations): iteration_id = f"iteration_{iteration_index:02d}" iteration_dir = iterations_dir / iteration_id iteration_dir.mkdir(parents=True, exist_ok=True) pack_output_root = iteration_dir / "pack_output" pack_id = "pack_run" pack_command = build_run_pack_command( args, manifest_path=manifest_path, pack_id=pack_id, output_root=pack_output_root, ) run_subprocess_command( pack_command, cwd=REPO_ROOT, timeout_seconds=max(600, int(args.timeout_seconds) * 20), stdout_path=iteration_dir / "pack_run.stdout.log", stderr_path=iteration_dir / "pack_run.stderr.log", ) pack_dir = pack_output_root / pack_id analyst_verdict_path = iteration_dir / "analyst_verdict.json" review_bundle_json = build_pack_review_bundle(pack_dir) previous_verdict_json = read_text_file(previous_verdict_path) if previous_verdict_path is not None and previous_verdict_path.exists() else None analyst_prompt = build_analyst_loop_prompt( loop_dir=loop_dir, iteration_dir=iteration_dir, pack_dir=pack_dir, previous_pack_dir=previous_pack_dir, previous_verdict_path=previous_verdict_path, target_score=target_score, review_bundle_json=review_bundle_json, previous_verdict_json=previous_verdict_json, ) write_text(iteration_dir / "analyst_prompt.md", analyst_prompt + "\n") analyst_command = build_codex_exec_command( args, output_file=analyst_verdict_path, schema_file=Path(args.analyst_schema).resolve(), sandbox_mode="read-only", model_override=getattr(args, "analyst_codex_model", None), reasoning_effort=getattr(args, "analyst_reasoning_effort", None), ) run_subprocess_command( analyst_command, cwd=REPO_ROOT, timeout_seconds=int(args.codex_timeout_seconds), input_text=analyst_prompt, stdout_path=iteration_dir / "analyst_exec.stdout.log", stderr_path=iteration_dir / "analyst_exec.stderr.log", ) analyst_verdict = read_json_output(analyst_verdict_path) accepted_gate, loop_decision, requires_user_decision, user_decision_type, user_decision_prompt = evaluate_analyst_gate( analyst_verdict, target_score ) loop_state["last_analyst_decision"] = loop_decision loop_state["last_user_decision_type"] = user_decision_type loop_state["last_user_decision_prompt"] = user_decision_prompt iteration_record: dict[str, Any] = { "iteration_id": iteration_id, "pack_dir": str(pack_dir), "quality_score": int(analyst_verdict.get("quality_score") or 0), "loop_decision": loop_decision, "accepted_gate": accepted_gate, "requires_user_decision": requires_user_decision, "user_decision_type": user_decision_type, "user_decision_prompt": user_decision_prompt, "analyst_verdict_path": str(analyst_verdict_path), "coder_status": None, } if accepted_gate: loop_state["iterations"].append(iteration_record) loop_state["final_status"] = "accepted" loop_state["stop_reason"] = f"analyst accepted at {iteration_id}" loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat() write_json(loop_dir / "loop_state.json", loop_state) break if requires_user_decision: loop_state["iterations"].append(iteration_record) if loop_decision in {"needs_exact_capability", "partial", "blocked"}: loop_state["final_status"] = loop_decision else: loop_state["final_status"] = "partial" prompt_suffix = f" | prompt: {user_decision_prompt}" if user_decision_prompt else "" loop_state["stop_reason"] = f"user_decision_required at {iteration_id}: {user_decision_type}{prompt_suffix}" loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat() write_json(loop_dir / "loop_state.json", loop_state) break if loop_decision == "blocked": loop_state["iterations"].append(iteration_record) loop_state["final_status"] = "blocked" loop_state["stop_reason"] = f"analyst blocked at {iteration_id}" loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat() write_json(loop_dir / "loop_state.json", loop_state) break coder_result_path = iteration_dir / "coder_result.json" coder_prompt = build_coder_loop_prompt( loop_dir=loop_dir, iteration_dir=iteration_dir, pack_dir=pack_dir, analyst_verdict_path=analyst_verdict_path, analyst_verdict_json=dump_json(analyst_verdict), ) write_text(iteration_dir / "coder_prompt.md", coder_prompt + "\n") coder_command = build_codex_exec_command( args, output_file=coder_result_path, schema_file=Path(args.coder_schema).resolve(), sandbox_mode="workspace-write", model_override=getattr(args, "coder_codex_model", None), reasoning_effort=getattr(args, "coder_reasoning_effort", None), ) run_subprocess_command( coder_command, cwd=REPO_ROOT, timeout_seconds=int(args.codex_timeout_seconds), input_text=coder_prompt, stdout_path=iteration_dir / "coder_exec.stdout.log", stderr_path=iteration_dir / "coder_exec.stderr.log", ) coder_result = read_json_output(coder_result_path) coder_status = str(coder_result.get("status") or "").strip() or "unknown" iteration_record["coder_status"] = coder_status iteration_record["coder_result_path"] = str(coder_result_path) loop_state["iterations"].append(iteration_record) loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat() write_json(loop_dir / "loop_state.json", loop_state) if coder_status == "blocked": loop_state["final_status"] = "blocked" loop_state["stop_reason"] = f"coder stopped progress at {iteration_id}: {coder_status}" loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat() write_json(loop_dir / "loop_state.json", loop_state) break previous_pack_dir = pack_dir previous_verdict_path = analyst_verdict_path else: if loop_state.get("last_analyst_decision") == "needs_exact_capability": loop_state["final_status"] = "needs_exact_capability" else: loop_state["final_status"] = "partial" loop_state["stop_reason"] = f"max_iterations_reached ({max_iterations})" loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat() write_json(loop_dir / "loop_state.json", loop_state) write_text(loop_dir / "loop_summary.md", build_loop_summary(loop_state)) write_text(loop_dir / "final_status.md", build_loop_final_status(loop_state)) print(f"[domain-case-loop] saved loop artifacts to {loop_dir}") print(f"[domain-case-loop] final_status={loop_state['final_status']}") return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Repo-native helper for NDC_1C domain-case and scenario orchestration") subparsers = parser.add_subparsers(dest="command", required=True) run_case = subparsers.add_parser("run-case", help="Run one assistant_stage1 case through the existing backend and save artifacts") run_case.add_argument("--domain", required=True) run_case.add_argument("--question", required=True) run_case.add_argument("--case-id") run_case.add_argument("--slot", default="baseline", choices=["baseline", "rerun"]) run_case.add_argument("--analysis-date") run_case.add_argument("--backend-url", default=DEFAULT_BACKEND_URL) run_case.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT)) run_case.add_argument("--sessions-dir", default=str(DEFAULT_SESSIONS_DIR)) run_case.add_argument("--reports-dir", default=str(DEFAULT_REPORTS_DIR)) run_case.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION) run_case.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"]) run_case.add_argument("--llm-model", default=DEFAULT_LLM_MODEL) run_case.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL) run_case.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY) run_case.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE) run_case.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS) run_case.add_argument("--timeout-seconds", type=int, default=300) run_case.add_argument("--poll-interval-seconds", type=float, default=1.5) run_case.add_argument("--expected-capability") run_case.add_argument("--expected-result-mode") run_case.add_argument("--use-mock", action="store_true") run_case.set_defaults(func=handle_run_case) import_export = subparsers.add_parser("import-export", help="Import an existing technical export markdown and build artifacts") import_export.add_argument("--domain", required=True) import_export.add_argument("--input", required=True) import_export.add_argument("--question") import_export.add_argument("--case-id") import_export.add_argument("--slot", default="baseline", choices=["baseline", "rerun"]) import_export.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT)) import_export.add_argument("--expected-capability") import_export.add_argument("--expected-result-mode") import_export.set_defaults(func=handle_import_export) run_scenario = subparsers.add_parser( "run-scenario", help="Run one multi-step domain scenario in a shared assistant session and save per-step artifacts", ) run_scenario.add_argument("--manifest", required=True) run_scenario.add_argument("--scenario-id") run_scenario.add_argument("--analysis-date") run_scenario.add_argument("--backend-url", default=DEFAULT_BACKEND_URL) run_scenario.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT)) run_scenario.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION) run_scenario.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"]) run_scenario.add_argument("--llm-model", default=DEFAULT_LLM_MODEL) run_scenario.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL) run_scenario.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY) run_scenario.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE) run_scenario.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS) run_scenario.add_argument("--timeout-seconds", type=int, default=180) run_scenario.add_argument("--use-mock", action="store_true") run_scenario.set_defaults(func=handle_run_scenario) run_pack = subparsers.add_parser( "run-pack", help="Run a multi-scenario domain pack and save aggregate orchestration artifacts", ) run_pack.add_argument("--manifest", required=True) run_pack.add_argument("--pack-id") run_pack.add_argument("--analysis-date") run_pack.add_argument("--max-scenarios", type=int) run_pack.add_argument("--backend-url", default=DEFAULT_BACKEND_URL) run_pack.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT)) run_pack.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION) run_pack.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"]) run_pack.add_argument("--llm-model", default=DEFAULT_LLM_MODEL) run_pack.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL) run_pack.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY) run_pack.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE) run_pack.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS) run_pack.add_argument("--timeout-seconds", type=int, default=180) run_pack.add_argument("--use-mock", action="store_true") run_pack.set_defaults(func=handle_run_pack) run_pack_loop = subparsers.add_parser( "run-pack-loop", help="Run autonomous analyst -> coder -> rerun iterations for a domain pack until the acceptance gate is reached", ) run_pack_loop.add_argument("--manifest", required=True) run_pack_loop.add_argument("--loop-id") run_pack_loop.add_argument("--analysis-date") run_pack_loop.add_argument("--max-scenarios", type=int) run_pack_loop.add_argument("--target-score", type=int, default=80) run_pack_loop.add_argument("--max-iterations", type=int, default=8) run_pack_loop.add_argument("--backend-url", default=DEFAULT_BACKEND_URL) run_pack_loop.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT)) run_pack_loop.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION) run_pack_loop.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"]) run_pack_loop.add_argument("--llm-model", default=DEFAULT_LLM_MODEL) run_pack_loop.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL) run_pack_loop.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY) run_pack_loop.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE) run_pack_loop.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS) run_pack_loop.add_argument("--timeout-seconds", type=int, default=180) run_pack_loop.add_argument("--use-mock", action="store_true") run_pack_loop.add_argument("--codex-binary", default="codex") run_pack_loop.add_argument("--codex-profile") run_pack_loop.add_argument("--codex-model") run_pack_loop.add_argument("--analyst-codex-model", default="gpt-5.4") run_pack_loop.add_argument("--coder-codex-model", default="gpt-5.4-mini") run_pack_loop.add_argument("--analyst-reasoning-effort", default="medium") run_pack_loop.add_argument("--coder-reasoning-effort", default="low") run_pack_loop.add_argument("--codex-timeout-seconds", type=int, default=1800) run_pack_loop.add_argument( "--analyst-schema", default=str(DEFAULT_LOOP_SCHEMA_DIR / "domain_loop_analyst_verdict.schema.json"), ) run_pack_loop.add_argument( "--coder-schema", default=str(DEFAULT_LOOP_SCHEMA_DIR / "domain_loop_coder_result.schema.json"), ) run_pack_loop.set_defaults(func=handle_run_pack_loop) return parser def main() -> int: parser = build_parser() args = parser.parse_args() try: return int(args.func(args)) except Exception as error: # noqa: BLE001 print(f"[domain-case-loop] error: {error}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())