NODEDC_1C/scripts/domain_case_loop.py

3874 lines
169 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
import textwrap
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
REPO_ROOT = Path(__file__).resolve().parent.parent
DEFAULT_ARTIFACTS_ROOT = REPO_ROOT / "artifacts" / "domain_runs"
DEFAULT_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "assistant_sessions"
DEFAULT_REPORTS_DIR = REPO_ROOT / "llm_normalizer" / "reports"
DEFAULT_LOOP_SCHEMA_DIR = REPO_ROOT / "docs" / "orchestration" / "schemas"
SHARED_LLM_CONNECTION_CONFIG = REPO_ROOT / "llm_normalizer" / "data" / "shared_llm_connection.json"
DEFAULT_BACKEND_URL = "http://127.0.0.1:8787"
DEFAULT_PROMPT_VERSION = "address_query_runtime_v1"
BASE_DEFAULT_LLM_PROVIDER = "local"
BASE_DEFAULT_LLM_MODEL = "qwen2.5-14b-instruct-1m"
BASE_DEFAULT_LLM_BASE_URL = "http://127.0.0.1:1234/v1"
DEFAULT_LLM_API_KEY = ""
BASE_DEFAULT_TEMPERATURE = 0.0
BASE_DEFAULT_MAX_OUTPUT_TOKENS = 900
TECH_SECTION_HEADER = "### technical_debug_payload_json"
SCENARIO_MANIFEST_SCHEMA_VERSION = "domain_scenario_manifest_v1"
SCENARIO_STATE_SCHEMA_VERSION = "domain_scenario_state_v1"
SCENARIO_STEP_STATE_SCHEMA_VERSION = "domain_scenario_step_state_v1"
SCENARIO_PACK_SCHEMA_VERSION = "domain_scenario_pack_v1"
ACTIVE_DOMAIN_CONTRACT_SCHEMA_VERSION = "active_domain_contract_v1"
AUTONOMOUS_LOOP_SCHEMA_VERSION = "domain_autonomous_loop_v1"
def load_shared_local_llm_defaults(config_path: Path | None = None) -> dict[str, Any]:
defaults: dict[str, Any] = {
"llm_provider": BASE_DEFAULT_LLM_PROVIDER,
"llm_model": BASE_DEFAULT_LLM_MODEL,
"llm_base_url": BASE_DEFAULT_LLM_BASE_URL,
"temperature": BASE_DEFAULT_TEMPERATURE,
"max_output_tokens": BASE_DEFAULT_MAX_OUTPUT_TOKENS,
}
target = config_path or SHARED_LLM_CONNECTION_CONFIG
if not target.exists():
return defaults
try:
raw = json.loads(target.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return defaults
connection = raw.get("connection")
if not isinstance(connection, dict):
return defaults
if str(connection.get("llmProvider") or "").strip().lower() != "local":
return defaults
model = str(connection.get("model") or "").strip()
base_url = str(connection.get("baseUrl") or "").strip()
temperature = connection.get("temperature")
max_output_tokens = connection.get("maxOutputTokens")
if model:
defaults["llm_model"] = model
if base_url:
defaults["llm_base_url"] = base_url
if isinstance(temperature, (int, float)) and not isinstance(temperature, bool):
defaults["temperature"] = float(temperature)
if isinstance(max_output_tokens, (int, float)) and not isinstance(max_output_tokens, bool):
defaults["max_output_tokens"] = max(1, int(max_output_tokens))
return defaults
SHARED_LLM_DEFAULTS = load_shared_local_llm_defaults()
DEFAULT_LLM_PROVIDER = str(SHARED_LLM_DEFAULTS["llm_provider"])
DEFAULT_LLM_MODEL = str(SHARED_LLM_DEFAULTS["llm_model"])
DEFAULT_LLM_BASE_URL = str(SHARED_LLM_DEFAULTS["llm_base_url"])
DEFAULT_TEMPERATURE = float(SHARED_LLM_DEFAULTS["temperature"])
DEFAULT_MAX_OUTPUT_TOKENS = int(SHARED_LLM_DEFAULTS["max_output_tokens"])
TOP_LEVEL_NOISE_PATTERNS = (
re.compile(r"^(?:status|статус(?: результата)?)\b", re.IGNORECASE),
re.compile(r"^(?:что учтено|сводка)\b", re.IGNORECASE),
re.compile(r"^блок\s+\d+\b", re.IGNORECASE),
re.compile(r"^(?:подтверждение|опорные документы|сервисно)\b", re.IGNORECASE),
)
DEFAULT_INVARIANT_SEVERITY: dict[str, str] = {
"wrong_intent": "P0",
"wrong_capability": "P0",
"wrong_followup_action": "P0",
"wrong_recipe": "P0",
"wrong_result_mode": "P0",
"wrong_as_of_date": "P0",
"wrong_period_from": "P0",
"wrong_period_to": "P0",
"missing_required_filter": "P0",
"forbidden_capability_selected": "P0",
"forbidden_recipe_selected": "P0",
"focus_object_missing": "P0",
"wrong_date_scope_state": "P0",
"direct_answer_missing": "P0",
"top_level_noise_present": "P0",
}
REPAIR_TARGET_SEVERITY_ORDER = {"P0": 0, "P1": 1, "P2": 2}
REPAIR_TARGET_PROBLEM_ORDER = {
"temporal_honesty_gap": 0,
"edge_carryover_gap": 1,
"followup_action_resolution_gap": 2,
"object_memory_gap": 3,
"route_gap": 4,
"answer_shape_mismatch": 5,
"presentation_gap": 6,
"domain_anchor_gap": 7,
"capability_gap": 8,
"evidence_gap": 9,
"other": 10,
}
REPAIR_TARGET_FILE_HINTS: dict[str, list[str]] = {
"followup_action_resolution_gap": [
"llm_normalizer/backend/src/services/addressIntentResolver.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"object_memory_gap": [
"llm_normalizer/backend/src/services/addressNavigationState.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"edge_carryover_gap": [
"llm_normalizer/backend/src/services/addressNavigationState.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/addressFilterExtractor.ts",
],
"temporal_honesty_gap": [
"llm_normalizer/backend/src/services/addressFilterExtractor.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
],
"route_gap": [
"llm_normalizer/backend/src/services/addressQueryClassifier.ts",
"llm_normalizer/backend/src/services/addressIntentResolver.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"capability_gap": [
"llm_normalizer/backend/src/services/addressCapabilityPolicy.ts",
"llm_normalizer/backend/src/services/addressRecipeCatalog.ts",
"llm_normalizer/backend/src/services/addressQueryService.ts",
],
"presentation_gap": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"evidence_gap": [
"llm_normalizer/backend/src/services/addressQueryService.ts",
"llm_normalizer/backend/src/services/addressRecipeCatalog.ts",
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
],
"domain_anchor_gap": [
"llm_normalizer/backend/src/services/addressQueryService.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/addressNavigationState.ts",
],
"other": [
"llm_normalizer/backend/src/services/addressQueryService.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
}
def dump_json(payload: Any) -> str:
return json.dumps(payload, ensure_ascii=False, indent=2)
def write_text(file_path: Path, text: str) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(text, encoding="utf-8", newline="\n")
def write_json(file_path: Path, payload: Any) -> None:
write_text(file_path, dump_json(payload) + "\n")
def read_text_file(file_path: Path) -> str:
return file_path.read_text(encoding="utf-8-sig")
def resolve_repo_relative_path(raw_path: str | None) -> Path | None:
candidate = str(raw_path or "").strip().replace("\\", "/")
if not candidate:
return None
path = Path(candidate)
if path.is_absolute() or any(part == ".." for part in path.parts):
return None
return REPO_ROOT / path
def build_coder_snapshot_paths(repair_targets: dict[str, Any]) -> list[Path]:
collected: list[Path] = []
seen: set[Path] = set()
groups = []
if isinstance(repair_targets, dict):
groups.extend(repair_targets.get("priority_foci") or [])
groups.extend(repair_targets.get("targets") or [])
for item in groups:
if not isinstance(item, dict):
continue
for raw_path in normalize_string_list(item.get("candidate_files")):
resolved = resolve_repo_relative_path(raw_path)
if resolved is None or not resolved.exists() or resolved in seen:
continue
seen.add(resolved)
collected.append(resolved)
return collected
def snapshot_coder_candidate_files(paths: list[Path]) -> dict[str, bytes]:
snapshots: dict[str, bytes] = {}
for path in paths:
if not path.exists() or not path.is_file():
continue
snapshots[str(path)] = path.read_bytes()
return snapshots
def restore_line_collapsed_files_from_snapshot(snapshots: dict[str, bytes]) -> list[str]:
restored: list[str] = []
for raw_path, before_bytes in snapshots.items():
path = Path(raw_path)
if not path.exists() or not path.is_file():
continue
after_bytes = path.read_bytes()
before_line_count = before_bytes.count(b"\n")
after_line_count = after_bytes.count(b"\n")
if before_line_count < 1 or after_line_count != 0:
continue
if before_bytes.replace(b"\r", b"").replace(b"\n", b"") != after_bytes.replace(b"\r", b"").replace(b"\n", b""):
continue
path.write_bytes(before_bytes)
try:
restored.append(str(path.relative_to(REPO_ROOT)).replace("\\", "/"))
except ValueError:
restored.append(str(path))
return restored
def sanitize_export_text(value: str) -> str:
raw = str(value or "")
debug_heading = re.search(
r"(?:^|\n)\s*#{0,6}\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json|debug_payload|technical_breakdown)\b",
raw,
flags=re.IGNORECASE,
)
pre_cut = raw[: debug_heading.start()] if debug_heading else raw
without_debug = re.sub(
r"###\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json)[\s\S]*?(?:```[\s\S]*?```|$)",
"",
pre_cut,
flags=re.IGNORECASE,
)
without_debug = re.sub(
r"(?:^|\n)\s*#{0,6}\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json)\b[\s\S]*$",
"",
without_debug,
flags=re.IGNORECASE,
)
inline_patterns = [
re.compile(r"\b(?:debug_payload_json|technical_breakdown_json)\b", re.IGNORECASE),
re.compile(r"\b(?:route_summary|semantic_profile|domain_scope|relation_patterns|account_scope)\b", re.IGNORECASE),
re.compile(r"\b(?:coverage_report|retrieval_status|problem_unit_state|candidate_evidence)\b", re.IGNORECASE),
re.compile(r"\b(?:graph_domain_scope|graph_runtime|selection_reason|why_included)\b", re.IGNORECASE),
]
output_lines: list[str] = []
for line in without_debug.splitlines():
cleaned = line.rstrip()
if not cleaned.strip():
continue
if any(pattern.search(cleaned) for pattern in inline_patterns):
continue
output_lines.append(cleaned)
return "\n".join(output_lines).strip()
def build_conversation_export(session_id: str, conversation: list[dict[str, Any]], mode: str = "technical") -> str:
include_debug = mode == "technical"
lines = [
"# Assistant conversation export",
f"session_id: {session_id or 'n/a'}",
f"export_mode: {mode}",
f"exported_at: {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}",
"",
]
for index, item in enumerate(conversation, start=1):
safe_text = sanitize_export_text(str(item.get("text") or ""))
lines.append(f"## {index}. {item.get('role', 'unknown')}")
lines.append(f"message_id: {item.get('message_id') or 'n/a'}")
lines.append(f"created_at: {item.get('created_at') or 'n/a'}")
lines.append(f"reply_type: {item.get('reply_type') or 'n/a'}")
trace_id = item.get("trace_id")
if trace_id:
lines.append(f"trace_id: {trace_id}")
lines.extend(["", safe_text or "(empty)", ""])
if include_debug and item.get("role") == "assistant" and item.get("debug") is not None:
lines.extend([TECH_SECTION_HEADER, "```json", dump_json(item["debug"]), "```", ""])
return "\n".join(lines)
def slugify_token(value: str | None, fallback: str) -> str:
cleaned = re.sub(r"[^\w-]+", "_", str(value or "").strip(), flags=re.UNICODE).strip("_")
return cleaned or fallback
def slugify_case_id(domain: str, explicit_case_id: str | None) -> str:
if explicit_case_id:
normalized = explicit_case_id.strip()
if normalized:
return normalized
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
return f"{slugify_token(domain, 'domain_case')}_{timestamp}"
def normalize_iso_date(value: Any) -> str | None:
if not isinstance(value, str):
return None
candidate = value.strip()
if not re.fullmatch(r"\d{4}-\d{2}-\d{2}", candidate):
return None
try:
datetime.strptime(candidate, "%Y-%m-%d")
except ValueError:
return None
return candidate
def normalize_analysis_context(raw_context: Any) -> dict[str, Any]:
source = raw_context if isinstance(raw_context, dict) else {}
output: dict[str, Any] = {}
as_of_date = normalize_iso_date(source.get("as_of_date"))
period_from = normalize_iso_date(source.get("period_from"))
period_to = normalize_iso_date(source.get("period_to"))
snapshot_mode_raw = str(source.get("snapshot_mode") or "").strip()
source_raw = str(source.get("source") or "").strip()
if as_of_date:
output["as_of_date"] = as_of_date
if period_from:
output["period_from"] = period_from
if period_to:
output["period_to"] = period_to
if snapshot_mode_raw in {"auto", "force_snapshot", "force_live"}:
output["snapshot_mode"] = snapshot_mode_raw
if source_raw:
output["source"] = source_raw
return output
def merge_analysis_context(base_context: Any, override_context: Any) -> dict[str, Any]:
merged = normalize_analysis_context(base_context)
merged.update(normalize_analysis_context(override_context))
return merged
def carry_forward_analysis_context(
scenario_state: dict[str, Any],
analysis_context: dict[str, Any],
*,
prefer_carryover: bool = False,
) -> dict[str, Any]:
carried = dict(analysis_context)
semantic_memory = scenario_state.get("semantic_memory")
if isinstance(semantic_memory, dict):
date_scope = semantic_memory.get("date_scope")
if isinstance(date_scope, dict):
carried_as_of_date = normalize_iso_date(date_scope.get("as_of_date"))
if carried_as_of_date and (prefer_carryover or not carried.get("as_of_date")):
carried["as_of_date"] = carried_as_of_date
if not carried.get("source"):
carried["source"] = "scenario_state_carryover"
for key in (
"focus_object",
"selected_object_ref",
"warehouse_scope",
"organization_scope",
"provenance_bundle",
"sale_trace_bundle",
"purchase_documents_bundle",
"supplier_if_known",
"first_purchase_date",
):
if (prefer_carryover or key not in carried) and semantic_memory.get(key) is not None:
carried[key] = semantic_memory.get(key)
return carried
def merge_scenario_date_scope(
previous_date_scope: Any,
current_date_scope: Any,
*,
depends_on: list[str],
) -> Any:
previous = previous_date_scope if isinstance(previous_date_scope, dict) else None
current = current_date_scope if isinstance(current_date_scope, dict) else None
if not current:
return previous or current_date_scope
if not depends_on or not previous:
return current
previous_as_of_date = normalize_iso_date(previous.get("as_of_date"))
current_as_of_date = normalize_iso_date(current.get("as_of_date"))
if previous_as_of_date and current_as_of_date and current_as_of_date != previous_as_of_date:
merged = dict(current)
merged["as_of_date"] = previous_as_of_date
if not merged.get("source"):
merged["source"] = "scenario_state_carryover"
return merged
return current
def repair_text_mojibake(value: str) -> str:
if not value:
return value
for encoding in ("utf-8", "cp1251", "cp866"):
try:
repaired = value.encode("latin1").decode(encoding)
except (UnicodeEncodeError, UnicodeDecodeError):
continue
if repaired != value:
return repaired
return value
def normalize_binding_value(value: Any) -> Any:
if isinstance(value, str):
return repair_text_mojibake(value)
if isinstance(value, list):
return [normalize_binding_value(item) for item in value]
if isinstance(value, dict):
return {normalize_binding_value(key) if isinstance(key, str) else key: normalize_binding_value(item) for key, item in value.items()}
return value
def normalize_bindings(raw_bindings: Any) -> dict[str, Any]:
if not isinstance(raw_bindings, dict):
return {}
return {str(key): normalize_binding_value(value) for key, value in raw_bindings.items()}
def normalize_string_list(raw_values: Any) -> list[str]:
if isinstance(raw_values, str):
value = raw_values.strip()
return [value] if value else []
if not isinstance(raw_values, list):
return []
values: list[str] = []
for item in raw_values:
value = str(item or "").strip()
if value:
values.append(value)
return values
def normalize_validation_filters(raw_filters: Any) -> dict[str, str]:
if not isinstance(raw_filters, dict):
return {}
normalized: dict[str, str] = {}
for raw_key, raw_value in raw_filters.items():
key = str(raw_key or "").strip()
if not key:
continue
if key in {"as_of_date", "period_from", "period_to"}:
normalized_value = normalize_iso_date(raw_value)
if normalized_value:
normalized[key] = normalized_value
continue
text_value = str(raw_value or "").strip()
if text_value:
normalized[key] = text_value
return normalized
def normalize_invariant_severity(raw_mapping: Any) -> dict[str, str]:
if not isinstance(raw_mapping, dict):
return {}
normalized: dict[str, str] = {}
for raw_key, raw_value in raw_mapping.items():
key = str(raw_key or "").strip()
value = str(raw_value or "").strip().upper()
if not key or value not in {"P0", "P1", "WARNING"}:
continue
normalized[key] = value
return normalized
def normalize_identifier(value: Any) -> str:
return str(value or "").strip().lower()
def identifiers_match(actual: Any, expected: Any) -> bool:
actual_id = normalize_identifier(actual)
expected_id = normalize_identifier(expected)
if not actual_id or not expected_id:
return False
return actual_id == expected_id or actual_id.endswith(expected_id) or expected_id.endswith(actual_id)
def identifier_in_list(actual: Any, expected_values: list[str]) -> bool:
return any(identifiers_match(actual, expected) for expected in expected_values if expected)
def first_non_empty_lines(text: str, limit: int = 3) -> list[str]:
output: list[str] = []
for raw_line in str(text or "").splitlines():
cleaned = raw_line.strip()
if not cleaned:
continue
output.append(cleaned)
if len(output) >= limit:
break
return output
def build_node_contract_index(raw_contract: dict[str, Any]) -> dict[str, dict[str, Any]]:
scenario_tree = raw_contract.get("scenario_tree")
if not isinstance(scenario_tree, dict):
return {}
node_index: dict[str, dict[str, Any]] = {}
for section_key in ("root_nodes", "critical_nodes", "supporting_nodes"):
raw_nodes = scenario_tree.get(section_key)
if not isinstance(raw_nodes, list):
continue
for raw_node in raw_nodes:
if not isinstance(raw_node, dict):
continue
node_id = str(raw_node.get("node_id") or "").strip()
if not node_id:
continue
node_index[node_id] = {
"expected_intents": normalize_string_list(raw_node.get("expected_intents")),
"required_answer_shape": str(raw_node.get("expected_answer_shape") or "").strip() or None,
"required_carryover_invariants": normalize_string_list(raw_node.get("required_carryover_invariants")),
"ordering_rule": str(raw_node.get("ordering_rule") or "").strip() or None,
}
return node_index
def enrich_step_with_node_contract(raw_step: Any, node_contract_index: dict[str, dict[str, Any]]) -> Any:
if not isinstance(raw_step, dict):
return raw_step
node_id = str(raw_step.get("node_id") or "").strip()
node_defaults = node_contract_index.get(node_id) or {}
if not node_defaults:
return raw_step
enriched = dict(raw_step)
if not enriched.get("expected_intents") and node_defaults.get("expected_intents"):
enriched["expected_intents"] = list(node_defaults["expected_intents"])
if not enriched.get("required_answer_shape") and node_defaults.get("required_answer_shape"):
enriched["required_answer_shape"] = node_defaults["required_answer_shape"]
merged_invariants = list(
dict.fromkeys(
normalize_string_list(node_defaults.get("required_carryover_invariants"))
+ normalize_string_list(enriched.get("required_carryover_invariants"))
)
)
if merged_invariants:
enriched["required_carryover_invariants"] = merged_invariants
if not enriched.get("ordering_rule") and node_defaults.get("ordering_rule"):
enriched["ordering_rule"] = node_defaults["ordering_rule"]
return enriched
def drop_none_values(payload: dict[str, Any]) -> dict[str, Any]:
return {key: value for key, value in payload.items() if value is not None}
def http_json(url: str, *, method: str = "GET", payload: dict[str, Any] | None = None, timeout: int = 30) -> dict[str, Any]:
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json; charset=utf-8"
request = Request(url, data=data, method=method, headers=headers)
try:
with urlopen(request, timeout=timeout) as response:
body = response.read().decode("utf-8")
except HTTPError as error:
detail = error.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {error.code} for {url}: {detail}") from error
except URLError as error:
raise RuntimeError(f"Failed to reach backend at {url}: {error}") from error
try:
return json.loads(body)
except json.JSONDecodeError as error:
raise RuntimeError(f"Backend returned non-JSON payload for {url}") from error
def ensure_backend_health(backend_url: str, timeout_seconds: int) -> dict[str, Any]:
health = http_json(f"{backend_url}/api/health", timeout=max(5, min(timeout_seconds, 30)))
if not isinstance(health, dict):
raise RuntimeError(f"Backend health endpoint returned invalid payload at {backend_url}/api/health")
if health.get("ok") is False:
raise RuntimeError(f"Backend health check failed for {backend_url}: {dump_json(health)}")
return health
def wait_for_job(backend_url: str, job_id: str, timeout_seconds: int, poll_interval_seconds: float) -> dict[str, Any]:
deadline = time.time() + timeout_seconds
last_status = None
while time.time() < deadline:
response = http_json(f"{backend_url}/api/eval/run-async/{job_id}")
job = response.get("job")
if not isinstance(job, dict):
raise RuntimeError("Async job response does not contain `job` object")
status = str(job.get("status") or "unknown")
if status != last_status:
print(f"[domain-case-loop] job {job_id}: {status}")
last_status = status
if status in {"completed", "failed"}:
return job
time.sleep(poll_interval_seconds)
raise TimeoutError(f"Async job {job_id} did not finish within {timeout_seconds} seconds")
def wait_for_file(file_path: Path, timeout_seconds: int = 30) -> None:
deadline = time.time() + timeout_seconds
while time.time() < deadline:
if file_path.exists():
return
time.sleep(0.5)
raise FileNotFoundError(f"Timed out waiting for file: {file_path}")
def read_json_file(file_path: Path) -> dict[str, Any]:
return json.loads(read_text_file(file_path))
def extract_conversation_from_session(session_record: dict[str, Any]) -> list[dict[str, Any]]:
items = session_record.get("items")
if isinstance(items, list) and items:
output: list[dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
continue
output.append(
{
"message_id": item.get("message_id"),
"role": item.get("role"),
"text": item.get("text") or "",
"reply_type": item.get("reply_type"),
"created_at": item.get("created_at"),
"trace_id": item.get("trace_id"),
"debug": item.get("debug"),
}
)
if output:
return output
conversation = session_record.get("conversation")
if isinstance(conversation, list) and conversation:
output = []
for item in conversation:
if not isinstance(item, dict):
continue
output.append(
{
"message_id": item.get("message_id"),
"role": item.get("role"),
"text": item.get("text") or "",
"reply_type": item.get("reply_type"),
"created_at": item.get("created_at"),
"trace_id": item.get("trace_id"),
"debug": item.get("debug"),
}
)
if output:
return output
turns = session_record.get("turns")
if not isinstance(turns, list):
return []
output = []
for turn in turns:
if not isinstance(turn, dict):
continue
technical_json = turn.get("technical_json")
if not isinstance(technical_json, dict):
continue
for item in (technical_json.get("user_message"), technical_json.get("assistant_message")):
if not isinstance(item, dict):
continue
output.append(
{
"message_id": item.get("message_id"),
"role": item.get("role"),
"text": item.get("text") or "",
"reply_type": item.get("reply_type"),
"created_at": item.get("created_at"),
"trace_id": item.get("trace_id"),
"debug": item.get("debug"),
}
)
return output
def find_last_assistant(conversation: list[dict[str, Any]]) -> dict[str, Any]:
for item in reversed(conversation):
if item.get("role") == "assistant":
return item
raise RuntimeError("Conversation does not contain assistant message")
def find_last_user_before(conversation: list[dict[str, Any]], assistant_message_id: Any) -> dict[str, Any] | None:
before_assistant: list[dict[str, Any]] = []
for item in conversation:
if item.get("message_id") == assistant_message_id:
break
before_assistant.append(item)
for item in reversed(before_assistant):
if item.get("role") == "user":
return item
return None
def extract_last_turn(session_record: dict[str, Any]) -> dict[str, Any] | None:
turns = session_record.get("turns")
if not isinstance(turns, list) or not turns:
return None
last_turn = turns[-1]
return last_turn if isinstance(last_turn, dict) else None
def extract_report_case(report_record: dict[str, Any], case_id: str) -> dict[str, Any] | None:
results = report_record.get("results")
if not isinstance(results, list):
return None
for item in results:
if isinstance(item, dict) and str(item.get("case_id") or "") == case_id:
return item
return None
def build_turn_artifact(
*,
slot: str,
domain: str,
case_id: str,
question: str | None,
session_id: str,
conversation: list[dict[str, Any]],
session_record: dict[str, Any] | None,
job_record: dict[str, Any] | None,
report_case: dict[str, Any] | None,
export_file_name: str,
) -> dict[str, Any]:
last_assistant = find_last_assistant(conversation)
last_user = find_last_user_before(conversation, last_assistant.get("message_id"))
final_question = question or (last_user.get("text") if isinstance(last_user, dict) else None)
last_turn = extract_last_turn(session_record or {})
session_record_safe = session_record or {}
return {
"schema_version": "domain_case_turn_artifact_v1",
"artifact_slot": slot,
"domain": domain,
"case_id": case_id,
"question": final_question,
"session_id": session_id,
"run": {
"job_id": job_record.get("job_id") if isinstance(job_record, dict) else None,
"run_id": job_record.get("run_id") if isinstance(job_record, dict) else None,
"analysis_date": job_record.get("analysis_date") if isinstance(job_record, dict) else None,
"report_case_available": report_case is not None,
},
"user_message": last_user,
"assistant_message": last_assistant,
"technical_debug_payload": last_assistant.get("debug"),
"session_summary": {
"schema_version": session_record_safe.get("schema_version"),
"updated_at": session_record_safe.get("updated_at"),
"trace_ids": session_record_safe.get("trace_ids"),
"reply_types": session_record_safe.get("reply_types"),
"investigation_state": session_record_safe.get("investigation_state"),
"address_navigation_state": session_record_safe.get("address_navigation_state"),
"items_count": len(session_record_safe.get("items", [])) if isinstance(session_record_safe.get("items"), list) else None,
"last_turn": last_turn,
},
"report_case": report_case,
"export_markdown_file": export_file_name,
}
def ensure_case_brief(
case_dir: Path,
*,
domain: str,
question: str | None,
expected_capability: str | None,
expected_result_mode: str | None,
) -> None:
file_path = case_dir / "case_brief.md"
if file_path.exists():
return
content = textwrap.dedent(
f"""\
# Case brief
## Domain
`{domain}`
## Raw user question
`{question or "<fill me>"}`
## Expected business meaning
- <fill me>
## Expected capability
- {expected_capability or "<fill me>"}
## Expected result mode
- {expected_result_mode or "<fill me>"}
## Constraints
- no architecture changes
- 1C/MCP first
- no fabricated values
- heuristic is not product success
- accepted requires analyst quality score >= 80 and zero unresolved P0
## Known current behavior
- <fill me>
## Draft acceptance criteria
- <fill me>
"""
)
write_text(file_path, content)
def save_capture_bundle(
*,
case_dir: Path,
slot: str,
export_markdown: str,
debug_payload: Any,
turn_artifact: dict[str, Any],
session_record: dict[str, Any] | None,
job_record: dict[str, Any] | None,
report_case: dict[str, Any] | None,
) -> None:
write_text(case_dir / f"{slot}_output.md", export_markdown)
write_json(case_dir / f"{slot}_debug.json", debug_payload if debug_payload is not None else {})
write_json(case_dir / f"{slot}_turn.json", turn_artifact)
if session_record is not None:
write_json(case_dir / f"{slot}_session.json", session_record)
if job_record is not None:
write_json(case_dir / f"{slot}_job.json", job_record)
if report_case is not None:
write_json(case_dir / f"{slot}_report_case.json", report_case)
def parse_metadata_line(line: str) -> tuple[str, str] | None:
if ":" not in line:
return None
key, value = line.split(":", 1)
return key.strip(), value.strip()
def parse_export_markdown(text: str) -> tuple[str, list[dict[str, Any]]]:
session_id = "n/a"
session_match = re.search(r"^session_id:\s*(.+?)\s*$", text, flags=re.MULTILINE)
if session_match:
session_id = session_match.group(1).strip()
section_pattern = re.compile(r"^##\s+\d+\.\s+(user|assistant)\s*$", flags=re.MULTILINE)
sections = list(section_pattern.finditer(text))
conversation: list[dict[str, Any]] = []
for index, match in enumerate(sections):
role = match.group(1)
start = match.end()
end = sections[index + 1].start() if index + 1 < len(sections) else len(text)
block = text[start:end].lstrip("\r\n")
lines = block.splitlines()
metadata: dict[str, Any] = {"role": role}
cursor = 0
while cursor < len(lines):
line = lines[cursor]
if not line.strip():
cursor += 1
break
meta = parse_metadata_line(line)
if not meta:
break
key, value = meta
metadata[key] = value
cursor += 1
body_lines = lines[cursor:]
debug_payload = None
debug_start = None
for body_index, line in enumerate(body_lines):
if line.strip().lower() == TECH_SECTION_HEADER.lower():
debug_start = body_index
break
if debug_start is not None:
body_text_lines = body_lines[:debug_start]
debug_lines = body_lines[debug_start + 1 :]
debug_text = "\n".join(debug_lines).strip()
fenced = re.search(r"```json\s*(.*?)\s*```", debug_text, flags=re.DOTALL | re.IGNORECASE)
if fenced:
debug_text = fenced.group(1).strip()
if debug_text:
debug_payload = json.loads(debug_text)
else:
body_text_lines = body_lines
conversation.append(
{
"message_id": metadata.get("message_id"),
"role": role,
"text": "\n".join(body_text_lines).strip(),
"reply_type": metadata.get("reply_type"),
"created_at": metadata.get("created_at"),
"trace_id": metadata.get("trace_id"),
"debug": debug_payload,
}
)
if not conversation:
raise RuntimeError("Could not parse conversation sections from export markdown")
return session_id, conversation
def build_normalize_config(args: argparse.Namespace) -> dict[str, Any]:
return {
"llmProvider": args.llm_provider,
"apiKey": args.llm_api_key,
"model": args.llm_model,
"baseUrl": args.llm_base_url,
"temperature": args.temperature,
"maxOutputTokens": args.max_output_tokens,
"promptVersion": args.prompt_version,
}
def fetch_session_snapshot(backend_url: str, session_id: str, timeout_seconds: int) -> dict[str, Any]:
deadline = time.time() + timeout_seconds
last_error: Exception | None = None
timeout_per_call = max(10, min(30, timeout_seconds))
while time.time() < deadline:
try:
response = http_json(f"{backend_url}/api/assistant/session/{session_id}", timeout=timeout_per_call)
session = response.get("session")
if isinstance(session, dict):
return session
except Exception as error: # noqa: BLE001
last_error = error
time.sleep(0.5)
if last_error is not None:
raise RuntimeError(f"Failed to fetch assistant session `{session_id}`: {last_error}") from last_error
raise RuntimeError(f"Assistant session `{session_id}` was not available within {timeout_seconds} seconds")
def build_assistant_message_payload(
args: argparse.Namespace,
*,
question: str,
session_id: str | None,
analysis_context: dict[str, Any],
) -> dict[str, Any]:
context_payload: dict[str, Any] = {}
normalized_analysis_context = normalize_analysis_context(analysis_context)
if normalized_analysis_context:
context_payload["analysis_context"] = normalized_analysis_context
as_of_date = normalized_analysis_context.get("as_of_date")
if isinstance(as_of_date, str):
context_payload["period_hint"] = as_of_date
payload = drop_none_values(
{
"session_id": session_id,
"user_message": question,
"message": question,
"mode": "assistant",
"llmProvider": args.llm_provider,
"apiKey": args.llm_api_key,
"model": args.llm_model,
"baseUrl": args.llm_base_url,
"temperature": args.temperature,
"maxOutputTokens": args.max_output_tokens,
"promptVersion": args.prompt_version,
"context": context_payload or None,
"useMock": bool(args.use_mock),
}
)
return payload
def parse_path_tokens(path_expression: str) -> list[str | int]:
tokens: list[str | int] = []
cursor = 0
path = path_expression.strip()
while cursor < len(path):
current = path[cursor]
if current == ".":
cursor += 1
continue
if current == "[":
closing = path.find("]", cursor)
if closing < 0:
raise RuntimeError(f"Invalid placeholder path: {path_expression}")
raw_index = path[cursor + 1 : closing].strip()
if not raw_index.isdigit():
raise RuntimeError(f"Only numeric list indexes are supported in placeholders: {path_expression}")
tokens.append(int(raw_index))
cursor = closing + 1
continue
end = cursor
while end < len(path) and path[end] not in ".[":
end += 1
tokens.append(path[cursor:end])
cursor = end
return tokens
def lookup_placeholder_value(path_expression: str, scenario_state: dict[str, Any]) -> Any:
root: dict[str, Any] = {
"scenario_state": scenario_state,
"step_outputs": scenario_state.get("step_outputs", {}),
"semantic_memory": scenario_state.get("semantic_memory", {}),
"bindings": scenario_state.get("bindings", {}),
}
if isinstance(scenario_state.get("step_outputs"), dict):
root.update(scenario_state["step_outputs"])
current: Any = root
for token in parse_path_tokens(path_expression):
if isinstance(token, int):
if not isinstance(current, list):
raise RuntimeError(f"Placeholder `{path_expression}` does not point to a list before index access")
if token >= len(current):
raise RuntimeError(f"Placeholder `{path_expression}` index {token} is out of range")
current = current[token]
continue
if not isinstance(current, dict) or token not in current:
raise RuntimeError(f"Placeholder `{path_expression}` could not be resolved at `{token}`")
current = current[token]
return current
def resolve_question_template(question_template: str, scenario_state: dict[str, Any]) -> str:
pattern = re.compile(r"{{\s*([^{}]+?)\s*}}")
def replace(match: re.Match[str]) -> str:
value = lookup_placeholder_value(match.group(1), scenario_state)
if isinstance(value, (dict, list)):
return dump_json(value)
return str(value)
return pattern.sub(replace, question_template).strip()
def build_failed_step_state(
*,
scenario_id: str,
domain: str,
step: dict[str, Any],
step_index: int,
question_resolved: str,
status: str,
failure_type: str,
error_message: str,
) -> dict[str, Any]:
return {
"schema_version": SCENARIO_STEP_STATE_SCHEMA_VERSION,
"scenario_id": scenario_id,
"domain": domain,
"step_id": step["step_id"],
"step_index": step_index,
"title": step["title"],
"depends_on": step["depends_on"],
"question_template": step["question_template"],
"question_resolved": question_resolved,
"expected_intents": step.get("expected_intents") or [],
"expected_capability": step.get("expected_capability"),
"expected_recipe": step.get("expected_recipe"),
"expected_result_mode": step.get("expected_result_mode"),
"required_filters": step.get("required_filters") or {},
"forbidden_capabilities": step.get("forbidden_capabilities") or [],
"forbidden_recipes": step.get("forbidden_recipes") or [],
"required_state_objects": step.get("required_state_objects") or [],
"required_answer_shape": step.get("required_answer_shape"),
"forbidden_answer_patterns": step.get("forbidden_answer_patterns") or [],
"required_carryover_invariants": step.get("required_carryover_invariants") or [],
"invariant_severity": step.get("invariant_severity") or {},
"reply_type": "backend_error" if status == "blocked" else "unresolved_followup",
"assistant_message_id": None,
"trace_id": None,
"detected_mode": None,
"detected_intent": None,
"selected_recipe": None,
"capability_id": None,
"capability_route_mode": None,
"route_expectation_status": None,
"result_mode": None,
"response_type": None,
"assistant_text": "",
"top_non_empty_lines": [],
"actual_direct_answer": None,
"extracted_filters": {},
"focus_object": None,
"fallback_type": failure_type,
"mcp_call_status": None,
"balance_confirmed": None,
"active_result_set_id": None,
"last_confirmed_route": None,
"date_scope": None,
"organization_scope": None,
"entries": [],
"execution_status": status,
"acceptance_status": status,
"violated_invariants": [],
"warnings": [],
"hard_fail": status == "blocked",
"status": status,
"failure_type": failure_type,
"error_message": error_message,
}
def normalize_step_definition(index: int, raw_step: Any) -> dict[str, Any]:
if isinstance(raw_step, str):
step_id = f"step_{index:02d}"
question_template = raw_step.strip()
if not question_template:
raise RuntimeError(f"Scenario step {index} must not be empty")
return {
"step_id": step_id,
"title": step_id,
"question_template": question_template,
"depends_on": [],
"analysis_context": {},
"expected_intents": [],
"expected_capability": None,
"expected_recipe": None,
"expected_result_mode": None,
"question_id": None,
"node_id": None,
"node_role": None,
"paraphrase_family": None,
"required_filters": {},
"forbidden_capabilities": [],
"forbidden_recipes": [],
"required_state_objects": [],
"required_answer_shape": None,
"forbidden_answer_patterns": [],
"required_carryover_invariants": [],
"invariant_severity": {},
"ordering_rule": None,
}
if not isinstance(raw_step, dict):
raise RuntimeError(f"Scenario step {index} must be a string or object")
raw_step_id = str(raw_step.get("step_id") or "").strip()
step_id = slugify_token(raw_step_id, f"step_{index:02d}")
question_template = str(raw_step.get("question") or raw_step.get("question_template") or "").strip()
if not question_template:
raise RuntimeError(f"Scenario step `{step_id}` must define `question` or `question_template`")
depends_on_raw = raw_step.get("depends_on")
depends_on: list[str] = []
if isinstance(depends_on_raw, str) and depends_on_raw.strip():
depends_on = [depends_on_raw.strip()]
elif isinstance(depends_on_raw, list):
depends_on = [str(item).strip() for item in depends_on_raw if str(item).strip()]
return {
"step_id": step_id,
"title": str(raw_step.get("title") or step_id).strip() or step_id,
"question_template": question_template,
"depends_on": depends_on,
"analysis_context": normalize_analysis_context(raw_step.get("analysis_context")),
"expected_intents": normalize_string_list(raw_step.get("expected_intents") or raw_step.get("expected_intent")),
"expected_capability": str(raw_step.get("expected_capability") or "").strip() or None,
"expected_recipe": str(raw_step.get("expected_recipe") or raw_step.get("expected_selected_recipe") or "").strip() or None,
"expected_result_mode": str(raw_step.get("expected_result_mode") or "").strip() or None,
"question_id": str(raw_step.get("question_id") or "").strip() or None,
"node_id": str(raw_step.get("node_id") or "").strip() or None,
"node_role": str(raw_step.get("node_role") or raw_step.get("role") or "").strip() or None,
"paraphrase_family": str(raw_step.get("paraphrase_family") or raw_step.get("wording_family") or "").strip() or None,
"required_filters": normalize_validation_filters(raw_step.get("required_filters")),
"forbidden_capabilities": normalize_string_list(raw_step.get("forbidden_capabilities")),
"forbidden_recipes": normalize_string_list(raw_step.get("forbidden_recipes")),
"required_state_objects": normalize_string_list(raw_step.get("required_state_objects")),
"required_answer_shape": (
str(raw_step.get("required_answer_shape") or raw_step.get("expected_answer_shape") or "").strip() or None
),
"forbidden_answer_patterns": normalize_string_list(raw_step.get("forbidden_answer_patterns")),
"required_carryover_invariants": normalize_string_list(raw_step.get("required_carryover_invariants")),
"invariant_severity": normalize_invariant_severity(raw_step.get("invariant_severity")),
"ordering_rule": str(raw_step.get("ordering_rule") or "").strip() or None,
}
def normalize_scenario_manifest(
raw_manifest: dict[str, Any],
*,
fallback_domain: str | None = None,
fallback_analysis_context: Any = None,
fallback_bindings: Any = None,
default_scenario_id: str | None = None,
) -> dict[str, Any]:
domain = str(raw_manifest.get("domain") or fallback_domain or "").strip()
if not domain:
raise RuntimeError("Scenario manifest must define `domain`")
raw_steps = raw_manifest.get("steps")
if not isinstance(raw_steps, list) or not raw_steps:
raise RuntimeError("Scenario manifest must define non-empty `steps`")
steps = [normalize_step_definition(index + 1, raw_step) for index, raw_step in enumerate(raw_steps)]
scenario_id = str(raw_manifest.get("scenario_id") or "").strip() or default_scenario_id or slugify_case_id(domain, None)
title = str(raw_manifest.get("title") or domain).strip() or domain
description = str(raw_manifest.get("description") or "").strip() or None
analysis_context = merge_analysis_context(fallback_analysis_context, raw_manifest.get("analysis_context"))
if analysis_context and "source" not in analysis_context:
analysis_context["source"] = "scenario_manifest"
bindings = normalize_bindings(fallback_bindings)
bindings.update(normalize_bindings(raw_manifest.get("bindings")))
return {
"schema_version": str(raw_manifest.get("schema_version") or SCENARIO_MANIFEST_SCHEMA_VERSION),
"scenario_id": scenario_id,
"domain": domain,
"title": title,
"description": description,
"analysis_context": analysis_context,
"bindings": bindings,
"question_ids": normalize_string_list(raw_manifest.get("question_ids")),
"node_ids": normalize_string_list(raw_manifest.get("node_ids")),
"acceptance_canon": raw_manifest.get("acceptance_canon") if isinstance(raw_manifest.get("acceptance_canon"), dict) else {},
"steps": steps,
}
def load_scenario_manifest(file_path: Path) -> dict[str, Any]:
raw_manifest = read_json_file(file_path)
return normalize_scenario_manifest(raw_manifest)
def build_active_contract_bindings(raw_contract: dict[str, Any]) -> dict[str, Any]:
observed_anchors = raw_contract.get("observed_anchors")
anchors = observed_anchors if isinstance(observed_anchors, dict) else {}
bindings = {
"target_date_historical": anchors.get("historical_as_of_date"),
"target_date_current": anchors.get("current_as_of_date_example"),
"observed_warehouse": anchors.get("warehouse"),
"observed_organization": anchors.get("organization"),
"focus_item_current": anchors.get("focus_item_current"),
"focus_item_historical": anchors.get("focus_item_historical"),
"focus_item_small_residue": anchors.get("focus_item_small_residue"),
"observed_supplier_candidate": anchors.get("supplier_candidate"),
"observed_supplier_candidate_alt": anchors.get("supplier_candidate_alt"),
"observed_customer_candidate": anchors.get("buyer_candidate"),
}
bindings.update(normalize_bindings(raw_contract.get("bindings")))
return normalize_bindings(bindings)
def convert_active_domain_contract_to_pack(raw_contract: dict[str, Any]) -> dict[str, Any]:
orchestration_pack = raw_contract.get("orchestration_pack")
if not isinstance(orchestration_pack, dict):
raise RuntimeError("Active domain contract must define object `orchestration_pack`")
raw_scenarios = orchestration_pack.get("scenarios")
if not isinstance(raw_scenarios, list) or not raw_scenarios:
raise RuntimeError("Active domain contract must define non-empty `orchestration_pack.scenarios`")
runtime_domain = (
str(raw_contract.get("runtime_domain") or raw_contract.get("domain") or "").strip()
or ("inventory_stock" if str(raw_contract.get("domain_id") or "").startswith("inventory_stock") else "")
)
if not runtime_domain:
raise RuntimeError("Active domain contract must define `runtime_domain`")
domain_id = str(raw_contract.get("domain_id") or runtime_domain).strip() or runtime_domain
node_contract_index = build_node_contract_index(raw_contract)
bindings = build_active_contract_bindings(raw_contract)
bindings.update(normalize_bindings(orchestration_pack.get("bindings")))
analysis_context = merge_analysis_context(raw_contract.get("default_analysis_context"), raw_contract.get("analysis_context"))
analysis_context = merge_analysis_context(analysis_context, orchestration_pack.get("analysis_context"))
if analysis_context and "source" not in analysis_context:
analysis_context["source"] = "active_domain_contract"
pack_id = str(orchestration_pack.get("pack_id") or "").strip() or slugify_case_id(domain_id, None)
title = str(orchestration_pack.get("title") or raw_contract.get("title") or domain_id).strip() or domain_id
description = str(orchestration_pack.get("description") or raw_contract.get("domain_goal") or "").strip() or None
enriched_scenarios: list[Any] = []
for raw_scenario in raw_scenarios:
if not isinstance(raw_scenario, dict):
enriched_scenarios.append(raw_scenario)
continue
enriched_scenario = dict(raw_scenario)
raw_steps = enriched_scenario.get("steps")
if isinstance(raw_steps, list):
enriched_scenario["steps"] = [
enrich_step_with_node_contract(raw_step, node_contract_index) for raw_step in raw_steps
]
enriched_scenarios.append(enriched_scenario)
return {
"schema_version": SCENARIO_PACK_SCHEMA_VERSION,
"source_schema_version": ACTIVE_DOMAIN_CONTRACT_SCHEMA_VERSION,
"source_contract_id": domain_id,
"pack_id": pack_id,
"domain": runtime_domain,
"title": title,
"description": description,
"analysis_context": analysis_context,
"bindings": bindings,
"scenarios": enriched_scenarios,
"scenario_tree": raw_contract.get("scenario_tree") if isinstance(raw_contract.get("scenario_tree"), dict) else {},
"acceptance_contract": (
raw_contract.get("acceptance_contract") if isinstance(raw_contract.get("acceptance_contract"), dict) else {}
),
"question_pool": raw_contract.get("question_pool") if isinstance(raw_contract.get("question_pool"), dict) else {},
"wording_families": raw_contract.get("wording_families") if isinstance(raw_contract.get("wording_families"), list) else [],
"known_failure_patterns_to_watch": (
raw_contract.get("known_failure_patterns_to_watch")
if isinstance(raw_contract.get("known_failure_patterns_to_watch"), list)
else []
),
"source_contract": {
"domain_id": domain_id,
"title": str(raw_contract.get("title") or domain_id).strip() or domain_id,
"status": str(raw_contract.get("status") or "active").strip() or "active",
},
}
def load_scenario_pack(file_path: Path) -> dict[str, Any]:
raw_pack = read_json_file(file_path)
schema_version = str(raw_pack.get("schema_version") or "").strip()
if schema_version == ACTIVE_DOMAIN_CONTRACT_SCHEMA_VERSION:
raw_pack = convert_active_domain_contract_to_pack(raw_pack)
domain = str(raw_pack.get("domain") or "").strip()
if not domain:
raise RuntimeError("Scenario pack must define `domain`")
raw_scenarios = raw_pack.get("scenarios")
if not isinstance(raw_scenarios, list) or not raw_scenarios:
raise RuntimeError("Scenario pack must define non-empty `scenarios`")
pack_id = str(raw_pack.get("pack_id") or "").strip() or slugify_case_id(domain, None)
title = str(raw_pack.get("title") or domain).strip() or domain
description = str(raw_pack.get("description") or "").strip() or None
analysis_context = normalize_analysis_context(raw_pack.get("analysis_context"))
if analysis_context and "source" not in analysis_context:
analysis_context["source"] = "scenario_pack"
bindings = normalize_bindings(raw_pack.get("bindings"))
scenarios = [
normalize_scenario_manifest(
raw_scenario,
fallback_domain=domain,
fallback_analysis_context=analysis_context,
fallback_bindings=bindings,
default_scenario_id=f"{pack_id}_scenario_{index + 1:02d}",
)
for index, raw_scenario in enumerate(raw_scenarios)
]
normalized_pack = {
"schema_version": str(raw_pack.get("schema_version") or SCENARIO_PACK_SCHEMA_VERSION),
"pack_id": pack_id,
"domain": domain,
"title": title,
"description": description,
"analysis_context": analysis_context,
"bindings": bindings,
"scenarios": scenarios,
}
for optional_key in (
"source_schema_version",
"source_contract_id",
"scenario_tree",
"acceptance_contract",
"question_pool",
"wording_families",
"known_failure_patterns_to_watch",
"source_contract",
):
optional_value = raw_pack.get(optional_key)
if optional_value is not None:
normalized_pack[optional_key] = optional_value
return normalized_pack
def ensure_scenario_brief(scenario_dir: Path, manifest: dict[str, Any]) -> None:
file_path = scenario_dir / "scenario_brief.md"
if file_path.exists():
return
steps_lines = "\n".join(
f"{index}. `{step['step_id']}` - {step['question_template']}" for index, step in enumerate(manifest["steps"], start=1)
)
content = textwrap.dedent(
f"""\
# Scenario brief
## Domain
`{manifest["domain"]}`
## Scenario id
`{manifest["scenario_id"]}`
## Title
{manifest["title"]}
## Description
{manifest.get("description") or "<fill me>"}
## Shared analysis context
```json
{dump_json(manifest.get("analysis_context") or {})}
```
## Bindings
```json
{dump_json(manifest.get("bindings") or {})}
```
## Steps
{steps_lines}
## Constraints
- no architecture changes
- reuse current assistant runtime and session state
- 1C/MCP first
- no fabricated values
- missing route or capability is domain enablement work, not silent rejection
"""
)
write_text(file_path, content)
def normalize_field_key(raw_key: str) -> str:
normalized = re.sub(r"\s+", " ", raw_key.strip().lower().replace("ё", "е")).strip(" :")
aliases = {
"склад": "warehouse",
"количество": "quantity",
"стоимость": "amount",
"сумма": "amount",
"организация": "organization",
"дата строки": "row_date",
"дата": "date",
"поставщик": "supplier",
"договор": "contract",
"документ": "document",
"документы": "documents",
"покупатель": "customer",
}
if normalized in aliases:
return aliases[normalized]
return slugify_token(normalized, "field").lower()
def extract_structured_entries(answer_text: str) -> list[dict[str, Any]]:
entries: list[dict[str, Any]] = []
for line in answer_text.splitlines():
match = re.match(r"^\s*(\d+)\.\s+(.*\S)\s*$", line)
if not match:
continue
index = int(match.group(1))
payload = match.group(2).strip()
segments = [segment.strip() for segment in payload.split(" | ") if segment.strip()]
title = segments[0] if segments else payload
fields: dict[str, str] = {}
raw_fields: dict[str, str] = {}
for segment in segments[1:]:
if ":" not in segment:
continue
raw_key, raw_value = segment.split(":", 1)
key = normalize_field_key(raw_key)
value = raw_value.strip()
fields[key] = value
raw_fields[raw_key.strip()] = value
entry: dict[str, Any] = {
"index": index,
"title": title,
"item": title,
"fields": fields,
"raw_fields": raw_fields,
"raw_line": line.strip(),
}
for key, value in fields.items():
entry[key] = value
entries.append(entry)
return entries
def derive_step_execution_status(reply_type: str | None, debug_payload: dict[str, Any]) -> str:
if reply_type == "backend_error":
return "blocked"
capability_route_mode = str(debug_payload.get("capability_route_mode") or "").strip()
fallback_type = str(debug_payload.get("fallback_type") or "").strip()
selected_recipe = str(debug_payload.get("selected_recipe") or "").strip()
detected_intent = str(debug_payload.get("detected_intent") or "").strip()
detected_mode = str(debug_payload.get("detected_mode") or "").strip()
if capability_route_mode == "exact" and fallback_type in {"", "none"} and reply_type in {"factual", "factual_with_explanation", "empty_but_valid"}:
return "exact"
if fallback_type in {"out_of_scope", "unknown"}:
return "needs_exact_capability"
if capability_route_mode == "exact" and detected_mode == "address_query":
return "partial"
if detected_mode == "address_query" and (selected_recipe or detected_intent):
return "partial"
if reply_type in {"partial_coverage", "clarification_required", "out_of_scope", "no_grounded_answer", "route_mismatch_blocked"}:
return "needs_exact_capability"
return "needs_exact_capability"
def derive_step_status(reply_type: str | None, debug_payload: dict[str, Any]) -> str:
return derive_step_execution_status(reply_type, debug_payload)
def should_require_direct_answer(step_state: dict[str, Any]) -> bool:
required_answer_shape = str(step_state.get("required_answer_shape") or "").strip()
if required_answer_shape:
return True
return str(step_state.get("node_role") or "").strip() in {"root", "critical_child"}
def is_top_level_noise_line(line: str) -> bool:
cleaned = str(line or "").strip()
if not cleaned:
return False
return any(pattern.search(cleaned) for pattern in TOP_LEVEL_NOISE_PATTERNS)
def derive_invariant_severity(step_state: dict[str, Any], violation_code: str) -> str:
overrides = step_state.get("invariant_severity")
if isinstance(overrides, dict):
override = str(overrides.get(violation_code) or "").strip().upper()
if override in {"P0", "P1", "WARNING"}:
return override
return DEFAULT_INVARIANT_SEVERITY.get(violation_code, "P1")
def acceptance_status_from_execution(execution_status: str, hard_fail: bool) -> str:
if execution_status == "blocked":
return "blocked"
if execution_status == "needs_exact_capability":
return "needs_exact_capability"
if hard_fail:
return "rejected"
if execution_status == "exact":
return "validated"
return "rejected"
def validate_step_contract(step_state: dict[str, Any]) -> dict[str, Any]:
state = dict(step_state)
execution_status = str(state.get("execution_status") or state.get("status") or "").strip() or "needs_exact_capability"
actual_direct_answer = str(state.get("actual_direct_answer") or "").strip()
top_non_empty_lines = state.get("top_non_empty_lines") if isinstance(state.get("top_non_empty_lines"), list) else []
extracted_filters = state.get("extracted_filters") if isinstance(state.get("extracted_filters"), dict) else {}
date_scope = state.get("date_scope") if isinstance(state.get("date_scope"), dict) else {}
violated_invariants: list[str] = []
warnings: list[str] = []
expected_intents = normalize_string_list(state.get("expected_intents"))
if expected_intents and not identifier_in_list(state.get("detected_intent"), expected_intents):
violated_invariants.append("wrong_intent")
expected_capability = state.get("expected_capability")
if expected_capability and not identifiers_match(state.get("capability_id"), expected_capability):
required_state_objects = set(normalize_string_list(state.get("required_state_objects")))
required_state_objects.update(normalize_string_list(state.get("required_carryover_invariants")))
violation_code = "wrong_followup_action" if "focus_object" in required_state_objects else "wrong_capability"
violated_invariants.append(violation_code)
expected_recipe = state.get("expected_recipe")
if expected_recipe and not identifiers_match(state.get("selected_recipe"), expected_recipe):
violated_invariants.append("wrong_recipe")
expected_result_mode = str(state.get("expected_result_mode") or "").strip()
actual_result_mode = str(state.get("result_mode") or "").strip()
if expected_result_mode and actual_result_mode and normalize_identifier(actual_result_mode) != normalize_identifier(expected_result_mode):
violated_invariants.append("wrong_result_mode")
for forbidden_capability in normalize_string_list(state.get("forbidden_capabilities")):
if identifiers_match(state.get("capability_id"), forbidden_capability):
violated_invariants.append("forbidden_capability_selected")
break
for forbidden_recipe in normalize_string_list(state.get("forbidden_recipes")):
if identifiers_match(state.get("selected_recipe"), forbidden_recipe):
violated_invariants.append("forbidden_recipe_selected")
break
required_filters = normalize_validation_filters(state.get("required_filters"))
required_as_of_date_from_context = normalize_iso_date(
(state.get("analysis_context") or {}).get("as_of_date") if isinstance(state.get("analysis_context"), dict) else None
)
if required_as_of_date_from_context and "as_of_date" not in required_filters:
required_filters["as_of_date"] = required_as_of_date_from_context
for filter_key, expected_value in required_filters.items():
actual_value = ""
if filter_key in {"as_of_date", "period_from", "period_to"}:
actual_value = normalize_iso_date(extracted_filters.get(filter_key))
else:
actual_value = str(extracted_filters.get(filter_key) or "").strip()
if not actual_value:
violated_invariants.append("missing_required_filter")
continue
if actual_value != expected_value:
if filter_key == "as_of_date":
violated_invariants.append("wrong_as_of_date")
elif filter_key == "period_from":
violated_invariants.append("wrong_period_from")
elif filter_key == "period_to":
violated_invariants.append("wrong_period_to")
else:
violated_invariants.append("missing_required_filter")
required_state_objects = set(normalize_string_list(state.get("required_state_objects")))
required_state_objects.update(
item
for item in normalize_string_list(state.get("required_carryover_invariants"))
if item in {"focus_object"}
)
focus_object = state.get("focus_object") if isinstance(state.get("focus_object"), dict) else {}
if "focus_object" in required_state_objects:
has_focus_object = bool(str(focus_object.get("object_id") or "").strip() or str(focus_object.get("label") or "").strip())
if not has_focus_object:
violated_invariants.append("focus_object_missing")
if "date_scope" in normalize_string_list(state.get("required_carryover_invariants")) and required_filters.get("as_of_date"):
current_date_scope = normalize_iso_date(date_scope.get("as_of_date"))
if current_date_scope and current_date_scope != required_filters["as_of_date"]:
violated_invariants.append("wrong_date_scope_state")
if should_require_direct_answer(state):
if not actual_direct_answer or is_top_level_noise_line(actual_direct_answer):
violated_invariants.append("direct_answer_missing")
first_top_line = str(top_non_empty_lines[0] if top_non_empty_lines else "").strip()
if first_top_line and is_top_level_noise_line(first_top_line):
violated_invariants.append("top_level_noise_present")
forbidden_answer_patterns = normalize_string_list(state.get("forbidden_answer_patterns"))
if forbidden_answer_patterns and top_non_empty_lines:
joined_top_block = "\n".join(str(line) for line in top_non_empty_lines)
for pattern in forbidden_answer_patterns:
if pattern and re.search(pattern, joined_top_block, flags=re.IGNORECASE):
warnings.append(f"forbidden_answer_pattern:{pattern}")
unique_violations = list(dict.fromkeys(violated_invariants))
hard_fail = any(derive_invariant_severity(state, code) == "P0" for code in unique_violations)
state["violated_invariants"] = unique_violations
state["warnings"] = list(dict.fromkeys(warnings))
state["hard_fail"] = hard_fail
state["acceptance_status"] = acceptance_status_from_execution(execution_status, hard_fail)
state["status"] = state["acceptance_status"]
return state
def build_scenario_step_state(
*,
scenario_id: str,
domain: str,
step: dict[str, Any],
step_index: int,
question_resolved: str,
analysis_context: dict[str, Any],
turn_artifact: dict[str, Any],
entries: list[dict[str, Any]],
) -> dict[str, Any]:
debug_payload = turn_artifact.get("technical_debug_payload")
if not isinstance(debug_payload, dict):
assistant_debug = turn_artifact.get("assistant_message", {}).get("debug") if isinstance(turn_artifact.get("assistant_message"), dict) else None
debug_payload = assistant_debug if isinstance(assistant_debug, dict) else {}
debug = debug_payload if isinstance(debug_payload, dict) else {}
session_summary = turn_artifact.get("session_summary")
summary = session_summary if isinstance(session_summary, dict) else {}
address_state = summary.get("address_navigation_state")
navigation_state = address_state if isinstance(address_state, dict) else {}
session_context = navigation_state.get("session_context")
context = session_context if isinstance(session_context, dict) else {}
assistant_message = turn_artifact.get("assistant_message")
assistant_item = assistant_message if isinstance(assistant_message, dict) else {}
reply_type = assistant_item.get("reply_type")
assistant_text = str(assistant_item.get("text") or "")
top_non_empty = first_non_empty_lines(assistant_text, limit=3)
turn_scenario = turn_artifact.get("scenario") if isinstance(turn_artifact.get("scenario"), dict) else {}
effective_analysis_context = normalize_analysis_context(turn_scenario.get("analysis_context"))
if not effective_analysis_context:
effective_analysis_context = normalize_analysis_context(analysis_context)
if not effective_analysis_context:
effective_analysis_context = step.get("analysis_context") if isinstance(step.get("analysis_context"), dict) else {}
step_state = {
"schema_version": SCENARIO_STEP_STATE_SCHEMA_VERSION,
"scenario_id": scenario_id,
"domain": domain,
"step_id": step["step_id"],
"step_index": step_index,
"title": step["title"],
"depends_on": step["depends_on"],
"question_template": step["question_template"],
"question_resolved": question_resolved,
"analysis_context": effective_analysis_context,
"expected_intents": step.get("expected_intents") or [],
"expected_capability": step.get("expected_capability"),
"expected_recipe": step.get("expected_recipe"),
"expected_result_mode": step.get("expected_result_mode"),
"required_filters": step.get("required_filters") or {},
"forbidden_capabilities": step.get("forbidden_capabilities") or [],
"forbidden_recipes": step.get("forbidden_recipes") or [],
"required_state_objects": step.get("required_state_objects") or [],
"required_answer_shape": step.get("required_answer_shape"),
"forbidden_answer_patterns": step.get("forbidden_answer_patterns") or [],
"required_carryover_invariants": step.get("required_carryover_invariants") or [],
"invariant_severity": step.get("invariant_severity") or {},
"reply_type": reply_type,
"assistant_message_id": assistant_item.get("message_id"),
"trace_id": assistant_item.get("trace_id"),
"detected_mode": debug.get("detected_mode"),
"detected_intent": debug.get("detected_intent"),
"selected_recipe": debug.get("selected_recipe"),
"capability_id": debug.get("capability_id"),
"capability_route_mode": debug.get("capability_route_mode"),
"mcp_discovery_catalog_chain_alignment_status": debug.get("mcp_discovery_catalog_chain_alignment_status"),
"mcp_discovery_catalog_chain_top_match": debug.get("mcp_discovery_catalog_chain_top_match"),
"mcp_discovery_catalog_chain_selected_matches_top": debug.get("mcp_discovery_catalog_chain_selected_matches_top"),
"route_expectation_status": debug.get("route_expectation_status"),
"result_mode": debug.get("result_mode"),
"response_type": debug.get("response_type"),
"assistant_text": assistant_text,
"top_non_empty_lines": top_non_empty,
"actual_direct_answer": top_non_empty[0] if top_non_empty else None,
"extracted_filters": debug.get("extracted_filters") if isinstance(debug.get("extracted_filters"), dict) else {},
"focus_object": context.get("active_focus_object") if isinstance(context.get("active_focus_object"), dict) else None,
"fallback_type": debug.get("fallback_type"),
"mcp_call_status": debug.get("mcp_call_status"),
"balance_confirmed": debug.get("balance_confirmed"),
"active_result_set_id": context.get("active_result_set_id"),
"last_confirmed_route": context.get("last_confirmed_route"),
"date_scope": context.get("date_scope"),
"organization_scope": context.get("organization_scope"),
"entries": entries,
}
step_state["execution_status"] = derive_step_execution_status(reply_type if isinstance(reply_type, str) else None, debug)
step_state["acceptance_status"] = step_state["execution_status"]
step_state["status"] = step_state["acceptance_status"]
return validate_step_contract(step_state)
def save_scenario_step_bundle(
*,
step_dir: Path,
export_markdown: str,
turn_artifact: dict[str, Any],
session_record: dict[str, Any],
response_payload: dict[str, Any],
step_state: dict[str, Any],
) -> None:
debug_payload = turn_artifact.get("technical_debug_payload")
write_text(step_dir / "output.md", export_markdown)
write_json(step_dir / "debug.json", debug_payload if debug_payload is not None else {})
write_json(step_dir / "turn.json", turn_artifact)
write_json(step_dir / "session.json", session_record)
write_json(step_dir / "assistant_response.json", response_payload)
write_json(step_dir / "step_state.json", step_state)
write_text(step_dir / "resolved_question.txt", f"{step_state['question_resolved']}\n")
def derive_scenario_execution_status(step_outputs: dict[str, dict[str, Any]]) -> str:
statuses = [str(item.get("execution_status") or item.get("status") or "") for item in step_outputs.values()]
if not statuses:
return "blocked"
if any(status == "blocked" for status in statuses):
return "blocked"
if any(status == "needs_exact_capability" for status in statuses):
return "needs_exact_capability"
if any(status == "partial" for status in statuses):
return "partial"
return "exact"
def derive_scenario_status(step_outputs: dict[str, dict[str, Any]]) -> str:
statuses = [str(item.get("acceptance_status") or item.get("status") or "") for item in step_outputs.values()]
if not statuses:
return "blocked"
if any(status == "blocked" for status in statuses):
return "blocked"
if any(status == "needs_exact_capability" for status in statuses):
return "needs_exact_capability"
if any(status in {"partial", "rejected"} for status in statuses):
return "partial"
return "accepted" if all(status == "validated" for status in statuses) else "partial"
def build_scenario_summary(manifest: dict[str, Any], scenario_state: dict[str, Any], final_status: str, execution_status: str) -> str:
lines = [
"# Scenario summary",
"",
f"- scenario_id: `{manifest['scenario_id']}`",
f"- domain: `{manifest['domain']}`",
f"- title: {manifest['title']}",
f"- session_id: `{scenario_state.get('session_id') or 'n/a'}`",
f"- execution_status: `{execution_status}`",
f"- final_status: `{final_status}`",
"",
"## Steps",
]
for index, step in enumerate(manifest["steps"], start=1):
step_output = scenario_state.get("step_outputs", {}).get(step["step_id"], {})
lines.extend(
[
f"{index}. `{step['step_id']}` - {step['question_template']}",
f"execution_status: `{step_output.get('execution_status') or 'n/a'}`",
f"acceptance_status: `{step_output.get('acceptance_status') or step_output.get('status') or 'n/a'}`",
f"question_resolved: {step_output.get('question_resolved') or 'n/a'}",
f"intent: `{step_output.get('detected_intent') or 'n/a'}`",
f"recipe: `{step_output.get('selected_recipe') or 'n/a'}`",
f"capability: `{step_output.get('capability_id') or 'n/a'}`",
f"result_mode: `{step_output.get('result_mode') or 'n/a'}`",
f"result_set: `{step_output.get('active_result_set_id') or 'n/a'}`",
f"violated_invariants: {', '.join(step_output.get('violated_invariants') or []) or 'none'}",
"",
]
)
return "\n".join(lines).strip() + "\n"
def build_scenario_final_status(
manifest: dict[str, Any],
scenario_state: dict[str, Any],
final_status: str,
execution_status: str,
) -> str:
reason = {
"accepted": "all scenario steps executed in one assistant session with no unresolved route or capability gaps",
"partial": "scenario captured successfully, but at least one step still needs exact capability enablement or route hardening",
"needs_exact_capability": "scenario is valid for the project, but at least one step still requires exact capability or route enablement",
"blocked": "scenario run was interrupted by runtime or backend failure",
}.get(final_status, "scenario status unknown")
return textwrap.dedent(
f"""\
# Final status
- status: `{final_status}`
- execution_status: `{execution_status}`
- scenario_id: `{manifest['scenario_id']}`
- session_id: `{scenario_state.get('session_id') or 'n/a'}`
- reason: {reason}
"""
)
def run_assistant_step(
*,
args: argparse.Namespace,
domain: str,
scenario_id: str,
step: dict[str, Any],
step_index: int,
session_id: str | None,
question_resolved: str,
analysis_context: dict[str, Any],
) -> dict[str, Any]:
payload = build_assistant_message_payload(
args,
question=question_resolved,
session_id=session_id,
analysis_context=analysis_context,
)
response_payload = http_json(
f"{args.backend_url}/api/assistant/message",
method="POST",
payload=payload,
timeout=max(30, int(args.timeout_seconds)),
)
resolved_session_id = str(response_payload.get("session_id") or session_id or "").strip()
if not resolved_session_id:
raise RuntimeError(f"Assistant response for step `{step['step_id']}` does not contain session_id")
session_record = fetch_session_snapshot(args.backend_url, resolved_session_id, args.timeout_seconds)
conversation = extract_conversation_from_session(session_record)
export_markdown = build_conversation_export(resolved_session_id, conversation, mode="technical")
turn_artifact = build_turn_artifact(
slot="step",
domain=domain,
case_id=scenario_id,
question=question_resolved,
session_id=resolved_session_id,
conversation=conversation,
session_record=session_record,
job_record=None,
report_case=None,
export_file_name="output.md",
)
turn_artifact["schema_version"] = "domain_scenario_turn_artifact_v1"
turn_artifact["scenario"] = {
"scenario_id": scenario_id,
"step_id": step["step_id"],
"step_index": step_index,
"question_template": step["question_template"],
"question_resolved": question_resolved,
"depends_on": step["depends_on"],
"analysis_context": analysis_context,
}
last_assistant = find_last_assistant(conversation)
entries = extract_structured_entries(str(last_assistant.get("text") or ""))
step_state = build_scenario_step_state(
scenario_id=scenario_id,
domain=domain,
step=step,
step_index=step_index,
question_resolved=question_resolved,
analysis_context=analysis_context,
turn_artifact=turn_artifact,
entries=entries,
)
return {
"session_id": resolved_session_id,
"response_payload": response_payload,
"session_record": session_record,
"conversation": conversation,
"export_markdown": export_markdown,
"turn_artifact": turn_artifact,
"step_state": step_state,
}
def execute_scenario_manifest(
*,
args: argparse.Namespace,
manifest: dict[str, Any],
scenario_dir: Path,
manifest_source_label: str | None,
) -> tuple[dict[str, Any], str]:
steps_dir = scenario_dir / "steps"
steps_dir.mkdir(parents=True, exist_ok=True)
write_json(scenario_dir / "scenario_manifest.json", manifest)
if manifest_source_label:
write_text(scenario_dir / "manifest_source.txt", f"{manifest_source_label}\n")
ensure_scenario_brief(scenario_dir, manifest)
scenario_state: dict[str, Any] = {
"schema_version": SCENARIO_STATE_SCHEMA_VERSION,
"scenario_id": manifest["scenario_id"],
"domain": manifest["domain"],
"title": manifest["title"],
"session_id": None,
"analysis_context": manifest.get("analysis_context") or {},
"bindings": manifest.get("bindings") or {},
"step_outputs": {},
"semantic_memory": {},
"updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
}
write_json(scenario_dir / "scenario_state.json", scenario_state)
last_export_markdown = ""
for step_index, step in enumerate(manifest["steps"], start=1):
step_dir = steps_dir / step["step_id"]
step_analysis_context = merge_analysis_context(manifest.get("analysis_context"), step.get("analysis_context"))
step_analysis_context = carry_forward_analysis_context(
scenario_state,
step_analysis_context,
prefer_carryover=bool(step.get("depends_on")),
)
try:
resolved_question = resolve_question_template(step["question_template"], scenario_state)
result = run_assistant_step(
args=args,
domain=manifest["domain"],
scenario_id=manifest["scenario_id"],
step=step,
step_index=step_index,
session_id=scenario_state.get("session_id"),
question_resolved=resolved_question,
analysis_context=step_analysis_context,
)
except RuntimeError as exc:
failure_type = "runtime_error"
failure_status = "blocked"
if "Placeholder `" in str(exc):
failure_type = "placeholder_resolution_error"
failure_status = "needs_exact_capability"
step_state = build_failed_step_state(
scenario_id=manifest["scenario_id"],
domain=manifest["domain"],
step=step,
step_index=step_index,
question_resolved=step["question_template"],
status=failure_status,
failure_type=failure_type,
error_message=str(exc),
)
scenario_state["step_outputs"][step["step_id"]] = step_state
scenario_state["semantic_memory"] = {
**(scenario_state.get("semantic_memory") or {}),
"latest_step_id": step["step_id"],
"latest_step_status": step_state["status"],
}
scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
save_scenario_step_bundle(
step_dir=step_dir,
export_markdown="",
turn_artifact={},
session_record={},
response_payload={},
step_state=step_state,
)
write_json(scenario_dir / "scenario_state.json", scenario_state)
print(
f"[domain-case-loop] scenario {manifest['scenario_id']} step {step_index}/{len(manifest['steps'])}: "
f"{step['step_id']} -> {step_state['status']} ({failure_type})"
)
if failure_status == "blocked":
break
continue
scenario_state["session_id"] = result["session_id"]
scenario_state["step_outputs"][step["step_id"]] = result["step_state"]
previous_semantic_memory = scenario_state.get("semantic_memory") or {}
previous_date_scope = previous_semantic_memory.get("date_scope") if isinstance(previous_semantic_memory, dict) else None
current_date_scope = result["step_state"].get("date_scope")
scenario_state["semantic_memory"] = {
"latest_step_id": step["step_id"],
"latest_step_status": result["step_state"].get("status"),
"active_result_set_id": result["step_state"].get("active_result_set_id"),
"last_confirmed_route": result["step_state"].get("last_confirmed_route"),
"date_scope": merge_scenario_date_scope(
previous_date_scope,
current_date_scope,
depends_on=step.get("depends_on") or [],
),
"organization_scope": result["step_state"].get("organization_scope"),
"entries": result["step_state"].get("entries"),
}
scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
save_scenario_step_bundle(
step_dir=step_dir,
export_markdown=result["export_markdown"],
turn_artifact=result["turn_artifact"],
session_record=result["session_record"],
response_payload=result["response_payload"],
step_state=result["step_state"],
)
write_json(scenario_dir / "scenario_state.json", scenario_state)
last_export_markdown = result["export_markdown"]
print(
f"[domain-case-loop] scenario {manifest['scenario_id']} step {step_index}/{len(manifest['steps'])}: "
f"{step['step_id']} -> {result['step_state']['status']}"
)
execution_status = derive_scenario_execution_status(scenario_state["step_outputs"])
final_status = derive_scenario_status(scenario_state["step_outputs"])
write_text(scenario_dir / "scenario_output.md", last_export_markdown or "")
write_text(scenario_dir / "scenario_summary.md", build_scenario_summary(manifest, scenario_state, final_status, execution_status))
write_text(scenario_dir / "final_status.md", build_scenario_final_status(manifest, scenario_state, final_status, execution_status))
if scenario_state.get("session_id"):
write_text(scenario_dir / "session_id.txt", f"{scenario_state['session_id']}\n")
print(f"[domain-case-loop] saved scenario artifacts to {scenario_dir}")
print(f"[domain-case-loop] execution_status={execution_status} final_status={final_status}")
return scenario_state, final_status
def handle_run_case(args: argparse.Namespace) -> int:
ensure_backend_health(args.backend_url, args.timeout_seconds)
case_id = slugify_case_id(args.domain, args.case_id)
case_dir = Path(args.output_root).resolve() / case_id
case_dir.mkdir(parents=True, exist_ok=True)
ensure_case_brief(
case_dir,
domain=args.domain,
question=args.question,
expected_capability=args.expected_capability,
expected_result_mode=args.expected_result_mode,
)
payload: dict[str, Any] = {
"normalizeConfig": build_normalize_config(args),
"eval_target": "assistant_stage1",
"questions": [args.question],
"useMock": bool(args.use_mock),
"mode": "standard",
}
if args.analysis_date:
payload["analysis_date"] = args.analysis_date
start_response = http_json(f"{args.backend_url}/api/eval/run-async/start", method="POST", payload=payload)
job = start_response.get("job")
if not isinstance(job, dict):
raise RuntimeError("Async start response does not contain `job` object")
job_id = str(job.get("job_id") or "")
if not job_id:
raise RuntimeError("Async start response does not contain job_id")
final_job = wait_for_job(args.backend_url, job_id, args.timeout_seconds, args.poll_interval_seconds)
if str(final_job.get("status") or "") != "completed":
raise RuntimeError(f"Async job did not complete successfully: {final_job.get('status')}")
run_id = str(final_job.get("run_id") or "")
report_case_id = "AUTO-001"
session_id = f"{run_id}-{report_case_id}"
session_file = Path(args.sessions_dir).resolve() / f"{session_id}.json"
wait_for_file(session_file)
session_record = read_json_file(session_file)
conversation = extract_conversation_from_session(session_record)
export_markdown = build_conversation_export(session_id, conversation, mode="technical")
report_case = None
report_file = Path(args.reports_dir).resolve() / f"{run_id}.json"
if report_file.exists():
report_record = read_json_file(report_file)
report_case = extract_report_case(report_record, report_case_id)
turn_artifact = build_turn_artifact(
slot=args.slot,
domain=args.domain,
case_id=case_id,
question=args.question,
session_id=session_id,
conversation=conversation,
session_record=session_record,
job_record=final_job,
report_case=report_case,
export_file_name=f"{args.slot}_output.md",
)
save_capture_bundle(
case_dir=case_dir,
slot=args.slot,
export_markdown=export_markdown,
debug_payload=turn_artifact.get("technical_debug_payload"),
turn_artifact=turn_artifact,
session_record=session_record,
job_record=final_job,
report_case=report_case,
)
print(f"[domain-case-loop] saved {args.slot} artifacts to {case_dir}")
print(f"[domain-case-loop] session_id={session_id}")
return 0
def handle_import_export(args: argparse.Namespace) -> int:
export_text = Path(args.input).read_text(encoding="utf-8-sig")
session_id, conversation = parse_export_markdown(export_text)
case_id = slugify_case_id(args.domain, args.case_id)
case_dir = Path(args.output_root).resolve() / case_id
case_dir.mkdir(parents=True, exist_ok=True)
last_assistant = find_last_assistant(conversation)
last_user = find_last_user_before(conversation, last_assistant.get("message_id"))
question = args.question or (last_user.get("text") if isinstance(last_user, dict) else None)
ensure_case_brief(
case_dir,
domain=args.domain,
question=question,
expected_capability=args.expected_capability,
expected_result_mode=args.expected_result_mode,
)
turn_artifact = build_turn_artifact(
slot=args.slot,
domain=args.domain,
case_id=case_id,
question=question,
session_id=session_id,
conversation=conversation,
session_record=None,
job_record=None,
report_case=None,
export_file_name=f"{args.slot}_output.md",
)
save_capture_bundle(
case_dir=case_dir,
slot=args.slot,
export_markdown=export_text,
debug_payload=last_assistant.get("debug"),
turn_artifact=turn_artifact,
session_record=None,
job_record=None,
report_case=None,
)
print(f"[domain-case-loop] imported {args.slot} artifacts to {case_dir}")
return 0
def handle_run_scenario(args: argparse.Namespace) -> int:
ensure_backend_health(args.backend_url, args.timeout_seconds)
manifest_path = Path(args.manifest).resolve()
manifest = load_scenario_manifest(manifest_path)
if args.scenario_id:
manifest["scenario_id"] = args.scenario_id.strip()
if args.analysis_date:
manifest["analysis_context"] = merge_analysis_context(
manifest.get("analysis_context"),
{"as_of_date": args.analysis_date, "source": "cli_override"},
)
scenario_dir = Path(args.output_root).resolve() / manifest["scenario_id"]
execute_scenario_manifest(
args=args,
manifest=manifest,
scenario_dir=scenario_dir,
manifest_source_label=str(manifest_path),
)
return 0
def build_pack_summary(
pack: dict[str, Any],
scenario_results: list[dict[str, Any]],
final_status: str,
execution_status: str,
) -> str:
lines = [
"# Pack summary",
"",
f"- pack_id: `{pack['pack_id']}`",
f"- domain: `{pack['domain']}`",
f"- title: {pack['title']}",
f"- execution_status: `{execution_status}`",
f"- final_status: `{final_status}`",
"",
"## Scenarios",
]
for index, item in enumerate(scenario_results, start=1):
lines.extend(
[
f"{index}. `{item['scenario_id']}` - {item['title']}",
f"execution_status: `{item.get('execution_status') or 'n/a'}`",
f"acceptance_status: `{item['final_status']}`",
f"session_id: `{item.get('session_id') or 'n/a'}`",
f"artifact_dir: `{item['artifact_dir']}`",
"",
]
)
return "\n".join(lines).strip() + "\n"
def build_pack_final_status(
pack: dict[str, Any],
scenario_results: list[dict[str, Any]],
final_status: str,
execution_status: str,
) -> str:
expected_scenarios = len(pack.get("scenarios") or [])
executed_scenarios = len(scenario_results)
has_missing_scenarios = executed_scenarios < expected_scenarios
reason = {
"accepted": "all declared scenarios in the pack executed without blocked or missing-capability states",
"partial": "the pack executed, but at least one scenario still needs capability enablement or route hardening",
"needs_exact_capability": "the pack is valid, but at least one scenario still requires exact capability or route enablement",
"blocked": "the pack could not be completed because at least one scenario failed at runtime",
}.get(final_status, "pack status unknown")
if final_status == "accepted" and has_missing_scenarios:
reason = (
f"only {executed_scenarios}/{expected_scenarios} declared scenarios were executed; "
"missing scenarios keep the pack from being confirmed as accepted"
)
return textwrap.dedent(
f"""\
# Final status
- status: `{final_status}`
- execution_status: `{execution_status}`
- pack_id: `{pack['pack_id']}`
- domain: `{pack['domain']}`
- reason: {reason}
"""
)
def derive_coverage_status(statuses: list[str]) -> str:
normalized = [str(status or "").strip() for status in statuses if str(status or "").strip()]
if not normalized:
return "unmapped"
if all(status == "accepted" for status in normalized):
return "green"
if any(status == "blocked" for status in normalized):
return "blocked"
if any(status == "needs_exact_capability" for status in normalized):
return "needs_exact_capability"
return "partial"
def derive_pack_execution_status(scenario_results: list[dict[str, Any]]) -> str:
aggregate_statuses = [str(item.get("execution_status") or "") for item in scenario_results if isinstance(item, dict)]
if not aggregate_statuses:
return "blocked"
if any(status == "blocked" for status in aggregate_statuses):
return "blocked"
if any(status == "needs_exact_capability" for status in aggregate_statuses):
return "needs_exact_capability"
if any(status == "partial" for status in aggregate_statuses):
return "partial"
return "exact"
def derive_pack_final_status(pack: dict[str, Any], scenario_results: list[dict[str, Any]]) -> str:
aggregate_statuses = [item["final_status"] for item in scenario_results]
if not aggregate_statuses:
return "blocked"
if any(status == "blocked" for status in aggregate_statuses):
return "blocked"
if any(status == "needs_exact_capability" for status in aggregate_statuses):
return "needs_exact_capability"
if any(status == "partial" for status in aggregate_statuses):
return "partial"
acceptance_matrix = build_scenario_acceptance_matrix(pack, scenario_results)
if "| partial |" in acceptance_matrix:
return "partial"
return "accepted" if len(scenario_results) == len(pack.get("scenarios") or []) else "partial"
def build_scenario_acceptance_matrix(pack: dict[str, Any], scenario_results: list[dict[str, Any]]) -> str:
scenario_status_map = {
str(item.get("scenario_id") or ""): str(item.get("final_status") or "unknown")
for item in scenario_results
if isinstance(item, dict)
}
scenarios = pack.get("scenarios") if isinstance(pack.get("scenarios"), list) else []
question_pool = pack.get("question_pool") if isinstance(pack.get("question_pool"), dict) else {}
raw_questions = question_pool.get("questions") if isinstance(question_pool.get("questions"), list) else []
question_index: dict[str, dict[str, Any]] = {}
for raw_question in raw_questions:
if not isinstance(raw_question, dict):
continue
question_id = str(raw_question.get("question_id") or "").strip()
if question_id:
question_index[question_id] = raw_question
scenario_questions_map: dict[str, list[str]] = {}
scenario_nodes_map: dict[str, list[str]] = {}
scenario_wording_map: dict[str, list[str]] = {}
for scenario in scenarios:
if not isinstance(scenario, dict):
continue
scenario_id = str(scenario.get("scenario_id") or "").strip()
if not scenario_id:
continue
question_ids = normalize_string_list(scenario.get("question_ids"))
if not question_ids:
question_ids = [
str(step.get("question_id") or "").strip()
for step in scenario.get("steps", [])
if isinstance(step, dict) and str(step.get("question_id") or "").strip()
]
node_ids = normalize_string_list(scenario.get("node_ids"))
if not node_ids:
node_ids = [
str(step.get("node_id") or "").strip()
for step in scenario.get("steps", [])
if isinstance(step, dict) and str(step.get("node_id") or "").strip()
]
if not node_ids:
for question_id in question_ids:
question_meta = question_index.get(question_id) or {}
node_id = str(question_meta.get("node_id") or "").strip()
if node_id:
node_ids.append(node_id)
scenario_questions_map[scenario_id] = question_ids
scenario_nodes_map[scenario_id] = list(dict.fromkeys(node_ids))
scenario_wording_map[scenario_id] = _scenario_observed_wording_families(scenario)
scenario_tree = pack.get("scenario_tree") if isinstance(pack.get("scenario_tree"), dict) else {}
source_contract = pack.get("source_contract") if isinstance(pack.get("source_contract"), dict) else {}
all_nodes: list[dict[str, Any]] = []
for section_key in ("root_nodes", "critical_nodes", "supporting_nodes"):
raw_nodes = scenario_tree.get(section_key)
if isinstance(raw_nodes, list):
all_nodes.extend(node for node in raw_nodes if isinstance(node, dict))
lines = [
"# Scenario acceptance matrix",
"",
f"- pack_id: `{pack.get('pack_id') or 'n/a'}`",
f"- domain: `{pack.get('domain') or 'n/a'}`",
f"- source_contract_id: `{source_contract.get('domain_id') or pack.get('source_contract_id') or 'n/a'}`",
f"- source_contract_title: {source_contract.get('title') or pack.get('title') or 'n/a'}",
"",
"## Scenario coverage",
"",
"| scenario_id | status | question_ids | node_ids |",
"| --- | --- | --- | --- |",
]
for scenario in scenarios:
if not isinstance(scenario, dict):
continue
scenario_id = str(scenario.get("scenario_id") or "").strip()
if not scenario_id:
continue
lines.append(
"| "
+ " | ".join(
[
scenario_id,
scenario_status_map.get(scenario_id, "not_run"),
", ".join(scenario_questions_map.get(scenario_id) or []) or "-",
", ".join(scenario_nodes_map.get(scenario_id) or []) or "-",
]
)
+ " |"
)
def append_node_section(title: str, section_key: str) -> None:
raw_nodes = scenario_tree.get(section_key)
nodes = raw_nodes if isinstance(raw_nodes, list) else []
if not nodes:
return
lines.extend(
[
"",
f"## {title}",
"",
"| node_id | status | backed_by_scenarios | question_ids | required_wording_families | observed_wording_families | missing_wording_families |",
"| --- | --- | --- | --- | --- | --- | --- |",
]
)
for node in nodes:
if not isinstance(node, dict):
continue
node_id = str(node.get("node_id") or "").strip()
if not node_id:
continue
backed_by = sorted(
scenario_id for scenario_id, node_ids in scenario_nodes_map.items() if node_id in node_ids
)
statuses = [scenario_status_map.get(scenario_id, "not_run") for scenario_id in backed_by]
required_wording_families = normalize_string_list(node.get("required_wording_families"))
observed_wording_families = sorted(
{family for scenario_id in backed_by for family in scenario_wording_map.get(scenario_id, [])}
)
missing_wording_families = [family for family in required_wording_families if family not in observed_wording_families]
status = derive_coverage_status(statuses)
if status == "green" and missing_wording_families:
status = "partial"
lines.append(
"| "
+ " | ".join(
[
node_id,
status,
", ".join(backed_by) or "-",
", ".join(normalize_string_list(node.get("covers_question_ids"))) or "-",
", ".join(required_wording_families) or "-",
", ".join(observed_wording_families) or "-",
", ".join(missing_wording_families) or "-",
]
)
+ " |"
)
append_node_section("Root nodes", "root_nodes")
append_node_section("Critical nodes", "critical_nodes")
raw_edges = scenario_tree.get("critical_edges")
edges = raw_edges if isinstance(raw_edges, list) else []
if edges:
lines.extend(
[
"",
"## Critical edges",
"",
"| edge_id | status | from_node | to_node | backed_by_scenarios | primary_user_path | observed_wording_families | missing_wording_families |",
"| --- | --- | --- | --- | --- | --- | --- | --- |",
]
)
for edge in edges:
if not isinstance(edge, dict):
continue
edge_id = str(edge.get("edge_id") or "").strip()
from_node = str(edge.get("from_node") or "").strip()
to_node = str(edge.get("to_node") or "").strip()
if not edge_id:
continue
backed_by = sorted(
scenario_id
for scenario_id, node_ids in scenario_nodes_map.items()
if from_node in node_ids and to_node in node_ids
)
statuses = [scenario_status_map.get(scenario_id, "not_run") for scenario_id in backed_by]
from_required = []
to_required = []
for node in all_nodes:
node_id = str(node.get("node_id") or "").strip()
if node_id == from_node:
from_required = normalize_string_list(node.get("required_wording_families"))
elif node_id == to_node:
to_required = normalize_string_list(node.get("required_wording_families"))
observed_wording_families = sorted(
{family for scenario_id in backed_by for family in scenario_wording_map.get(scenario_id, [])}
)
edge_required_families = list(dict.fromkeys(from_required + [family for family in to_required if family not in from_required]))
missing_wording_families = [family for family in edge_required_families if family not in observed_wording_families]
status = derive_coverage_status(statuses)
if status == "green" and missing_wording_families:
status = "partial"
lines.append(
"| "
+ " | ".join(
[
edge_id,
status,
from_node or "-",
to_node or "-",
", ".join(backed_by) or "-",
"yes" if bool(edge.get("primary_user_path")) else "no",
", ".join(observed_wording_families) or "-",
", ".join(missing_wording_families) or "-",
]
)
+ " |"
)
raw_paths = scenario_tree.get("primary_user_paths")
paths = raw_paths if isinstance(raw_paths, list) else []
if paths:
lines.extend(
[
"",
"## Primary user paths",
"",
"| path_id | status | nodes | backed_by_scenarios |",
"| --- | --- | --- | --- |",
]
)
for path in paths:
if not isinstance(path, dict):
continue
path_id = str(path.get("path_id") or "").strip()
node_ids = normalize_string_list(path.get("nodes"))
backed_by = sorted(
scenario_id
for scenario_id, scenario_node_ids in scenario_nodes_map.items()
if node_ids and all(node_id in scenario_node_ids for node_id in node_ids)
)
statuses = [scenario_status_map.get(scenario_id, "not_run") for scenario_id in backed_by]
lines.append(
"| "
+ " | ".join(
[
path_id or "-",
derive_coverage_status(statuses),
" -> ".join(node_ids) or "-",
", ".join(backed_by) or "-",
]
)
+ " |"
)
return "\n".join(lines).strip() + "\n"
def run_subprocess_command(
command: list[str],
*,
cwd: Path,
timeout_seconds: int,
input_text: str | None = None,
stdout_path: Path | None = None,
stderr_path: Path | None = None,
) -> subprocess.CompletedProcess[str]:
result = subprocess.run(
command,
cwd=str(cwd),
input=input_text,
text=True,
encoding="utf-8",
errors="replace",
capture_output=True,
timeout=timeout_seconds,
check=False,
)
if stdout_path is not None:
write_text(stdout_path, result.stdout)
if stderr_path is not None:
write_text(stderr_path, result.stderr)
if result.returncode != 0:
raise RuntimeError(
f"Command failed with exit code {result.returncode}: {' '.join(command)}\n{result.stderr.strip()}"
)
return result
def build_run_pack_command(
args: argparse.Namespace,
*,
manifest_path: Path,
pack_id: str,
output_root: Path,
) -> list[str]:
command = [
sys.executable,
str(Path(__file__).resolve()),
"run-pack",
"--manifest",
str(manifest_path),
"--pack-id",
pack_id,
"--output-root",
str(output_root),
"--backend-url",
str(args.backend_url),
"--prompt-version",
str(args.prompt_version),
"--llm-provider",
str(args.llm_provider),
"--llm-model",
str(args.llm_model),
"--llm-base-url",
str(args.llm_base_url),
"--llm-api-key",
str(args.llm_api_key),
"--temperature",
str(args.temperature),
"--max-output-tokens",
str(args.max_output_tokens),
"--timeout-seconds",
str(args.timeout_seconds),
]
if getattr(args, "analysis_date", None):
command.extend(["--analysis-date", str(args.analysis_date)])
max_scenarios = getattr(args, "max_scenarios", None)
if max_scenarios is not None:
command.extend(["--max-scenarios", str(int(max_scenarios))])
if bool(getattr(args, "use_mock", False)):
command.append("--use-mock")
return command
def build_codex_exec_command(
args: argparse.Namespace,
*,
output_file: Path,
schema_file: Path,
sandbox_mode: str,
model_override: str | None = None,
reasoning_effort: str | None = None,
) -> list[str]:
command = [
str(args.codex_binary),
"exec",
"-C",
str(REPO_ROOT),
"-s",
sandbox_mode,
"-c",
'approval_policy="never"',
"--output-schema",
str(schema_file),
"-o",
str(output_file),
"--color",
"never",
]
if reasoning_effort:
command.extend(["-c", f'model_reasoning_effort="{reasoning_effort}"'])
if getattr(args, "codex_profile", None):
command.extend(["-p", str(args.codex_profile)])
selected_model = model_override or getattr(args, "codex_model", None)
if selected_model:
command.extend(["-m", str(selected_model)])
return command
def read_json_output(file_path: Path) -> dict[str, Any]:
payload = json.loads(read_text_file(file_path))
if not isinstance(payload, dict):
raise RuntimeError(f"Expected JSON object in {file_path}")
return payload
def compact_step_output_for_review(step_output: Any) -> dict[str, Any]:
if not isinstance(step_output, dict):
return {}
entry_titles_sample: list[str] = []
entries = step_output.get("entries")
if isinstance(entries, list):
for item in entries[:5]:
if not isinstance(item, dict):
continue
title = str(item.get("item") or item.get("title") or "").strip()
if title:
entry_titles_sample.append(title)
return {
"status": step_output.get("status"),
"execution_status": step_output.get("execution_status"),
"acceptance_status": step_output.get("acceptance_status"),
"question_resolved": step_output.get("question_resolved"),
"detected_intent": step_output.get("detected_intent"),
"selected_recipe": step_output.get("selected_recipe"),
"capability_id": step_output.get("capability_id"),
"result_mode": step_output.get("result_mode"),
"answer_shape": step_output.get("answer_shape"),
"actual_direct_answer": step_output.get("actual_direct_answer"),
"violated_invariants": step_output.get("violated_invariants"),
"warnings": step_output.get("warnings"),
"fallback_type": step_output.get("fallback_type"),
"mcp_call_status": step_output.get("mcp_call_status"),
"failure_type": step_output.get("failure_type"),
"error_message": step_output.get("error_message"),
"entry_titles_sample": entry_titles_sample,
}
def collect_pack_scenario_artifacts(pack_dir: Path) -> list[dict[str, Any]]:
scenarios_root = pack_dir / "scenarios"
artifacts: list[dict[str, Any]] = []
if not scenarios_root.exists():
return artifacts
for scenario_dir in sorted(path for path in scenarios_root.iterdir() if path.is_dir()):
scenario_state = read_json_file(scenario_dir / "scenario_state.json") if (scenario_dir / "scenario_state.json").exists() else {}
artifacts.append(
{
"scenario_id": scenario_state.get("scenario_id") or scenario_dir.name,
"title": scenario_state.get("title"),
"session_id": scenario_state.get("session_id"),
"artifact_dir": str(scenario_dir),
"summary": read_text_file(scenario_dir / "scenario_summary.md") if (scenario_dir / "scenario_summary.md").exists() else "",
"scenario_state": scenario_state,
}
)
return artifacts
def derive_repair_target_severity(step_output: dict[str, Any]) -> str:
if bool(step_output.get("hard_fail")):
return "P0"
violated_invariants = normalize_string_list(step_output.get("violated_invariants"))
if any(derive_invariant_severity(step_output, code) == "P0" for code in violated_invariants):
return "P0"
execution_status = str(step_output.get("execution_status") or "").strip()
acceptance_status = str(step_output.get("acceptance_status") or step_output.get("status") or "").strip()
reply_type = str(step_output.get("reply_type") or "").strip()
if execution_status == "blocked":
return "P0"
if acceptance_status in {"rejected", "needs_exact_capability"}:
return "P1"
if execution_status in {"partial", "needs_exact_capability"} or reply_type == "partial_coverage":
return "P1"
if normalize_string_list(step_output.get("warnings")):
return "P2"
return "P2"
def derive_repair_problem_type(step_output: dict[str, Any]) -> str:
violated = set(normalize_string_list(step_output.get("violated_invariants")))
execution_status = str(step_output.get("execution_status") or "").strip()
acceptance_status = str(step_output.get("acceptance_status") or step_output.get("status") or "").strip()
reply_type = str(step_output.get("reply_type") or "").strip()
fallback_type = str(step_output.get("fallback_type") or "").strip()
mcp_call_status = str(step_output.get("mcp_call_status") or "").strip()
if "wrong_followup_action" in violated:
return "followup_action_resolution_gap"
if "focus_object_missing" in violated:
return "object_memory_gap"
if "wrong_date_scope_state" in violated:
return "edge_carryover_gap"
if {"wrong_as_of_date", "wrong_period_from", "wrong_period_to"} & violated:
return "temporal_honesty_gap"
if {
"wrong_intent",
"wrong_capability",
"wrong_recipe",
"wrong_result_mode",
"forbidden_capability_selected",
"forbidden_recipe_selected",
} & violated:
return "route_gap"
if {"direct_answer_missing", "top_level_noise_present"} & violated:
return "presentation_gap"
if mcp_call_status == "materialized_but_not_anchor_matched":
return "domain_anchor_gap"
if acceptance_status == "needs_exact_capability" or execution_status == "needs_exact_capability":
return "capability_gap"
if reply_type in {"partial_coverage", "clarification_required", "route_mismatch_blocked"} or fallback_type == "partial":
return "evidence_gap"
return "other"
def derive_repair_root_cause_layers(step_output: dict[str, Any], problem_type: str) -> list[str]:
violated = set(normalize_string_list(step_output.get("violated_invariants")))
layers: list[str] = []
if problem_type == "followup_action_resolution_gap":
layers.append("followup_action_resolution_gap")
if "focus_object_missing" in violated:
layers.append("object_memory_gap")
elif problem_type == "object_memory_gap":
layers.append("object_memory_gap")
elif problem_type == "edge_carryover_gap":
layers.append("edge_carryover_gap")
if "wrong_as_of_date" in violated or "wrong_period_from" in violated or "wrong_period_to" in violated:
layers.append("temporal_honesty_gap")
elif problem_type == "temporal_honesty_gap":
layers.append("temporal_honesty_gap")
if "wrong_date_scope_state" in violated:
layers.append("edge_carryover_gap")
elif problem_type == "route_gap":
layers.append("semantic_understanding_gap")
elif problem_type == "capability_gap":
layers.append("runtime_capability_gap")
elif problem_type == "presentation_gap":
layers.append("business_utility_gap")
if str(step_output.get("required_answer_shape") or "").strip():
layers.append("answer_shape_mismatch")
elif problem_type == "evidence_gap":
layers.append("runtime_capability_gap")
elif problem_type == "domain_anchor_gap":
layers.append("domain_anchor_gap")
else:
layers.append("other")
return list(dict.fromkeys(layers))
def build_repair_fix_goal(step_output: dict[str, Any], problem_type: str) -> str:
question = str(step_output.get("question_resolved") or step_output.get("title") or step_output.get("step_id") or "").strip()
if problem_type == "followup_action_resolution_gap":
return f"Resolve `{question}` on the current business object and keep the requested micro-action instead of drifting to another drilldown."
if problem_type == "object_memory_gap":
return f"Preserve the selected business object for `{question}` so the follow-up resolves without re-anchoring from scratch."
if problem_type == "edge_carryover_gap":
return f"Carry forward the selected-object state and historical date scope into `{question}` without resetting the follow-up context."
if problem_type == "temporal_honesty_gap":
return f"Keep `{question}` on the requested historical date/period and separate exact-window evidence from nearest available out-of-window evidence."
if problem_type == "route_gap":
return f"Keep `{question}` on the expected exact route/capability instead of letting wording drift into a different semantic lane."
if problem_type == "capability_gap":
return f"Enable an exact route for `{question}` so the loop no longer falls back to partial or unsupported behavior."
if problem_type == "presentation_gap":
return f"Make `{question}` answer-first: direct business answer in the first line, proof second, service notes last."
if problem_type == "evidence_gap":
return f"Return grounded evidence for `{question}` instead of a limited empty response when the correct route already fires."
if problem_type == "domain_anchor_gap":
return f"Match the selected business anchor for `{question}` against materialized rows so the exact route returns a grounded answer instead of an anchor-mismatch limit."
return f"Improve `{question}` with the smallest patch that removes the current acceptance failure without architecture drift."
def build_step_repair_target(
*,
scenario_id: str,
scenario_title: str,
scenario_dir: Path,
step_id: str,
step_output: dict[str, Any],
) -> dict[str, Any] | None:
acceptance_status = str(step_output.get("acceptance_status") or step_output.get("status") or "").strip() or "unknown"
execution_status = str(step_output.get("execution_status") or "").strip() or "unknown"
violated_invariants = normalize_string_list(step_output.get("violated_invariants"))
warnings = normalize_string_list(step_output.get("warnings"))
if acceptance_status in {"validated", "accepted"} and execution_status == "exact" and not violated_invariants and not warnings:
return None
problem_type = derive_repair_problem_type(step_output)
severity = derive_repair_target_severity(step_output)
root_cause_layers = derive_repair_root_cause_layers(step_output, problem_type)
step_state_path = scenario_dir / "steps" / step_id / "step_state.json"
signals: list[str] = []
for field_name in ("reply_type", "fallback_type", "mcp_call_status", "selected_recipe", "capability_id"):
value = str(step_output.get(field_name) or "").strip()
if value:
signals.append(f"{field_name}={value}")
for violation in violated_invariants:
signals.append(f"violation={violation}")
for warning in warnings[:3]:
signals.append(f"warning={warning}")
return {
"target_id": f"{scenario_id}:{step_id}",
"scenario_id": scenario_id,
"scenario_title": scenario_title,
"step_id": step_id,
"step_title": str(step_output.get("title") or "").strip() or None,
"question_resolved": str(step_output.get("question_resolved") or "").strip() or None,
"severity": severity,
"problem_type": problem_type,
"root_cause_layers": root_cause_layers,
"execution_status": execution_status,
"acceptance_status": acceptance_status,
"violated_invariants": violated_invariants,
"fix_goal": build_repair_fix_goal(step_output, problem_type),
"candidate_files": REPAIR_TARGET_FILE_HINTS.get(problem_type, REPAIR_TARGET_FILE_HINTS["other"]),
"signals": signals,
"artifact_refs": {
"scenario_dir": str(scenario_dir),
"step_state_json": str(step_state_path),
},
}
def build_repair_focus_signature(target: dict[str, Any]) -> str:
problem_type = str(target.get("problem_type") or "other").strip() or "other"
candidate_files = normalize_string_list(target.get("candidate_files"))
primary_file = candidate_files[0] if candidate_files else "no_file_hint"
return f"{problem_type}|{primary_file}"
def build_priority_repair_foci(targets: list[dict[str, Any]]) -> list[dict[str, Any]]:
grouped: dict[str, dict[str, Any]] = {}
for target in targets:
focus_id = build_repair_focus_signature(target)
focus = grouped.setdefault(
focus_id,
{
"focus_id": focus_id,
"severity": str(target.get("severity") or "P2"),
"problem_type": str(target.get("problem_type") or "other"),
"root_cause_layers": normalize_string_list(target.get("root_cause_layers")),
"candidate_files": normalize_string_list(target.get("candidate_files")),
"target_ids": [],
"scenario_ids": set(),
},
)
focus["target_ids"].append(str(target.get("target_id") or ""))
scenario_id = str(target.get("scenario_id") or "").strip()
if scenario_id:
focus["scenario_ids"].add(scenario_id)
priority_foci: list[dict[str, Any]] = []
for focus in grouped.values():
scenario_ids = sorted(focus.pop("scenario_ids"))
target_ids = [target_id for target_id in focus.get("target_ids", []) if target_id]
focus["target_count"] = len(target_ids)
focus["scenario_count"] = len(scenario_ids)
focus["target_ids"] = target_ids
focus["scenario_ids"] = scenario_ids
priority_foci.append(focus)
priority_foci.sort(
key=lambda item: (
REPAIR_TARGET_SEVERITY_ORDER.get(str(item.get("severity") or "P2"), 99),
-int(item.get("target_count") or 0),
-int(item.get("scenario_count") or 0),
REPAIR_TARGET_PROBLEM_ORDER.get(str(item.get("problem_type") or "other"), 99),
str(item.get("focus_id") or ""),
)
)
for index, focus in enumerate(priority_foci, start=1):
primary_file = normalize_string_list(focus.get("candidate_files"))[:1]
focus["focus_rank"] = index
focus["rank_reason"] = (
f"severity={focus.get('severity')} targets={focus.get('target_count')} "
f"scenarios={focus.get('scenario_count')} primary_file={primary_file[0] if primary_file else 'n/a'}"
)
return priority_foci
def build_deterministic_repair_targets(
pack_state: dict[str, Any],
scenario_artifacts: list[dict[str, Any]],
) -> dict[str, Any]:
targets: list[dict[str, Any]] = []
for scenario_artifact in scenario_artifacts:
scenario_id = str(scenario_artifact.get("scenario_id") or "").strip()
scenario_title = str(scenario_artifact.get("title") or "").strip()
scenario_dir = Path(str(scenario_artifact.get("artifact_dir") or ""))
scenario_state = scenario_artifact.get("scenario_state")
if not isinstance(scenario_state, dict):
continue
step_outputs = scenario_state.get("step_outputs")
if not isinstance(step_outputs, dict):
continue
for step_id, raw_step_output in step_outputs.items():
if not isinstance(raw_step_output, dict):
continue
target = build_step_repair_target(
scenario_id=scenario_id,
scenario_title=scenario_title,
scenario_dir=scenario_dir,
step_id=str(step_id),
step_output=raw_step_output,
)
if target:
targets.append(target)
priority_foci = build_priority_repair_foci(targets)
focus_rank_by_id = {
str(focus.get("focus_id") or ""): int(focus.get("focus_rank") or 999)
for focus in priority_foci
if isinstance(focus, dict)
}
focus_target_count_by_id = {
str(focus.get("focus_id") or ""): int(focus.get("target_count") or 0)
for focus in priority_foci
if isinstance(focus, dict)
}
focus_scenario_count_by_id = {
str(focus.get("focus_id") or ""): int(focus.get("scenario_count") or 0)
for focus in priority_foci
if isinstance(focus, dict)
}
for target in targets:
focus_id = build_repair_focus_signature(target)
target["repair_focus_id"] = focus_id
target["repair_focus_rank"] = focus_rank_by_id.get(focus_id, 999)
target["repair_focus_target_count"] = focus_target_count_by_id.get(focus_id, 0)
target["repair_focus_scenario_count"] = focus_scenario_count_by_id.get(focus_id, 0)
targets.sort(
key=lambda item: (
REPAIR_TARGET_SEVERITY_ORDER.get(str(item.get("severity") or "P2"), 99),
int(item.get("repair_focus_rank") or 999),
-int(item.get("repair_focus_target_count") or 0),
-int(item.get("repair_focus_scenario_count") or 0),
REPAIR_TARGET_PROBLEM_ORDER.get(str(item.get("problem_type") or "other"), 99),
str(item.get("scenario_id") or ""),
str(item.get("step_id") or ""),
)
)
severity_counts = {"P0": 0, "P1": 0, "P2": 0}
for target in targets:
severity = str(target.get("severity") or "P2")
if severity in severity_counts:
severity_counts[severity] += 1
return {
"schema_version": "domain_pack_repair_targets_v1",
"pack_id": pack_state.get("pack_id"),
"domain": pack_state.get("domain"),
"final_status": pack_state.get("final_status"),
"target_count": len(targets),
"severity_counts": severity_counts,
"priority_foci": priority_foci,
"targets": targets,
}
def select_primary_repair_focus(repair_targets: dict[str, Any]) -> dict[str, Any] | None:
if not isinstance(repair_targets, dict):
return None
priority_foci = repair_targets.get("priority_foci")
if not isinstance(priority_foci, list) or not priority_foci:
return None
primary_focus = priority_foci[0]
return primary_focus if isinstance(primary_focus, dict) else None
def build_repair_targets_summary(repair_targets: dict[str, Any]) -> str:
lines = [
"# Repair targets",
"",
f"- pack_id: `{repair_targets.get('pack_id') or 'n/a'}`",
f"- domain: `{repair_targets.get('domain') or 'n/a'}`",
f"- target_count: `{repair_targets.get('target_count') or 0}`",
f"- severity_counts: `{dump_json(repair_targets.get('severity_counts') or {})}`",
]
priority_foci = repair_targets.get("priority_foci") or []
if isinstance(priority_foci, list) and priority_foci:
lines.extend(
[
"",
"## Priority foci",
]
)
for focus in priority_foci:
if not isinstance(focus, dict):
continue
lines.extend(
[
f"- `{focus.get('focus_id')}`",
f" focus_rank: `{focus.get('focus_rank')}`",
f" severity: `{focus.get('severity')}`",
f" problem_type: `{focus.get('problem_type')}`",
f" target_count: `{focus.get('target_count')}`",
f" scenario_count: `{focus.get('scenario_count')}`",
f" candidate_files: {', '.join(focus.get('candidate_files') or []) or 'none'}",
f" rank_reason: {focus.get('rank_reason') or 'n/a'}",
]
)
lines.extend(
[
"",
"## Targets",
]
)
for target in repair_targets.get("targets") or []:
if not isinstance(target, dict):
continue
lines.extend(
[
f"- `{target.get('target_id')}`",
f" severity: `{target.get('severity')}`",
f" problem_type: `{target.get('problem_type')}`",
f" repair_focus_rank: `{target.get('repair_focus_rank')}`",
f" repair_focus_target_count: `{target.get('repair_focus_target_count')}`",
f" root_cause_layers: {', '.join(target.get('root_cause_layers') or []) or 'none'}",
f" fix_goal: {target.get('fix_goal') or 'n/a'}",
f" candidate_files: {', '.join(target.get('candidate_files') or []) or 'none'}",
]
)
return "\n".join(lines).strip() + "\n"
def evaluate_deterministic_loop_gate(
pack_state: dict[str, Any],
repair_targets: dict[str, Any],
) -> tuple[bool, str]:
pack_final_status = str(pack_state.get("final_status") or "").strip() or "partial"
if pack_final_status != "accepted":
return False, f"pack_final_status={pack_final_status}"
severity_counts = repair_targets.get("severity_counts") if isinstance(repair_targets, dict) else {}
if isinstance(severity_counts, dict):
p0_count = int(severity_counts.get("P0") or 0)
p1_count = int(severity_counts.get("P1") or 0)
if p0_count > 0 or p1_count > 0:
return False, f"repair_targets_remaining=P0:{p0_count},P1:{p1_count}"
return True, "deterministic_gate_passed"
def build_pack_review_bundle(pack_dir: Path) -> str:
pack_state = read_json_file(pack_dir / "pack_state.json") if (pack_dir / "pack_state.json").exists() else {}
scenario_artifacts = collect_pack_scenario_artifacts(pack_dir)
scenarios_bundle: list[dict[str, Any]] = []
for scenario_artifact in scenario_artifacts:
scenario_state = scenario_artifact.get("scenario_state") if isinstance(scenario_artifact.get("scenario_state"), dict) else {}
step_outputs_raw = scenario_state.get("step_outputs") if isinstance(scenario_state, dict) else {}
compact_steps: dict[str, Any] = {}
if isinstance(step_outputs_raw, dict):
for step_id, step_output in step_outputs_raw.items():
compact_steps[str(step_id)] = compact_step_output_for_review(step_output)
scenarios_bundle.append(
{
"scenario_id": scenario_artifact.get("scenario_id"),
"title": scenario_artifact.get("title"),
"session_id": scenario_artifact.get("session_id"),
"artifact_dir": scenario_artifact.get("artifact_dir"),
"summary": scenario_artifact.get("summary") or "",
"step_outputs": compact_steps,
}
)
repair_targets = (
read_json_file(pack_dir / "repair_targets.json")
if (pack_dir / "repair_targets.json").exists()
else build_deterministic_repair_targets(pack_state, scenario_artifacts)
)
bundle = {
"pack_state": {
"pack_id": pack_state.get("pack_id"),
"domain": pack_state.get("domain"),
"title": pack_state.get("title"),
"execution_status": pack_state.get("execution_status"),
"final_status": pack_state.get("final_status"),
"scenario_results": pack_state.get("scenario_results"),
},
"pack_manifest": read_json_file(pack_dir / "pack_manifest.json") if (pack_dir / "pack_manifest.json").exists() else {},
"pack_summary": read_text_file(pack_dir / "pack_summary.md") if (pack_dir / "pack_summary.md").exists() else "",
"scenario_acceptance_matrix": (
read_text_file(pack_dir / "scenario_acceptance_matrix.md")
if (pack_dir / "scenario_acceptance_matrix.md").exists()
else ""
),
"deterministic_repair_targets": repair_targets,
"scenarios": scenarios_bundle,
}
return dump_json(bundle)
def _scenario_observed_wording_families(scenario: dict[str, Any]) -> list[str]:
families: list[str] = []
steps = scenario.get("steps")
if not isinstance(steps, list):
return families
for step in steps:
if not isinstance(step, dict):
continue
family = str(step.get("paraphrase_family") or step.get("wording_family") or "").strip()
if family:
families.append(family)
return list(dict.fromkeys(families))
def build_analyst_loop_prompt(
*,
loop_dir: Path,
iteration_dir: Path,
pack_dir: Path,
repair_targets_path: Path,
previous_pack_dir: Path | None,
previous_verdict_path: Path | None,
target_score: int,
review_bundle_json: str,
repair_targets_json: str,
previous_verdict_json: str | None,
) -> str:
comparison_block = ""
if previous_pack_dir is not None:
comparison_block = textwrap.dedent(
f"""\
Compare the current run with the previous run:
- previous_pack_dir: `{previous_pack_dir}`
"""
)
if previous_verdict_path is not None and previous_verdict_path.exists():
comparison_block += f"- previous_analyst_verdict: `{previous_verdict_path}`\n"
previous_verdict_block = ""
if previous_verdict_json:
previous_verdict_block = textwrap.dedent(
f"""\
Previous analyst verdict JSON:
```json
{previous_verdict_json}
```
"""
)
return textwrap.dedent(
f"""\
You are the strict `domain_analyst` for NDC_1C.
Use the repo rules from:
- `.codex/agents/domain_analyst.toml`
- `.codex/skills/domain-case-loop/SKILL.md`
- `.codex/skills/domain-case-loop/references/verdict_template.md`
- `.codex/skills/domain-case-loop/references/business_first_analyst_rubric.md`
Current loop context:
- loop_dir: `{loop_dir}`
- iteration_dir: `{iteration_dir}`
- current_pack_dir: `{pack_dir}`
{comparison_block}
Required artifacts to inspect:
- `{pack_dir / 'pack_summary.md'}`
- `{pack_dir / 'pack_state.json'}`
- `{pack_dir / 'scenario_acceptance_matrix.md'}`
- `{repair_targets_path}`
- all `scenario_summary.md`, `scenario_state.json`, and problematic `steps/*/step_state.json` files inside `{pack_dir / 'scenarios'}`
Goal:
- evaluate current domain-pack correctness for business meaning, route/capability quality, evidence quality, and absence of silent heuristic masking;
- evaluate business usefulness, direct-answer-first behavior, state continuity, and field truthfulness, not only technical groundedness;
- evaluate object-centric dialog continuity: stable `focus_object`, stable `answer_object`, reusable bundles such as `provenance_bundle`, and correct action resolution for pronoun-style follow-ups;
- evaluate action-first follow-up behavior, answer layering, compactness of narrow micro-actions, and temporal honesty when the runtime broadens beyond the requested date window;
- determine whether the gate `quality_score >= {target_score}` is reached;
- if not, provide the smallest high-value fix targets for the coder.
Rules:
- `accepted` is allowed only if quality_score >= {target_score}, unresolved_p0_count = 0, and regression_detected = false;
- `accepted` is forbidden if the evidence bundle shows `pack_state.final_status != accepted` or the deterministic repair targets still contain any `P0` or `P1` items;
- `accepted` also requires `direct_answer_ok = true`, `business_usefulness_ok = true`, `temporal_honesty_ok = true`, and `field_truth_ok = true`;
- when several failing steps share one deterministic repair focus, call out the highest-leverage shared focus first instead of only the lexicographically first failing step;
- `partial` means the pack is usable but exactness, routing, or coverage is still insufficient;
- `needs_exact_capability` means the primary blocker is a missing exact route or capability, but the loop should still continue autonomously unless a user decision is required;
- `continue` means there is a clear next patch cycle;
- `blocked` means the loop is stopped by a real hard blocker such as runtime/infrastructure failure, unavailable 1C data, or another condition that the repo cannot autonomously repair;
- set `requires_user_decision = true` when the next step cannot be chosen safely without user input, for example:
- an architecture fork or risky contour expansion;
- a scope tradeoff or important business ambiguity;
- a required observation anchor is missing and cannot be recovered safely from artifacts, 1C, or the current scenario state;
- the only remaining implementation path would rely on a hack, brittle workaround, heuristic masking, or disproportionate complexity/risk;
- if `requires_user_decision = true`, fill `user_decision_type` and `user_decision_prompt`;
- if the pack is below {target_score} but there is still safe autonomous implementation work, keep `requires_user_decision = false`;
- do not request user input merely because the score is still below {target_score}; request it only when the loop would otherwise guess, overfit, or risk architecture drift.
- return machine-readable fields for: `user_intent_summary`, `expected_direct_answer`, `actual_direct_answer`, `direct_answer_ok`, `business_usefulness_ok`, `business_utility_score`, `direct_answer_priority_score`, `state_continuity_score`, `answer_shape_score`, `evidence_clarity_score`, `focus_object_continuity_ok`, `bundle_reuse_ok`, `followup_action_resolution_ok`, `temporal_honesty_ok`, `field_truth_ok`, `answer_layering_ok`, `recommended_state_objects`, `root_cause_layers`, `broken_edge_ids`, `violated_invariants`;
- if the product found the evidence but failed to retain the selected object, provenance bundle, or another reusable resolved object across turns, classify that as `object_memory_gap` or `edge_carryover_gap`, not as a generic route problem;
- if the product retained the item but resolved the wrong action over that item, for example `покажи документы по этой позиции` -> `documents_by_counterparty`, classify that as `followup_action_resolution_gap`;
- if the product already resolved supplier/date/document details for the active item but failed to reuse that bundle for adjacent follow-ups, classify that as `bundle_reuse_gap`;
- if a narrow business follow-up opens with numbered scaffolding such as `Блок 1/2/3` or a full generic trace packet instead of a compact direct answer, lower business usefulness explicitly rather than treating it as harmless formatting;
- if the surfaced business field looks mislabeled, for example supplier vs organization, classify that as `field_mapping_gap`;
- if the answer blurs exact-window evidence with nearest available out-of-window evidence, classify that as `temporal_honesty_gap`;
- if the answer is technically grounded but still weak for a manager/accountant/operator, classify that as `business_utility_gap`.
Use this UTF-8 evidence bundle as the source of truth for artifact contents. Do not treat shell rendering artifacts as file corruption if the embedded bundle is readable.
Current evidence bundle:
```json
{review_bundle_json}
```
Deterministic repair targets:
```json
{repair_targets_json}
```
{previous_verdict_block}
Return JSON only and follow the schema exactly.
"""
).strip()
def build_coder_loop_prompt(
*,
loop_dir: Path,
iteration_dir: Path,
pack_dir: Path,
repair_targets_path: Path,
repair_targets_json: str,
assigned_focus: dict[str, Any] | None,
analyst_verdict_path: Path,
analyst_verdict_json: str,
) -> str:
assigned_focus_block = (
textwrap.dedent(
f"""\
Assigned deterministic repair focus for this iteration:
```json
{dump_json(assigned_focus)}
```
"""
).strip()
if assigned_focus
else "Assigned deterministic repair focus for this iteration: none"
)
return textwrap.dedent(
f"""\
You are the `domain_coder` for NDC_1C.
Use the repo rules from:
- `.codex/agents/domain_coder.toml`
- `.codex/skills/domain-case-loop/SKILL.md`
Current loop context:
- loop_dir: `{loop_dir}`
- iteration_dir: `{iteration_dir}`
- current_pack_dir: `{pack_dir}`
- deterministic_repair_targets: `{repair_targets_path}`
- analyst_verdict_json: `{analyst_verdict_path}`
Make the smallest domain-only patch in the working tree that improves the failing or partial scenarios named in the analyst verdict.
Hard rules:
- do not change the architecture;
- do not fabricate data;
- do not present heuristic answers as confirmed;
- do not touch unrelated files;
- preserve already successful baseline flows.
- preserve UTF-8 without BOM and the existing line structure of edited files; do not leave whole-file normalization-only rewrites or single-line collapses in the worktree;
- use minimal local edits; if a tool rewrites a file into normalization noise, restore the original file first and then apply only the intended semantic patch;
- use `root_cause_layers`, `broken_edge_ids`, `violated_invariants`, and business-utility scores from the analyst verdict to choose the smallest fix;
- use the deterministic repair targets to choose the highest-leverage repair focus first; within that focus, patch the narrowest shared layer that can clear the most `P0`/`P1` targets without architecture drift;
- the assigned deterministic repair focus below is mandatory for this iteration; do not switch to a lower-priority focus unless you are blocked from making a safe patch for the assigned focus;
- if the analyst verdict is optimistic but deterministic repair targets still contain `P0` or `P1`, trust the deterministic repair targets and keep fixing the pack;
- prioritize state continuity, selected-object persistence, stable `focus_object`, stable `answer_object`, reusable `provenance_bundle` / `sale_trace_bundle`, action-first answer behavior, compact micro-action answers, answer layering, temporal honesty, and field-truth mapping when those are the blocking layers;
- do not broaden scope when the analyst says the defect is mainly `object_memory_gap`, `followup_action_resolution_gap`, `bundle_reuse_gap`, `field_mapping_gap`, `temporal_honesty_gap`, `answer_shape_mismatch`, or `business_utility_gap`;
- when the verdict points to pronoun follow-ups or item-centric drilldowns, prefer a narrow object-state or follow-up-action fix over prompt inflation.
Required outputs:
- create `{iteration_dir / 'coder_plan.md'}` with a short plan;
- create `{iteration_dir / 'patch_summary.md'}` with a short summary of the patch;
Analyst verdict JSON:
```json
{analyst_verdict_json}
```
Deterministic repair targets JSON:
```json
{repair_targets_json}
```
{assigned_focus_block}
- then return JSON only and follow the schema exactly.
"""
).strip()
def evaluate_analyst_gate(
verdict: dict[str, Any], target_score: int
) -> tuple[bool, str, bool, str, str | None]:
quality_score = int(verdict.get("quality_score") or 0)
unresolved_p0_count = int(verdict.get("unresolved_p0_count") or 0)
regression_detected = bool(verdict.get("regression_detected"))
direct_answer_ok = bool(verdict.get("direct_answer_ok", True))
business_usefulness_ok = bool(verdict.get("business_usefulness_ok", True))
temporal_honesty_ok = bool(verdict.get("temporal_honesty_ok", True))
field_truth_ok = bool(verdict.get("field_truth_ok", True))
answer_layering_ok = bool(verdict.get("answer_layering_ok", True))
loop_decision = str(verdict.get("loop_decision") or "").strip() or "continue"
requires_user_decision = bool(verdict.get("requires_user_decision"))
user_decision_type = str(verdict.get("user_decision_type") or "").strip() or "none"
user_decision_prompt_raw = verdict.get("user_decision_prompt")
user_decision_prompt = str(user_decision_prompt_raw).strip() if user_decision_prompt_raw else None
accepted = (
quality_score >= target_score
and unresolved_p0_count == 0
and not regression_detected
and direct_answer_ok
and business_usefulness_ok
and temporal_honesty_ok
and field_truth_ok
and answer_layering_ok
and loop_decision == "accepted"
)
return accepted, loop_decision, requires_user_decision, user_decision_type, user_decision_prompt
def handle_run_pack(args: argparse.Namespace) -> int:
ensure_backend_health(args.backend_url, args.timeout_seconds)
pack_path = Path(args.manifest).resolve()
pack = load_scenario_pack(pack_path)
if args.pack_id:
pack["pack_id"] = args.pack_id.strip()
if args.analysis_date:
pack["analysis_context"] = merge_analysis_context(
pack.get("analysis_context"),
{"as_of_date": args.analysis_date, "source": "cli_override"},
)
pack_dir = Path(args.output_root).resolve() / pack["pack_id"]
scenarios_dir = pack_dir / "scenarios"
scenarios_dir.mkdir(parents=True, exist_ok=True)
write_json(pack_dir / "pack_manifest.json", pack)
write_text(pack_dir / "manifest_source.txt", f"{pack_path}\n")
scenario_results: list[dict[str, Any]] = []
max_scenarios = max(0, int(args.max_scenarios)) if args.max_scenarios is not None else None
scenarios_to_run = pack["scenarios"][:max_scenarios] if max_scenarios else pack["scenarios"]
for scenario in scenarios_to_run:
scenario_manifest = normalize_scenario_manifest(
scenario,
fallback_domain=pack["domain"],
fallback_analysis_context=pack.get("analysis_context"),
fallback_bindings=pack.get("bindings"),
default_scenario_id=scenario["scenario_id"],
)
scenario_dir = scenarios_dir / scenario_manifest["scenario_id"]
scenario_state, scenario_final_status = execute_scenario_manifest(
args=args,
manifest=scenario_manifest,
scenario_dir=scenario_dir,
manifest_source_label=f"{pack_path}#{scenario_manifest['scenario_id']}",
)
scenario_results.append(
{
"scenario_id": scenario_manifest["scenario_id"],
"title": scenario_manifest["title"],
"execution_status": derive_scenario_execution_status(scenario_state.get("step_outputs") or {}),
"final_status": scenario_final_status,
"session_id": scenario_state.get("session_id"),
"artifact_dir": str(scenario_dir),
}
)
execution_status = derive_pack_execution_status(scenario_results)
final_status = derive_pack_final_status(pack, scenario_results)
pack_state = {
"schema_version": SCENARIO_PACK_SCHEMA_VERSION,
"pack_id": pack["pack_id"],
"domain": pack["domain"],
"title": pack["title"],
"analysis_context": pack.get("analysis_context") or {},
"bindings": pack.get("bindings") or {},
"scenario_results": scenario_results,
"execution_status": execution_status,
"final_status": final_status,
"updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
}
scenario_artifacts = collect_pack_scenario_artifacts(pack_dir)
repair_targets = build_deterministic_repair_targets(pack_state, scenario_artifacts)
write_text(pack_dir / "scenario_acceptance_matrix.md", build_scenario_acceptance_matrix(pack, scenario_results))
write_json(pack_dir / "pack_state.json", pack_state)
write_json(pack_dir / "repair_targets.json", repair_targets)
write_text(pack_dir / "repair_targets.md", build_repair_targets_summary(repair_targets))
write_text(pack_dir / "pack_summary.md", build_pack_summary(pack, scenario_results, final_status, execution_status))
write_text(pack_dir / "final_status.md", build_pack_final_status(pack, scenario_results, final_status, execution_status))
print(f"[domain-case-loop] saved pack artifacts to {pack_dir}")
print(f"[domain-case-loop] execution_status={execution_status} final_status={final_status}")
return 0
def build_loop_summary(loop_state: dict[str, Any]) -> str:
lines = [
"# Loop summary",
"",
f"- loop_id: `{loop_state['loop_id']}`",
f"- target_score: `{loop_state['target_score']}`",
f"- max_iterations: `{loop_state['max_iterations']}`",
f"- final_status: `{loop_state['final_status']}`",
f"- last_analyst_decision: `{loop_state.get('last_analyst_decision') or 'n/a'}`",
"",
"## Iterations",
]
for item in loop_state.get("iterations", []):
lines.extend(
[
f"- `{item['iteration_id']}`",
f" baseline_pack_dir: `{item['pack_dir']}`",
f" analyst_score: `{item.get('quality_score')}`",
f" analyst_decision: `{item.get('loop_decision')}`",
f" analyst_accepted_gate: `{item.get('analyst_accepted_gate')}`",
f" accepted_gate: `{item.get('accepted_gate')}`",
f" deterministic_gate_ok: `{item.get('deterministic_gate_ok')}`",
f" deterministic_gate_reason: `{item.get('deterministic_gate_reason') or 'n/a'}`",
f" requires_user_decision: `{item.get('requires_user_decision')}`",
f" user_decision_type: `{item.get('user_decision_type') or 'none'}`",
f" coder_status: `{item.get('coder_status') or 'n/a'}`",
f" assigned_repair_focus_id: `{item.get('assigned_repair_focus_id') or 'none'}`",
f" coder_workspace_hygiene_restored_files: `{', '.join(item.get('coder_workspace_hygiene_restored_files') or []) or 'none'}`",
f" analyst_verdict: `{item.get('analyst_verdict_path') or 'n/a'}`",
f" repair_targets: `{item.get('repair_targets_path') or 'n/a'}`",
f" repair_target_count: `{item.get('repair_target_count')}`",
f" repair_target_severity_counts: `{dump_json(item.get('repair_target_severity_counts') or {})}`",
]
)
return "\n".join(lines).strip() + "\n"
def build_loop_final_status(loop_state: dict[str, Any]) -> str:
return textwrap.dedent(
f"""\
# Final status
- status: `{loop_state['final_status']}`
- loop_id: `{loop_state['loop_id']}`
- target_score: `{loop_state['target_score']}`
- iterations_ran: `{len(loop_state.get('iterations', []))}`
- last_analyst_decision: `{loop_state.get('last_analyst_decision') or 'n/a'}`
- stop_reason: {loop_state.get('stop_reason') or 'n/a'}
"""
)
def handle_run_pack_loop(args: argparse.Namespace) -> int:
manifest_path = Path(args.manifest).resolve()
loop_id = str(args.loop_id or slugify_case_id("domain_pack_loop", None)).strip()
loop_dir = Path(args.output_root).resolve() / loop_id
iterations_dir = loop_dir / "iterations"
iterations_dir.mkdir(parents=True, exist_ok=True)
write_text(loop_dir / "manifest_source.txt", f"{manifest_path}\n")
target_score = int(args.target_score)
max_iterations = int(args.max_iterations)
if max_iterations < 1:
raise RuntimeError("--max-iterations must be >= 1")
loop_state: dict[str, Any] = {
"schema_version": AUTONOMOUS_LOOP_SCHEMA_VERSION,
"loop_id": loop_id,
"manifest_path": str(manifest_path),
"target_score": target_score,
"max_iterations": max_iterations,
"iterations": [],
"final_status": "partial",
"stop_reason": None,
"last_analyst_decision": None,
"last_user_decision_type": "none",
"last_user_decision_prompt": None,
"updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
}
write_json(loop_dir / "loop_state.json", loop_state)
previous_pack_dir: Path | None = None
previous_verdict_path: Path | None = None
for iteration_index in range(max_iterations):
iteration_id = f"iteration_{iteration_index:02d}"
iteration_dir = iterations_dir / iteration_id
iteration_dir.mkdir(parents=True, exist_ok=True)
pack_output_root = iteration_dir / "pack_output"
pack_id = "pack_run"
pack_command = build_run_pack_command(
args,
manifest_path=manifest_path,
pack_id=pack_id,
output_root=pack_output_root,
)
run_subprocess_command(
pack_command,
cwd=REPO_ROOT,
timeout_seconds=max(600, int(args.timeout_seconds) * 20),
stdout_path=iteration_dir / "pack_run.stdout.log",
stderr_path=iteration_dir / "pack_run.stderr.log",
)
pack_dir = pack_output_root / pack_id
analyst_verdict_path = iteration_dir / "analyst_verdict.json"
review_bundle_json = build_pack_review_bundle(pack_dir)
repair_targets_path = pack_dir / "repair_targets.json"
repair_targets = read_json_file(repair_targets_path) if repair_targets_path.exists() else {}
repair_targets_json = dump_json(repair_targets)
pack_state = read_json_file(pack_dir / "pack_state.json") if (pack_dir / "pack_state.json").exists() else {}
previous_verdict_json = read_text_file(previous_verdict_path) if previous_verdict_path is not None and previous_verdict_path.exists() else None
analyst_prompt = build_analyst_loop_prompt(
loop_dir=loop_dir,
iteration_dir=iteration_dir,
pack_dir=pack_dir,
repair_targets_path=repair_targets_path,
previous_pack_dir=previous_pack_dir,
previous_verdict_path=previous_verdict_path,
target_score=target_score,
review_bundle_json=review_bundle_json,
repair_targets_json=repair_targets_json,
previous_verdict_json=previous_verdict_json,
)
write_text(iteration_dir / "analyst_prompt.md", analyst_prompt + "\n")
analyst_command = build_codex_exec_command(
args,
output_file=analyst_verdict_path,
schema_file=Path(args.analyst_schema).resolve(),
sandbox_mode="read-only",
model_override=getattr(args, "analyst_codex_model", None),
reasoning_effort=getattr(args, "analyst_reasoning_effort", None),
)
run_subprocess_command(
analyst_command,
cwd=REPO_ROOT,
timeout_seconds=int(args.codex_timeout_seconds),
input_text=analyst_prompt,
stdout_path=iteration_dir / "analyst_exec.stdout.log",
stderr_path=iteration_dir / "analyst_exec.stderr.log",
)
analyst_verdict = read_json_output(analyst_verdict_path)
analyst_accepted_gate, loop_decision, requires_user_decision, user_decision_type, user_decision_prompt = evaluate_analyst_gate(
analyst_verdict, target_score
)
deterministic_gate_ok, deterministic_gate_reason = evaluate_deterministic_loop_gate(pack_state, repair_targets)
accepted_gate = analyst_accepted_gate and deterministic_gate_ok
repair_target_count = int(repair_targets.get("target_count") or 0) if isinstance(repair_targets, dict) else 0
repair_target_severity_counts = (
repair_targets.get("severity_counts")
if isinstance(repair_targets, dict) and isinstance(repair_targets.get("severity_counts"), dict)
else {}
)
loop_state["last_analyst_decision"] = loop_decision
loop_state["last_user_decision_type"] = user_decision_type
loop_state["last_user_decision_prompt"] = user_decision_prompt
iteration_record: dict[str, Any] = {
"iteration_id": iteration_id,
"pack_dir": str(pack_dir),
"quality_score": int(analyst_verdict.get("quality_score") or 0),
"loop_decision": loop_decision,
"analyst_accepted_gate": analyst_accepted_gate,
"accepted_gate": accepted_gate,
"deterministic_gate_ok": deterministic_gate_ok,
"deterministic_gate_reason": deterministic_gate_reason,
"requires_user_decision": requires_user_decision,
"user_decision_type": user_decision_type,
"user_decision_prompt": user_decision_prompt,
"analyst_verdict_path": str(analyst_verdict_path),
"repair_targets_path": str(repair_targets_path),
"repair_target_count": repair_target_count,
"repair_target_severity_counts": repair_target_severity_counts,
"coder_status": None,
}
if accepted_gate:
loop_state["iterations"].append(iteration_record)
loop_state["final_status"] = "accepted"
loop_state["stop_reason"] = f"analyst accepted + deterministic gate passed at {iteration_id}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
if requires_user_decision:
loop_state["iterations"].append(iteration_record)
if loop_decision in {"needs_exact_capability", "partial", "blocked"}:
loop_state["final_status"] = loop_decision
else:
loop_state["final_status"] = "partial"
prompt_suffix = f" | prompt: {user_decision_prompt}" if user_decision_prompt else ""
loop_state["stop_reason"] = f"user_decision_required at {iteration_id}: {user_decision_type}{prompt_suffix}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
if loop_decision == "blocked":
loop_state["iterations"].append(iteration_record)
loop_state["final_status"] = "blocked"
loop_state["stop_reason"] = f"analyst blocked at {iteration_id}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
coder_result_path = iteration_dir / "coder_result.json"
assigned_focus = select_primary_repair_focus(repair_targets)
coder_prompt = build_coder_loop_prompt(
loop_dir=loop_dir,
iteration_dir=iteration_dir,
pack_dir=pack_dir,
repair_targets_path=repair_targets_path,
repair_targets_json=repair_targets_json,
assigned_focus=assigned_focus,
analyst_verdict_path=analyst_verdict_path,
analyst_verdict_json=dump_json(analyst_verdict),
)
write_text(iteration_dir / "coder_prompt.md", coder_prompt + "\n")
coder_snapshot_paths = build_coder_snapshot_paths(repair_targets)
coder_snapshots = snapshot_coder_candidate_files(coder_snapshot_paths)
coder_command = build_codex_exec_command(
args,
output_file=coder_result_path,
schema_file=Path(args.coder_schema).resolve(),
sandbox_mode="workspace-write",
model_override=getattr(args, "coder_codex_model", None),
reasoning_effort=getattr(args, "coder_reasoning_effort", None),
)
run_subprocess_command(
coder_command,
cwd=REPO_ROOT,
timeout_seconds=int(args.codex_timeout_seconds),
input_text=coder_prompt,
stdout_path=iteration_dir / "coder_exec.stdout.log",
stderr_path=iteration_dir / "coder_exec.stderr.log",
)
restored_files = restore_line_collapsed_files_from_snapshot(coder_snapshots)
coder_result = read_json_output(coder_result_path)
coder_status = str(coder_result.get("status") or "").strip() or "unknown"
iteration_record["coder_status"] = coder_status
iteration_record["coder_result_path"] = str(coder_result_path)
if assigned_focus:
iteration_record["assigned_repair_focus_id"] = str(assigned_focus.get("focus_id") or "")
if restored_files:
iteration_record["coder_workspace_hygiene_restored_files"] = restored_files
loop_state["iterations"].append(iteration_record)
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
if coder_status == "blocked":
loop_state["final_status"] = "blocked"
loop_state["stop_reason"] = f"coder stopped progress at {iteration_id}: {coder_status}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
previous_pack_dir = pack_dir
previous_verdict_path = analyst_verdict_path
else:
if loop_state.get("last_analyst_decision") == "needs_exact_capability":
loop_state["final_status"] = "needs_exact_capability"
else:
loop_state["final_status"] = "partial"
loop_state["stop_reason"] = f"max_iterations_reached ({max_iterations})"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
write_text(loop_dir / "loop_summary.md", build_loop_summary(loop_state))
write_text(loop_dir / "final_status.md", build_loop_final_status(loop_state))
print(f"[domain-case-loop] saved loop artifacts to {loop_dir}")
print(f"[domain-case-loop] final_status={loop_state['final_status']}")
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Repo-native helper for NDC_1C domain-case and scenario orchestration")
subparsers = parser.add_subparsers(dest="command", required=True)
run_case = subparsers.add_parser("run-case", help="Run one assistant_stage1 case through the existing backend and save artifacts")
run_case.add_argument("--domain", required=True)
run_case.add_argument("--question", required=True)
run_case.add_argument("--case-id")
run_case.add_argument("--slot", default="baseline", choices=["baseline", "rerun"])
run_case.add_argument("--analysis-date")
run_case.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
run_case.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
run_case.add_argument("--sessions-dir", default=str(DEFAULT_SESSIONS_DIR))
run_case.add_argument("--reports-dir", default=str(DEFAULT_REPORTS_DIR))
run_case.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION)
run_case.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"])
run_case.add_argument("--llm-model", default=DEFAULT_LLM_MODEL)
run_case.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL)
run_case.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY)
run_case.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE)
run_case.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS)
run_case.add_argument("--timeout-seconds", type=int, default=300)
run_case.add_argument("--poll-interval-seconds", type=float, default=1.5)
run_case.add_argument("--expected-capability")
run_case.add_argument("--expected-result-mode")
run_case.add_argument("--use-mock", action="store_true")
run_case.set_defaults(func=handle_run_case)
import_export = subparsers.add_parser("import-export", help="Import an existing technical export markdown and build artifacts")
import_export.add_argument("--domain", required=True)
import_export.add_argument("--input", required=True)
import_export.add_argument("--question")
import_export.add_argument("--case-id")
import_export.add_argument("--slot", default="baseline", choices=["baseline", "rerun"])
import_export.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
import_export.add_argument("--expected-capability")
import_export.add_argument("--expected-result-mode")
import_export.set_defaults(func=handle_import_export)
run_scenario = subparsers.add_parser(
"run-scenario",
help="Run one multi-step domain scenario in a shared assistant session and save per-step artifacts",
)
run_scenario.add_argument("--manifest", required=True)
run_scenario.add_argument("--scenario-id")
run_scenario.add_argument("--analysis-date")
run_scenario.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
run_scenario.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
run_scenario.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION)
run_scenario.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"])
run_scenario.add_argument("--llm-model", default=DEFAULT_LLM_MODEL)
run_scenario.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL)
run_scenario.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY)
run_scenario.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE)
run_scenario.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS)
run_scenario.add_argument("--timeout-seconds", type=int, default=180)
run_scenario.add_argument("--use-mock", action="store_true")
run_scenario.set_defaults(func=handle_run_scenario)
run_pack = subparsers.add_parser(
"run-pack",
help="Run a multi-scenario domain pack and save aggregate orchestration artifacts",
)
run_pack.add_argument("--manifest", required=True)
run_pack.add_argument("--pack-id")
run_pack.add_argument("--analysis-date")
run_pack.add_argument("--max-scenarios", type=int)
run_pack.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
run_pack.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
run_pack.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION)
run_pack.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"])
run_pack.add_argument("--llm-model", default=DEFAULT_LLM_MODEL)
run_pack.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL)
run_pack.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY)
run_pack.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE)
run_pack.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS)
run_pack.add_argument("--timeout-seconds", type=int, default=180)
run_pack.add_argument("--use-mock", action="store_true")
run_pack.set_defaults(func=handle_run_pack)
run_pack_loop = subparsers.add_parser(
"run-pack-loop",
help="Run autonomous analyst -> coder -> rerun iterations for a domain pack until the acceptance gate is reached",
)
run_pack_loop.add_argument("--manifest", required=True)
run_pack_loop.add_argument("--loop-id")
run_pack_loop.add_argument("--analysis-date")
run_pack_loop.add_argument("--max-scenarios", type=int)
run_pack_loop.add_argument("--target-score", type=int, default=80)
run_pack_loop.add_argument("--max-iterations", type=int, default=8)
run_pack_loop.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
run_pack_loop.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
run_pack_loop.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION)
run_pack_loop.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"])
run_pack_loop.add_argument("--llm-model", default=DEFAULT_LLM_MODEL)
run_pack_loop.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL)
run_pack_loop.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY)
run_pack_loop.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE)
run_pack_loop.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS)
run_pack_loop.add_argument("--timeout-seconds", type=int, default=180)
run_pack_loop.add_argument("--use-mock", action="store_true")
run_pack_loop.add_argument("--codex-binary", default="codex")
run_pack_loop.add_argument("--codex-profile")
run_pack_loop.add_argument("--codex-model")
run_pack_loop.add_argument("--analyst-codex-model", default="gpt-5.4")
run_pack_loop.add_argument("--coder-codex-model", default="gpt-5.4-mini")
run_pack_loop.add_argument("--analyst-reasoning-effort", default="medium")
run_pack_loop.add_argument("--coder-reasoning-effort", default="low")
run_pack_loop.add_argument("--codex-timeout-seconds", type=int, default=1800)
run_pack_loop.add_argument(
"--analyst-schema",
default=str(DEFAULT_LOOP_SCHEMA_DIR / "domain_loop_analyst_verdict.schema.json"),
)
run_pack_loop.add_argument(
"--coder-schema",
default=str(DEFAULT_LOOP_SCHEMA_DIR / "domain_loop_coder_result.schema.json"),
)
run_pack_loop.set_defaults(func=handle_run_pack_loop)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
try:
return int(args.func(args))
except Exception as error: # noqa: BLE001
print(f"[domain-case-loop] error: {error}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())