NODEDC_1C/scripts/domain_case_loop.py

2676 lines
110 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
import textwrap
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
REPO_ROOT = Path(__file__).resolve().parent.parent
DEFAULT_ARTIFACTS_ROOT = REPO_ROOT / "artifacts" / "domain_runs"
DEFAULT_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "assistant_sessions"
DEFAULT_REPORTS_DIR = REPO_ROOT / "llm_normalizer" / "reports"
DEFAULT_LOOP_SCHEMA_DIR = REPO_ROOT / "docs" / "orchestration" / "schemas"
DEFAULT_BACKEND_URL = "http://127.0.0.1:8787"
DEFAULT_PROMPT_VERSION = "address_query_runtime_v1"
DEFAULT_LLM_PROVIDER = "local"
DEFAULT_LLM_MODEL = "qwen2.5-14b-instruct-1m"
DEFAULT_LLM_BASE_URL = "http://127.0.0.1:1234/v1"
DEFAULT_LLM_API_KEY = ""
DEFAULT_TEMPERATURE = 0.0
DEFAULT_MAX_OUTPUT_TOKENS = 900
TECH_SECTION_HEADER = "### technical_debug_payload_json"
SCENARIO_MANIFEST_SCHEMA_VERSION = "domain_scenario_manifest_v1"
SCENARIO_STATE_SCHEMA_VERSION = "domain_scenario_state_v1"
SCENARIO_STEP_STATE_SCHEMA_VERSION = "domain_scenario_step_state_v1"
SCENARIO_PACK_SCHEMA_VERSION = "domain_scenario_pack_v1"
ACTIVE_DOMAIN_CONTRACT_SCHEMA_VERSION = "active_domain_contract_v1"
AUTONOMOUS_LOOP_SCHEMA_VERSION = "domain_autonomous_loop_v1"
def dump_json(payload: Any) -> str:
return json.dumps(payload, ensure_ascii=False, indent=2)
def write_text(file_path: Path, text: str) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(text, encoding="utf-8", newline="\n")
def write_json(file_path: Path, payload: Any) -> None:
write_text(file_path, dump_json(payload) + "\n")
def read_text_file(file_path: Path) -> str:
return file_path.read_text(encoding="utf-8-sig")
def sanitize_export_text(value: str) -> str:
raw = str(value or "")
debug_heading = re.search(
r"(?:^|\n)\s*#{0,6}\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json|debug_payload|technical_breakdown)\b",
raw,
flags=re.IGNORECASE,
)
pre_cut = raw[: debug_heading.start()] if debug_heading else raw
without_debug = re.sub(
r"###\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json)[\s\S]*?(?:```[\s\S]*?```|$)",
"",
pre_cut,
flags=re.IGNORECASE,
)
without_debug = re.sub(
r"(?:^|\n)\s*#{0,6}\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json)\b[\s\S]*$",
"",
without_debug,
flags=re.IGNORECASE,
)
inline_patterns = [
re.compile(r"\b(?:debug_payload_json|technical_breakdown_json)\b", re.IGNORECASE),
re.compile(r"\b(?:route_summary|semantic_profile|domain_scope|relation_patterns|account_scope)\b", re.IGNORECASE),
re.compile(r"\b(?:coverage_report|retrieval_status|problem_unit_state|candidate_evidence)\b", re.IGNORECASE),
re.compile(r"\b(?:graph_domain_scope|graph_runtime|selection_reason|why_included)\b", re.IGNORECASE),
]
output_lines: list[str] = []
for line in without_debug.splitlines():
cleaned = line.rstrip()
if not cleaned.strip():
continue
if any(pattern.search(cleaned) for pattern in inline_patterns):
continue
output_lines.append(cleaned)
return "\n".join(output_lines).strip()
def build_conversation_export(session_id: str, conversation: list[dict[str, Any]], mode: str = "technical") -> str:
include_debug = mode == "technical"
lines = [
"# Assistant conversation export",
f"session_id: {session_id or 'n/a'}",
f"export_mode: {mode}",
f"exported_at: {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}",
"",
]
for index, item in enumerate(conversation, start=1):
safe_text = sanitize_export_text(str(item.get("text") or ""))
lines.append(f"## {index}. {item.get('role', 'unknown')}")
lines.append(f"message_id: {item.get('message_id') or 'n/a'}")
lines.append(f"created_at: {item.get('created_at') or 'n/a'}")
lines.append(f"reply_type: {item.get('reply_type') or 'n/a'}")
trace_id = item.get("trace_id")
if trace_id:
lines.append(f"trace_id: {trace_id}")
lines.extend(["", safe_text or "(empty)", ""])
if include_debug and item.get("role") == "assistant" and item.get("debug") is not None:
lines.extend([TECH_SECTION_HEADER, "```json", dump_json(item["debug"]), "```", ""])
return "\n".join(lines)
def slugify_token(value: str | None, fallback: str) -> str:
cleaned = re.sub(r"[^\w-]+", "_", str(value or "").strip(), flags=re.UNICODE).strip("_")
return cleaned or fallback
def slugify_case_id(domain: str, explicit_case_id: str | None) -> str:
if explicit_case_id:
normalized = explicit_case_id.strip()
if normalized:
return normalized
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
return f"{slugify_token(domain, 'domain_case')}_{timestamp}"
def normalize_iso_date(value: Any) -> str | None:
if not isinstance(value, str):
return None
candidate = value.strip()
if not re.fullmatch(r"\d{4}-\d{2}-\d{2}", candidate):
return None
try:
datetime.strptime(candidate, "%Y-%m-%d")
except ValueError:
return None
return candidate
def normalize_analysis_context(raw_context: Any) -> dict[str, Any]:
source = raw_context if isinstance(raw_context, dict) else {}
output: dict[str, Any] = {}
as_of_date = normalize_iso_date(source.get("as_of_date"))
period_from = normalize_iso_date(source.get("period_from"))
period_to = normalize_iso_date(source.get("period_to"))
snapshot_mode_raw = str(source.get("snapshot_mode") or "").strip()
source_raw = str(source.get("source") or "").strip()
if as_of_date:
output["as_of_date"] = as_of_date
if period_from:
output["period_from"] = period_from
if period_to:
output["period_to"] = period_to
if snapshot_mode_raw in {"auto", "force_snapshot", "force_live"}:
output["snapshot_mode"] = snapshot_mode_raw
if source_raw:
output["source"] = source_raw
return output
def merge_analysis_context(base_context: Any, override_context: Any) -> dict[str, Any]:
merged = normalize_analysis_context(base_context)
merged.update(normalize_analysis_context(override_context))
return merged
def carry_forward_analysis_context(
scenario_state: dict[str, Any],
analysis_context: dict[str, Any],
) -> dict[str, Any]:
carried = dict(analysis_context)
semantic_memory = scenario_state.get("semantic_memory")
if isinstance(semantic_memory, dict):
date_scope = semantic_memory.get("date_scope")
if isinstance(date_scope, dict):
carried_as_of_date = normalize_iso_date(date_scope.get("as_of_date"))
if carried_as_of_date and not carried.get("as_of_date"):
carried["as_of_date"] = carried_as_of_date
if not carried.get("source"):
carried["source"] = "scenario_state_carryover"
return carried
def merge_scenario_date_scope(
previous_date_scope: Any,
current_date_scope: Any,
*,
depends_on: list[str],
) -> Any:
previous = previous_date_scope if isinstance(previous_date_scope, dict) else None
current = current_date_scope if isinstance(current_date_scope, dict) else None
if not current:
return previous or current_date_scope
if not depends_on or not previous:
return current
previous_as_of_date = normalize_iso_date(previous.get("as_of_date"))
current_as_of_date = normalize_iso_date(current.get("as_of_date"))
if previous_as_of_date and current_as_of_date and current_as_of_date != previous_as_of_date:
merged = dict(current)
merged["as_of_date"] = previous_as_of_date
if not merged.get("source"):
merged["source"] = "scenario_state_carryover"
return merged
return current
def repair_text_mojibake(value: str) -> str:
if not value:
return value
for encoding in ("utf-8", "cp1251", "cp866"):
try:
repaired = value.encode("latin1").decode(encoding)
except (UnicodeEncodeError, UnicodeDecodeError):
continue
if repaired != value:
return repaired
return value
def normalize_binding_value(value: Any) -> Any:
if isinstance(value, str):
return repair_text_mojibake(value)
if isinstance(value, list):
return [normalize_binding_value(item) for item in value]
if isinstance(value, dict):
return {normalize_binding_value(key) if isinstance(key, str) else key: normalize_binding_value(item) for key, item in value.items()}
return value
def normalize_bindings(raw_bindings: Any) -> dict[str, Any]:
if not isinstance(raw_bindings, dict):
return {}
return {str(key): normalize_binding_value(value) for key, value in raw_bindings.items()}
def normalize_string_list(raw_values: Any) -> list[str]:
if isinstance(raw_values, str):
value = raw_values.strip()
return [value] if value else []
if not isinstance(raw_values, list):
return []
values: list[str] = []
for item in raw_values:
value = str(item or "").strip()
if value:
values.append(value)
return values
def drop_none_values(payload: dict[str, Any]) -> dict[str, Any]:
return {key: value for key, value in payload.items() if value is not None}
def http_json(url: str, *, method: str = "GET", payload: dict[str, Any] | None = None, timeout: int = 30) -> dict[str, Any]:
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json; charset=utf-8"
request = Request(url, data=data, method=method, headers=headers)
try:
with urlopen(request, timeout=timeout) as response:
body = response.read().decode("utf-8")
except HTTPError as error:
detail = error.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {error.code} for {url}: {detail}") from error
except URLError as error:
raise RuntimeError(f"Failed to reach backend at {url}: {error}") from error
try:
return json.loads(body)
except json.JSONDecodeError as error:
raise RuntimeError(f"Backend returned non-JSON payload for {url}") from error
def ensure_backend_health(backend_url: str, timeout_seconds: int) -> dict[str, Any]:
health = http_json(f"{backend_url}/api/health", timeout=max(5, min(timeout_seconds, 30)))
if not isinstance(health, dict):
raise RuntimeError(f"Backend health endpoint returned invalid payload at {backend_url}/api/health")
if health.get("ok") is False:
raise RuntimeError(f"Backend health check failed for {backend_url}: {dump_json(health)}")
return health
def wait_for_job(backend_url: str, job_id: str, timeout_seconds: int, poll_interval_seconds: float) -> dict[str, Any]:
deadline = time.time() + timeout_seconds
last_status = None
while time.time() < deadline:
response = http_json(f"{backend_url}/api/eval/run-async/{job_id}")
job = response.get("job")
if not isinstance(job, dict):
raise RuntimeError("Async job response does not contain `job` object")
status = str(job.get("status") or "unknown")
if status != last_status:
print(f"[domain-case-loop] job {job_id}: {status}")
last_status = status
if status in {"completed", "failed"}:
return job
time.sleep(poll_interval_seconds)
raise TimeoutError(f"Async job {job_id} did not finish within {timeout_seconds} seconds")
def wait_for_file(file_path: Path, timeout_seconds: int = 30) -> None:
deadline = time.time() + timeout_seconds
while time.time() < deadline:
if file_path.exists():
return
time.sleep(0.5)
raise FileNotFoundError(f"Timed out waiting for file: {file_path}")
def read_json_file(file_path: Path) -> dict[str, Any]:
return json.loads(read_text_file(file_path))
def extract_conversation_from_session(session_record: dict[str, Any]) -> list[dict[str, Any]]:
items = session_record.get("items")
if isinstance(items, list) and items:
output: list[dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
continue
output.append(
{
"message_id": item.get("message_id"),
"role": item.get("role"),
"text": item.get("text") or "",
"reply_type": item.get("reply_type"),
"created_at": item.get("created_at"),
"trace_id": item.get("trace_id"),
"debug": item.get("debug"),
}
)
if output:
return output
conversation = session_record.get("conversation")
if isinstance(conversation, list) and conversation:
output = []
for item in conversation:
if not isinstance(item, dict):
continue
output.append(
{
"message_id": item.get("message_id"),
"role": item.get("role"),
"text": item.get("text") or "",
"reply_type": item.get("reply_type"),
"created_at": item.get("created_at"),
"trace_id": item.get("trace_id"),
"debug": item.get("debug"),
}
)
if output:
return output
turns = session_record.get("turns")
if not isinstance(turns, list):
return []
output = []
for turn in turns:
if not isinstance(turn, dict):
continue
technical_json = turn.get("technical_json")
if not isinstance(technical_json, dict):
continue
for item in (technical_json.get("user_message"), technical_json.get("assistant_message")):
if not isinstance(item, dict):
continue
output.append(
{
"message_id": item.get("message_id"),
"role": item.get("role"),
"text": item.get("text") or "",
"reply_type": item.get("reply_type"),
"created_at": item.get("created_at"),
"trace_id": item.get("trace_id"),
"debug": item.get("debug"),
}
)
return output
def find_last_assistant(conversation: list[dict[str, Any]]) -> dict[str, Any]:
for item in reversed(conversation):
if item.get("role") == "assistant":
return item
raise RuntimeError("Conversation does not contain assistant message")
def find_last_user_before(conversation: list[dict[str, Any]], assistant_message_id: Any) -> dict[str, Any] | None:
before_assistant: list[dict[str, Any]] = []
for item in conversation:
if item.get("message_id") == assistant_message_id:
break
before_assistant.append(item)
for item in reversed(before_assistant):
if item.get("role") == "user":
return item
return None
def extract_last_turn(session_record: dict[str, Any]) -> dict[str, Any] | None:
turns = session_record.get("turns")
if not isinstance(turns, list) or not turns:
return None
last_turn = turns[-1]
return last_turn if isinstance(last_turn, dict) else None
def extract_report_case(report_record: dict[str, Any], case_id: str) -> dict[str, Any] | None:
results = report_record.get("results")
if not isinstance(results, list):
return None
for item in results:
if isinstance(item, dict) and str(item.get("case_id") or "") == case_id:
return item
return None
def build_turn_artifact(
*,
slot: str,
domain: str,
case_id: str,
question: str | None,
session_id: str,
conversation: list[dict[str, Any]],
session_record: dict[str, Any] | None,
job_record: dict[str, Any] | None,
report_case: dict[str, Any] | None,
export_file_name: str,
) -> dict[str, Any]:
last_assistant = find_last_assistant(conversation)
last_user = find_last_user_before(conversation, last_assistant.get("message_id"))
final_question = question or (last_user.get("text") if isinstance(last_user, dict) else None)
last_turn = extract_last_turn(session_record or {})
session_record_safe = session_record or {}
return {
"schema_version": "domain_case_turn_artifact_v1",
"artifact_slot": slot,
"domain": domain,
"case_id": case_id,
"question": final_question,
"session_id": session_id,
"run": {
"job_id": job_record.get("job_id") if isinstance(job_record, dict) else None,
"run_id": job_record.get("run_id") if isinstance(job_record, dict) else None,
"analysis_date": job_record.get("analysis_date") if isinstance(job_record, dict) else None,
"report_case_available": report_case is not None,
},
"user_message": last_user,
"assistant_message": last_assistant,
"technical_debug_payload": last_assistant.get("debug"),
"session_summary": {
"schema_version": session_record_safe.get("schema_version"),
"updated_at": session_record_safe.get("updated_at"),
"trace_ids": session_record_safe.get("trace_ids"),
"reply_types": session_record_safe.get("reply_types"),
"investigation_state": session_record_safe.get("investigation_state"),
"address_navigation_state": session_record_safe.get("address_navigation_state"),
"items_count": len(session_record_safe.get("items", [])) if isinstance(session_record_safe.get("items"), list) else None,
"last_turn": last_turn,
},
"report_case": report_case,
"export_markdown_file": export_file_name,
}
def ensure_case_brief(
case_dir: Path,
*,
domain: str,
question: str | None,
expected_capability: str | None,
expected_result_mode: str | None,
) -> None:
file_path = case_dir / "case_brief.md"
if file_path.exists():
return
content = textwrap.dedent(
f"""\
# Case brief
## Domain
`{domain}`
## Raw user question
`{question or "<fill me>"}`
## Expected business meaning
- <fill me>
## Expected capability
- {expected_capability or "<fill me>"}
## Expected result mode
- {expected_result_mode or "<fill me>"}
## Constraints
- no architecture changes
- 1C/MCP first
- no fabricated values
- heuristic is not product success
- accepted requires analyst quality score >= 80 and zero unresolved P0
## Known current behavior
- <fill me>
## Draft acceptance criteria
- <fill me>
"""
)
write_text(file_path, content)
def save_capture_bundle(
*,
case_dir: Path,
slot: str,
export_markdown: str,
debug_payload: Any,
turn_artifact: dict[str, Any],
session_record: dict[str, Any] | None,
job_record: dict[str, Any] | None,
report_case: dict[str, Any] | None,
) -> None:
write_text(case_dir / f"{slot}_output.md", export_markdown)
write_json(case_dir / f"{slot}_debug.json", debug_payload if debug_payload is not None else {})
write_json(case_dir / f"{slot}_turn.json", turn_artifact)
if session_record is not None:
write_json(case_dir / f"{slot}_session.json", session_record)
if job_record is not None:
write_json(case_dir / f"{slot}_job.json", job_record)
if report_case is not None:
write_json(case_dir / f"{slot}_report_case.json", report_case)
def parse_metadata_line(line: str) -> tuple[str, str] | None:
if ":" not in line:
return None
key, value = line.split(":", 1)
return key.strip(), value.strip()
def parse_export_markdown(text: str) -> tuple[str, list[dict[str, Any]]]:
session_id = "n/a"
session_match = re.search(r"^session_id:\s*(.+?)\s*$", text, flags=re.MULTILINE)
if session_match:
session_id = session_match.group(1).strip()
section_pattern = re.compile(r"^##\s+\d+\.\s+(user|assistant)\s*$", flags=re.MULTILINE)
sections = list(section_pattern.finditer(text))
conversation: list[dict[str, Any]] = []
for index, match in enumerate(sections):
role = match.group(1)
start = match.end()
end = sections[index + 1].start() if index + 1 < len(sections) else len(text)
block = text[start:end].lstrip("\r\n")
lines = block.splitlines()
metadata: dict[str, Any] = {"role": role}
cursor = 0
while cursor < len(lines):
line = lines[cursor]
if not line.strip():
cursor += 1
break
meta = parse_metadata_line(line)
if not meta:
break
key, value = meta
metadata[key] = value
cursor += 1
body_lines = lines[cursor:]
debug_payload = None
debug_start = None
for body_index, line in enumerate(body_lines):
if line.strip().lower() == TECH_SECTION_HEADER.lower():
debug_start = body_index
break
if debug_start is not None:
body_text_lines = body_lines[:debug_start]
debug_lines = body_lines[debug_start + 1 :]
debug_text = "\n".join(debug_lines).strip()
fenced = re.search(r"```json\s*(.*?)\s*```", debug_text, flags=re.DOTALL | re.IGNORECASE)
if fenced:
debug_text = fenced.group(1).strip()
if debug_text:
debug_payload = json.loads(debug_text)
else:
body_text_lines = body_lines
conversation.append(
{
"message_id": metadata.get("message_id"),
"role": role,
"text": "\n".join(body_text_lines).strip(),
"reply_type": metadata.get("reply_type"),
"created_at": metadata.get("created_at"),
"trace_id": metadata.get("trace_id"),
"debug": debug_payload,
}
)
if not conversation:
raise RuntimeError("Could not parse conversation sections from export markdown")
return session_id, conversation
def build_normalize_config(args: argparse.Namespace) -> dict[str, Any]:
return {
"llmProvider": args.llm_provider,
"apiKey": args.llm_api_key,
"model": args.llm_model,
"baseUrl": args.llm_base_url,
"temperature": args.temperature,
"maxOutputTokens": args.max_output_tokens,
"promptVersion": args.prompt_version,
}
def fetch_session_snapshot(backend_url: str, session_id: str, timeout_seconds: int) -> dict[str, Any]:
deadline = time.time() + timeout_seconds
last_error: Exception | None = None
timeout_per_call = max(10, min(30, timeout_seconds))
while time.time() < deadline:
try:
response = http_json(f"{backend_url}/api/assistant/session/{session_id}", timeout=timeout_per_call)
session = response.get("session")
if isinstance(session, dict):
return session
except Exception as error: # noqa: BLE001
last_error = error
time.sleep(0.5)
if last_error is not None:
raise RuntimeError(f"Failed to fetch assistant session `{session_id}`: {last_error}") from last_error
raise RuntimeError(f"Assistant session `{session_id}` was not available within {timeout_seconds} seconds")
def build_assistant_message_payload(
args: argparse.Namespace,
*,
question: str,
session_id: str | None,
analysis_context: dict[str, Any],
) -> dict[str, Any]:
context_payload: dict[str, Any] = {}
normalized_analysis_context = normalize_analysis_context(analysis_context)
if normalized_analysis_context:
context_payload["analysis_context"] = normalized_analysis_context
as_of_date = normalized_analysis_context.get("as_of_date")
if isinstance(as_of_date, str):
context_payload["period_hint"] = as_of_date
payload = drop_none_values(
{
"session_id": session_id,
"user_message": question,
"message": question,
"mode": "assistant",
"llmProvider": args.llm_provider,
"apiKey": args.llm_api_key,
"model": args.llm_model,
"baseUrl": args.llm_base_url,
"temperature": args.temperature,
"maxOutputTokens": args.max_output_tokens,
"promptVersion": args.prompt_version,
"context": context_payload or None,
"useMock": bool(args.use_mock),
}
)
return payload
def parse_path_tokens(path_expression: str) -> list[str | int]:
tokens: list[str | int] = []
cursor = 0
path = path_expression.strip()
while cursor < len(path):
current = path[cursor]
if current == ".":
cursor += 1
continue
if current == "[":
closing = path.find("]", cursor)
if closing < 0:
raise RuntimeError(f"Invalid placeholder path: {path_expression}")
raw_index = path[cursor + 1 : closing].strip()
if not raw_index.isdigit():
raise RuntimeError(f"Only numeric list indexes are supported in placeholders: {path_expression}")
tokens.append(int(raw_index))
cursor = closing + 1
continue
end = cursor
while end < len(path) and path[end] not in ".[":
end += 1
tokens.append(path[cursor:end])
cursor = end
return tokens
def lookup_placeholder_value(path_expression: str, scenario_state: dict[str, Any]) -> Any:
root: dict[str, Any] = {
"scenario_state": scenario_state,
"step_outputs": scenario_state.get("step_outputs", {}),
"semantic_memory": scenario_state.get("semantic_memory", {}),
"bindings": scenario_state.get("bindings", {}),
}
if isinstance(scenario_state.get("step_outputs"), dict):
root.update(scenario_state["step_outputs"])
current: Any = root
for token in parse_path_tokens(path_expression):
if isinstance(token, int):
if not isinstance(current, list):
raise RuntimeError(f"Placeholder `{path_expression}` does not point to a list before index access")
if token >= len(current):
raise RuntimeError(f"Placeholder `{path_expression}` index {token} is out of range")
current = current[token]
continue
if not isinstance(current, dict) or token not in current:
raise RuntimeError(f"Placeholder `{path_expression}` could not be resolved at `{token}`")
current = current[token]
return current
def resolve_question_template(question_template: str, scenario_state: dict[str, Any]) -> str:
pattern = re.compile(r"{{\s*([^{}]+?)\s*}}")
def replace(match: re.Match[str]) -> str:
value = lookup_placeholder_value(match.group(1), scenario_state)
if isinstance(value, (dict, list)):
return dump_json(value)
return str(value)
return pattern.sub(replace, question_template).strip()
def build_failed_step_state(
*,
scenario_id: str,
domain: str,
step: dict[str, Any],
step_index: int,
question_resolved: str,
status: str,
failure_type: str,
error_message: str,
) -> dict[str, Any]:
return {
"schema_version": SCENARIO_STEP_STATE_SCHEMA_VERSION,
"scenario_id": scenario_id,
"domain": domain,
"step_id": step["step_id"],
"step_index": step_index,
"title": step["title"],
"depends_on": step["depends_on"],
"question_template": step["question_template"],
"question_resolved": question_resolved,
"expected_capability": step.get("expected_capability"),
"expected_result_mode": step.get("expected_result_mode"),
"reply_type": "backend_error" if status == "blocked" else "unresolved_followup",
"assistant_message_id": None,
"trace_id": None,
"detected_mode": None,
"detected_intent": None,
"selected_recipe": None,
"capability_id": None,
"capability_route_mode": None,
"route_expectation_status": None,
"result_mode": None,
"response_type": None,
"fallback_type": failure_type,
"mcp_call_status": None,
"balance_confirmed": None,
"active_result_set_id": None,
"last_confirmed_route": None,
"date_scope": None,
"organization_scope": None,
"entries": [],
"status": status,
"failure_type": failure_type,
"error_message": error_message,
}
def normalize_step_definition(index: int, raw_step: Any) -> dict[str, Any]:
if isinstance(raw_step, str):
step_id = f"step_{index:02d}"
question_template = raw_step.strip()
if not question_template:
raise RuntimeError(f"Scenario step {index} must not be empty")
return {
"step_id": step_id,
"title": step_id,
"question_template": question_template,
"depends_on": [],
"analysis_context": {},
"expected_capability": None,
"expected_result_mode": None,
"question_id": None,
"node_id": None,
"node_role": None,
"paraphrase_family": None,
"required_carryover_invariants": [],
"ordering_rule": None,
}
if not isinstance(raw_step, dict):
raise RuntimeError(f"Scenario step {index} must be a string or object")
raw_step_id = str(raw_step.get("step_id") or "").strip()
step_id = slugify_token(raw_step_id, f"step_{index:02d}")
question_template = str(raw_step.get("question") or raw_step.get("question_template") or "").strip()
if not question_template:
raise RuntimeError(f"Scenario step `{step_id}` must define `question` or `question_template`")
depends_on_raw = raw_step.get("depends_on")
depends_on: list[str] = []
if isinstance(depends_on_raw, str) and depends_on_raw.strip():
depends_on = [depends_on_raw.strip()]
elif isinstance(depends_on_raw, list):
depends_on = [str(item).strip() for item in depends_on_raw if str(item).strip()]
return {
"step_id": step_id,
"title": str(raw_step.get("title") or step_id).strip() or step_id,
"question_template": question_template,
"depends_on": depends_on,
"analysis_context": normalize_analysis_context(raw_step.get("analysis_context")),
"expected_capability": str(raw_step.get("expected_capability") or "").strip() or None,
"expected_result_mode": str(raw_step.get("expected_result_mode") or "").strip() or None,
"question_id": str(raw_step.get("question_id") or "").strip() or None,
"node_id": str(raw_step.get("node_id") or "").strip() or None,
"node_role": str(raw_step.get("node_role") or raw_step.get("role") or "").strip() or None,
"paraphrase_family": str(raw_step.get("paraphrase_family") or raw_step.get("wording_family") or "").strip() or None,
"required_carryover_invariants": normalize_string_list(raw_step.get("required_carryover_invariants")),
"ordering_rule": str(raw_step.get("ordering_rule") or "").strip() or None,
}
def normalize_scenario_manifest(
raw_manifest: dict[str, Any],
*,
fallback_domain: str | None = None,
fallback_analysis_context: Any = None,
fallback_bindings: Any = None,
default_scenario_id: str | None = None,
) -> dict[str, Any]:
domain = str(raw_manifest.get("domain") or fallback_domain or "").strip()
if not domain:
raise RuntimeError("Scenario manifest must define `domain`")
raw_steps = raw_manifest.get("steps")
if not isinstance(raw_steps, list) or not raw_steps:
raise RuntimeError("Scenario manifest must define non-empty `steps`")
steps = [normalize_step_definition(index + 1, raw_step) for index, raw_step in enumerate(raw_steps)]
scenario_id = str(raw_manifest.get("scenario_id") or "").strip() or default_scenario_id or slugify_case_id(domain, None)
title = str(raw_manifest.get("title") or domain).strip() or domain
description = str(raw_manifest.get("description") or "").strip() or None
analysis_context = merge_analysis_context(fallback_analysis_context, raw_manifest.get("analysis_context"))
if analysis_context and "source" not in analysis_context:
analysis_context["source"] = "scenario_manifest"
bindings = normalize_bindings(fallback_bindings)
bindings.update(normalize_bindings(raw_manifest.get("bindings")))
return {
"schema_version": str(raw_manifest.get("schema_version") or SCENARIO_MANIFEST_SCHEMA_VERSION),
"scenario_id": scenario_id,
"domain": domain,
"title": title,
"description": description,
"analysis_context": analysis_context,
"bindings": bindings,
"question_ids": normalize_string_list(raw_manifest.get("question_ids")),
"node_ids": normalize_string_list(raw_manifest.get("node_ids")),
"acceptance_canon": raw_manifest.get("acceptance_canon") if isinstance(raw_manifest.get("acceptance_canon"), dict) else {},
"steps": steps,
}
def load_scenario_manifest(file_path: Path) -> dict[str, Any]:
raw_manifest = read_json_file(file_path)
return normalize_scenario_manifest(raw_manifest)
def build_active_contract_bindings(raw_contract: dict[str, Any]) -> dict[str, Any]:
observed_anchors = raw_contract.get("observed_anchors")
anchors = observed_anchors if isinstance(observed_anchors, dict) else {}
bindings = {
"target_date_historical": anchors.get("historical_as_of_date"),
"target_date_current": anchors.get("current_as_of_date_example"),
"observed_warehouse": anchors.get("warehouse"),
"observed_organization": anchors.get("organization"),
"focus_item_current": anchors.get("focus_item_current"),
"focus_item_historical": anchors.get("focus_item_historical"),
"focus_item_small_residue": anchors.get("focus_item_small_residue"),
"observed_supplier_candidate": anchors.get("supplier_candidate"),
"observed_supplier_candidate_alt": anchors.get("supplier_candidate_alt"),
"observed_customer_candidate": anchors.get("buyer_candidate"),
}
bindings.update(normalize_bindings(raw_contract.get("bindings")))
return normalize_bindings(bindings)
def convert_active_domain_contract_to_pack(raw_contract: dict[str, Any]) -> dict[str, Any]:
orchestration_pack = raw_contract.get("orchestration_pack")
if not isinstance(orchestration_pack, dict):
raise RuntimeError("Active domain contract must define object `orchestration_pack`")
raw_scenarios = orchestration_pack.get("scenarios")
if not isinstance(raw_scenarios, list) or not raw_scenarios:
raise RuntimeError("Active domain contract must define non-empty `orchestration_pack.scenarios`")
runtime_domain = (
str(raw_contract.get("runtime_domain") or raw_contract.get("domain") or "").strip()
or ("inventory_stock" if str(raw_contract.get("domain_id") or "").startswith("inventory_stock") else "")
)
if not runtime_domain:
raise RuntimeError("Active domain contract must define `runtime_domain`")
domain_id = str(raw_contract.get("domain_id") or runtime_domain).strip() or runtime_domain
bindings = build_active_contract_bindings(raw_contract)
bindings.update(normalize_bindings(orchestration_pack.get("bindings")))
analysis_context = merge_analysis_context(raw_contract.get("default_analysis_context"), raw_contract.get("analysis_context"))
analysis_context = merge_analysis_context(analysis_context, orchestration_pack.get("analysis_context"))
if analysis_context and "source" not in analysis_context:
analysis_context["source"] = "active_domain_contract"
pack_id = str(orchestration_pack.get("pack_id") or "").strip() or slugify_case_id(domain_id, None)
title = str(orchestration_pack.get("title") or raw_contract.get("title") or domain_id).strip() or domain_id
description = str(orchestration_pack.get("description") or raw_contract.get("domain_goal") or "").strip() or None
return {
"schema_version": SCENARIO_PACK_SCHEMA_VERSION,
"source_schema_version": ACTIVE_DOMAIN_CONTRACT_SCHEMA_VERSION,
"source_contract_id": domain_id,
"pack_id": pack_id,
"domain": runtime_domain,
"title": title,
"description": description,
"analysis_context": analysis_context,
"bindings": bindings,
"scenarios": raw_scenarios,
"scenario_tree": raw_contract.get("scenario_tree") if isinstance(raw_contract.get("scenario_tree"), dict) else {},
"acceptance_contract": (
raw_contract.get("acceptance_contract") if isinstance(raw_contract.get("acceptance_contract"), dict) else {}
),
"question_pool": raw_contract.get("question_pool") if isinstance(raw_contract.get("question_pool"), dict) else {},
"wording_families": raw_contract.get("wording_families") if isinstance(raw_contract.get("wording_families"), list) else [],
"known_failure_patterns_to_watch": (
raw_contract.get("known_failure_patterns_to_watch")
if isinstance(raw_contract.get("known_failure_patterns_to_watch"), list)
else []
),
"source_contract": {
"domain_id": domain_id,
"title": str(raw_contract.get("title") or domain_id).strip() or domain_id,
"status": str(raw_contract.get("status") or "active").strip() or "active",
},
}
def load_scenario_pack(file_path: Path) -> dict[str, Any]:
raw_pack = read_json_file(file_path)
schema_version = str(raw_pack.get("schema_version") or "").strip()
if schema_version == ACTIVE_DOMAIN_CONTRACT_SCHEMA_VERSION:
raw_pack = convert_active_domain_contract_to_pack(raw_pack)
domain = str(raw_pack.get("domain") or "").strip()
if not domain:
raise RuntimeError("Scenario pack must define `domain`")
raw_scenarios = raw_pack.get("scenarios")
if not isinstance(raw_scenarios, list) or not raw_scenarios:
raise RuntimeError("Scenario pack must define non-empty `scenarios`")
pack_id = str(raw_pack.get("pack_id") or "").strip() or slugify_case_id(domain, None)
title = str(raw_pack.get("title") or domain).strip() or domain
description = str(raw_pack.get("description") or "").strip() or None
analysis_context = normalize_analysis_context(raw_pack.get("analysis_context"))
if analysis_context and "source" not in analysis_context:
analysis_context["source"] = "scenario_pack"
bindings = normalize_bindings(raw_pack.get("bindings"))
scenarios = [
normalize_scenario_manifest(
raw_scenario,
fallback_domain=domain,
fallback_analysis_context=analysis_context,
fallback_bindings=bindings,
default_scenario_id=f"{pack_id}_scenario_{index + 1:02d}",
)
for index, raw_scenario in enumerate(raw_scenarios)
]
normalized_pack = {
"schema_version": str(raw_pack.get("schema_version") or SCENARIO_PACK_SCHEMA_VERSION),
"pack_id": pack_id,
"domain": domain,
"title": title,
"description": description,
"analysis_context": analysis_context,
"bindings": bindings,
"scenarios": scenarios,
}
for optional_key in (
"source_schema_version",
"source_contract_id",
"scenario_tree",
"acceptance_contract",
"question_pool",
"wording_families",
"known_failure_patterns_to_watch",
"source_contract",
):
optional_value = raw_pack.get(optional_key)
if optional_value is not None:
normalized_pack[optional_key] = optional_value
return normalized_pack
def ensure_scenario_brief(scenario_dir: Path, manifest: dict[str, Any]) -> None:
file_path = scenario_dir / "scenario_brief.md"
if file_path.exists():
return
steps_lines = "\n".join(
f"{index}. `{step['step_id']}` - {step['question_template']}" for index, step in enumerate(manifest["steps"], start=1)
)
content = textwrap.dedent(
f"""\
# Scenario brief
## Domain
`{manifest["domain"]}`
## Scenario id
`{manifest["scenario_id"]}`
## Title
{manifest["title"]}
## Description
{manifest.get("description") or "<fill me>"}
## Shared analysis context
```json
{dump_json(manifest.get("analysis_context") or {})}
```
## Bindings
```json
{dump_json(manifest.get("bindings") or {})}
```
## Steps
{steps_lines}
## Constraints
- no architecture changes
- reuse current assistant runtime and session state
- 1C/MCP first
- no fabricated values
- missing route or capability is domain enablement work, not silent rejection
"""
)
write_text(file_path, content)
def normalize_field_key(raw_key: str) -> str:
normalized = re.sub(r"\s+", " ", raw_key.strip().lower().replace("ё", "е")).strip(" :")
aliases = {
"склад": "warehouse",
"количество": "quantity",
"стоимость": "amount",
"сумма": "amount",
"организация": "organization",
"дата строки": "row_date",
"дата": "date",
"поставщик": "supplier",
"договор": "contract",
"документ": "document",
"документы": "documents",
"покупатель": "customer",
}
if normalized in aliases:
return aliases[normalized]
return slugify_token(normalized, "field").lower()
def extract_structured_entries(answer_text: str) -> list[dict[str, Any]]:
entries: list[dict[str, Any]] = []
for line in answer_text.splitlines():
match = re.match(r"^\s*(\d+)\.\s+(.*\S)\s*$", line)
if not match:
continue
index = int(match.group(1))
payload = match.group(2).strip()
segments = [segment.strip() for segment in payload.split(" | ") if segment.strip()]
title = segments[0] if segments else payload
fields: dict[str, str] = {}
raw_fields: dict[str, str] = {}
for segment in segments[1:]:
if ":" not in segment:
continue
raw_key, raw_value = segment.split(":", 1)
key = normalize_field_key(raw_key)
value = raw_value.strip()
fields[key] = value
raw_fields[raw_key.strip()] = value
entry: dict[str, Any] = {
"index": index,
"title": title,
"item": title,
"fields": fields,
"raw_fields": raw_fields,
"raw_line": line.strip(),
}
for key, value in fields.items():
entry[key] = value
entries.append(entry)
return entries
def derive_step_status(reply_type: str | None, debug_payload: dict[str, Any]) -> str:
if reply_type == "backend_error":
return "blocked"
capability_route_mode = str(debug_payload.get("capability_route_mode") or "").strip()
fallback_type = str(debug_payload.get("fallback_type") or "").strip()
selected_recipe = str(debug_payload.get("selected_recipe") or "").strip()
detected_intent = str(debug_payload.get("detected_intent") or "").strip()
detected_mode = str(debug_payload.get("detected_mode") or "").strip()
if capability_route_mode == "exact" and fallback_type in {"", "none"} and reply_type in {"factual", "factual_with_explanation", "empty_but_valid"}:
return "exact"
if fallback_type in {"out_of_scope", "unknown"}:
return "needs_exact_capability"
if capability_route_mode == "exact" and detected_mode == "address_query":
return "partial"
if detected_mode == "address_query" and (selected_recipe or detected_intent):
return "partial"
if reply_type in {"partial_coverage", "clarification_required", "out_of_scope", "no_grounded_answer", "route_mismatch_blocked"}:
return "needs_exact_capability"
return "needs_exact_capability"
def build_scenario_step_state(
*,
scenario_id: str,
domain: str,
step: dict[str, Any],
step_index: int,
question_resolved: str,
turn_artifact: dict[str, Any],
entries: list[dict[str, Any]],
) -> dict[str, Any]:
debug_payload = turn_artifact.get("technical_debug_payload")
debug = debug_payload if isinstance(debug_payload, dict) else {}
session_summary = turn_artifact.get("session_summary")
summary = session_summary if isinstance(session_summary, dict) else {}
address_state = summary.get("address_navigation_state")
navigation_state = address_state if isinstance(address_state, dict) else {}
session_context = navigation_state.get("session_context")
context = session_context if isinstance(session_context, dict) else {}
assistant_message = turn_artifact.get("assistant_message")
assistant_item = assistant_message if isinstance(assistant_message, dict) else {}
reply_type = assistant_item.get("reply_type")
step_state = {
"schema_version": SCENARIO_STEP_STATE_SCHEMA_VERSION,
"scenario_id": scenario_id,
"domain": domain,
"step_id": step["step_id"],
"step_index": step_index,
"title": step["title"],
"depends_on": step["depends_on"],
"question_template": step["question_template"],
"question_resolved": question_resolved,
"expected_capability": step.get("expected_capability"),
"expected_result_mode": step.get("expected_result_mode"),
"reply_type": reply_type,
"assistant_message_id": assistant_item.get("message_id"),
"trace_id": assistant_item.get("trace_id"),
"detected_mode": debug.get("detected_mode"),
"detected_intent": debug.get("detected_intent"),
"selected_recipe": debug.get("selected_recipe"),
"capability_id": debug.get("capability_id"),
"capability_route_mode": debug.get("capability_route_mode"),
"route_expectation_status": debug.get("route_expectation_status"),
"result_mode": debug.get("result_mode"),
"response_type": debug.get("response_type"),
"fallback_type": debug.get("fallback_type"),
"mcp_call_status": debug.get("mcp_call_status"),
"balance_confirmed": debug.get("balance_confirmed"),
"active_result_set_id": context.get("active_result_set_id"),
"last_confirmed_route": context.get("last_confirmed_route"),
"date_scope": context.get("date_scope"),
"organization_scope": context.get("organization_scope"),
"entries": entries,
}
step_state["status"] = derive_step_status(reply_type if isinstance(reply_type, str) else None, debug)
return step_state
def save_scenario_step_bundle(
*,
step_dir: Path,
export_markdown: str,
turn_artifact: dict[str, Any],
session_record: dict[str, Any],
response_payload: dict[str, Any],
step_state: dict[str, Any],
) -> None:
debug_payload = turn_artifact.get("technical_debug_payload")
write_text(step_dir / "output.md", export_markdown)
write_json(step_dir / "debug.json", debug_payload if debug_payload is not None else {})
write_json(step_dir / "turn.json", turn_artifact)
write_json(step_dir / "session.json", session_record)
write_json(step_dir / "assistant_response.json", response_payload)
write_json(step_dir / "step_state.json", step_state)
write_text(step_dir / "resolved_question.txt", f"{step_state['question_resolved']}\n")
def derive_scenario_status(step_outputs: dict[str, dict[str, Any]]) -> str:
statuses = [str(item.get("status") or "") for item in step_outputs.values()]
if not statuses:
return "blocked"
if any(status == "blocked" for status in statuses):
return "blocked"
if any(status == "needs_exact_capability" for status in statuses):
return "needs_exact_capability"
if any(status == "partial" for status in statuses):
return "partial"
return "accepted"
def build_scenario_summary(manifest: dict[str, Any], scenario_state: dict[str, Any], final_status: str) -> str:
lines = [
"# Scenario summary",
"",
f"- scenario_id: `{manifest['scenario_id']}`",
f"- domain: `{manifest['domain']}`",
f"- title: {manifest['title']}",
f"- session_id: `{scenario_state.get('session_id') or 'n/a'}`",
f"- final_status: `{final_status}`",
"",
"## Steps",
]
for index, step in enumerate(manifest["steps"], start=1):
step_output = scenario_state.get("step_outputs", {}).get(step["step_id"], {})
lines.extend(
[
f"{index}. `{step['step_id']}` - {step['question_template']}",
f"status: `{step_output.get('status') or 'n/a'}`",
f"question_resolved: {step_output.get('question_resolved') or 'n/a'}",
f"intent: `{step_output.get('detected_intent') or 'n/a'}`",
f"recipe: `{step_output.get('selected_recipe') or 'n/a'}`",
f"capability: `{step_output.get('capability_id') or 'n/a'}`",
f"result_mode: `{step_output.get('result_mode') or 'n/a'}`",
f"result_set: `{step_output.get('active_result_set_id') or 'n/a'}`",
"",
]
)
return "\n".join(lines).strip() + "\n"
def build_scenario_final_status(manifest: dict[str, Any], scenario_state: dict[str, Any], final_status: str) -> str:
reason = {
"accepted": "all scenario steps executed in one assistant session with no unresolved route or capability gaps",
"partial": "scenario captured successfully, but at least one step still needs exact capability enablement or route hardening",
"needs_exact_capability": "scenario is valid for the project, but at least one step still requires exact capability or route enablement",
"blocked": "scenario run was interrupted by runtime or backend failure",
}.get(final_status, "scenario status unknown")
return textwrap.dedent(
f"""\
# Final status
- status: `{final_status}`
- scenario_id: `{manifest['scenario_id']}`
- session_id: `{scenario_state.get('session_id') or 'n/a'}`
- reason: {reason}
"""
)
def run_assistant_step(
*,
args: argparse.Namespace,
domain: str,
scenario_id: str,
step: dict[str, Any],
step_index: int,
session_id: str | None,
question_resolved: str,
analysis_context: dict[str, Any],
) -> dict[str, Any]:
payload = build_assistant_message_payload(
args,
question=question_resolved,
session_id=session_id,
analysis_context=analysis_context,
)
response_payload = http_json(
f"{args.backend_url}/api/assistant/message",
method="POST",
payload=payload,
timeout=max(30, int(args.timeout_seconds)),
)
resolved_session_id = str(response_payload.get("session_id") or session_id or "").strip()
if not resolved_session_id:
raise RuntimeError(f"Assistant response for step `{step['step_id']}` does not contain session_id")
session_record = fetch_session_snapshot(args.backend_url, resolved_session_id, args.timeout_seconds)
conversation = extract_conversation_from_session(session_record)
export_markdown = build_conversation_export(resolved_session_id, conversation, mode="technical")
turn_artifact = build_turn_artifact(
slot="step",
domain=domain,
case_id=scenario_id,
question=question_resolved,
session_id=resolved_session_id,
conversation=conversation,
session_record=session_record,
job_record=None,
report_case=None,
export_file_name="output.md",
)
turn_artifact["schema_version"] = "domain_scenario_turn_artifact_v1"
turn_artifact["scenario"] = {
"scenario_id": scenario_id,
"step_id": step["step_id"],
"step_index": step_index,
"question_template": step["question_template"],
"question_resolved": question_resolved,
"depends_on": step["depends_on"],
"analysis_context": analysis_context,
}
last_assistant = find_last_assistant(conversation)
entries = extract_structured_entries(str(last_assistant.get("text") or ""))
step_state = build_scenario_step_state(
scenario_id=scenario_id,
domain=domain,
step=step,
step_index=step_index,
question_resolved=question_resolved,
turn_artifact=turn_artifact,
entries=entries,
)
return {
"session_id": resolved_session_id,
"response_payload": response_payload,
"session_record": session_record,
"conversation": conversation,
"export_markdown": export_markdown,
"turn_artifact": turn_artifact,
"step_state": step_state,
}
def execute_scenario_manifest(
*,
args: argparse.Namespace,
manifest: dict[str, Any],
scenario_dir: Path,
manifest_source_label: str | None,
) -> tuple[dict[str, Any], str]:
steps_dir = scenario_dir / "steps"
steps_dir.mkdir(parents=True, exist_ok=True)
write_json(scenario_dir / "scenario_manifest.json", manifest)
if manifest_source_label:
write_text(scenario_dir / "manifest_source.txt", f"{manifest_source_label}\n")
ensure_scenario_brief(scenario_dir, manifest)
scenario_state: dict[str, Any] = {
"schema_version": SCENARIO_STATE_SCHEMA_VERSION,
"scenario_id": manifest["scenario_id"],
"domain": manifest["domain"],
"title": manifest["title"],
"session_id": None,
"analysis_context": manifest.get("analysis_context") or {},
"bindings": manifest.get("bindings") or {},
"step_outputs": {},
"semantic_memory": {},
"updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
}
write_json(scenario_dir / "scenario_state.json", scenario_state)
last_export_markdown = ""
for step_index, step in enumerate(manifest["steps"], start=1):
step_dir = steps_dir / step["step_id"]
step_analysis_context = merge_analysis_context(manifest.get("analysis_context"), step.get("analysis_context"))
step_analysis_context = carry_forward_analysis_context(scenario_state, step_analysis_context)
try:
resolved_question = resolve_question_template(step["question_template"], scenario_state)
result = run_assistant_step(
args=args,
domain=manifest["domain"],
scenario_id=manifest["scenario_id"],
step=step,
step_index=step_index,
session_id=scenario_state.get("session_id"),
question_resolved=resolved_question,
analysis_context=step_analysis_context,
)
except RuntimeError as exc:
failure_type = "runtime_error"
failure_status = "blocked"
if "Placeholder `" in str(exc):
failure_type = "placeholder_resolution_error"
failure_status = "needs_exact_capability"
step_state = build_failed_step_state(
scenario_id=manifest["scenario_id"],
domain=manifest["domain"],
step=step,
step_index=step_index,
question_resolved=step["question_template"],
status=failure_status,
failure_type=failure_type,
error_message=str(exc),
)
scenario_state["step_outputs"][step["step_id"]] = step_state
scenario_state["semantic_memory"] = {
**(scenario_state.get("semantic_memory") or {}),
"latest_step_id": step["step_id"],
"latest_step_status": step_state["status"],
}
scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
save_scenario_step_bundle(
step_dir=step_dir,
export_markdown="",
turn_artifact={},
session_record={},
response_payload={},
step_state=step_state,
)
write_json(scenario_dir / "scenario_state.json", scenario_state)
print(
f"[domain-case-loop] scenario {manifest['scenario_id']} step {step_index}/{len(manifest['steps'])}: "
f"{step['step_id']} -> {step_state['status']} ({failure_type})"
)
if failure_status == "blocked":
break
continue
scenario_state["session_id"] = result["session_id"]
scenario_state["step_outputs"][step["step_id"]] = result["step_state"]
previous_semantic_memory = scenario_state.get("semantic_memory") or {}
previous_date_scope = previous_semantic_memory.get("date_scope") if isinstance(previous_semantic_memory, dict) else None
current_date_scope = result["step_state"].get("date_scope")
scenario_state["semantic_memory"] = {
"latest_step_id": step["step_id"],
"latest_step_status": result["step_state"].get("status"),
"active_result_set_id": result["step_state"].get("active_result_set_id"),
"last_confirmed_route": result["step_state"].get("last_confirmed_route"),
"date_scope": merge_scenario_date_scope(
previous_date_scope,
current_date_scope,
depends_on=step.get("depends_on") or [],
),
"organization_scope": result["step_state"].get("organization_scope"),
"entries": result["step_state"].get("entries"),
}
scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
save_scenario_step_bundle(
step_dir=step_dir,
export_markdown=result["export_markdown"],
turn_artifact=result["turn_artifact"],
session_record=result["session_record"],
response_payload=result["response_payload"],
step_state=result["step_state"],
)
write_json(scenario_dir / "scenario_state.json", scenario_state)
last_export_markdown = result["export_markdown"]
print(
f"[domain-case-loop] scenario {manifest['scenario_id']} step {step_index}/{len(manifest['steps'])}: "
f"{step['step_id']} -> {result['step_state']['status']}"
)
final_status = derive_scenario_status(scenario_state["step_outputs"])
write_text(scenario_dir / "scenario_output.md", last_export_markdown or "")
write_text(scenario_dir / "scenario_summary.md", build_scenario_summary(manifest, scenario_state, final_status))
write_text(scenario_dir / "final_status.md", build_scenario_final_status(manifest, scenario_state, final_status))
if scenario_state.get("session_id"):
write_text(scenario_dir / "session_id.txt", f"{scenario_state['session_id']}\n")
print(f"[domain-case-loop] saved scenario artifacts to {scenario_dir}")
print(f"[domain-case-loop] final_status={final_status}")
return scenario_state, final_status
def handle_run_case(args: argparse.Namespace) -> int:
ensure_backend_health(args.backend_url, args.timeout_seconds)
case_id = slugify_case_id(args.domain, args.case_id)
case_dir = Path(args.output_root).resolve() / case_id
case_dir.mkdir(parents=True, exist_ok=True)
ensure_case_brief(
case_dir,
domain=args.domain,
question=args.question,
expected_capability=args.expected_capability,
expected_result_mode=args.expected_result_mode,
)
payload: dict[str, Any] = {
"normalizeConfig": build_normalize_config(args),
"eval_target": "assistant_stage1",
"questions": [args.question],
"useMock": bool(args.use_mock),
"mode": "standard",
}
if args.analysis_date:
payload["analysis_date"] = args.analysis_date
start_response = http_json(f"{args.backend_url}/api/eval/run-async/start", method="POST", payload=payload)
job = start_response.get("job")
if not isinstance(job, dict):
raise RuntimeError("Async start response does not contain `job` object")
job_id = str(job.get("job_id") or "")
if not job_id:
raise RuntimeError("Async start response does not contain job_id")
final_job = wait_for_job(args.backend_url, job_id, args.timeout_seconds, args.poll_interval_seconds)
if str(final_job.get("status") or "") != "completed":
raise RuntimeError(f"Async job did not complete successfully: {final_job.get('status')}")
run_id = str(final_job.get("run_id") or "")
report_case_id = "AUTO-001"
session_id = f"{run_id}-{report_case_id}"
session_file = Path(args.sessions_dir).resolve() / f"{session_id}.json"
wait_for_file(session_file)
session_record = read_json_file(session_file)
conversation = extract_conversation_from_session(session_record)
export_markdown = build_conversation_export(session_id, conversation, mode="technical")
report_case = None
report_file = Path(args.reports_dir).resolve() / f"{run_id}.json"
if report_file.exists():
report_record = read_json_file(report_file)
report_case = extract_report_case(report_record, report_case_id)
turn_artifact = build_turn_artifact(
slot=args.slot,
domain=args.domain,
case_id=case_id,
question=args.question,
session_id=session_id,
conversation=conversation,
session_record=session_record,
job_record=final_job,
report_case=report_case,
export_file_name=f"{args.slot}_output.md",
)
save_capture_bundle(
case_dir=case_dir,
slot=args.slot,
export_markdown=export_markdown,
debug_payload=turn_artifact.get("technical_debug_payload"),
turn_artifact=turn_artifact,
session_record=session_record,
job_record=final_job,
report_case=report_case,
)
print(f"[domain-case-loop] saved {args.slot} artifacts to {case_dir}")
print(f"[domain-case-loop] session_id={session_id}")
return 0
def handle_import_export(args: argparse.Namespace) -> int:
export_text = Path(args.input).read_text(encoding="utf-8-sig")
session_id, conversation = parse_export_markdown(export_text)
case_id = slugify_case_id(args.domain, args.case_id)
case_dir = Path(args.output_root).resolve() / case_id
case_dir.mkdir(parents=True, exist_ok=True)
last_assistant = find_last_assistant(conversation)
last_user = find_last_user_before(conversation, last_assistant.get("message_id"))
question = args.question or (last_user.get("text") if isinstance(last_user, dict) else None)
ensure_case_brief(
case_dir,
domain=args.domain,
question=question,
expected_capability=args.expected_capability,
expected_result_mode=args.expected_result_mode,
)
turn_artifact = build_turn_artifact(
slot=args.slot,
domain=args.domain,
case_id=case_id,
question=question,
session_id=session_id,
conversation=conversation,
session_record=None,
job_record=None,
report_case=None,
export_file_name=f"{args.slot}_output.md",
)
save_capture_bundle(
case_dir=case_dir,
slot=args.slot,
export_markdown=export_text,
debug_payload=last_assistant.get("debug"),
turn_artifact=turn_artifact,
session_record=None,
job_record=None,
report_case=None,
)
print(f"[domain-case-loop] imported {args.slot} artifacts to {case_dir}")
return 0
def handle_run_scenario(args: argparse.Namespace) -> int:
ensure_backend_health(args.backend_url, args.timeout_seconds)
manifest_path = Path(args.manifest).resolve()
manifest = load_scenario_manifest(manifest_path)
if args.scenario_id:
manifest["scenario_id"] = args.scenario_id.strip()
if args.analysis_date:
manifest["analysis_context"] = merge_analysis_context(
manifest.get("analysis_context"),
{"as_of_date": args.analysis_date, "source": "cli_override"},
)
scenario_dir = Path(args.output_root).resolve() / manifest["scenario_id"]
execute_scenario_manifest(
args=args,
manifest=manifest,
scenario_dir=scenario_dir,
manifest_source_label=str(manifest_path),
)
return 0
def build_pack_summary(pack: dict[str, Any], scenario_results: list[dict[str, Any]], final_status: str) -> str:
lines = [
"# Pack summary",
"",
f"- pack_id: `{pack['pack_id']}`",
f"- domain: `{pack['domain']}`",
f"- title: {pack['title']}",
f"- final_status: `{final_status}`",
"",
"## Scenarios",
]
for index, item in enumerate(scenario_results, start=1):
lines.extend(
[
f"{index}. `{item['scenario_id']}` - {item['title']}",
f"status: `{item['final_status']}`",
f"session_id: `{item.get('session_id') or 'n/a'}`",
f"artifact_dir: `{item['artifact_dir']}`",
"",
]
)
return "\n".join(lines).strip() + "\n"
def build_pack_final_status(pack: dict[str, Any], scenario_results: list[dict[str, Any]], final_status: str) -> str:
expected_scenarios = len(pack.get("scenarios") or [])
executed_scenarios = len(scenario_results)
has_missing_scenarios = executed_scenarios < expected_scenarios
reason = {
"accepted": "all declared scenarios in the pack executed without blocked or missing-capability states",
"partial": "the pack executed, but at least one scenario still needs capability enablement or route hardening",
"needs_exact_capability": "the pack is valid, but at least one scenario still requires exact capability or route enablement",
"blocked": "the pack could not be completed because at least one scenario failed at runtime",
}.get(final_status, "pack status unknown")
if final_status == "accepted" and has_missing_scenarios:
reason = (
f"only {executed_scenarios}/{expected_scenarios} declared scenarios were executed; "
"missing scenarios keep the pack from being confirmed as accepted"
)
return textwrap.dedent(
f"""\
# Final status
- status: `{final_status}`
- pack_id: `{pack['pack_id']}`
- domain: `{pack['domain']}`
- reason: {reason}
"""
)
def derive_coverage_status(statuses: list[str]) -> str:
normalized = [str(status or "").strip() for status in statuses if str(status or "").strip()]
if not normalized:
return "unmapped"
if all(status == "accepted" for status in normalized):
return "green"
if any(status == "blocked" for status in normalized):
return "blocked"
if any(status == "needs_exact_capability" for status in normalized):
return "needs_exact_capability"
return "partial"
def build_scenario_acceptance_matrix(pack: dict[str, Any], scenario_results: list[dict[str, Any]]) -> str:
scenario_status_map = {
str(item.get("scenario_id") or ""): str(item.get("final_status") or "unknown")
for item in scenario_results
if isinstance(item, dict)
}
scenarios = pack.get("scenarios") if isinstance(pack.get("scenarios"), list) else []
question_pool = pack.get("question_pool") if isinstance(pack.get("question_pool"), dict) else {}
raw_questions = question_pool.get("questions") if isinstance(question_pool.get("questions"), list) else []
question_index: dict[str, dict[str, Any]] = {}
for raw_question in raw_questions:
if not isinstance(raw_question, dict):
continue
question_id = str(raw_question.get("question_id") or "").strip()
if question_id:
question_index[question_id] = raw_question
scenario_questions_map: dict[str, list[str]] = {}
scenario_nodes_map: dict[str, list[str]] = {}
for scenario in scenarios:
if not isinstance(scenario, dict):
continue
scenario_id = str(scenario.get("scenario_id") or "").strip()
if not scenario_id:
continue
question_ids = normalize_string_list(scenario.get("question_ids"))
if not question_ids:
question_ids = [
str(step.get("question_id") or "").strip()
for step in scenario.get("steps", [])
if isinstance(step, dict) and str(step.get("question_id") or "").strip()
]
node_ids = normalize_string_list(scenario.get("node_ids"))
if not node_ids:
node_ids = [
str(step.get("node_id") or "").strip()
for step in scenario.get("steps", [])
if isinstance(step, dict) and str(step.get("node_id") or "").strip()
]
if not node_ids:
for question_id in question_ids:
question_meta = question_index.get(question_id) or {}
node_id = str(question_meta.get("node_id") or "").strip()
if node_id:
node_ids.append(node_id)
scenario_questions_map[scenario_id] = question_ids
scenario_nodes_map[scenario_id] = list(dict.fromkeys(node_ids))
scenario_tree = pack.get("scenario_tree") if isinstance(pack.get("scenario_tree"), dict) else {}
source_contract = pack.get("source_contract") if isinstance(pack.get("source_contract"), dict) else {}
lines = [
"# Scenario acceptance matrix",
"",
f"- pack_id: `{pack.get('pack_id') or 'n/a'}`",
f"- domain: `{pack.get('domain') or 'n/a'}`",
f"- source_contract_id: `{source_contract.get('domain_id') or pack.get('source_contract_id') or 'n/a'}`",
f"- source_contract_title: {source_contract.get('title') or pack.get('title') or 'n/a'}",
"",
"## Scenario coverage",
"",
"| scenario_id | status | question_ids | node_ids |",
"| --- | --- | --- | --- |",
]
for scenario in scenarios:
if not isinstance(scenario, dict):
continue
scenario_id = str(scenario.get("scenario_id") or "").strip()
if not scenario_id:
continue
lines.append(
"| "
+ " | ".join(
[
scenario_id,
scenario_status_map.get(scenario_id, "not_run"),
", ".join(scenario_questions_map.get(scenario_id) or []) or "-",
", ".join(scenario_nodes_map.get(scenario_id) or []) or "-",
]
)
+ " |"
)
def append_node_section(title: str, section_key: str) -> None:
raw_nodes = scenario_tree.get(section_key)
nodes = raw_nodes if isinstance(raw_nodes, list) else []
if not nodes:
return
lines.extend(
[
"",
f"## {title}",
"",
"| node_id | status | backed_by_scenarios | question_ids | required_wording_families |",
"| --- | --- | --- | --- | --- |",
]
)
for node in nodes:
if not isinstance(node, dict):
continue
node_id = str(node.get("node_id") or "").strip()
if not node_id:
continue
backed_by = sorted(
scenario_id for scenario_id, node_ids in scenario_nodes_map.items() if node_id in node_ids
)
statuses = [scenario_status_map.get(scenario_id, "not_run") for scenario_id in backed_by]
lines.append(
"| "
+ " | ".join(
[
node_id,
derive_coverage_status(statuses),
", ".join(backed_by) or "-",
", ".join(normalize_string_list(node.get("covers_question_ids"))) or "-",
", ".join(normalize_string_list(node.get("required_wording_families"))) or "-",
]
)
+ " |"
)
append_node_section("Root nodes", "root_nodes")
append_node_section("Critical nodes", "critical_nodes")
raw_edges = scenario_tree.get("critical_edges")
edges = raw_edges if isinstance(raw_edges, list) else []
if edges:
lines.extend(
[
"",
"## Critical edges",
"",
"| edge_id | status | from_node | to_node | backed_by_scenarios | primary_user_path |",
"| --- | --- | --- | --- | --- | --- |",
]
)
for edge in edges:
if not isinstance(edge, dict):
continue
edge_id = str(edge.get("edge_id") or "").strip()
from_node = str(edge.get("from_node") or "").strip()
to_node = str(edge.get("to_node") or "").strip()
if not edge_id:
continue
backed_by = sorted(
scenario_id
for scenario_id, node_ids in scenario_nodes_map.items()
if from_node in node_ids and to_node in node_ids
)
statuses = [scenario_status_map.get(scenario_id, "not_run") for scenario_id in backed_by]
lines.append(
"| "
+ " | ".join(
[
edge_id,
derive_coverage_status(statuses),
from_node or "-",
to_node or "-",
", ".join(backed_by) or "-",
"yes" if bool(edge.get("primary_user_path")) else "no",
]
)
+ " |"
)
raw_paths = scenario_tree.get("primary_user_paths")
paths = raw_paths if isinstance(raw_paths, list) else []
if paths:
lines.extend(
[
"",
"## Primary user paths",
"",
"| path_id | status | nodes | backed_by_scenarios |",
"| --- | --- | --- | --- |",
]
)
for path in paths:
if not isinstance(path, dict):
continue
path_id = str(path.get("path_id") or "").strip()
node_ids = normalize_string_list(path.get("nodes"))
backed_by = sorted(
scenario_id
for scenario_id, scenario_node_ids in scenario_nodes_map.items()
if node_ids and all(node_id in scenario_node_ids for node_id in node_ids)
)
statuses = [scenario_status_map.get(scenario_id, "not_run") for scenario_id in backed_by]
lines.append(
"| "
+ " | ".join(
[
path_id or "-",
derive_coverage_status(statuses),
" -> ".join(node_ids) or "-",
", ".join(backed_by) or "-",
]
)
+ " |"
)
return "\n".join(lines).strip() + "\n"
def run_subprocess_command(
command: list[str],
*,
cwd: Path,
timeout_seconds: int,
input_text: str | None = None,
stdout_path: Path | None = None,
stderr_path: Path | None = None,
) -> subprocess.CompletedProcess[str]:
result = subprocess.run(
command,
cwd=str(cwd),
input=input_text,
text=True,
encoding="utf-8",
errors="replace",
capture_output=True,
timeout=timeout_seconds,
check=False,
)
if stdout_path is not None:
write_text(stdout_path, result.stdout)
if stderr_path is not None:
write_text(stderr_path, result.stderr)
if result.returncode != 0:
raise RuntimeError(
f"Command failed with exit code {result.returncode}: {' '.join(command)}\n{result.stderr.strip()}"
)
return result
def build_run_pack_command(
args: argparse.Namespace,
*,
manifest_path: Path,
pack_id: str,
output_root: Path,
) -> list[str]:
command = [
sys.executable,
str(Path(__file__).resolve()),
"run-pack",
"--manifest",
str(manifest_path),
"--pack-id",
pack_id,
"--output-root",
str(output_root),
"--backend-url",
str(args.backend_url),
"--prompt-version",
str(args.prompt_version),
"--llm-provider",
str(args.llm_provider),
"--llm-model",
str(args.llm_model),
"--llm-base-url",
str(args.llm_base_url),
"--llm-api-key",
str(args.llm_api_key),
"--temperature",
str(args.temperature),
"--max-output-tokens",
str(args.max_output_tokens),
"--timeout-seconds",
str(args.timeout_seconds),
]
if getattr(args, "analysis_date", None):
command.extend(["--analysis-date", str(args.analysis_date)])
max_scenarios = getattr(args, "max_scenarios", None)
if max_scenarios is not None:
command.extend(["--max-scenarios", str(int(max_scenarios))])
if bool(getattr(args, "use_mock", False)):
command.append("--use-mock")
return command
def build_codex_exec_command(
args: argparse.Namespace,
*,
output_file: Path,
schema_file: Path,
sandbox_mode: str,
model_override: str | None = None,
reasoning_effort: str | None = None,
) -> list[str]:
command = [
str(args.codex_binary),
"exec",
"-C",
str(REPO_ROOT),
"-s",
sandbox_mode,
"-c",
'approval_policy="never"',
"--output-schema",
str(schema_file),
"-o",
str(output_file),
"--color",
"never",
]
if reasoning_effort:
command.extend(["-c", f'model_reasoning_effort="{reasoning_effort}"'])
if getattr(args, "codex_profile", None):
command.extend(["-p", str(args.codex_profile)])
selected_model = model_override or getattr(args, "codex_model", None)
if selected_model:
command.extend(["-m", str(selected_model)])
return command
def read_json_output(file_path: Path) -> dict[str, Any]:
payload = json.loads(read_text_file(file_path))
if not isinstance(payload, dict):
raise RuntimeError(f"Expected JSON object in {file_path}")
return payload
def compact_step_output_for_review(step_output: Any) -> dict[str, Any]:
if not isinstance(step_output, dict):
return {}
entry_titles_sample: list[str] = []
entries = step_output.get("entries")
if isinstance(entries, list):
for item in entries[:5]:
if not isinstance(item, dict):
continue
title = str(item.get("item") or item.get("title") or "").strip()
if title:
entry_titles_sample.append(title)
return {
"status": step_output.get("status"),
"question_resolved": step_output.get("question_resolved"),
"detected_intent": step_output.get("detected_intent"),
"selected_recipe": step_output.get("selected_recipe"),
"capability_id": step_output.get("capability_id"),
"result_mode": step_output.get("result_mode"),
"fallback_type": step_output.get("fallback_type"),
"mcp_call_status": step_output.get("mcp_call_status"),
"failure_type": step_output.get("failure_type"),
"error_message": step_output.get("error_message"),
"entry_titles_sample": entry_titles_sample,
}
def build_pack_review_bundle(pack_dir: Path) -> str:
pack_state = read_json_file(pack_dir / "pack_state.json") if (pack_dir / "pack_state.json").exists() else {}
scenarios_root = pack_dir / "scenarios"
scenarios_bundle: list[dict[str, Any]] = []
if scenarios_root.exists():
for scenario_dir in sorted(path for path in scenarios_root.iterdir() if path.is_dir()):
scenario_state = read_json_file(scenario_dir / "scenario_state.json") if (scenario_dir / "scenario_state.json").exists() else {}
step_outputs_raw = scenario_state.get("step_outputs")
compact_steps: dict[str, Any] = {}
if isinstance(step_outputs_raw, dict):
for step_id, step_output in step_outputs_raw.items():
compact_steps[str(step_id)] = compact_step_output_for_review(step_output)
scenarios_bundle.append(
{
"scenario_id": scenario_state.get("scenario_id") or scenario_dir.name,
"title": scenario_state.get("title"),
"session_id": scenario_state.get("session_id"),
"summary": read_text_file(scenario_dir / "scenario_summary.md") if (scenario_dir / "scenario_summary.md").exists() else "",
"step_outputs": compact_steps,
}
)
bundle = {
"pack_state": {
"pack_id": pack_state.get("pack_id"),
"domain": pack_state.get("domain"),
"title": pack_state.get("title"),
"final_status": pack_state.get("final_status"),
"scenario_results": pack_state.get("scenario_results"),
},
"pack_manifest": read_json_file(pack_dir / "pack_manifest.json") if (pack_dir / "pack_manifest.json").exists() else {},
"pack_summary": read_text_file(pack_dir / "pack_summary.md") if (pack_dir / "pack_summary.md").exists() else "",
"scenario_acceptance_matrix": (
read_text_file(pack_dir / "scenario_acceptance_matrix.md")
if (pack_dir / "scenario_acceptance_matrix.md").exists()
else ""
),
"scenarios": scenarios_bundle,
}
return dump_json(bundle)
def build_analyst_loop_prompt(
*,
loop_dir: Path,
iteration_dir: Path,
pack_dir: Path,
previous_pack_dir: Path | None,
previous_verdict_path: Path | None,
target_score: int,
review_bundle_json: str,
previous_verdict_json: str | None,
) -> str:
comparison_block = ""
if previous_pack_dir is not None:
comparison_block = textwrap.dedent(
f"""\
Compare the current run with the previous run:
- previous_pack_dir: `{previous_pack_dir}`
"""
)
if previous_verdict_path is not None and previous_verdict_path.exists():
comparison_block += f"- previous_analyst_verdict: `{previous_verdict_path}`\n"
previous_verdict_block = ""
if previous_verdict_json:
previous_verdict_block = textwrap.dedent(
f"""\
Previous analyst verdict JSON:
```json
{previous_verdict_json}
```
"""
)
return textwrap.dedent(
f"""\
You are the strict `domain_analyst` for NDC_1C.
Use the repo rules from:
- `.codex/agents/domain_analyst.toml`
- `.codex/skills/domain-case-loop/SKILL.md`
- `.codex/skills/domain-case-loop/references/verdict_template.md`
Current loop context:
- loop_dir: `{loop_dir}`
- iteration_dir: `{iteration_dir}`
- current_pack_dir: `{pack_dir}`
{comparison_block}
Required artifacts to inspect:
- `{pack_dir / 'pack_summary.md'}`
- `{pack_dir / 'pack_state.json'}`
- `{pack_dir / 'scenario_acceptance_matrix.md'}`
- all `scenario_summary.md`, `scenario_state.json`, and problematic `steps/*/step_state.json` files inside `{pack_dir / 'scenarios'}`
Goal:
- evaluate current domain-pack correctness for business meaning, route/capability quality, evidence quality, and absence of silent heuristic masking;
- determine whether the gate `quality_score >= {target_score}` is reached;
- if not, provide the smallest high-value fix targets for the coder.
Rules:
- `accepted` is allowed only if quality_score >= {target_score}, unresolved_p0_count = 0, and regression_detected = false;
- `partial` means the pack is usable but exactness, routing, or coverage is still insufficient;
- `needs_exact_capability` means the primary blocker is a missing exact route or capability, but the loop should still continue autonomously unless a user decision is required;
- `continue` means there is a clear next patch cycle;
- `blocked` means the loop is stopped by a real hard blocker such as runtime/infrastructure failure, unavailable 1C data, or another condition that the repo cannot autonomously repair;
- set `requires_user_decision = true` when the next step cannot be chosen safely without user input, for example:
- an architecture fork or risky contour expansion;
- a scope tradeoff or important business ambiguity;
- a required observation anchor is missing and cannot be recovered safely from artifacts, 1C, or the current scenario state;
- the only remaining implementation path would rely on a hack, brittle workaround, heuristic masking, or disproportionate complexity/risk;
- if `requires_user_decision = true`, fill `user_decision_type` and `user_decision_prompt`;
- if the pack is below {target_score} but there is still safe autonomous implementation work, keep `requires_user_decision = false`;
- do not request user input merely because the score is still below {target_score}; request it only when the loop would otherwise guess, overfit, or risk architecture drift.
Use this UTF-8 evidence bundle as the source of truth for artifact contents. Do not treat shell rendering artifacts as file corruption if the embedded bundle is readable.
Current evidence bundle:
```json
{review_bundle_json}
```
{previous_verdict_block}
Return JSON only and follow the schema exactly.
"""
).strip()
def build_coder_loop_prompt(
*,
loop_dir: Path,
iteration_dir: Path,
pack_dir: Path,
analyst_verdict_path: Path,
analyst_verdict_json: str,
) -> str:
return textwrap.dedent(
f"""\
You are the `domain_coder` for NDC_1C.
Use the repo rules from:
- `.codex/agents/domain_coder.toml`
- `.codex/skills/domain-case-loop/SKILL.md`
Current loop context:
- loop_dir: `{loop_dir}`
- iteration_dir: `{iteration_dir}`
- current_pack_dir: `{pack_dir}`
- analyst_verdict_json: `{analyst_verdict_path}`
Make the smallest domain-only patch in the working tree that improves the failing or partial scenarios named in the analyst verdict.
Hard rules:
- do not change the architecture;
- do not fabricate data;
- do not present heuristic answers as confirmed;
- do not touch unrelated files;
- preserve already successful baseline flows.
Required outputs:
- create `{iteration_dir / 'coder_plan.md'}` with a short plan;
- create `{iteration_dir / 'patch_summary.md'}` with a short summary of the patch;
Analyst verdict JSON:
```json
{analyst_verdict_json}
```
- then return JSON only and follow the schema exactly.
"""
).strip()
def evaluate_analyst_gate(
verdict: dict[str, Any], target_score: int
) -> tuple[bool, str, bool, str, str | None]:
quality_score = int(verdict.get("quality_score") or 0)
unresolved_p0_count = int(verdict.get("unresolved_p0_count") or 0)
regression_detected = bool(verdict.get("regression_detected"))
loop_decision = str(verdict.get("loop_decision") or "").strip() or "continue"
requires_user_decision = bool(verdict.get("requires_user_decision"))
user_decision_type = str(verdict.get("user_decision_type") or "").strip() or "none"
user_decision_prompt_raw = verdict.get("user_decision_prompt")
user_decision_prompt = str(user_decision_prompt_raw).strip() if user_decision_prompt_raw else None
accepted = quality_score >= target_score and unresolved_p0_count == 0 and not regression_detected and loop_decision == "accepted"
return accepted, loop_decision, requires_user_decision, user_decision_type, user_decision_prompt
def handle_run_pack(args: argparse.Namespace) -> int:
ensure_backend_health(args.backend_url, args.timeout_seconds)
pack_path = Path(args.manifest).resolve()
pack = load_scenario_pack(pack_path)
if args.pack_id:
pack["pack_id"] = args.pack_id.strip()
if args.analysis_date:
pack["analysis_context"] = merge_analysis_context(
pack.get("analysis_context"),
{"as_of_date": args.analysis_date, "source": "cli_override"},
)
pack_dir = Path(args.output_root).resolve() / pack["pack_id"]
scenarios_dir = pack_dir / "scenarios"
scenarios_dir.mkdir(parents=True, exist_ok=True)
write_json(pack_dir / "pack_manifest.json", pack)
write_text(pack_dir / "manifest_source.txt", f"{pack_path}\n")
scenario_results: list[dict[str, Any]] = []
max_scenarios = max(0, int(args.max_scenarios)) if args.max_scenarios is not None else None
scenarios_to_run = pack["scenarios"][:max_scenarios] if max_scenarios else pack["scenarios"]
for scenario in scenarios_to_run:
scenario_manifest = normalize_scenario_manifest(
scenario,
fallback_domain=pack["domain"],
fallback_analysis_context=pack.get("analysis_context"),
fallback_bindings=pack.get("bindings"),
default_scenario_id=scenario["scenario_id"],
)
scenario_dir = scenarios_dir / scenario_manifest["scenario_id"]
scenario_state, scenario_final_status = execute_scenario_manifest(
args=args,
manifest=scenario_manifest,
scenario_dir=scenario_dir,
manifest_source_label=f"{pack_path}#{scenario_manifest['scenario_id']}",
)
scenario_results.append(
{
"scenario_id": scenario_manifest["scenario_id"],
"title": scenario_manifest["title"],
"final_status": scenario_final_status,
"session_id": scenario_state.get("session_id"),
"artifact_dir": str(scenario_dir),
}
)
aggregate_statuses = [item["final_status"] for item in scenario_results]
if not aggregate_statuses:
final_status = "blocked"
elif any(status == "blocked" for status in aggregate_statuses):
final_status = "blocked"
elif any(status == "needs_exact_capability" for status in aggregate_statuses):
final_status = "needs_exact_capability"
elif any(status == "partial" for status in aggregate_statuses):
final_status = "partial"
else:
final_status = "accepted" if len(scenario_results) == len(pack.get("scenarios") or []) else "partial"
pack_state = {
"schema_version": SCENARIO_PACK_SCHEMA_VERSION,
"pack_id": pack["pack_id"],
"domain": pack["domain"],
"title": pack["title"],
"analysis_context": pack.get("analysis_context") or {},
"bindings": pack.get("bindings") or {},
"scenario_results": scenario_results,
"final_status": final_status,
"updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
}
write_text(pack_dir / "scenario_acceptance_matrix.md", build_scenario_acceptance_matrix(pack, scenario_results))
write_json(pack_dir / "pack_state.json", pack_state)
write_text(pack_dir / "pack_summary.md", build_pack_summary(pack, scenario_results, final_status))
write_text(pack_dir / "final_status.md", build_pack_final_status(pack, scenario_results, final_status))
print(f"[domain-case-loop] saved pack artifacts to {pack_dir}")
print(f"[domain-case-loop] final_status={final_status}")
return 0
def build_loop_summary(loop_state: dict[str, Any]) -> str:
lines = [
"# Loop summary",
"",
f"- loop_id: `{loop_state['loop_id']}`",
f"- target_score: `{loop_state['target_score']}`",
f"- max_iterations: `{loop_state['max_iterations']}`",
f"- final_status: `{loop_state['final_status']}`",
f"- last_analyst_decision: `{loop_state.get('last_analyst_decision') or 'n/a'}`",
"",
"## Iterations",
]
for item in loop_state.get("iterations", []):
lines.extend(
[
f"- `{item['iteration_id']}`",
f" baseline_pack_dir: `{item['pack_dir']}`",
f" analyst_score: `{item.get('quality_score')}`",
f" analyst_decision: `{item.get('loop_decision')}`",
f" accepted_gate: `{item.get('accepted_gate')}`",
f" requires_user_decision: `{item.get('requires_user_decision')}`",
f" user_decision_type: `{item.get('user_decision_type') or 'none'}`",
f" coder_status: `{item.get('coder_status') or 'n/a'}`",
f" analyst_verdict: `{item.get('analyst_verdict_path') or 'n/a'}`",
]
)
return "\n".join(lines).strip() + "\n"
def build_loop_final_status(loop_state: dict[str, Any]) -> str:
return textwrap.dedent(
f"""\
# Final status
- status: `{loop_state['final_status']}`
- loop_id: `{loop_state['loop_id']}`
- target_score: `{loop_state['target_score']}`
- iterations_ran: `{len(loop_state.get('iterations', []))}`
- last_analyst_decision: `{loop_state.get('last_analyst_decision') or 'n/a'}`
- stop_reason: {loop_state.get('stop_reason') or 'n/a'}
"""
)
def handle_run_pack_loop(args: argparse.Namespace) -> int:
manifest_path = Path(args.manifest).resolve()
loop_id = str(args.loop_id or slugify_case_id("domain_pack_loop", None)).strip()
loop_dir = Path(args.output_root).resolve() / loop_id
iterations_dir = loop_dir / "iterations"
iterations_dir.mkdir(parents=True, exist_ok=True)
write_text(loop_dir / "manifest_source.txt", f"{manifest_path}\n")
target_score = int(args.target_score)
max_iterations = int(args.max_iterations)
if max_iterations < 1:
raise RuntimeError("--max-iterations must be >= 1")
loop_state: dict[str, Any] = {
"schema_version": AUTONOMOUS_LOOP_SCHEMA_VERSION,
"loop_id": loop_id,
"manifest_path": str(manifest_path),
"target_score": target_score,
"max_iterations": max_iterations,
"iterations": [],
"final_status": "partial",
"stop_reason": None,
"last_analyst_decision": None,
"last_user_decision_type": "none",
"last_user_decision_prompt": None,
"updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
}
write_json(loop_dir / "loop_state.json", loop_state)
previous_pack_dir: Path | None = None
previous_verdict_path: Path | None = None
for iteration_index in range(max_iterations):
iteration_id = f"iteration_{iteration_index:02d}"
iteration_dir = iterations_dir / iteration_id
iteration_dir.mkdir(parents=True, exist_ok=True)
pack_output_root = iteration_dir / "pack_output"
pack_id = "pack_run"
pack_command = build_run_pack_command(
args,
manifest_path=manifest_path,
pack_id=pack_id,
output_root=pack_output_root,
)
run_subprocess_command(
pack_command,
cwd=REPO_ROOT,
timeout_seconds=max(600, int(args.timeout_seconds) * 20),
stdout_path=iteration_dir / "pack_run.stdout.log",
stderr_path=iteration_dir / "pack_run.stderr.log",
)
pack_dir = pack_output_root / pack_id
analyst_verdict_path = iteration_dir / "analyst_verdict.json"
review_bundle_json = build_pack_review_bundle(pack_dir)
previous_verdict_json = read_text_file(previous_verdict_path) if previous_verdict_path is not None and previous_verdict_path.exists() else None
analyst_prompt = build_analyst_loop_prompt(
loop_dir=loop_dir,
iteration_dir=iteration_dir,
pack_dir=pack_dir,
previous_pack_dir=previous_pack_dir,
previous_verdict_path=previous_verdict_path,
target_score=target_score,
review_bundle_json=review_bundle_json,
previous_verdict_json=previous_verdict_json,
)
write_text(iteration_dir / "analyst_prompt.md", analyst_prompt + "\n")
analyst_command = build_codex_exec_command(
args,
output_file=analyst_verdict_path,
schema_file=Path(args.analyst_schema).resolve(),
sandbox_mode="read-only",
model_override=getattr(args, "analyst_codex_model", None),
reasoning_effort=getattr(args, "analyst_reasoning_effort", None),
)
run_subprocess_command(
analyst_command,
cwd=REPO_ROOT,
timeout_seconds=int(args.codex_timeout_seconds),
input_text=analyst_prompt,
stdout_path=iteration_dir / "analyst_exec.stdout.log",
stderr_path=iteration_dir / "analyst_exec.stderr.log",
)
analyst_verdict = read_json_output(analyst_verdict_path)
accepted_gate, loop_decision, requires_user_decision, user_decision_type, user_decision_prompt = evaluate_analyst_gate(
analyst_verdict, target_score
)
loop_state["last_analyst_decision"] = loop_decision
loop_state["last_user_decision_type"] = user_decision_type
loop_state["last_user_decision_prompt"] = user_decision_prompt
iteration_record: dict[str, Any] = {
"iteration_id": iteration_id,
"pack_dir": str(pack_dir),
"quality_score": int(analyst_verdict.get("quality_score") or 0),
"loop_decision": loop_decision,
"accepted_gate": accepted_gate,
"requires_user_decision": requires_user_decision,
"user_decision_type": user_decision_type,
"user_decision_prompt": user_decision_prompt,
"analyst_verdict_path": str(analyst_verdict_path),
"coder_status": None,
}
if accepted_gate:
loop_state["iterations"].append(iteration_record)
loop_state["final_status"] = "accepted"
loop_state["stop_reason"] = f"analyst accepted at {iteration_id}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
if requires_user_decision:
loop_state["iterations"].append(iteration_record)
if loop_decision in {"needs_exact_capability", "partial", "blocked"}:
loop_state["final_status"] = loop_decision
else:
loop_state["final_status"] = "partial"
prompt_suffix = f" | prompt: {user_decision_prompt}" if user_decision_prompt else ""
loop_state["stop_reason"] = f"user_decision_required at {iteration_id}: {user_decision_type}{prompt_suffix}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
if loop_decision == "blocked":
loop_state["iterations"].append(iteration_record)
loop_state["final_status"] = "blocked"
loop_state["stop_reason"] = f"analyst blocked at {iteration_id}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
coder_result_path = iteration_dir / "coder_result.json"
coder_prompt = build_coder_loop_prompt(
loop_dir=loop_dir,
iteration_dir=iteration_dir,
pack_dir=pack_dir,
analyst_verdict_path=analyst_verdict_path,
analyst_verdict_json=dump_json(analyst_verdict),
)
write_text(iteration_dir / "coder_prompt.md", coder_prompt + "\n")
coder_command = build_codex_exec_command(
args,
output_file=coder_result_path,
schema_file=Path(args.coder_schema).resolve(),
sandbox_mode="workspace-write",
model_override=getattr(args, "coder_codex_model", None),
reasoning_effort=getattr(args, "coder_reasoning_effort", None),
)
run_subprocess_command(
coder_command,
cwd=REPO_ROOT,
timeout_seconds=int(args.codex_timeout_seconds),
input_text=coder_prompt,
stdout_path=iteration_dir / "coder_exec.stdout.log",
stderr_path=iteration_dir / "coder_exec.stderr.log",
)
coder_result = read_json_output(coder_result_path)
coder_status = str(coder_result.get("status") or "").strip() or "unknown"
iteration_record["coder_status"] = coder_status
iteration_record["coder_result_path"] = str(coder_result_path)
loop_state["iterations"].append(iteration_record)
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
if coder_status == "blocked":
loop_state["final_status"] = "blocked"
loop_state["stop_reason"] = f"coder stopped progress at {iteration_id}: {coder_status}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
previous_pack_dir = pack_dir
previous_verdict_path = analyst_verdict_path
else:
if loop_state.get("last_analyst_decision") == "needs_exact_capability":
loop_state["final_status"] = "needs_exact_capability"
else:
loop_state["final_status"] = "partial"
loop_state["stop_reason"] = f"max_iterations_reached ({max_iterations})"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
write_text(loop_dir / "loop_summary.md", build_loop_summary(loop_state))
write_text(loop_dir / "final_status.md", build_loop_final_status(loop_state))
print(f"[domain-case-loop] saved loop artifacts to {loop_dir}")
print(f"[domain-case-loop] final_status={loop_state['final_status']}")
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Repo-native helper for NDC_1C domain-case and scenario orchestration")
subparsers = parser.add_subparsers(dest="command", required=True)
run_case = subparsers.add_parser("run-case", help="Run one assistant_stage1 case through the existing backend and save artifacts")
run_case.add_argument("--domain", required=True)
run_case.add_argument("--question", required=True)
run_case.add_argument("--case-id")
run_case.add_argument("--slot", default="baseline", choices=["baseline", "rerun"])
run_case.add_argument("--analysis-date")
run_case.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
run_case.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
run_case.add_argument("--sessions-dir", default=str(DEFAULT_SESSIONS_DIR))
run_case.add_argument("--reports-dir", default=str(DEFAULT_REPORTS_DIR))
run_case.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION)
run_case.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"])
run_case.add_argument("--llm-model", default=DEFAULT_LLM_MODEL)
run_case.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL)
run_case.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY)
run_case.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE)
run_case.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS)
run_case.add_argument("--timeout-seconds", type=int, default=300)
run_case.add_argument("--poll-interval-seconds", type=float, default=1.5)
run_case.add_argument("--expected-capability")
run_case.add_argument("--expected-result-mode")
run_case.add_argument("--use-mock", action="store_true")
run_case.set_defaults(func=handle_run_case)
import_export = subparsers.add_parser("import-export", help="Import an existing technical export markdown and build artifacts")
import_export.add_argument("--domain", required=True)
import_export.add_argument("--input", required=True)
import_export.add_argument("--question")
import_export.add_argument("--case-id")
import_export.add_argument("--slot", default="baseline", choices=["baseline", "rerun"])
import_export.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
import_export.add_argument("--expected-capability")
import_export.add_argument("--expected-result-mode")
import_export.set_defaults(func=handle_import_export)
run_scenario = subparsers.add_parser(
"run-scenario",
help="Run one multi-step domain scenario in a shared assistant session and save per-step artifacts",
)
run_scenario.add_argument("--manifest", required=True)
run_scenario.add_argument("--scenario-id")
run_scenario.add_argument("--analysis-date")
run_scenario.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
run_scenario.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
run_scenario.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION)
run_scenario.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"])
run_scenario.add_argument("--llm-model", default=DEFAULT_LLM_MODEL)
run_scenario.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL)
run_scenario.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY)
run_scenario.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE)
run_scenario.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS)
run_scenario.add_argument("--timeout-seconds", type=int, default=180)
run_scenario.add_argument("--use-mock", action="store_true")
run_scenario.set_defaults(func=handle_run_scenario)
run_pack = subparsers.add_parser(
"run-pack",
help="Run a multi-scenario domain pack and save aggregate orchestration artifacts",
)
run_pack.add_argument("--manifest", required=True)
run_pack.add_argument("--pack-id")
run_pack.add_argument("--analysis-date")
run_pack.add_argument("--max-scenarios", type=int)
run_pack.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
run_pack.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
run_pack.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION)
run_pack.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"])
run_pack.add_argument("--llm-model", default=DEFAULT_LLM_MODEL)
run_pack.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL)
run_pack.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY)
run_pack.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE)
run_pack.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS)
run_pack.add_argument("--timeout-seconds", type=int, default=180)
run_pack.add_argument("--use-mock", action="store_true")
run_pack.set_defaults(func=handle_run_pack)
run_pack_loop = subparsers.add_parser(
"run-pack-loop",
help="Run autonomous analyst -> coder -> rerun iterations for a domain pack until the acceptance gate is reached",
)
run_pack_loop.add_argument("--manifest", required=True)
run_pack_loop.add_argument("--loop-id")
run_pack_loop.add_argument("--analysis-date")
run_pack_loop.add_argument("--max-scenarios", type=int)
run_pack_loop.add_argument("--target-score", type=int, default=80)
run_pack_loop.add_argument("--max-iterations", type=int, default=8)
run_pack_loop.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
run_pack_loop.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
run_pack_loop.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION)
run_pack_loop.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"])
run_pack_loop.add_argument("--llm-model", default=DEFAULT_LLM_MODEL)
run_pack_loop.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL)
run_pack_loop.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY)
run_pack_loop.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE)
run_pack_loop.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS)
run_pack_loop.add_argument("--timeout-seconds", type=int, default=180)
run_pack_loop.add_argument("--use-mock", action="store_true")
run_pack_loop.add_argument("--codex-binary", default="codex")
run_pack_loop.add_argument("--codex-profile")
run_pack_loop.add_argument("--codex-model")
run_pack_loop.add_argument("--analyst-codex-model", default="gpt-5.4")
run_pack_loop.add_argument("--coder-codex-model", default="gpt-5.4-mini")
run_pack_loop.add_argument("--analyst-reasoning-effort", default="medium")
run_pack_loop.add_argument("--coder-reasoning-effort", default="low")
run_pack_loop.add_argument("--codex-timeout-seconds", type=int, default=1800)
run_pack_loop.add_argument(
"--analyst-schema",
default=str(DEFAULT_LOOP_SCHEMA_DIR / "domain_loop_analyst_verdict.schema.json"),
)
run_pack_loop.add_argument(
"--coder-schema",
default=str(DEFAULT_LOOP_SCHEMA_DIR / "domain_loop_coder_result.schema.json"),
)
run_pack_loop.set_defaults(func=handle_run_pack_loop)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
try:
return int(args.func(args))
except Exception as error: # noqa: BLE001
print(f"[domain-case-loop] error: {error}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())