NODEDC_1C/scripts/domain_case_loop.py

5361 lines
243 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
import textwrap
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
REPO_ROOT = Path(__file__).resolve().parent.parent
DEFAULT_ARTIFACTS_ROOT = REPO_ROOT / "artifacts" / "domain_runs"
DEFAULT_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "assistant_sessions"
DEFAULT_REPORTS_DIR = REPO_ROOT / "llm_normalizer" / "reports"
DEFAULT_LOOP_SCHEMA_DIR = REPO_ROOT / "docs" / "orchestration" / "schemas"
SHARED_LLM_CONNECTION_CONFIG = REPO_ROOT / "llm_normalizer" / "data" / "shared_llm_connection.json"
DEFAULT_BACKEND_URL = "http://127.0.0.1:8787"
DEFAULT_PROMPT_VERSION = "address_query_runtime_v1"
BASE_DEFAULT_LLM_PROVIDER = "local"
BASE_DEFAULT_LLM_MODEL = "qwen2.5-14b-instruct-1m"
BASE_DEFAULT_LLM_BASE_URL = "http://127.0.0.1:1234/v1"
DEFAULT_LLM_API_KEY = ""
BASE_DEFAULT_TEMPERATURE = 0.0
BASE_DEFAULT_MAX_OUTPUT_TOKENS = 900
TECH_SECTION_HEADER = "### technical_debug_payload_json"
SCENARIO_MANIFEST_SCHEMA_VERSION = "domain_scenario_manifest_v1"
SCENARIO_STATE_SCHEMA_VERSION = "domain_scenario_state_v1"
SCENARIO_STEP_STATE_SCHEMA_VERSION = "domain_scenario_step_state_v1"
SCENARIO_PACK_SCHEMA_VERSION = "domain_scenario_pack_v1"
ACTIVE_DOMAIN_CONTRACT_SCHEMA_VERSION = "active_domain_contract_v1"
AUTONOMOUS_LOOP_SCHEMA_VERSION = "domain_autonomous_loop_v1"
REPAIR_MODE_LEAD_HANDOFF = "lead-handoff"
REPAIR_MODE_AUTO_CODER = "auto-coder"
def load_shared_local_llm_defaults(config_path: Path | None = None) -> dict[str, Any]:
defaults: dict[str, Any] = {
"llm_provider": BASE_DEFAULT_LLM_PROVIDER,
"llm_model": BASE_DEFAULT_LLM_MODEL,
"llm_base_url": BASE_DEFAULT_LLM_BASE_URL,
"temperature": BASE_DEFAULT_TEMPERATURE,
"max_output_tokens": BASE_DEFAULT_MAX_OUTPUT_TOKENS,
}
target = config_path or SHARED_LLM_CONNECTION_CONFIG
if not target.exists():
return defaults
try:
raw = json.loads(target.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return defaults
connection = raw.get("connection")
if not isinstance(connection, dict):
return defaults
if str(connection.get("llmProvider") or "").strip().lower() != "local":
return defaults
model = str(connection.get("model") or "").strip()
base_url = str(connection.get("baseUrl") or "").strip()
temperature = connection.get("temperature")
max_output_tokens = connection.get("maxOutputTokens")
if model:
defaults["llm_model"] = model
if base_url:
defaults["llm_base_url"] = base_url
if isinstance(temperature, (int, float)) and not isinstance(temperature, bool):
defaults["temperature"] = float(temperature)
if isinstance(max_output_tokens, (int, float)) and not isinstance(max_output_tokens, bool):
defaults["max_output_tokens"] = max(1, int(max_output_tokens))
return defaults
SHARED_LLM_DEFAULTS = load_shared_local_llm_defaults()
DEFAULT_LLM_PROVIDER = str(SHARED_LLM_DEFAULTS["llm_provider"])
DEFAULT_LLM_MODEL = str(SHARED_LLM_DEFAULTS["llm_model"])
DEFAULT_LLM_BASE_URL = str(SHARED_LLM_DEFAULTS["llm_base_url"])
DEFAULT_TEMPERATURE = float(SHARED_LLM_DEFAULTS["temperature"])
DEFAULT_MAX_OUTPUT_TOKENS = int(SHARED_LLM_DEFAULTS["max_output_tokens"])
TOP_LEVEL_NOISE_PATTERNS = (
re.compile(r"^(?:status|статус(?: результата)?)\b", re.IGNORECASE),
re.compile(r"^(?:что учтено|сводка)\b", re.IGNORECASE),
re.compile(r"^блок\s+\d+\b", re.IGNORECASE),
re.compile(r"^(?:подтверждение|опорные документы|сервисно)\b", re.IGNORECASE),
)
BUSINESS_DIRECT_QUESTION_MARKERS = (
"\u0441\u043a\u043e\u043b\u044c\u043a\u043e",
"\u0441\u043a\u043e\u043a",
"\u043a\u0430\u043a\u043e\u0439",
"\u043a\u0430\u043a\u0430\u044f",
"\u043a\u0430\u043a\u0438\u0435",
"\u043a\u0442\u043e",
"\u043a\u043e\u043c\u0443",
"\u043a\u043e\u0433\u0434\u0430",
"\u0433\u0434\u0435",
"\u043a\u0443\u0434\u0430",
"\u043f\u043e\u0447\u0435\u043c\u0443",
"\u0437\u0430\u0447\u0435\u043c",
"\u043a\u0430\u043a\u0438\u043c \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u043c",
"\u043f\u043e\u043a\u0430\u0436\u0438",
)
BUSINESS_REPORT_REQUEST_MARKERS = (
"\u043e\u0431\u0437\u043e\u0440",
"\u0430\u043d\u0430\u043b\u0438\u0437",
"\u043f\u043e\u0434\u0440\u043e\u0431",
"\u0440\u0430\u0437\u0432\u0435\u0440\u043d",
"\u043e\u0446\u0435\u043d",
"\u0430\u0443\u0434\u0438\u0442",
)
BUSINESS_TOP_LINE_SCAFFOLD_MARKERS = (
"\u043e\u0433\u0440\u0430\u043d\u0438\u0447\u0435\u043d\u043d\u044b\u0439 \u0431\u0438\u0437\u043d\u0435\u0441-\u043e\u0431\u0437\u043e\u0440",
"\u0447\u0442\u043e \u043f\u043e\u0434\u0442\u0432\u0435\u0440\u0436\u0434\u0435\u043d\u043e",
"\u043f\u0440\u043e\u0432\u0435\u0440\u0435\u043d\u043d\u044b\u0435 \u043a\u043e\u043d\u0442\u0443\u0440\u044b",
"\u0431\u043b\u043e\u043a 1",
"\u0441\u0442\u0430\u0442\u0443\u0441",
)
BUSINESS_TECHNICAL_GARBAGE_MARKERS = (
"mcp-срез",
"лимит выборки mcp",
"через mcp",
"mcp-провер",
"mcp_discovery",
"runtime_",
"capability_id",
"selected_chain_id",
"business_overview_route_template_v1",
"probe ",
"query_movements",
"query_documents",
)
BUSINESS_DIRECT_ANSWER_SOFT_LIMIT = 1800
GUARDED_INSUFFICIENCY_PRIMARY_MARKERS = (
"\u0442\u043e\u0447\u043d\u044b\u0439",
"\u0442\u043e\u0447\u043d\u044b\u0435",
"\u043d\u0435 \u043f\u043e\u0434\u0442\u0432\u0435\u0440\u0436\u0434\u0435\u043d",
"\u043d\u0435 \u043f\u043e\u0434\u0442\u0432\u0435\u0440\u0436\u0434\u0451\u043d",
)
GUARDED_INSUFFICIENCY_LIMITATION_MARKERS = (
"\u043f\u0440\u0435\u0434\u0432\u0430\u0440\u0438\u0442\u0435\u043b\u044c\u043d",
"\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f-\u043a\u0430\u043d\u0434\u0438\u0434\u0430\u0442",
"\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u0439-\u043a\u0430\u043d\u0434\u0438\u0434\u0430\u0442",
"\u043d\u0435 \u043f\u043e\u0434\u0442\u0432\u0435\u0440\u0436\u0434\u0435\u043d\u043d\u043e\u0435 \u0441\u0430\u043b\u044c\u0434\u043e",
"\u043d\u0435 \u043f\u043e\u0434\u0442\u0432\u0435\u0440\u0436\u0434\u0451\u043d\u043d\u043e\u0435 \u0441\u0430\u043b\u044c\u0434\u043e",
"\u043d\u0435 \u0434\u043e\u043a\u0430\u0437\u044b\u0432\u0430\u0435\u0442 \u043e\u0441\u0442\u0430\u0442\u043e\u043a",
"\u043d\u0435 \u0444\u0438\u043d\u0430\u043b\u044c\u043d\u044b\u0439 \u0440\u0435\u0435\u0441\u0442\u0440",
)
GUARDED_INSUFFICIENCY_RESULT_MODES = {"heuristic_candidates"}
GUARDED_INSUFFICIENCY_TRUTH_MODES = {"limited"}
GUARDED_INSUFFICIENCY_ANSWER_SHAPES = {"limited_with_reason"}
MCP_DISCOVERY_CHAIN_INTENT_ALIASES: dict[str, tuple[str, ...]] = {
"business_overview": ("business_overview",),
"value_flow_comparison": (
"value_flow",
"bidirectional_value_flow_comparison",
"counterparty_value_flow",
),
"inventory_stock_snapshot": ("inventory_on_hand_as_of_date",),
}
DEFAULT_INVARIANT_SEVERITY: dict[str, str] = {
"wrong_intent": "P0",
"wrong_capability": "P0",
"wrong_followup_action": "P0",
"wrong_recipe": "P0",
"wrong_result_mode": "P0",
"wrong_as_of_date": "P0",
"wrong_period_from": "P0",
"wrong_period_to": "P0",
"missing_required_filter": "P0",
"forbidden_capability_selected": "P0",
"forbidden_recipe_selected": "P0",
"focus_object_missing": "P0",
"wrong_date_scope_state": "P0",
"direct_answer_missing": "P0",
"top_level_noise_present": "P0",
"business_direct_answer_missing": "P0",
"technical_garbage_in_answer": "P0",
"answer_layering_noise": "P1",
"business_answer_too_verbose": "P1",
}
REPAIR_TARGET_SEVERITY_ORDER = {"P0": 0, "P1": 1, "P2": 2}
REPAIR_TARGET_PROBLEM_ORDER = {
"temporal_honesty_gap": 0,
"edge_carryover_gap": 1,
"bundle_reuse_gap": 1,
"followup_action_resolution_gap": 2,
"object_memory_gap": 3,
"route_gap": 4,
"route_candidate_enablement_gap": 4,
"answer_shape_mismatch": 5,
"field_mapping_gap": 5,
"business_utility_gap": 6,
"presentation_gap": 7,
"domain_anchor_gap": 8,
"capability_gap": 9,
"evidence_gap": 10,
"partial_masking_gap": 10,
"other": 11,
}
REPAIR_TARGET_FILE_HINTS: dict[str, list[str]] = {
"followup_action_resolution_gap": [
"llm_normalizer/backend/src/services/addressIntentResolver.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"object_memory_gap": [
"llm_normalizer/backend/src/services/addressNavigationState.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"edge_carryover_gap": [
"llm_normalizer/backend/src/services/addressNavigationState.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/addressFilterExtractor.ts",
],
"temporal_honesty_gap": [
"llm_normalizer/backend/src/services/addressFilterExtractor.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
],
"route_gap": [
"llm_normalizer/backend/src/services/addressQueryClassifier.ts",
"llm_normalizer/backend/src/services/addressIntentResolver.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"route_candidate_enablement_gap": [
"llm_normalizer/backend/src/services/assistantMcpDiscoveryRuntimeBridge.ts",
"llm_normalizer/backend/src/services/assistantMcpDiscoveryPlanner.ts",
"llm_normalizer/backend/src/services/assistantMcpDiscoveryPilotExecutor.ts",
"llm_normalizer/backend/src/services/assistantMcpDiscoveryTurnInputAdapter.ts",
"llm_normalizer/backend/src/services/addressRecipeCatalog.ts",
],
"capability_gap": [
"llm_normalizer/backend/src/services/addressCapabilityPolicy.ts",
"llm_normalizer/backend/src/services/addressRecipeCatalog.ts",
"llm_normalizer/backend/src/services/addressQueryService.ts",
],
"presentation_gap": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"answer_shape_mismatch": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
"llm_normalizer/backend/src/services/assistantMcpDiscoveryResponseCandidate.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"business_utility_gap": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
"llm_normalizer/backend/src/services/assistantMcpDiscoveryResponseCandidate.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"bundle_reuse_gap": [
"llm_normalizer/backend/src/services/assistantMcpDiscoveryResponseCandidate.ts",
"llm_normalizer/backend/src/services/assistantMcpDiscoveryTurnInputAdapter.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
"scripts/domain_case_loop.py",
],
"field_mapping_gap": [
"llm_normalizer/backend/src/services/assistantMcpDiscoveryResponseCandidate.ts",
"llm_normalizer/backend/src/services/assistantMcpDiscoveryDataNeedGraph.ts",
"llm_normalizer/backend/src/services/assistantMcpDiscoveryPolicy.ts",
],
"evidence_gap": [
"llm_normalizer/backend/src/services/addressQueryService.ts",
"llm_normalizer/backend/src/services/addressRecipeCatalog.ts",
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
],
"partial_masking_gap": [
"scripts/domain_case_loop.py",
"llm_normalizer/backend/src/services/assistantTruthAnswerPolicyRuntimeAdapter.ts",
"llm_normalizer/backend/src/services/assistantMcpDiscoveryResponsePolicy.ts",
],
"domain_anchor_gap": [
"llm_normalizer/backend/src/services/addressQueryService.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/addressNavigationState.ts",
],
"other": [
"llm_normalizer/backend/src/services/addressQueryService.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
}
def dump_json(payload: Any) -> str:
return json.dumps(payload, ensure_ascii=False, indent=2)
def write_text(file_path: Path, text: str) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(text, encoding="utf-8", newline="\n")
def write_json(file_path: Path, payload: Any) -> None:
write_text(file_path, dump_json(payload) + "\n")
def read_text_file(file_path: Path) -> str:
return file_path.read_text(encoding="utf-8-sig")
def repo_relative(file_path: Path) -> str:
try:
return str(file_path.resolve().relative_to(REPO_ROOT)).replace("\\", "/")
except ValueError:
return str(file_path.resolve()).replace("\\", "/")
def resolve_repo_relative_path(raw_path: str | None) -> Path | None:
candidate = str(raw_path or "").strip().replace("\\", "/")
if not candidate:
return None
path = Path(candidate)
if path.is_absolute() or any(part == ".." for part in path.parts):
return None
return REPO_ROOT / path
def build_coder_snapshot_paths(repair_targets: dict[str, Any]) -> list[Path]:
collected: list[Path] = []
seen: set[Path] = set()
groups = []
if isinstance(repair_targets, dict):
groups.extend(repair_targets.get("priority_foci") or [])
groups.extend(repair_targets.get("targets") or [])
for item in groups:
if not isinstance(item, dict):
continue
for raw_path in normalize_string_list(item.get("candidate_files")):
resolved = resolve_repo_relative_path(raw_path)
if resolved is None or not resolved.exists() or resolved in seen:
continue
seen.add(resolved)
collected.append(resolved)
return collected
def snapshot_coder_candidate_files(paths: list[Path]) -> dict[str, bytes]:
snapshots: dict[str, bytes] = {}
for path in paths:
if not path.exists() or not path.is_file():
continue
snapshots[str(path)] = path.read_bytes()
return snapshots
def restore_line_collapsed_files_from_snapshot(snapshots: dict[str, bytes]) -> list[str]:
restored: list[str] = []
for raw_path, before_bytes in snapshots.items():
path = Path(raw_path)
if not path.exists() or not path.is_file():
continue
after_bytes = path.read_bytes()
before_line_count = before_bytes.count(b"\n")
after_line_count = after_bytes.count(b"\n")
if before_line_count < 1 or after_line_count != 0:
continue
if before_bytes.replace(b"\r", b"").replace(b"\n", b"") != after_bytes.replace(b"\r", b"").replace(b"\n", b""):
continue
path.write_bytes(before_bytes)
try:
restored.append(str(path.relative_to(REPO_ROOT)).replace("\\", "/"))
except ValueError:
restored.append(str(path))
return restored
def sanitize_export_text(value: str) -> str:
raw = str(value or "")
debug_heading = re.search(
r"(?:^|\n)\s*#{0,6}\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json|debug_payload|technical_breakdown)\b",
raw,
flags=re.IGNORECASE,
)
pre_cut = raw[: debug_heading.start()] if debug_heading else raw
without_debug = re.sub(
r"###\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json)[\s\S]*?(?:```[\s\S]*?```|$)",
"",
pre_cut,
flags=re.IGNORECASE,
)
without_debug = re.sub(
r"(?:^|\n)\s*#{0,6}\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json)\b[\s\S]*$",
"",
without_debug,
flags=re.IGNORECASE,
)
inline_patterns = [
re.compile(r"\b(?:debug_payload_json|technical_breakdown_json)\b", re.IGNORECASE),
re.compile(r"\b(?:route_summary|semantic_profile|domain_scope|relation_patterns|account_scope)\b", re.IGNORECASE),
re.compile(r"\b(?:coverage_report|retrieval_status|problem_unit_state|candidate_evidence)\b", re.IGNORECASE),
re.compile(r"\b(?:graph_domain_scope|graph_runtime|selection_reason|why_included)\b", re.IGNORECASE),
]
output_lines: list[str] = []
for line in without_debug.splitlines():
cleaned = line.rstrip()
if not cleaned.strip():
continue
if any(pattern.search(cleaned) for pattern in inline_patterns):
continue
output_lines.append(cleaned)
return "\n".join(output_lines).strip()
def build_conversation_export(session_id: str, conversation: list[dict[str, Any]], mode: str = "technical") -> str:
include_debug = mode == "technical"
lines = [
"# Assistant conversation export",
f"session_id: {session_id or 'n/a'}",
f"export_mode: {mode}",
f"exported_at: {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}",
"",
]
for index, item in enumerate(conversation, start=1):
safe_text = sanitize_export_text(str(item.get("text") or ""))
lines.append(f"## {index}. {item.get('role', 'unknown')}")
lines.append(f"message_id: {item.get('message_id') or 'n/a'}")
lines.append(f"created_at: {item.get('created_at') or 'n/a'}")
lines.append(f"reply_type: {item.get('reply_type') or 'n/a'}")
trace_id = item.get("trace_id")
if trace_id:
lines.append(f"trace_id: {trace_id}")
lines.extend(["", safe_text or "(empty)", ""])
if include_debug and item.get("role") == "assistant" and item.get("debug") is not None:
lines.extend([TECH_SECTION_HEADER, "```json", dump_json(item["debug"]), "```", ""])
return "\n".join(lines)
def slugify_token(value: str | None, fallback: str) -> str:
cleaned = re.sub(r"[^\w-]+", "_", str(value or "").strip(), flags=re.UNICODE).strip("_")
return cleaned or fallback
def slugify_case_id(domain: str, explicit_case_id: str | None) -> str:
if explicit_case_id:
normalized = explicit_case_id.strip()
if normalized:
return normalized
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
return f"{slugify_token(domain, 'domain_case')}_{timestamp}"
def normalize_iso_date(value: Any) -> str | None:
if not isinstance(value, str):
return None
candidate = value.strip()
if not re.fullmatch(r"\d{4}-\d{2}-\d{2}", candidate):
return None
try:
datetime.strptime(candidate, "%Y-%m-%d")
except ValueError:
return None
return candidate
def normalize_analysis_context(raw_context: Any) -> dict[str, Any]:
source = raw_context if isinstance(raw_context, dict) else {}
output: dict[str, Any] = {}
as_of_date = normalize_iso_date(source.get("as_of_date"))
period_from = normalize_iso_date(source.get("period_from"))
period_to = normalize_iso_date(source.get("period_to"))
snapshot_mode_raw = str(source.get("snapshot_mode") or "").strip()
source_raw = str(source.get("source") or "").strip()
if as_of_date:
output["as_of_date"] = as_of_date
if period_from:
output["period_from"] = period_from
if period_to:
output["period_to"] = period_to
if snapshot_mode_raw in {"auto", "force_snapshot", "force_live"}:
output["snapshot_mode"] = snapshot_mode_raw
if source_raw:
output["source"] = source_raw
return output
def merge_analysis_context(base_context: Any, override_context: Any) -> dict[str, Any]:
merged = normalize_analysis_context(base_context)
merged.update(normalize_analysis_context(override_context))
return merged
def carry_forward_analysis_context(
scenario_state: dict[str, Any],
analysis_context: dict[str, Any],
*,
prefer_carryover: bool = False,
carry_date_scope: bool = True,
) -> dict[str, Any]:
carried = dict(analysis_context)
semantic_memory = scenario_state.get("semantic_memory")
if isinstance(semantic_memory, dict):
if carry_date_scope:
date_scope = semantic_memory.get("date_scope")
if isinstance(date_scope, dict):
carried_as_of_date = normalize_iso_date(date_scope.get("as_of_date"))
if carried_as_of_date and (prefer_carryover or not carried.get("as_of_date")):
carried["as_of_date"] = carried_as_of_date
if not carried.get("source"):
carried["source"] = "scenario_state_carryover"
for key in (
"focus_object",
"selected_object_ref",
"warehouse_scope",
"organization_scope",
"provenance_bundle",
"sale_trace_bundle",
"purchase_documents_bundle",
"supplier_if_known",
"first_purchase_date",
):
if (prefer_carryover or key not in carried) and semantic_memory.get(key) is not None:
carried[key] = semantic_memory.get(key)
return carried
def merge_scenario_date_scope(
previous_date_scope: Any,
current_date_scope: Any,
*,
depends_on: list[str],
) -> Any:
previous = previous_date_scope if isinstance(previous_date_scope, dict) else None
current = current_date_scope if isinstance(current_date_scope, dict) else None
if not current:
return previous or current_date_scope
return current
def question_resets_temporal_scope(value: Any) -> bool:
normalized = str(value or "").casefold().replace("ё", "е")
markers = (
"за все доступное время",
"за все время",
"за весь период",
"за все подтвержденные данные",
"по всем подтвержденным данным",
"в целом",
"all-time",
"all time",
"не тащи ндс",
"не тащи период",
"не тащи старый период",
)
return any(marker in normalized for marker in markers)
def repair_text_mojibake(value: str) -> str:
if not value:
return value
for encoding in ("utf-8", "cp1251", "cp866"):
try:
repaired = value.encode("latin1").decode(encoding)
except (UnicodeEncodeError, UnicodeDecodeError):
continue
if repaired != value:
return repaired
return value
def normalize_binding_value(value: Any) -> Any:
if isinstance(value, str):
return repair_text_mojibake(value)
if isinstance(value, list):
return [normalize_binding_value(item) for item in value]
if isinstance(value, dict):
return {normalize_binding_value(key) if isinstance(key, str) else key: normalize_binding_value(item) for key, item in value.items()}
return value
def normalize_bindings(raw_bindings: Any) -> dict[str, Any]:
if not isinstance(raw_bindings, dict):
return {}
return {str(key): normalize_binding_value(value) for key, value in raw_bindings.items()}
def normalize_string_list(raw_values: Any) -> list[str]:
if isinstance(raw_values, str):
value = raw_values.strip()
return [value] if value else []
if not isinstance(raw_values, list):
return []
values: list[str] = []
for item in raw_values:
value = str(item or "").strip()
if value:
values.append(value)
return values
def first_non_empty_string(*values: Any) -> str | None:
for value in values:
if isinstance(value, dict):
continue
text = str(value or "").strip()
if text:
return text
return None
def normalize_repair_mode(raw_value: Any) -> str:
value = str(raw_value or REPAIR_MODE_LEAD_HANDOFF).strip().lower().replace("_", "-")
if value in {"lead", "lead-codex", "manual", "manual-lead", "lead-handoff"}:
return REPAIR_MODE_LEAD_HANDOFF
if value in {"auto", "autonomous", "auto-coder", "autonomous-coder"}:
return REPAIR_MODE_AUTO_CODER
raise RuntimeError(f"Unknown repair mode: {raw_value}")
def normalize_validation_filters(raw_filters: Any) -> dict[str, str]:
if not isinstance(raw_filters, dict):
return {}
normalized: dict[str, str] = {}
for raw_key, raw_value in raw_filters.items():
key = str(raw_key or "").strip()
if not key:
continue
if key in {"as_of_date", "period_from", "period_to"}:
normalized_value = normalize_iso_date(raw_value)
if normalized_value:
normalized[key] = normalized_value
continue
text_value = str(raw_value or "").strip()
if text_value:
normalized[key] = text_value
return normalized
def normalize_invariant_severity(raw_mapping: Any) -> dict[str, str]:
if not isinstance(raw_mapping, dict):
return {}
normalized: dict[str, str] = {}
for raw_key, raw_value in raw_mapping.items():
key = str(raw_key or "").strip()
value = str(raw_value or "").strip().upper()
if not key or value not in {"P0", "P1", "WARNING"}:
continue
normalized[key] = value
return normalized
def normalize_identifier(value: Any) -> str:
return str(value or "").strip().lower()
def identifiers_match(actual: Any, expected: Any) -> bool:
actual_id = normalize_identifier(actual)
expected_id = normalize_identifier(expected)
if not actual_id or not expected_id:
return False
return actual_id == expected_id or actual_id.endswith(expected_id) or expected_id.endswith(actual_id)
def identifier_in_list(actual: Any, expected_values: list[str]) -> bool:
return any(identifiers_match(actual, expected) for expected in expected_values if expected)
def first_non_empty_lines(text: str, limit: int = 3) -> list[str]:
output: list[str] = []
for raw_line in str(text or "").splitlines():
cleaned = raw_line.strip()
if not cleaned:
continue
output.append(cleaned)
if len(output) >= limit:
break
return output
def build_node_contract_index(raw_contract: dict[str, Any]) -> dict[str, dict[str, Any]]:
scenario_tree = raw_contract.get("scenario_tree")
if not isinstance(scenario_tree, dict):
return {}
node_index: dict[str, dict[str, Any]] = {}
for section_key in ("root_nodes", "critical_nodes", "supporting_nodes"):
raw_nodes = scenario_tree.get(section_key)
if not isinstance(raw_nodes, list):
continue
for raw_node in raw_nodes:
if not isinstance(raw_node, dict):
continue
node_id = str(raw_node.get("node_id") or "").strip()
if not node_id:
continue
node_index[node_id] = {
"expected_intents": normalize_string_list(raw_node.get("expected_intents")),
"required_answer_shape": str(raw_node.get("expected_answer_shape") or "").strip() or None,
"required_carryover_invariants": normalize_string_list(raw_node.get("required_carryover_invariants")),
"ordering_rule": str(raw_node.get("ordering_rule") or "").strip() or None,
}
return node_index
def enrich_step_with_node_contract(raw_step: Any, node_contract_index: dict[str, dict[str, Any]]) -> Any:
if not isinstance(raw_step, dict):
return raw_step
node_id = str(raw_step.get("node_id") or "").strip()
node_defaults = node_contract_index.get(node_id) or {}
if not node_defaults:
return raw_step
enriched = dict(raw_step)
if not enriched.get("expected_intents") and node_defaults.get("expected_intents"):
enriched["expected_intents"] = list(node_defaults["expected_intents"])
if not enriched.get("required_answer_shape") and node_defaults.get("required_answer_shape"):
enriched["required_answer_shape"] = node_defaults["required_answer_shape"]
merged_invariants = list(
dict.fromkeys(
normalize_string_list(node_defaults.get("required_carryover_invariants"))
+ normalize_string_list(enriched.get("required_carryover_invariants"))
)
)
if merged_invariants:
enriched["required_carryover_invariants"] = merged_invariants
if not enriched.get("ordering_rule") and node_defaults.get("ordering_rule"):
enriched["ordering_rule"] = node_defaults["ordering_rule"]
return enriched
def drop_none_values(payload: dict[str, Any]) -> dict[str, Any]:
return {key: value for key, value in payload.items() if value is not None}
def http_json(url: str, *, method: str = "GET", payload: dict[str, Any] | None = None, timeout: int = 30) -> dict[str, Any]:
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json; charset=utf-8"
request = Request(url, data=data, method=method, headers=headers)
try:
with urlopen(request, timeout=timeout) as response:
body = response.read().decode("utf-8")
except HTTPError as error:
detail = error.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {error.code} for {url}: {detail}") from error
except URLError as error:
raise RuntimeError(f"Failed to reach backend at {url}: {error}") from error
try:
return json.loads(body)
except json.JSONDecodeError as error:
raise RuntimeError(f"Backend returned non-JSON payload for {url}") from error
def ensure_backend_health(backend_url: str, timeout_seconds: int) -> dict[str, Any]:
health = http_json(f"{backend_url}/api/health", timeout=max(5, min(timeout_seconds, 30)))
if not isinstance(health, dict):
raise RuntimeError(f"Backend health endpoint returned invalid payload at {backend_url}/api/health")
if health.get("ok") is False:
raise RuntimeError(f"Backend health check failed for {backend_url}: {dump_json(health)}")
return health
def wait_for_job(backend_url: str, job_id: str, timeout_seconds: int, poll_interval_seconds: float) -> dict[str, Any]:
deadline = time.time() + timeout_seconds
last_status = None
while time.time() < deadline:
response = http_json(f"{backend_url}/api/eval/run-async/{job_id}")
job = response.get("job")
if not isinstance(job, dict):
raise RuntimeError("Async job response does not contain `job` object")
status = str(job.get("status") or "unknown")
if status != last_status:
print(f"[domain-case-loop] job {job_id}: {status}")
last_status = status
if status in {"completed", "failed"}:
return job
time.sleep(poll_interval_seconds)
raise TimeoutError(f"Async job {job_id} did not finish within {timeout_seconds} seconds")
def wait_for_file(file_path: Path, timeout_seconds: int = 30) -> None:
deadline = time.time() + timeout_seconds
while time.time() < deadline:
if file_path.exists():
return
time.sleep(0.5)
raise FileNotFoundError(f"Timed out waiting for file: {file_path}")
def read_json_file(file_path: Path) -> dict[str, Any]:
return json.loads(read_text_file(file_path))
def extract_conversation_from_session(session_record: dict[str, Any]) -> list[dict[str, Any]]:
items = session_record.get("items")
if isinstance(items, list) and items:
output: list[dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
continue
output.append(
{
"message_id": item.get("message_id"),
"role": item.get("role"),
"text": item.get("text") or "",
"reply_type": item.get("reply_type"),
"created_at": item.get("created_at"),
"trace_id": item.get("trace_id"),
"debug": item.get("debug"),
}
)
if output:
return output
conversation = session_record.get("conversation")
if isinstance(conversation, list) and conversation:
output = []
for item in conversation:
if not isinstance(item, dict):
continue
output.append(
{
"message_id": item.get("message_id"),
"role": item.get("role"),
"text": item.get("text") or "",
"reply_type": item.get("reply_type"),
"created_at": item.get("created_at"),
"trace_id": item.get("trace_id"),
"debug": item.get("debug"),
}
)
if output:
return output
turns = session_record.get("turns")
if not isinstance(turns, list):
return []
output = []
for turn in turns:
if not isinstance(turn, dict):
continue
technical_json = turn.get("technical_json")
if not isinstance(technical_json, dict):
continue
for item in (technical_json.get("user_message"), technical_json.get("assistant_message")):
if not isinstance(item, dict):
continue
output.append(
{
"message_id": item.get("message_id"),
"role": item.get("role"),
"text": item.get("text") or "",
"reply_type": item.get("reply_type"),
"created_at": item.get("created_at"),
"trace_id": item.get("trace_id"),
"debug": item.get("debug"),
}
)
return output
def find_last_assistant(conversation: list[dict[str, Any]]) -> dict[str, Any]:
for item in reversed(conversation):
if item.get("role") == "assistant":
return item
raise RuntimeError("Conversation does not contain assistant message")
def find_last_user_before(conversation: list[dict[str, Any]], assistant_message_id: Any) -> dict[str, Any] | None:
before_assistant: list[dict[str, Any]] = []
for item in conversation:
if item.get("message_id") == assistant_message_id:
break
before_assistant.append(item)
for item in reversed(before_assistant):
if item.get("role") == "user":
return item
return None
def extract_last_turn(session_record: dict[str, Any]) -> dict[str, Any] | None:
turns = session_record.get("turns")
if not isinstance(turns, list) or not turns:
return None
last_turn = turns[-1]
return last_turn if isinstance(last_turn, dict) else None
def extract_report_case(report_record: dict[str, Any], case_id: str) -> dict[str, Any] | None:
results = report_record.get("results")
if not isinstance(results, list):
return None
for item in results:
if isinstance(item, dict) and str(item.get("case_id") or "") == case_id:
return item
return None
def build_turn_artifact(
*,
slot: str,
domain: str,
case_id: str,
question: str | None,
session_id: str,
conversation: list[dict[str, Any]],
session_record: dict[str, Any] | None,
job_record: dict[str, Any] | None,
report_case: dict[str, Any] | None,
export_file_name: str,
) -> dict[str, Any]:
last_assistant = find_last_assistant(conversation)
last_user = find_last_user_before(conversation, last_assistant.get("message_id"))
final_question = question or (last_user.get("text") if isinstance(last_user, dict) else None)
last_turn = extract_last_turn(session_record or {})
session_record_safe = session_record or {}
return {
"schema_version": "domain_case_turn_artifact_v1",
"artifact_slot": slot,
"domain": domain,
"case_id": case_id,
"question": final_question,
"session_id": session_id,
"run": {
"job_id": job_record.get("job_id") if isinstance(job_record, dict) else None,
"run_id": job_record.get("run_id") if isinstance(job_record, dict) else None,
"analysis_date": job_record.get("analysis_date") if isinstance(job_record, dict) else None,
"report_case_available": report_case is not None,
},
"user_message": last_user,
"assistant_message": last_assistant,
"technical_debug_payload": last_assistant.get("debug"),
"session_summary": {
"schema_version": session_record_safe.get("schema_version"),
"updated_at": session_record_safe.get("updated_at"),
"trace_ids": session_record_safe.get("trace_ids"),
"reply_types": session_record_safe.get("reply_types"),
"investigation_state": session_record_safe.get("investigation_state"),
"address_navigation_state": session_record_safe.get("address_navigation_state"),
"items_count": len(session_record_safe.get("items", [])) if isinstance(session_record_safe.get("items"), list) else None,
"last_turn": last_turn,
},
"report_case": report_case,
"export_markdown_file": export_file_name,
}
def ensure_case_brief(
case_dir: Path,
*,
domain: str,
question: str | None,
expected_capability: str | None,
expected_result_mode: str | None,
) -> None:
file_path = case_dir / "case_brief.md"
if file_path.exists():
return
content = textwrap.dedent(
f"""\
# Case brief
## Domain
`{domain}`
## Raw user question
`{question or "<fill me>"}`
## Expected business meaning
- <fill me>
## Expected capability
- {expected_capability or "<fill me>"}
## Expected result mode
- {expected_result_mode or "<fill me>"}
## Constraints
- no architecture changes
- 1C/MCP first
- no fabricated values
- heuristic is not product success
- accepted requires analyst quality score >= 80 and zero unresolved P0
## Known current behavior
- <fill me>
## Draft acceptance criteria
- <fill me>
"""
)
write_text(file_path, content)
def save_capture_bundle(
*,
case_dir: Path,
slot: str,
export_markdown: str,
debug_payload: Any,
turn_artifact: dict[str, Any],
session_record: dict[str, Any] | None,
job_record: dict[str, Any] | None,
report_case: dict[str, Any] | None,
) -> None:
write_text(case_dir / f"{slot}_output.md", export_markdown)
write_json(case_dir / f"{slot}_debug.json", debug_payload if debug_payload is not None else {})
write_json(case_dir / f"{slot}_turn.json", turn_artifact)
if session_record is not None:
write_json(case_dir / f"{slot}_session.json", session_record)
if job_record is not None:
write_json(case_dir / f"{slot}_job.json", job_record)
if report_case is not None:
write_json(case_dir / f"{slot}_report_case.json", report_case)
def parse_metadata_line(line: str) -> tuple[str, str] | None:
if ":" not in line:
return None
key, value = line.split(":", 1)
return key.strip(), value.strip()
def parse_export_markdown(text: str) -> tuple[str, list[dict[str, Any]]]:
session_id = "n/a"
session_match = re.search(r"^session_id:\s*(.+?)\s*$", text, flags=re.MULTILINE)
if session_match:
session_id = session_match.group(1).strip()
section_pattern = re.compile(r"^##\s+\d+\.\s+(user|assistant)\s*$", flags=re.MULTILINE)
sections = list(section_pattern.finditer(text))
conversation: list[dict[str, Any]] = []
for index, match in enumerate(sections):
role = match.group(1)
start = match.end()
end = sections[index + 1].start() if index + 1 < len(sections) else len(text)
block = text[start:end].lstrip("\r\n")
lines = block.splitlines()
metadata: dict[str, Any] = {"role": role}
cursor = 0
while cursor < len(lines):
line = lines[cursor]
if not line.strip():
cursor += 1
break
meta = parse_metadata_line(line)
if not meta:
break
key, value = meta
metadata[key] = value
cursor += 1
body_lines = lines[cursor:]
debug_payload = None
debug_start = None
for body_index, line in enumerate(body_lines):
if line.strip().lower() == TECH_SECTION_HEADER.lower():
debug_start = body_index
break
if debug_start is not None:
body_text_lines = body_lines[:debug_start]
debug_lines = body_lines[debug_start + 1 :]
debug_text = "\n".join(debug_lines).strip()
fenced = re.search(r"```json\s*(.*?)\s*```", debug_text, flags=re.DOTALL | re.IGNORECASE)
if fenced:
debug_text = fenced.group(1).strip()
if debug_text:
debug_payload = json.loads(debug_text)
else:
body_text_lines = body_lines
conversation.append(
{
"message_id": metadata.get("message_id"),
"role": role,
"text": "\n".join(body_text_lines).strip(),
"reply_type": metadata.get("reply_type"),
"created_at": metadata.get("created_at"),
"trace_id": metadata.get("trace_id"),
"debug": debug_payload,
}
)
if not conversation:
raise RuntimeError("Could not parse conversation sections from export markdown")
return session_id, conversation
def build_normalize_config(args: argparse.Namespace) -> dict[str, Any]:
return {
"llmProvider": args.llm_provider,
"apiKey": args.llm_api_key,
"model": args.llm_model,
"baseUrl": args.llm_base_url,
"temperature": args.temperature,
"maxOutputTokens": args.max_output_tokens,
"promptVersion": args.prompt_version,
}
def fetch_session_snapshot(backend_url: str, session_id: str, timeout_seconds: int) -> dict[str, Any]:
deadline = time.time() + timeout_seconds
last_error: Exception | None = None
timeout_per_call = max(10, min(30, timeout_seconds))
while time.time() < deadline:
try:
response = http_json(f"{backend_url}/api/assistant/session/{session_id}", timeout=timeout_per_call)
session = response.get("session")
if isinstance(session, dict):
return session
except Exception as error: # noqa: BLE001
last_error = error
time.sleep(0.5)
if last_error is not None:
raise RuntimeError(f"Failed to fetch assistant session `{session_id}`: {last_error}") from last_error
raise RuntimeError(f"Assistant session `{session_id}` was not available within {timeout_seconds} seconds")
def build_assistant_message_payload(
args: argparse.Namespace,
*,
question: str,
session_id: str | None,
analysis_context: dict[str, Any],
) -> dict[str, Any]:
context_payload: dict[str, Any] = {}
normalized_analysis_context = normalize_analysis_context(analysis_context)
if normalized_analysis_context:
context_payload["analysis_context"] = normalized_analysis_context
as_of_date = normalized_analysis_context.get("as_of_date")
if isinstance(as_of_date, str):
context_payload["period_hint"] = as_of_date
payload = drop_none_values(
{
"session_id": session_id,
"user_message": question,
"message": question,
"mode": "assistant",
"llmProvider": args.llm_provider,
"apiKey": args.llm_api_key,
"model": args.llm_model,
"baseUrl": args.llm_base_url,
"temperature": args.temperature,
"maxOutputTokens": args.max_output_tokens,
"promptVersion": args.prompt_version,
"context": context_payload or None,
"useMock": bool(args.use_mock),
}
)
return payload
def parse_path_tokens(path_expression: str) -> list[str | int]:
tokens: list[str | int] = []
cursor = 0
path = path_expression.strip()
while cursor < len(path):
current = path[cursor]
if current == ".":
cursor += 1
continue
if current == "[":
closing = path.find("]", cursor)
if closing < 0:
raise RuntimeError(f"Invalid placeholder path: {path_expression}")
raw_index = path[cursor + 1 : closing].strip()
if not raw_index.isdigit():
raise RuntimeError(f"Only numeric list indexes are supported in placeholders: {path_expression}")
tokens.append(int(raw_index))
cursor = closing + 1
continue
end = cursor
while end < len(path) and path[end] not in ".[":
end += 1
tokens.append(path[cursor:end])
cursor = end
return tokens
def lookup_placeholder_value(path_expression: str, scenario_state: dict[str, Any]) -> Any:
root: dict[str, Any] = {
"scenario_state": scenario_state,
"step_outputs": scenario_state.get("step_outputs", {}),
"semantic_memory": scenario_state.get("semantic_memory", {}),
"bindings": scenario_state.get("bindings", {}),
}
if isinstance(scenario_state.get("step_outputs"), dict):
root.update(scenario_state["step_outputs"])
current: Any = root
for token in parse_path_tokens(path_expression):
if isinstance(token, int):
if not isinstance(current, list):
raise RuntimeError(f"Placeholder `{path_expression}` does not point to a list before index access")
if token >= len(current):
raise RuntimeError(f"Placeholder `{path_expression}` index {token} is out of range")
current = current[token]
continue
if not isinstance(current, dict) or token not in current:
raise RuntimeError(f"Placeholder `{path_expression}` could not be resolved at `{token}`")
current = current[token]
return current
def resolve_question_template(question_template: str, scenario_state: dict[str, Any]) -> str:
pattern = re.compile(r"{{\s*([^{}]+?)\s*}}")
def replace(match: re.Match[str]) -> str:
value = lookup_placeholder_value(match.group(1), scenario_state)
if isinstance(value, (dict, list)):
return dump_json(value)
return str(value)
return pattern.sub(replace, question_template).strip()
def build_failed_step_state(
*,
scenario_id: str,
domain: str,
step: dict[str, Any],
step_index: int,
question_resolved: str,
status: str,
failure_type: str,
error_message: str,
) -> dict[str, Any]:
return {
"schema_version": SCENARIO_STEP_STATE_SCHEMA_VERSION,
"scenario_id": scenario_id,
"domain": domain,
"step_id": step["step_id"],
"step_index": step_index,
"title": step["title"],
"depends_on": step["depends_on"],
"question_template": step["question_template"],
"question_resolved": question_resolved,
"expected_intents": step.get("expected_intents") or [],
"expected_capability": step.get("expected_capability"),
"expected_recipe": step.get("expected_recipe"),
"expected_result_mode": step.get("expected_result_mode"),
"required_filters": step.get("required_filters") or {},
"forbidden_capabilities": step.get("forbidden_capabilities") or [],
"forbidden_recipes": step.get("forbidden_recipes") or [],
"required_state_objects": step.get("required_state_objects") or [],
"required_answer_shape": step.get("required_answer_shape"),
"forbidden_answer_patterns": step.get("forbidden_answer_patterns") or [],
"required_carryover_invariants": step.get("required_carryover_invariants") or [],
"invariant_severity": step.get("invariant_severity") or {},
"reply_type": "backend_error" if status == "blocked" else "unresolved_followup",
"assistant_message_id": None,
"trace_id": None,
"detected_mode": None,
"detected_intent": None,
"selected_recipe": None,
"capability_id": None,
"capability_route_mode": None,
"route_expectation_status": None,
"result_mode": None,
"response_type": None,
"assistant_text": "",
"top_non_empty_lines": [],
"actual_direct_answer": None,
"extracted_filters": {},
"focus_object": None,
"fallback_type": failure_type,
"mcp_call_status": None,
"balance_confirmed": None,
"active_result_set_id": None,
"last_confirmed_route": None,
"date_scope": None,
"organization_scope": None,
"entries": [],
"execution_status": status,
"acceptance_status": status,
"violated_invariants": [],
"warnings": [],
"hard_fail": status == "blocked",
"status": status,
"failure_type": failure_type,
"error_message": error_message,
}
def normalize_step_definition(index: int, raw_step: Any) -> dict[str, Any]:
if isinstance(raw_step, str):
step_id = f"step_{index:02d}"
question_template = raw_step.strip()
if not question_template:
raise RuntimeError(f"Scenario step {index} must not be empty")
return {
"step_id": step_id,
"title": step_id,
"question_template": question_template,
"depends_on": [],
"analysis_context": {},
"expected_intents": [],
"expected_capability": None,
"expected_recipe": None,
"expected_result_mode": None,
"question_id": None,
"node_id": None,
"node_role": None,
"paraphrase_family": None,
"required_filters": {},
"forbidden_capabilities": [],
"forbidden_recipes": [],
"required_state_objects": [],
"required_answer_shape": None,
"forbidden_answer_patterns": [],
"required_answer_patterns_any": [],
"required_answer_patterns_all": [],
"semantic_tags": [],
"required_carryover_invariants": [],
"invariant_severity": {},
"ordering_rule": None,
}
if not isinstance(raw_step, dict):
raise RuntimeError(f"Scenario step {index} must be a string or object")
raw_step_id = str(raw_step.get("step_id") or "").strip()
step_id = slugify_token(raw_step_id, f"step_{index:02d}")
question_template = str(raw_step.get("question") or raw_step.get("question_template") or "").strip()
if not question_template:
raise RuntimeError(f"Scenario step `{step_id}` must define `question` or `question_template`")
depends_on_raw = raw_step.get("depends_on")
depends_on: list[str] = []
if isinstance(depends_on_raw, str) and depends_on_raw.strip():
depends_on = [depends_on_raw.strip()]
elif isinstance(depends_on_raw, list):
depends_on = [str(item).strip() for item in depends_on_raw if str(item).strip()]
return {
"step_id": step_id,
"title": str(raw_step.get("title") or step_id).strip() or step_id,
"question_template": question_template,
"depends_on": depends_on,
"analysis_context": normalize_analysis_context(raw_step.get("analysis_context")),
"expected_intents": normalize_string_list(raw_step.get("expected_intents") or raw_step.get("expected_intent")),
"expected_capability": str(raw_step.get("expected_capability") or "").strip() or None,
"expected_recipe": str(raw_step.get("expected_recipe") or raw_step.get("expected_selected_recipe") or "").strip() or None,
"expected_result_mode": str(raw_step.get("expected_result_mode") or "").strip() or None,
"question_id": str(raw_step.get("question_id") or "").strip() or None,
"node_id": str(raw_step.get("node_id") or "").strip() or None,
"node_role": str(raw_step.get("node_role") or raw_step.get("role") or "").strip() or None,
"paraphrase_family": str(raw_step.get("paraphrase_family") or raw_step.get("wording_family") or "").strip() or None,
"required_filters": normalize_validation_filters(raw_step.get("required_filters")),
"forbidden_capabilities": normalize_string_list(raw_step.get("forbidden_capabilities")),
"forbidden_recipes": normalize_string_list(raw_step.get("forbidden_recipes")),
"required_state_objects": normalize_string_list(raw_step.get("required_state_objects")),
"required_answer_shape": (
str(raw_step.get("required_answer_shape") or raw_step.get("expected_answer_shape") or "").strip() or None
),
"forbidden_answer_patterns": normalize_string_list(raw_step.get("forbidden_answer_patterns")),
"required_answer_patterns_any": normalize_string_list(raw_step.get("required_answer_patterns_any")),
"required_answer_patterns_all": normalize_string_list(raw_step.get("required_answer_patterns_all")),
"semantic_tags": normalize_string_list(raw_step.get("semantic_tags")),
"required_carryover_invariants": normalize_string_list(raw_step.get("required_carryover_invariants")),
"invariant_severity": normalize_invariant_severity(raw_step.get("invariant_severity")),
"ordering_rule": str(raw_step.get("ordering_rule") or "").strip() or None,
}
def normalize_scenario_manifest(
raw_manifest: dict[str, Any],
*,
fallback_domain: str | None = None,
fallback_analysis_context: Any = None,
fallback_bindings: Any = None,
default_scenario_id: str | None = None,
) -> dict[str, Any]:
domain = str(raw_manifest.get("domain") or fallback_domain or "").strip()
if not domain:
raise RuntimeError("Scenario manifest must define `domain`")
raw_steps = raw_manifest.get("steps")
if not isinstance(raw_steps, list) or not raw_steps:
raise RuntimeError("Scenario manifest must define non-empty `steps`")
steps = [normalize_step_definition(index + 1, raw_step) for index, raw_step in enumerate(raw_steps)]
scenario_id = str(raw_manifest.get("scenario_id") or "").strip() or default_scenario_id or slugify_case_id(domain, None)
title = str(raw_manifest.get("title") or domain).strip() or domain
description = str(raw_manifest.get("description") or "").strip() or None
analysis_context = merge_analysis_context(fallback_analysis_context, raw_manifest.get("analysis_context"))
if analysis_context and "source" not in analysis_context:
analysis_context["source"] = "scenario_manifest"
bindings = normalize_bindings(fallback_bindings)
bindings.update(normalize_bindings(raw_manifest.get("bindings")))
return {
"schema_version": str(raw_manifest.get("schema_version") or SCENARIO_MANIFEST_SCHEMA_VERSION),
"scenario_id": scenario_id,
"domain": domain,
"title": title,
"description": description,
"analysis_context": analysis_context,
"bindings": bindings,
"question_ids": normalize_string_list(raw_manifest.get("question_ids")),
"node_ids": normalize_string_list(raw_manifest.get("node_ids")),
"acceptance_canon": raw_manifest.get("acceptance_canon") if isinstance(raw_manifest.get("acceptance_canon"), dict) else {},
"steps": steps,
}
def load_scenario_manifest(file_path: Path) -> dict[str, Any]:
raw_manifest = read_json_file(file_path)
return normalize_scenario_manifest(raw_manifest)
def build_active_contract_bindings(raw_contract: dict[str, Any]) -> dict[str, Any]:
observed_anchors = raw_contract.get("observed_anchors")
anchors = observed_anchors if isinstance(observed_anchors, dict) else {}
bindings = {
"target_date_historical": anchors.get("historical_as_of_date"),
"target_date_current": anchors.get("current_as_of_date_example"),
"observed_warehouse": anchors.get("warehouse"),
"observed_organization": anchors.get("organization"),
"focus_item_current": anchors.get("focus_item_current"),
"focus_item_historical": anchors.get("focus_item_historical"),
"focus_item_small_residue": anchors.get("focus_item_small_residue"),
"observed_supplier_candidate": anchors.get("supplier_candidate"),
"observed_supplier_candidate_alt": anchors.get("supplier_candidate_alt"),
"observed_customer_candidate": anchors.get("buyer_candidate"),
}
bindings.update(normalize_bindings(raw_contract.get("bindings")))
return normalize_bindings(bindings)
def convert_active_domain_contract_to_pack(raw_contract: dict[str, Any]) -> dict[str, Any]:
orchestration_pack = raw_contract.get("orchestration_pack")
if not isinstance(orchestration_pack, dict):
raise RuntimeError("Active domain contract must define object `orchestration_pack`")
raw_scenarios = orchestration_pack.get("scenarios")
if not isinstance(raw_scenarios, list) or not raw_scenarios:
raise RuntimeError("Active domain contract must define non-empty `orchestration_pack.scenarios`")
runtime_domain = (
str(raw_contract.get("runtime_domain") or raw_contract.get("domain") or "").strip()
or ("inventory_stock" if str(raw_contract.get("domain_id") or "").startswith("inventory_stock") else "")
)
if not runtime_domain:
raise RuntimeError("Active domain contract must define `runtime_domain`")
domain_id = str(raw_contract.get("domain_id") or runtime_domain).strip() or runtime_domain
node_contract_index = build_node_contract_index(raw_contract)
bindings = build_active_contract_bindings(raw_contract)
bindings.update(normalize_bindings(orchestration_pack.get("bindings")))
analysis_context = merge_analysis_context(raw_contract.get("default_analysis_context"), raw_contract.get("analysis_context"))
analysis_context = merge_analysis_context(analysis_context, orchestration_pack.get("analysis_context"))
if analysis_context and "source" not in analysis_context:
analysis_context["source"] = "active_domain_contract"
pack_id = str(orchestration_pack.get("pack_id") or "").strip() or slugify_case_id(domain_id, None)
title = str(orchestration_pack.get("title") or raw_contract.get("title") or domain_id).strip() or domain_id
description = str(orchestration_pack.get("description") or raw_contract.get("domain_goal") or "").strip() or None
enriched_scenarios: list[Any] = []
for raw_scenario in raw_scenarios:
if not isinstance(raw_scenario, dict):
enriched_scenarios.append(raw_scenario)
continue
enriched_scenario = dict(raw_scenario)
raw_steps = enriched_scenario.get("steps")
if isinstance(raw_steps, list):
enriched_scenario["steps"] = [
enrich_step_with_node_contract(raw_step, node_contract_index) for raw_step in raw_steps
]
enriched_scenarios.append(enriched_scenario)
return {
"schema_version": SCENARIO_PACK_SCHEMA_VERSION,
"source_schema_version": ACTIVE_DOMAIN_CONTRACT_SCHEMA_VERSION,
"source_contract_id": domain_id,
"pack_id": pack_id,
"domain": runtime_domain,
"title": title,
"description": description,
"analysis_context": analysis_context,
"bindings": bindings,
"scenarios": enriched_scenarios,
"scenario_tree": raw_contract.get("scenario_tree") if isinstance(raw_contract.get("scenario_tree"), dict) else {},
"acceptance_contract": (
raw_contract.get("acceptance_contract") if isinstance(raw_contract.get("acceptance_contract"), dict) else {}
),
"question_pool": raw_contract.get("question_pool") if isinstance(raw_contract.get("question_pool"), dict) else {},
"wording_families": raw_contract.get("wording_families") if isinstance(raw_contract.get("wording_families"), list) else [],
"known_failure_patterns_to_watch": (
raw_contract.get("known_failure_patterns_to_watch")
if isinstance(raw_contract.get("known_failure_patterns_to_watch"), list)
else []
),
"source_contract": {
"domain_id": domain_id,
"title": str(raw_contract.get("title") or domain_id).strip() or domain_id,
"status": str(raw_contract.get("status") or "active").strip() or "active",
},
}
def load_scenario_pack(file_path: Path) -> dict[str, Any]:
raw_pack = read_json_file(file_path)
schema_version = str(raw_pack.get("schema_version") or "").strip()
if schema_version == ACTIVE_DOMAIN_CONTRACT_SCHEMA_VERSION:
raw_pack = convert_active_domain_contract_to_pack(raw_pack)
domain = str(raw_pack.get("domain") or "").strip()
if not domain:
raise RuntimeError("Scenario pack must define `domain`")
raw_scenarios = raw_pack.get("scenarios")
if not isinstance(raw_scenarios, list) or not raw_scenarios:
raise RuntimeError("Scenario pack must define non-empty `scenarios`")
pack_id = str(raw_pack.get("pack_id") or "").strip() or slugify_case_id(domain, None)
title = str(raw_pack.get("title") or domain).strip() or domain
description = str(raw_pack.get("description") or "").strip() or None
analysis_context = normalize_analysis_context(raw_pack.get("analysis_context"))
if analysis_context and "source" not in analysis_context:
analysis_context["source"] = "scenario_pack"
bindings = normalize_bindings(raw_pack.get("bindings"))
scenarios = [
normalize_scenario_manifest(
raw_scenario,
fallback_domain=domain,
fallback_analysis_context=analysis_context,
fallback_bindings=bindings,
default_scenario_id=f"{pack_id}_scenario_{index + 1:02d}",
)
for index, raw_scenario in enumerate(raw_scenarios)
]
normalized_pack = {
"schema_version": str(raw_pack.get("schema_version") or SCENARIO_PACK_SCHEMA_VERSION),
"pack_id": pack_id,
"domain": domain,
"title": title,
"description": description,
"analysis_context": analysis_context,
"bindings": bindings,
"scenarios": scenarios,
}
for optional_key in (
"source_schema_version",
"source_contract_id",
"scenario_tree",
"acceptance_contract",
"question_pool",
"wording_families",
"known_failure_patterns_to_watch",
"source_contract",
):
optional_value = raw_pack.get(optional_key)
if optional_value is not None:
normalized_pack[optional_key] = optional_value
return normalized_pack
def ensure_scenario_brief(scenario_dir: Path, manifest: dict[str, Any]) -> None:
file_path = scenario_dir / "scenario_brief.md"
if file_path.exists():
return
steps_lines = "\n".join(
f"{index}. `{step['step_id']}` - {step['question_template']}" for index, step in enumerate(manifest["steps"], start=1)
)
content = textwrap.dedent(
f"""\
# Scenario brief
## Domain
`{manifest["domain"]}`
## Scenario id
`{manifest["scenario_id"]}`
## Title
{manifest["title"]}
## Description
{manifest.get("description") or "<fill me>"}
## Shared analysis context
```json
{dump_json(manifest.get("analysis_context") or {})}
```
## Bindings
```json
{dump_json(manifest.get("bindings") or {})}
```
## Steps
{steps_lines}
## Constraints
- no architecture changes
- reuse current assistant runtime and session state
- 1C/MCP first
- no fabricated values
- missing route or capability is domain enablement work, not silent rejection
"""
)
write_text(file_path, content)
def normalize_field_key(raw_key: str) -> str:
normalized = re.sub(r"\s+", " ", raw_key.strip().lower().replace("ё", "е")).strip(" :")
aliases = {
"склад": "warehouse",
"количество": "quantity",
"стоимость": "amount",
"сумма": "amount",
"организация": "organization",
"дата строки": "row_date",
"дата": "date",
"поставщик": "supplier",
"договор": "contract",
"документ": "document",
"документы": "documents",
"покупатель": "customer",
}
if normalized in aliases:
return aliases[normalized]
return slugify_token(normalized, "field").lower()
def extract_structured_entries(answer_text: str) -> list[dict[str, Any]]:
entries: list[dict[str, Any]] = []
for line in answer_text.splitlines():
match = re.match(r"^\s*(\d+)\.\s+(.*\S)\s*$", line)
if not match:
continue
index = int(match.group(1))
payload = match.group(2).strip()
segments = [segment.strip() for segment in payload.split(" | ") if segment.strip()]
title = segments[0] if segments else payload
fields: dict[str, str] = {}
raw_fields: dict[str, str] = {}
for segment in segments[1:]:
if ":" not in segment:
continue
raw_key, raw_value = segment.split(":", 1)
key = normalize_field_key(raw_key)
value = raw_value.strip()
fields[key] = value
raw_fields[raw_key.strip()] = value
entry: dict[str, Any] = {
"index": index,
"title": title,
"item": title,
"fields": fields,
"raw_fields": raw_fields,
"raw_line": line.strip(),
}
for key, value in fields.items():
entry[key] = value
entries.append(entry)
return entries
def derive_step_execution_status(reply_type: str | None, debug_payload: dict[str, Any]) -> str:
if reply_type == "backend_error":
return "blocked"
mcp_discovery_candidate = (
debug_payload.get("mcp_discovery_response_candidate_v1")
if isinstance(debug_payload.get("mcp_discovery_response_candidate_v1"), dict)
else {}
)
mcp_discovery_candidate_status = str(mcp_discovery_candidate.get("candidate_status") or "").strip()
if (
debug_payload.get("mcp_discovery_response_applied") is True
and mcp_discovery_candidate_status == "ready_for_guarded_use"
and reply_type in {"factual", "factual_with_explanation", "partial_coverage"}
):
return "partial"
capability_route_mode = str(debug_payload.get("capability_route_mode") or "").strip()
fallback_type = str(debug_payload.get("fallback_type") or "").strip()
selected_recipe = str(debug_payload.get("selected_recipe") or "").strip()
detected_intent = str(debug_payload.get("detected_intent") or "").strip()
detected_mode = str(debug_payload.get("detected_mode") or "").strip()
living_router_reason = str(debug_payload.get("living_router_reason") or "").strip()
living_chat_response_source = str(debug_payload.get("living_chat_response_source") or "").strip()
if capability_route_mode == "exact" and fallback_type in {"", "none"} and reply_type in {"factual", "factual_with_explanation", "empty_but_valid"}:
return "exact"
if (
detected_mode == "chat"
and living_router_reason == "memory_recap_followup_detected"
and living_chat_response_source == "deterministic_memory_recap_contract"
and fallback_type in {"", "none"}
and reply_type in {"factual", "factual_with_explanation", "empty_but_valid"}
):
return "partial"
if fallback_type in {"out_of_scope", "unknown"}:
return "needs_exact_capability"
if capability_route_mode == "exact" and detected_mode == "address_query":
return "partial"
if detected_mode == "address_query" and (selected_recipe or detected_intent):
return "partial"
if reply_type in {"partial_coverage", "clarification_required", "out_of_scope", "no_grounded_answer", "route_mismatch_blocked"}:
return "needs_exact_capability"
return "needs_exact_capability"
def derive_step_status(reply_type: str | None, debug_payload: dict[str, Any]) -> str:
return derive_step_execution_status(reply_type, debug_payload)
def derive_mcp_discovery_effective_intents(debug_payload: dict[str, Any]) -> list[str]:
candidate = (
debug_payload.get("mcp_discovery_response_candidate_v1")
if isinstance(debug_payload.get("mcp_discovery_response_candidate_v1"), dict)
else {}
)
if debug_payload.get("mcp_discovery_response_applied") is not True:
return []
if str(candidate.get("candidate_status") or "").strip() != "ready_for_guarded_use":
return []
chain_ids = [
str(debug_payload.get("mcp_discovery_selected_chain_id") or "").strip(),
str(debug_payload.get("mcp_discovery_catalog_chain_top_match") or "").strip(),
]
effective_intents: list[str] = []
for chain_id in chain_ids:
if not chain_id:
continue
effective_intents.extend(MCP_DISCOVERY_CHAIN_INTENT_ALIASES.get(chain_id, (chain_id,)))
return list(dict.fromkeys(item for item in effective_intents if item))
def nested_dict(source: Any, *keys: str) -> dict[str, Any]:
current = source
for key in keys:
if not isinstance(current, dict):
return {}
current = current.get(key)
return current if isinstance(current, dict) else {}
def derive_mcp_discovery_business_overview_date_scope(debug_payload: dict[str, Any]) -> dict[str, Any] | None:
if debug_payload.get("mcp_discovery_response_applied") is not True:
return None
chain_id = str(debug_payload.get("mcp_discovery_selected_chain_id") or "").strip()
if chain_id != "business_overview":
return None
overview = nested_dict(
debug_payload,
"assistant_mcp_discovery_entry_point_v1",
"bridge",
"pilot",
"derived_business_overview",
)
if not overview:
return None
period_scope = overview.get("period_scope")
source = "mcp_discovery_business_overview"
if period_scope is None or str(period_scope).strip() == "":
return {
"scope": "all_time",
"source": source,
"as_of_date": None,
"period_from": None,
"period_to": None,
}
if isinstance(period_scope, dict):
normalized = normalize_date_scope(period_scope)
if normalized:
normalized["source"] = source
return normalized
period_label = str(period_scope).strip()
year_match = re.fullmatch(r"(\d{4})", period_label)
if year_match:
year = year_match.group(1)
return {
"scope": "year",
"source": source,
"period_label": period_label,
"as_of_date": f"{year}-12-31",
"period_from": f"{year}-01-01",
"period_to": f"{year}-12-31",
}
return {
"scope": "period",
"source": source,
"period_label": period_label,
}
def derive_step_active_result_set_id(
context: dict[str, Any],
debug_payload: dict[str, Any],
assistant_item: dict[str, Any],
) -> Any:
if debug_payload.get("mcp_discovery_response_applied") is True:
message_id = str(assistant_item.get("message_id") or "").strip()
if message_id:
return f"mcp-discovery-{message_id}"
return context.get("active_result_set_id")
def should_require_analysis_context_as_of_filter(state: dict[str, Any], required_filters: dict[str, str]) -> bool:
if "as_of_date" in required_filters:
return False
analysis_context = state.get("analysis_context") if isinstance(state.get("analysis_context"), dict) else {}
if analysis_context.get("enforce_as_of_date_filter") is True:
return True
if analysis_context.get("as_of_date_is_business_filter") is True:
return True
carryover_invariants = set(normalize_string_list(state.get("required_carryover_invariants")))
if "date_scope" in carryover_invariants:
return True
expected_tokens = [
*normalize_string_list(state.get("expected_intents")),
str(state.get("expected_capability") or ""),
str(state.get("expected_recipe") or ""),
]
return any("as_of_date" in token for token in expected_tokens)
def step_intent_matches_expected(state: dict[str, Any], expected_intents: list[str]) -> bool:
observed_intents = [
str(state.get("detected_intent") or "").strip(),
*normalize_string_list(state.get("mcp_discovery_effective_intents")),
]
return any(identifier_in_list(intent, expected_intents) for intent in observed_intents if intent)
def should_require_direct_answer(step_state: dict[str, Any]) -> bool:
required_answer_shape = str(step_state.get("required_answer_shape") or "").strip()
if required_answer_shape:
return True
return str(step_state.get("node_role") or "").strip() in {"root", "critical_child"}
def _review_text(value: Any) -> str:
return str(value or "").strip().lower()
def _marker_hits(text: str, markers: tuple[str, ...]) -> list[str]:
lowered = _review_text(text)
return [marker for marker in markers if marker and marker in lowered]
def is_report_style_business_question(question: str) -> bool:
return bool(_marker_hits(question, BUSINESS_REPORT_REQUEST_MARKERS))
def is_direct_style_business_question(question: str) -> bool:
if is_report_style_business_question(question):
return False
return bool(_marker_hits(question, BUSINESS_DIRECT_QUESTION_MARKERS))
def build_business_first_review(step_state: dict[str, Any]) -> dict[str, Any]:
question = str(step_state.get("question_resolved") or step_state.get("question_template") or "").strip()
assistant_text = str(step_state.get("assistant_text") or "")
top_lines = step_state.get("top_non_empty_lines") if isinstance(step_state.get("top_non_empty_lines"), list) else []
first_line = str(top_lines[0] if top_lines else step_state.get("actual_direct_answer") or "").strip()
direct_answer_required = should_require_direct_answer(step_state) or is_direct_style_business_question(question)
report_style_question = is_report_style_business_question(question)
technical_hits = _marker_hits(assistant_text, BUSINESS_TECHNICAL_GARBAGE_MARKERS)
first_line_technical_hits = _marker_hits(first_line, BUSINESS_TECHNICAL_GARBAGE_MARKERS)
scaffold_hits = _marker_hits(first_line, BUSINESS_TOP_LINE_SCAFFOLD_MARKERS)
top_noise = bool(first_line and is_top_level_noise_line(first_line))
direct_answer_first_ok = bool(first_line) and not top_noise and not scaffold_hits and not first_line_technical_hits
too_verbose_for_direct = bool(
direct_answer_required
and not report_style_question
and len(assistant_text) > BUSINESS_DIRECT_ANSWER_SOFT_LIMIT
)
issue_codes: list[str] = []
if technical_hits:
issue_codes.append("technical_garbage_in_answer")
if direct_answer_required and not direct_answer_first_ok:
issue_codes.append("business_direct_answer_missing")
if scaffold_hits or top_noise:
issue_codes.append("answer_layering_noise")
if too_verbose_for_direct:
issue_codes.append("business_answer_too_verbose")
root_cause_layers: list[str] = []
if "business_direct_answer_missing" in issue_codes or "answer_layering_noise" in issue_codes:
root_cause_layers.append("answer_shape_mismatch")
if "business_answer_too_verbose" in issue_codes or "technical_garbage_in_answer" in issue_codes:
root_cause_layers.append("business_utility_gap")
return {
"schema_version": "business_first_step_review_v1",
"question": question,
"direct_answer_required": direct_answer_required,
"report_style_question": report_style_question,
"answer_length_chars": len(assistant_text),
"answer_line_count": len([line for line in assistant_text.splitlines() if line.strip()]),
"actual_direct_answer": first_line or None,
"direct_answer_first_ok": (not direct_answer_required) or direct_answer_first_ok,
"answer_layering_ok": not scaffold_hits and not top_noise,
"technical_garbage_present": bool(technical_hits),
"technical_garbage_hits": technical_hits,
"top_line_scaffold_present": bool(scaffold_hits or top_noise),
"top_line_scaffold_hits": scaffold_hits,
"too_verbose_for_direct_question": too_verbose_for_direct,
"business_usefulness_ok": not issue_codes,
"issue_codes": issue_codes,
"suggested_root_cause_layers": list(dict.fromkeys(root_cause_layers)),
}
def is_top_level_noise_line(line: str) -> bool:
cleaned = str(line or "").strip()
if not cleaned:
return False
return any(pattern.search(cleaned) for pattern in TOP_LEVEL_NOISE_PATTERNS)
def derive_invariant_severity(step_state: dict[str, Any], violation_code: str) -> str:
overrides = step_state.get("invariant_severity")
if isinstance(overrides, dict):
override = str(overrides.get(violation_code) or "").strip().upper()
if override in {"P0", "P1", "WARNING"}:
return override
return DEFAULT_INVARIANT_SEVERITY.get(violation_code, "P1")
def is_validated_bounded_mcp_answer(
state: dict[str, Any],
execution_status: str,
business_review: dict[str, Any],
violations: list[str],
) -> bool:
if execution_status != "partial":
return False
if state.get("mcp_discovery_response_applied") is not True:
return False
if str(state.get("mcp_discovery_response_candidate_status") or "").strip() != "ready_for_guarded_use":
return False
if str(state.get("mcp_discovery_response_reply_type") or "").strip() not in {
"factual",
"factual_with_explanation",
"partial_coverage",
}:
return False
if violations:
return False
return (
business_review.get("business_usefulness_ok") is True
and business_review.get("direct_answer_first_ok") is True
and business_review.get("answer_layering_ok") is True
and business_review.get("technical_garbage_present") is False
)
MEMORY_CHECKPOINT_NO_CONTEXT_MARKERS = (
"не выбрана",
"не выбран",
"не выбрано",
"нет выбран",
"нет уже выбран",
"не зафиксирован",
"не вижу выбран",
"не нашёл выбран",
"не нашел выбран",
"не подтверждена",
"не подтвержден",
"не выдум",
)
def is_validated_memory_checkpoint_answer(
state: dict[str, Any],
business_review: dict[str, Any],
violations: list[str],
) -> bool:
tags = set(normalize_string_list(state.get("semantic_tags")))
if "memory" not in tags:
return False
if violations:
return False
assistant_text = str(state.get("assistant_text") or "").casefold()
if not any(marker in assistant_text for marker in MEMORY_CHECKPOINT_NO_CONTEXT_MARKERS):
return False
return (
business_review.get("direct_answer_first_ok") is True
and business_review.get("answer_layering_ok") is True
and business_review.get("technical_garbage_present") is False
)
def is_validated_confirmed_runtime_answer(
state: dict[str, Any],
execution_status: str,
business_review: dict[str, Any],
violations: list[str],
) -> bool:
if execution_status not in {"partial", "exact"}:
return False
if violations:
return False
if state.get("mcp_discovery_response_applied") is True:
return False
if str(state.get("reply_type") or "").strip() not in {"factual", "factual_with_explanation", "empty_but_valid"}:
return False
if str(state.get("fallback_type") or "").strip() not in {"", "none"}:
return False
if str(state.get("mcp_call_status") or "").strip() != "matched_non_empty":
return False
response_type = str(state.get("response_type") or "").strip()
if response_type not in {"FACTUAL_LIST", "FACTUAL_SUMMARY"}:
return False
result_mode = str(state.get("result_mode") or "").strip()
truth_mode = str(state.get("truth_mode") or "").strip()
answer_shape = str(state.get("answer_shape") or "").strip()
if result_mode in GUARDED_INSUFFICIENCY_RESULT_MODES:
return False
if truth_mode in GUARDED_INSUFFICIENCY_TRUTH_MODES:
return False
if answer_shape in GUARDED_INSUFFICIENCY_ANSWER_SHAPES:
return False
if truth_mode and truth_mode != "confirmed":
return False
if state.get("balance_confirmed") is False:
return False
return (
business_review.get("business_usefulness_ok") is True
and business_review.get("direct_answer_first_ok") is True
and business_review.get("answer_layering_ok") is True
and business_review.get("technical_garbage_present") is False
)
def has_guarded_insufficiency_language(answer_text: str) -> bool:
lowered = str(answer_text or "").casefold()
has_unconfirmed_marker = any(marker in lowered for marker in GUARDED_INSUFFICIENCY_PRIMARY_MARKERS)
has_limitation_marker = any(marker in lowered for marker in GUARDED_INSUFFICIENCY_LIMITATION_MARKERS)
return has_unconfirmed_marker and has_limitation_marker
def is_validated_guarded_insufficiency_answer(
state: dict[str, Any],
execution_status: str,
business_review: dict[str, Any],
violations: list[str],
) -> bool:
if execution_status != "partial":
return False
if violations:
return False
if state.get("mcp_discovery_response_applied") is True:
return False
if str(state.get("reply_type") or "").strip() not in {"factual", "factual_with_explanation", "partial_coverage"}:
return False
if str(state.get("fallback_type") or "").strip() not in {"", "none"}:
return False
if str(state.get("mcp_call_status") or "").strip() != "matched_non_empty":
return False
result_mode = str(state.get("result_mode") or "").strip()
truth_mode = str(state.get("truth_mode") or "").strip()
answer_shape = str(state.get("answer_shape") or "").strip()
has_limited_evidence_mode = (
result_mode in GUARDED_INSUFFICIENCY_RESULT_MODES
or truth_mode in GUARDED_INSUFFICIENCY_TRUTH_MODES
or answer_shape in GUARDED_INSUFFICIENCY_ANSWER_SHAPES
or state.get("balance_confirmed") is False
)
if not has_limited_evidence_mode:
return False
if not has_guarded_insufficiency_language(str(state.get("assistant_text") or "")):
return False
return (
business_review.get("direct_answer_first_ok") is True
and business_review.get("answer_layering_ok") is True
and business_review.get("technical_garbage_present") is False
)
def acceptance_status_from_execution(execution_status: str, hard_fail: bool, semantic_validated: bool = False) -> str:
if execution_status == "blocked":
return "blocked"
if execution_status == "needs_exact_capability":
return "needs_exact_capability"
if hard_fail:
return "rejected"
if semantic_validated:
return "validated"
if execution_status == "exact":
return "validated"
return "rejected"
def validate_step_contract(step_state: dict[str, Any]) -> dict[str, Any]:
state = dict(step_state)
execution_status = str(state.get("execution_status") or state.get("status") or "").strip() or "needs_exact_capability"
actual_direct_answer = str(state.get("actual_direct_answer") or "").strip()
assistant_text = str(state.get("assistant_text") or "")
top_non_empty_lines = state.get("top_non_empty_lines") if isinstance(state.get("top_non_empty_lines"), list) else []
extracted_filters = state.get("extracted_filters") if isinstance(state.get("extracted_filters"), dict) else {}
date_scope = state.get("date_scope") if isinstance(state.get("date_scope"), dict) else {}
violated_invariants: list[str] = []
warnings: list[str] = []
business_review = build_business_first_review(state)
state["business_first_review"] = business_review
expected_intents = normalize_string_list(state.get("expected_intents"))
if expected_intents and not step_intent_matches_expected(state, expected_intents):
violated_invariants.append("wrong_intent")
expected_capability = state.get("expected_capability")
if expected_capability and not identifiers_match(state.get("capability_id"), expected_capability):
required_state_objects = set(normalize_string_list(state.get("required_state_objects")))
required_state_objects.update(normalize_string_list(state.get("required_carryover_invariants")))
violation_code = "wrong_followup_action" if "focus_object" in required_state_objects else "wrong_capability"
violated_invariants.append(violation_code)
expected_recipe = state.get("expected_recipe")
if expected_recipe and not identifiers_match(state.get("selected_recipe"), expected_recipe):
violated_invariants.append("wrong_recipe")
expected_result_mode = str(state.get("expected_result_mode") or "").strip()
actual_result_mode = str(state.get("result_mode") or "").strip()
if expected_result_mode and actual_result_mode and normalize_identifier(actual_result_mode) != normalize_identifier(expected_result_mode):
violated_invariants.append("wrong_result_mode")
for forbidden_capability in normalize_string_list(state.get("forbidden_capabilities")):
if identifiers_match(state.get("capability_id"), forbidden_capability):
violated_invariants.append("forbidden_capability_selected")
break
for forbidden_recipe in normalize_string_list(state.get("forbidden_recipes")):
if identifiers_match(state.get("selected_recipe"), forbidden_recipe):
violated_invariants.append("forbidden_recipe_selected")
break
required_filters = normalize_validation_filters(state.get("required_filters"))
required_as_of_date_from_context = normalize_iso_date(
(state.get("analysis_context") or {}).get("as_of_date") if isinstance(state.get("analysis_context"), dict) else None
)
if required_as_of_date_from_context and should_require_analysis_context_as_of_filter(state, required_filters):
required_filters["as_of_date"] = required_as_of_date_from_context
for filter_key, expected_value in required_filters.items():
actual_value = ""
if filter_key in {"as_of_date", "period_from", "period_to"}:
actual_value = normalize_iso_date(extracted_filters.get(filter_key))
else:
actual_value = str(extracted_filters.get(filter_key) or "").strip()
if not actual_value:
violated_invariants.append("missing_required_filter")
continue
if actual_value != expected_value:
if filter_key == "as_of_date":
violated_invariants.append("wrong_as_of_date")
elif filter_key == "period_from":
violated_invariants.append("wrong_period_from")
elif filter_key == "period_to":
violated_invariants.append("wrong_period_to")
else:
violated_invariants.append("missing_required_filter")
required_state_objects = set(normalize_string_list(state.get("required_state_objects")))
required_state_objects.update(
item
for item in normalize_string_list(state.get("required_carryover_invariants"))
if item in {"focus_object"}
)
focus_object = state.get("focus_object") if isinstance(state.get("focus_object"), dict) else {}
if "focus_object" in required_state_objects:
has_focus_object = bool(str(focus_object.get("object_id") or "").strip() or str(focus_object.get("label") or "").strip())
if not has_focus_object:
violated_invariants.append("focus_object_missing")
if "date_scope" in normalize_string_list(state.get("required_carryover_invariants")) and required_filters.get("as_of_date"):
current_date_scope = normalize_iso_date(date_scope.get("as_of_date"))
if current_date_scope and current_date_scope != required_filters["as_of_date"]:
violated_invariants.append("wrong_date_scope_state")
if should_require_direct_answer(state):
if not actual_direct_answer or is_top_level_noise_line(actual_direct_answer):
violated_invariants.append("direct_answer_missing")
first_top_line = str(top_non_empty_lines[0] if top_non_empty_lines else "").strip()
if first_top_line and is_top_level_noise_line(first_top_line):
violated_invariants.append("top_level_noise_present")
for issue_code in normalize_string_list(business_review.get("issue_codes")):
if issue_code == "business_answer_too_verbose":
warnings.append(issue_code)
violated_invariants.append(issue_code)
continue
violated_invariants.append(issue_code)
required_answer_patterns_any = normalize_string_list(state.get("required_answer_patterns_any"))
if required_answer_patterns_any and not any(
re.search(pattern, assistant_text, flags=re.IGNORECASE) for pattern in required_answer_patterns_any
):
violated_invariants.append("required_answer_patterns_any_missing")
required_answer_patterns_all = normalize_string_list(state.get("required_answer_patterns_all"))
missing_answer_patterns = [
pattern
for pattern in required_answer_patterns_all
if not re.search(pattern, assistant_text, flags=re.IGNORECASE)
]
if missing_answer_patterns:
violated_invariants.append("required_answer_patterns_all_missing")
forbidden_answer_patterns = normalize_string_list(state.get("forbidden_answer_patterns"))
if forbidden_answer_patterns and top_non_empty_lines:
joined_top_block = "\n".join(str(line) for line in top_non_empty_lines)
for pattern in forbidden_answer_patterns:
if pattern and re.search(pattern, joined_top_block, flags=re.IGNORECASE):
warnings.append(f"forbidden_answer_pattern:{pattern}")
unique_violations = list(dict.fromkeys(violated_invariants))
hard_fail = any(derive_invariant_severity(state, code) == "P0" for code in unique_violations)
bounded_validated = is_validated_bounded_mcp_answer(state, execution_status, business_review, unique_violations)
memory_validated = is_validated_memory_checkpoint_answer(state, business_review, unique_violations)
runtime_factual_validated = is_validated_confirmed_runtime_answer(
state,
execution_status,
business_review,
unique_violations,
)
guarded_insufficiency_validated = is_validated_guarded_insufficiency_answer(
state,
execution_status,
business_review,
unique_violations,
)
state["violated_invariants"] = unique_violations
state["warnings"] = list(dict.fromkeys(warnings))
state["hard_fail"] = hard_fail
state["bounded_mcp_answer_validated"] = bounded_validated
state["memory_checkpoint_validated"] = memory_validated
state["runtime_factual_answer_validated"] = runtime_factual_validated
state["guarded_insufficiency_validated"] = guarded_insufficiency_validated
state["acceptance_status"] = acceptance_status_from_execution(
execution_status,
hard_fail,
bounded_validated or memory_validated or runtime_factual_validated or guarded_insufficiency_validated,
)
state["status"] = state["acceptance_status"]
return state
def build_scenario_step_state(
*,
scenario_id: str,
domain: str,
step: dict[str, Any],
step_index: int,
question_resolved: str,
analysis_context: dict[str, Any],
turn_artifact: dict[str, Any],
entries: list[dict[str, Any]],
) -> dict[str, Any]:
debug_payload = turn_artifact.get("technical_debug_payload")
if not isinstance(debug_payload, dict):
assistant_debug = turn_artifact.get("assistant_message", {}).get("debug") if isinstance(turn_artifact.get("assistant_message"), dict) else None
debug_payload = assistant_debug if isinstance(assistant_debug, dict) else {}
debug = debug_payload if isinstance(debug_payload, dict) else {}
session_summary = turn_artifact.get("session_summary")
summary = session_summary if isinstance(session_summary, dict) else {}
address_state = summary.get("address_navigation_state")
navigation_state = address_state if isinstance(address_state, dict) else {}
session_context = navigation_state.get("session_context")
context = session_context if isinstance(session_context, dict) else {}
assistant_message = turn_artifact.get("assistant_message")
assistant_item = assistant_message if isinstance(assistant_message, dict) else {}
reply_type = assistant_item.get("reply_type")
assistant_text = str(assistant_item.get("text") or "")
top_non_empty = first_non_empty_lines(assistant_text, limit=3)
turn_scenario = turn_artifact.get("scenario") if isinstance(turn_artifact.get("scenario"), dict) else {}
effective_analysis_context = normalize_analysis_context(turn_scenario.get("analysis_context"))
if not effective_analysis_context:
effective_analysis_context = normalize_analysis_context(analysis_context)
if not effective_analysis_context:
effective_analysis_context = step.get("analysis_context") if isinstance(step.get("analysis_context"), dict) else {}
current_date_scope = derive_mcp_discovery_business_overview_date_scope(debug)
if current_date_scope is None:
current_date_scope = context.get("date_scope")
truth_policy = debug.get("truth_answer_policy_v1") if isinstance(debug.get("truth_answer_policy_v1"), dict) else {}
truth_gate = truth_policy.get("truth_gate") if isinstance(truth_policy.get("truth_gate"), dict) else {}
truth_answer_shape = truth_policy.get("answer_shape") if isinstance(truth_policy.get("answer_shape"), dict) else {}
coverage_gate_contract = (
debug.get("coverage_gate_contract") if isinstance(debug.get("coverage_gate_contract"), dict) else {}
)
answer_shape_contract = (
debug.get("answer_shape_contract") if isinstance(debug.get("answer_shape_contract"), dict) else {}
)
raw_answer_shape = debug.get("answer_shape")
debug_answer_shape = raw_answer_shape.get("answer_shape") if isinstance(raw_answer_shape, dict) else raw_answer_shape
step_state = {
"schema_version": SCENARIO_STEP_STATE_SCHEMA_VERSION,
"scenario_id": scenario_id,
"domain": domain,
"step_id": step["step_id"],
"step_index": step_index,
"title": step["title"],
"depends_on": step["depends_on"],
"question_template": step["question_template"],
"question_resolved": question_resolved,
"analysis_context": effective_analysis_context,
"expected_intents": step.get("expected_intents") or [],
"expected_capability": step.get("expected_capability"),
"expected_recipe": step.get("expected_recipe"),
"expected_result_mode": step.get("expected_result_mode"),
"required_filters": step.get("required_filters") or {},
"forbidden_capabilities": step.get("forbidden_capabilities") or [],
"forbidden_recipes": step.get("forbidden_recipes") or [],
"required_state_objects": step.get("required_state_objects") or [],
"required_answer_shape": step.get("required_answer_shape"),
"forbidden_answer_patterns": step.get("forbidden_answer_patterns") or [],
"required_answer_patterns_any": step.get("required_answer_patterns_any") or [],
"required_answer_patterns_all": step.get("required_answer_patterns_all") or [],
"semantic_tags": step.get("semantic_tags") or [],
"required_carryover_invariants": step.get("required_carryover_invariants") or [],
"invariant_severity": step.get("invariant_severity") or {},
"reply_type": reply_type,
"assistant_message_id": assistant_item.get("message_id"),
"trace_id": assistant_item.get("trace_id"),
"detected_mode": debug.get("detected_mode"),
"detected_intent": debug.get("detected_intent"),
"selected_recipe": debug.get("selected_recipe"),
"capability_id": debug.get("capability_id"),
"capability_route_mode": debug.get("capability_route_mode"),
"mcp_discovery_catalog_chain_alignment_status": debug.get("mcp_discovery_catalog_chain_alignment_status"),
"mcp_discovery_catalog_chain_top_match": debug.get("mcp_discovery_catalog_chain_top_match"),
"mcp_discovery_catalog_chain_selected_matches_top": debug.get("mcp_discovery_catalog_chain_selected_matches_top"),
"mcp_discovery_route_candidate_status": debug.get("mcp_discovery_route_candidate_status"),
"mcp_discovery_route_candidate_fact_family": debug.get("mcp_discovery_route_candidate_fact_family"),
"mcp_discovery_route_candidate_action_family": debug.get("mcp_discovery_route_candidate_action_family"),
"mcp_discovery_route_candidate_proof_expectation": debug.get("mcp_discovery_route_candidate_proof_expectation"),
"mcp_discovery_route_candidate_missing_axes": normalize_string_list(
debug.get("mcp_discovery_route_candidate_missing_axes")
),
"mcp_discovery_route_candidate_provided_axes": normalize_string_list(
debug.get("mcp_discovery_route_candidate_provided_axes")
),
"mcp_discovery_route_candidate_executable_now": debug.get("mcp_discovery_route_candidate_executable_now"),
"mcp_discovery_route_candidate_enablement_reason": debug.get(
"mcp_discovery_route_candidate_enablement_reason"
),
"mcp_discovery_route_candidate_next_action": debug.get("mcp_discovery_route_candidate_next_action"),
"mcp_discovery_response_applied": debug.get("mcp_discovery_response_applied"),
"mcp_discovery_selected_chain_id": debug.get("mcp_discovery_selected_chain_id"),
"mcp_discovery_response_candidate_status": (
debug.get("mcp_discovery_response_candidate_v1", {}).get("candidate_status")
if isinstance(debug.get("mcp_discovery_response_candidate_v1"), dict)
else None
),
"mcp_discovery_response_reply_type": (
debug.get("mcp_discovery_response_candidate_v1", {}).get("reply_type")
if isinstance(debug.get("mcp_discovery_response_candidate_v1"), dict)
else None
),
"mcp_discovery_effective_intents": derive_mcp_discovery_effective_intents(debug),
"route_expectation_status": debug.get("route_expectation_status"),
"result_mode": debug.get("result_mode"),
"response_type": debug.get("response_type"),
"truth_mode": first_non_empty_string(
debug.get("truth_mode"),
coverage_gate_contract.get("truth_mode"),
truth_gate.get("truth_mode"),
),
"answer_shape": first_non_empty_string(
debug_answer_shape,
answer_shape_contract.get("answer_shape"),
truth_answer_shape.get("answer_shape"),
),
"coverage_status": first_non_empty_string(
debug.get("coverage_status"),
coverage_gate_contract.get("coverage_status"),
truth_gate.get("coverage_status"),
),
"evidence_grade": first_non_empty_string(
debug.get("evidence_grade"),
coverage_gate_contract.get("evidence_grade"),
truth_gate.get("evidence_grade"),
),
"assistant_text": assistant_text,
"top_non_empty_lines": top_non_empty,
"actual_direct_answer": top_non_empty[0] if top_non_empty else None,
"extracted_filters": debug.get("extracted_filters") if isinstance(debug.get("extracted_filters"), dict) else {},
"focus_object": context.get("active_focus_object") if isinstance(context.get("active_focus_object"), dict) else None,
"fallback_type": debug.get("fallback_type"),
"mcp_call_status": debug.get("mcp_call_status"),
"balance_confirmed": debug.get("balance_confirmed"),
"active_result_set_id": derive_step_active_result_set_id(context, debug, assistant_item),
"last_confirmed_route": context.get("last_confirmed_route"),
"date_scope": current_date_scope,
"organization_scope": context.get("organization_scope"),
"entries": entries,
}
step_state["execution_status"] = derive_step_execution_status(reply_type if isinstance(reply_type, str) else None, debug)
step_state["acceptance_status"] = step_state["execution_status"]
step_state["status"] = step_state["acceptance_status"]
return validate_step_contract(step_state)
def save_scenario_step_bundle(
*,
step_dir: Path,
export_markdown: str,
turn_artifact: dict[str, Any],
session_record: dict[str, Any],
response_payload: dict[str, Any],
step_state: dict[str, Any],
) -> None:
debug_payload = turn_artifact.get("technical_debug_payload")
write_text(step_dir / "output.md", export_markdown)
write_json(step_dir / "debug.json", debug_payload if debug_payload is not None else {})
write_json(step_dir / "turn.json", turn_artifact)
write_json(step_dir / "session.json", session_record)
write_json(step_dir / "assistant_response.json", response_payload)
write_json(step_dir / "step_state.json", step_state)
write_text(step_dir / "resolved_question.txt", f"{step_state['question_resolved']}\n")
def derive_scenario_execution_status(step_outputs: dict[str, dict[str, Any]]) -> str:
statuses = [str(item.get("execution_status") or item.get("status") or "") for item in step_outputs.values()]
if not statuses:
return "blocked"
if any(status == "blocked" for status in statuses):
return "blocked"
if any(status == "needs_exact_capability" for status in statuses):
return "needs_exact_capability"
if any(status == "partial" for status in statuses):
return "partial"
return "exact"
def derive_scenario_status(step_outputs: dict[str, dict[str, Any]]) -> str:
statuses = [str(item.get("acceptance_status") or item.get("status") or "") for item in step_outputs.values()]
if not statuses:
return "blocked"
if any(status == "blocked" for status in statuses):
return "blocked"
if any(status == "needs_exact_capability" for status in statuses):
return "needs_exact_capability"
if any(status in {"partial", "rejected"} for status in statuses):
return "partial"
return "accepted" if all(status == "validated" for status in statuses) else "partial"
def build_scenario_summary(manifest: dict[str, Any], scenario_state: dict[str, Any], final_status: str, execution_status: str) -> str:
lines = [
"# Scenario summary",
"",
f"- scenario_id: `{manifest['scenario_id']}`",
f"- domain: `{manifest['domain']}`",
f"- title: {manifest['title']}",
f"- session_id: `{scenario_state.get('session_id') or 'n/a'}`",
f"- execution_status: `{execution_status}`",
f"- final_status: `{final_status}`",
"",
"## Steps",
]
for index, step in enumerate(manifest["steps"], start=1):
step_output = scenario_state.get("step_outputs", {}).get(step["step_id"], {})
lines.extend(
[
f"{index}. `{step['step_id']}` - {step['question_template']}",
f"execution_status: `{step_output.get('execution_status') or 'n/a'}`",
f"acceptance_status: `{step_output.get('acceptance_status') or step_output.get('status') or 'n/a'}`",
f"question_resolved: {step_output.get('question_resolved') or 'n/a'}",
f"intent: `{step_output.get('detected_intent') or 'n/a'}`",
f"recipe: `{step_output.get('selected_recipe') or 'n/a'}`",
f"capability: `{step_output.get('capability_id') or 'n/a'}`",
f"result_mode: `{step_output.get('result_mode') or 'n/a'}`",
f"result_set: `{step_output.get('active_result_set_id') or 'n/a'}`",
f"violated_invariants: {', '.join(step_output.get('violated_invariants') or []) or 'none'}",
"",
]
)
return "\n".join(lines).strip() + "\n"
def build_scenario_final_status(
manifest: dict[str, Any],
scenario_state: dict[str, Any],
final_status: str,
execution_status: str,
) -> str:
reason = {
"accepted": "all scenario steps executed in one assistant session with no unresolved route or capability gaps",
"partial": "scenario captured successfully, but at least one step still needs exact capability enablement or route hardening",
"needs_exact_capability": "scenario is valid for the project, but at least one step still requires exact capability or route enablement",
"blocked": "scenario run was interrupted by runtime or backend failure",
}.get(final_status, "scenario status unknown")
return textwrap.dedent(
f"""\
# Final status
- status: `{final_status}`
- execution_status: `{execution_status}`
- scenario_id: `{manifest['scenario_id']}`
- session_id: `{scenario_state.get('session_id') or 'n/a'}`
- reason: {reason}
"""
)
def run_assistant_step(
*,
args: argparse.Namespace,
domain: str,
scenario_id: str,
step: dict[str, Any],
step_index: int,
session_id: str | None,
question_resolved: str,
analysis_context: dict[str, Any],
) -> dict[str, Any]:
payload = build_assistant_message_payload(
args,
question=question_resolved,
session_id=session_id,
analysis_context=analysis_context,
)
response_payload = http_json(
f"{args.backend_url}/api/assistant/message",
method="POST",
payload=payload,
timeout=max(30, int(args.timeout_seconds)),
)
resolved_session_id = str(response_payload.get("session_id") or session_id or "").strip()
if not resolved_session_id:
raise RuntimeError(f"Assistant response for step `{step['step_id']}` does not contain session_id")
session_record = fetch_session_snapshot(args.backend_url, resolved_session_id, args.timeout_seconds)
conversation = extract_conversation_from_session(session_record)
export_markdown = build_conversation_export(resolved_session_id, conversation, mode="technical")
turn_artifact = build_turn_artifact(
slot="step",
domain=domain,
case_id=scenario_id,
question=question_resolved,
session_id=resolved_session_id,
conversation=conversation,
session_record=session_record,
job_record=None,
report_case=None,
export_file_name="output.md",
)
turn_artifact["schema_version"] = "domain_scenario_turn_artifact_v1"
turn_artifact["scenario"] = {
"scenario_id": scenario_id,
"step_id": step["step_id"],
"step_index": step_index,
"question_template": step["question_template"],
"question_resolved": question_resolved,
"depends_on": step["depends_on"],
"analysis_context": analysis_context,
}
last_assistant = find_last_assistant(conversation)
entries = extract_structured_entries(str(last_assistant.get("text") or ""))
step_state = build_scenario_step_state(
scenario_id=scenario_id,
domain=domain,
step=step,
step_index=step_index,
question_resolved=question_resolved,
analysis_context=analysis_context,
turn_artifact=turn_artifact,
entries=entries,
)
return {
"session_id": resolved_session_id,
"response_payload": response_payload,
"session_record": session_record,
"conversation": conversation,
"export_markdown": export_markdown,
"turn_artifact": turn_artifact,
"step_state": step_state,
}
def execute_scenario_manifest(
*,
args: argparse.Namespace,
manifest: dict[str, Any],
scenario_dir: Path,
manifest_source_label: str | None,
) -> tuple[dict[str, Any], str]:
steps_dir = scenario_dir / "steps"
steps_dir.mkdir(parents=True, exist_ok=True)
write_json(scenario_dir / "scenario_manifest.json", manifest)
if manifest_source_label:
write_text(scenario_dir / "manifest_source.txt", f"{manifest_source_label}\n")
ensure_scenario_brief(scenario_dir, manifest)
scenario_state: dict[str, Any] = {
"schema_version": SCENARIO_STATE_SCHEMA_VERSION,
"scenario_id": manifest["scenario_id"],
"domain": manifest["domain"],
"title": manifest["title"],
"session_id": None,
"analysis_context": manifest.get("analysis_context") or {},
"bindings": manifest.get("bindings") or {},
"step_outputs": {},
"semantic_memory": {},
"updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
}
write_json(scenario_dir / "scenario_state.json", scenario_state)
last_export_markdown = ""
for step_index, step in enumerate(manifest["steps"], start=1):
step_dir = steps_dir / step["step_id"]
try:
resolved_question = resolve_question_template(step["question_template"], scenario_state)
step_analysis_context = merge_analysis_context(manifest.get("analysis_context"), step.get("analysis_context"))
step_analysis_context = carry_forward_analysis_context(
scenario_state,
step_analysis_context,
prefer_carryover=bool(step.get("depends_on")),
carry_date_scope=not question_resets_temporal_scope(resolved_question),
)
result = run_assistant_step(
args=args,
domain=manifest["domain"],
scenario_id=manifest["scenario_id"],
step=step,
step_index=step_index,
session_id=scenario_state.get("session_id"),
question_resolved=resolved_question,
analysis_context=step_analysis_context,
)
except RuntimeError as exc:
failure_type = "runtime_error"
failure_status = "blocked"
if "Placeholder `" in str(exc):
failure_type = "placeholder_resolution_error"
failure_status = "needs_exact_capability"
step_state = build_failed_step_state(
scenario_id=manifest["scenario_id"],
domain=manifest["domain"],
step=step,
step_index=step_index,
question_resolved=step["question_template"],
status=failure_status,
failure_type=failure_type,
error_message=str(exc),
)
scenario_state["step_outputs"][step["step_id"]] = step_state
scenario_state["semantic_memory"] = {
**(scenario_state.get("semantic_memory") or {}),
"latest_step_id": step["step_id"],
"latest_step_status": step_state["status"],
}
scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
save_scenario_step_bundle(
step_dir=step_dir,
export_markdown="",
turn_artifact={},
session_record={},
response_payload={},
step_state=step_state,
)
write_json(scenario_dir / "scenario_state.json", scenario_state)
print(
f"[domain-case-loop] scenario {manifest['scenario_id']} step {step_index}/{len(manifest['steps'])}: "
f"{step['step_id']} -> {step_state['status']} ({failure_type})"
)
if failure_status == "blocked":
break
continue
scenario_state["session_id"] = result["session_id"]
scenario_state["step_outputs"][step["step_id"]] = result["step_state"]
previous_semantic_memory = scenario_state.get("semantic_memory") or {}
previous_date_scope = previous_semantic_memory.get("date_scope") if isinstance(previous_semantic_memory, dict) else None
current_date_scope = result["step_state"].get("date_scope")
scenario_state["semantic_memory"] = {
"latest_step_id": step["step_id"],
"latest_step_status": result["step_state"].get("status"),
"active_result_set_id": result["step_state"].get("active_result_set_id"),
"last_confirmed_route": result["step_state"].get("last_confirmed_route"),
"date_scope": merge_scenario_date_scope(
previous_date_scope,
current_date_scope,
depends_on=step.get("depends_on") or [],
),
"organization_scope": result["step_state"].get("organization_scope"),
"entries": result["step_state"].get("entries"),
}
scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
save_scenario_step_bundle(
step_dir=step_dir,
export_markdown=result["export_markdown"],
turn_artifact=result["turn_artifact"],
session_record=result["session_record"],
response_payload=result["response_payload"],
step_state=result["step_state"],
)
write_json(scenario_dir / "scenario_state.json", scenario_state)
last_export_markdown = result["export_markdown"]
print(
f"[domain-case-loop] scenario {manifest['scenario_id']} step {step_index}/{len(manifest['steps'])}: "
f"{step['step_id']} -> {result['step_state']['status']}"
)
execution_status = derive_scenario_execution_status(scenario_state["step_outputs"])
final_status = derive_scenario_status(scenario_state["step_outputs"])
write_text(scenario_dir / "scenario_output.md", last_export_markdown or "")
write_text(scenario_dir / "scenario_summary.md", build_scenario_summary(manifest, scenario_state, final_status, execution_status))
write_text(scenario_dir / "final_status.md", build_scenario_final_status(manifest, scenario_state, final_status, execution_status))
if scenario_state.get("session_id"):
write_text(scenario_dir / "session_id.txt", f"{scenario_state['session_id']}\n")
print(f"[domain-case-loop] saved scenario artifacts to {scenario_dir}")
print(f"[domain-case-loop] execution_status={execution_status} final_status={final_status}")
return scenario_state, final_status
def handle_run_case(args: argparse.Namespace) -> int:
ensure_backend_health(args.backend_url, args.timeout_seconds)
case_id = slugify_case_id(args.domain, args.case_id)
case_dir = Path(args.output_root).resolve() / case_id
case_dir.mkdir(parents=True, exist_ok=True)
ensure_case_brief(
case_dir,
domain=args.domain,
question=args.question,
expected_capability=args.expected_capability,
expected_result_mode=args.expected_result_mode,
)
payload: dict[str, Any] = {
"normalizeConfig": build_normalize_config(args),
"eval_target": "assistant_stage1",
"questions": [args.question],
"useMock": bool(args.use_mock),
"mode": "standard",
}
if args.analysis_date:
payload["analysis_date"] = args.analysis_date
start_response = http_json(f"{args.backend_url}/api/eval/run-async/start", method="POST", payload=payload)
job = start_response.get("job")
if not isinstance(job, dict):
raise RuntimeError("Async start response does not contain `job` object")
job_id = str(job.get("job_id") or "")
if not job_id:
raise RuntimeError("Async start response does not contain job_id")
final_job = wait_for_job(args.backend_url, job_id, args.timeout_seconds, args.poll_interval_seconds)
if str(final_job.get("status") or "") != "completed":
raise RuntimeError(f"Async job did not complete successfully: {final_job.get('status')}")
run_id = str(final_job.get("run_id") or "")
report_case_id = "AUTO-001"
session_id = f"{run_id}-{report_case_id}"
session_file = Path(args.sessions_dir).resolve() / f"{session_id}.json"
wait_for_file(session_file)
session_record = read_json_file(session_file)
conversation = extract_conversation_from_session(session_record)
export_markdown = build_conversation_export(session_id, conversation, mode="technical")
report_case = None
report_file = Path(args.reports_dir).resolve() / f"{run_id}.json"
if report_file.exists():
report_record = read_json_file(report_file)
report_case = extract_report_case(report_record, report_case_id)
turn_artifact = build_turn_artifact(
slot=args.slot,
domain=args.domain,
case_id=case_id,
question=args.question,
session_id=session_id,
conversation=conversation,
session_record=session_record,
job_record=final_job,
report_case=report_case,
export_file_name=f"{args.slot}_output.md",
)
save_capture_bundle(
case_dir=case_dir,
slot=args.slot,
export_markdown=export_markdown,
debug_payload=turn_artifact.get("technical_debug_payload"),
turn_artifact=turn_artifact,
session_record=session_record,
job_record=final_job,
report_case=report_case,
)
print(f"[domain-case-loop] saved {args.slot} artifacts to {case_dir}")
print(f"[domain-case-loop] session_id={session_id}")
return 0
def handle_import_export(args: argparse.Namespace) -> int:
export_text = Path(args.input).read_text(encoding="utf-8-sig")
session_id, conversation = parse_export_markdown(export_text)
case_id = slugify_case_id(args.domain, args.case_id)
case_dir = Path(args.output_root).resolve() / case_id
case_dir.mkdir(parents=True, exist_ok=True)
last_assistant = find_last_assistant(conversation)
last_user = find_last_user_before(conversation, last_assistant.get("message_id"))
question = args.question or (last_user.get("text") if isinstance(last_user, dict) else None)
ensure_case_brief(
case_dir,
domain=args.domain,
question=question,
expected_capability=args.expected_capability,
expected_result_mode=args.expected_result_mode,
)
turn_artifact = build_turn_artifact(
slot=args.slot,
domain=args.domain,
case_id=case_id,
question=question,
session_id=session_id,
conversation=conversation,
session_record=None,
job_record=None,
report_case=None,
export_file_name=f"{args.slot}_output.md",
)
save_capture_bundle(
case_dir=case_dir,
slot=args.slot,
export_markdown=export_text,
debug_payload=last_assistant.get("debug"),
turn_artifact=turn_artifact,
session_record=None,
job_record=None,
report_case=None,
)
print(f"[domain-case-loop] imported {args.slot} artifacts to {case_dir}")
return 0
def handle_run_scenario(args: argparse.Namespace) -> int:
ensure_backend_health(args.backend_url, args.timeout_seconds)
manifest_path = Path(args.manifest).resolve()
manifest = load_scenario_manifest(manifest_path)
if args.scenario_id:
manifest["scenario_id"] = args.scenario_id.strip()
if args.analysis_date:
manifest["analysis_context"] = merge_analysis_context(
manifest.get("analysis_context"),
{"as_of_date": args.analysis_date, "source": "cli_override"},
)
scenario_dir = Path(args.output_root).resolve() / manifest["scenario_id"]
execute_scenario_manifest(
args=args,
manifest=manifest,
scenario_dir=scenario_dir,
manifest_source_label=str(manifest_path),
)
return 0
def build_pack_summary(
pack: dict[str, Any],
scenario_results: list[dict[str, Any]],
final_status: str,
execution_status: str,
) -> str:
lines = [
"# Pack summary",
"",
f"- pack_id: `{pack['pack_id']}`",
f"- domain: `{pack['domain']}`",
f"- title: {pack['title']}",
f"- execution_status: `{execution_status}`",
f"- final_status: `{final_status}`",
"",
"## Scenarios",
]
for index, item in enumerate(scenario_results, start=1):
lines.extend(
[
f"{index}. `{item['scenario_id']}` - {item['title']}",
f"execution_status: `{item.get('execution_status') or 'n/a'}`",
f"acceptance_status: `{item['final_status']}`",
f"session_id: `{item.get('session_id') or 'n/a'}`",
f"artifact_dir: `{item['artifact_dir']}`",
"",
]
)
return "\n".join(lines).strip() + "\n"
def build_pack_final_status(
pack: dict[str, Any],
scenario_results: list[dict[str, Any]],
final_status: str,
execution_status: str,
) -> str:
expected_scenarios = len(pack.get("scenarios") or [])
executed_scenarios = len(scenario_results)
has_missing_scenarios = executed_scenarios < expected_scenarios
reason = {
"accepted": "all declared scenarios in the pack executed without blocked or missing-capability states",
"partial": "the pack executed, but at least one scenario still needs capability enablement or route hardening",
"needs_exact_capability": "the pack is valid, but at least one scenario still requires exact capability or route enablement",
"blocked": "the pack could not be completed because at least one scenario failed at runtime",
}.get(final_status, "pack status unknown")
if final_status == "accepted" and has_missing_scenarios:
reason = (
f"only {executed_scenarios}/{expected_scenarios} declared scenarios were executed; "
"missing scenarios keep the pack from being confirmed as accepted"
)
return textwrap.dedent(
f"""\
# Final status
- status: `{final_status}`
- execution_status: `{execution_status}`
- pack_id: `{pack['pack_id']}`
- domain: `{pack['domain']}`
- reason: {reason}
"""
)
def derive_coverage_status(statuses: list[str]) -> str:
normalized = [str(status or "").strip() for status in statuses if str(status or "").strip()]
if not normalized:
return "unmapped"
if all(status == "accepted" for status in normalized):
return "green"
if any(status == "blocked" for status in normalized):
return "blocked"
if any(status == "needs_exact_capability" for status in normalized):
return "needs_exact_capability"
return "partial"
def derive_pack_execution_status(scenario_results: list[dict[str, Any]]) -> str:
aggregate_statuses = [str(item.get("execution_status") or "") for item in scenario_results if isinstance(item, dict)]
if not aggregate_statuses:
return "blocked"
if any(status == "blocked" for status in aggregate_statuses):
return "blocked"
if any(status == "needs_exact_capability" for status in aggregate_statuses):
return "needs_exact_capability"
if any(status == "partial" for status in aggregate_statuses):
return "partial"
return "exact"
def derive_pack_final_status(pack: dict[str, Any], scenario_results: list[dict[str, Any]]) -> str:
aggregate_statuses = [item["final_status"] for item in scenario_results]
if not aggregate_statuses:
return "blocked"
if any(status == "blocked" for status in aggregate_statuses):
return "blocked"
if any(status == "needs_exact_capability" for status in aggregate_statuses):
return "needs_exact_capability"
if any(status == "partial" for status in aggregate_statuses):
return "partial"
acceptance_matrix = build_scenario_acceptance_matrix(pack, scenario_results)
if "| partial |" in acceptance_matrix:
return "partial"
return "accepted" if len(scenario_results) == len(pack.get("scenarios") or []) else "partial"
def build_scenario_acceptance_matrix(pack: dict[str, Any], scenario_results: list[dict[str, Any]]) -> str:
scenario_status_map = {
str(item.get("scenario_id") or ""): str(item.get("final_status") or "unknown")
for item in scenario_results
if isinstance(item, dict)
}
scenarios = pack.get("scenarios") if isinstance(pack.get("scenarios"), list) else []
question_pool = pack.get("question_pool") if isinstance(pack.get("question_pool"), dict) else {}
raw_questions = question_pool.get("questions") if isinstance(question_pool.get("questions"), list) else []
question_index: dict[str, dict[str, Any]] = {}
for raw_question in raw_questions:
if not isinstance(raw_question, dict):
continue
question_id = str(raw_question.get("question_id") or "").strip()
if question_id:
question_index[question_id] = raw_question
scenario_questions_map: dict[str, list[str]] = {}
scenario_nodes_map: dict[str, list[str]] = {}
scenario_wording_map: dict[str, list[str]] = {}
for scenario in scenarios:
if not isinstance(scenario, dict):
continue
scenario_id = str(scenario.get("scenario_id") or "").strip()
if not scenario_id:
continue
question_ids = normalize_string_list(scenario.get("question_ids"))
if not question_ids:
question_ids = [
str(step.get("question_id") or "").strip()
for step in scenario.get("steps", [])
if isinstance(step, dict) and str(step.get("question_id") or "").strip()
]
node_ids = normalize_string_list(scenario.get("node_ids"))
if not node_ids:
node_ids = [
str(step.get("node_id") or "").strip()
for step in scenario.get("steps", [])
if isinstance(step, dict) and str(step.get("node_id") or "").strip()
]
if not node_ids:
for question_id in question_ids:
question_meta = question_index.get(question_id) or {}
node_id = str(question_meta.get("node_id") or "").strip()
if node_id:
node_ids.append(node_id)
scenario_questions_map[scenario_id] = question_ids
scenario_nodes_map[scenario_id] = list(dict.fromkeys(node_ids))
scenario_wording_map[scenario_id] = _scenario_observed_wording_families(scenario)
scenario_tree = pack.get("scenario_tree") if isinstance(pack.get("scenario_tree"), dict) else {}
source_contract = pack.get("source_contract") if isinstance(pack.get("source_contract"), dict) else {}
all_nodes: list[dict[str, Any]] = []
for section_key in ("root_nodes", "critical_nodes", "supporting_nodes"):
raw_nodes = scenario_tree.get(section_key)
if isinstance(raw_nodes, list):
all_nodes.extend(node for node in raw_nodes if isinstance(node, dict))
lines = [
"# Scenario acceptance matrix",
"",
f"- pack_id: `{pack.get('pack_id') or 'n/a'}`",
f"- domain: `{pack.get('domain') or 'n/a'}`",
f"- source_contract_id: `{source_contract.get('domain_id') or pack.get('source_contract_id') or 'n/a'}`",
f"- source_contract_title: {source_contract.get('title') or pack.get('title') or 'n/a'}",
"",
"## Scenario coverage",
"",
"| scenario_id | status | question_ids | node_ids |",
"| --- | --- | --- | --- |",
]
for scenario in scenarios:
if not isinstance(scenario, dict):
continue
scenario_id = str(scenario.get("scenario_id") or "").strip()
if not scenario_id:
continue
lines.append(
"| "
+ " | ".join(
[
scenario_id,
scenario_status_map.get(scenario_id, "not_run"),
", ".join(scenario_questions_map.get(scenario_id) or []) or "-",
", ".join(scenario_nodes_map.get(scenario_id) or []) or "-",
]
)
+ " |"
)
def append_node_section(title: str, section_key: str) -> None:
raw_nodes = scenario_tree.get(section_key)
nodes = raw_nodes if isinstance(raw_nodes, list) else []
if not nodes:
return
lines.extend(
[
"",
f"## {title}",
"",
"| node_id | status | backed_by_scenarios | question_ids | required_wording_families | observed_wording_families | missing_wording_families |",
"| --- | --- | --- | --- | --- | --- | --- |",
]
)
for node in nodes:
if not isinstance(node, dict):
continue
node_id = str(node.get("node_id") or "").strip()
if not node_id:
continue
backed_by = sorted(
scenario_id for scenario_id, node_ids in scenario_nodes_map.items() if node_id in node_ids
)
statuses = [scenario_status_map.get(scenario_id, "not_run") for scenario_id in backed_by]
required_wording_families = normalize_string_list(node.get("required_wording_families"))
observed_wording_families = sorted(
{family for scenario_id in backed_by for family in scenario_wording_map.get(scenario_id, [])}
)
missing_wording_families = [family for family in required_wording_families if family not in observed_wording_families]
status = derive_coverage_status(statuses)
if status == "green" and missing_wording_families:
status = "partial"
lines.append(
"| "
+ " | ".join(
[
node_id,
status,
", ".join(backed_by) or "-",
", ".join(normalize_string_list(node.get("covers_question_ids"))) or "-",
", ".join(required_wording_families) or "-",
", ".join(observed_wording_families) or "-",
", ".join(missing_wording_families) or "-",
]
)
+ " |"
)
append_node_section("Root nodes", "root_nodes")
append_node_section("Critical nodes", "critical_nodes")
raw_edges = scenario_tree.get("critical_edges")
edges = raw_edges if isinstance(raw_edges, list) else []
if edges:
lines.extend(
[
"",
"## Critical edges",
"",
"| edge_id | status | from_node | to_node | backed_by_scenarios | primary_user_path | observed_wording_families | missing_wording_families |",
"| --- | --- | --- | --- | --- | --- | --- | --- |",
]
)
for edge in edges:
if not isinstance(edge, dict):
continue
edge_id = str(edge.get("edge_id") or "").strip()
from_node = str(edge.get("from_node") or "").strip()
to_node = str(edge.get("to_node") or "").strip()
if not edge_id:
continue
backed_by = sorted(
scenario_id
for scenario_id, node_ids in scenario_nodes_map.items()
if from_node in node_ids and to_node in node_ids
)
statuses = [scenario_status_map.get(scenario_id, "not_run") for scenario_id in backed_by]
from_required = []
to_required = []
for node in all_nodes:
node_id = str(node.get("node_id") or "").strip()
if node_id == from_node:
from_required = normalize_string_list(node.get("required_wording_families"))
elif node_id == to_node:
to_required = normalize_string_list(node.get("required_wording_families"))
observed_wording_families = sorted(
{family for scenario_id in backed_by for family in scenario_wording_map.get(scenario_id, [])}
)
edge_required_families = list(dict.fromkeys(from_required + [family for family in to_required if family not in from_required]))
missing_wording_families = [family for family in edge_required_families if family not in observed_wording_families]
status = derive_coverage_status(statuses)
if status == "green" and missing_wording_families:
status = "partial"
lines.append(
"| "
+ " | ".join(
[
edge_id,
status,
from_node or "-",
to_node or "-",
", ".join(backed_by) or "-",
"yes" if bool(edge.get("primary_user_path")) else "no",
", ".join(observed_wording_families) or "-",
", ".join(missing_wording_families) or "-",
]
)
+ " |"
)
raw_paths = scenario_tree.get("primary_user_paths")
paths = raw_paths if isinstance(raw_paths, list) else []
if paths:
lines.extend(
[
"",
"## Primary user paths",
"",
"| path_id | status | nodes | backed_by_scenarios |",
"| --- | --- | --- | --- |",
]
)
for path in paths:
if not isinstance(path, dict):
continue
path_id = str(path.get("path_id") or "").strip()
node_ids = normalize_string_list(path.get("nodes"))
backed_by = sorted(
scenario_id
for scenario_id, scenario_node_ids in scenario_nodes_map.items()
if node_ids and all(node_id in scenario_node_ids for node_id in node_ids)
)
statuses = [scenario_status_map.get(scenario_id, "not_run") for scenario_id in backed_by]
lines.append(
"| "
+ " | ".join(
[
path_id or "-",
derive_coverage_status(statuses),
" -> ".join(node_ids) or "-",
", ".join(backed_by) or "-",
]
)
+ " |"
)
return "\n".join(lines).strip() + "\n"
def run_subprocess_command(
command: list[str],
*,
cwd: Path,
timeout_seconds: int,
input_text: str | None = None,
stdout_path: Path | None = None,
stderr_path: Path | None = None,
) -> subprocess.CompletedProcess[str]:
result = subprocess.run(
command,
cwd=str(cwd),
input=input_text,
text=True,
encoding="utf-8",
errors="replace",
capture_output=True,
timeout=timeout_seconds,
check=False,
)
if stdout_path is not None:
write_text(stdout_path, result.stdout)
if stderr_path is not None:
write_text(stderr_path, result.stderr)
if result.returncode != 0:
raise RuntimeError(
f"Command failed with exit code {result.returncode}: {' '.join(command)}\n{result.stderr.strip()}"
)
return result
def build_run_pack_command(
args: argparse.Namespace,
*,
manifest_path: Path,
pack_id: str,
output_root: Path,
) -> list[str]:
command = [
sys.executable,
str(Path(__file__).resolve()),
"run-pack",
"--manifest",
str(manifest_path),
"--pack-id",
pack_id,
"--output-root",
str(output_root),
"--backend-url",
str(args.backend_url),
"--prompt-version",
str(args.prompt_version),
"--llm-provider",
str(args.llm_provider),
"--llm-model",
str(args.llm_model),
"--llm-base-url",
str(args.llm_base_url),
"--llm-api-key",
str(args.llm_api_key),
"--temperature",
str(args.temperature),
"--max-output-tokens",
str(args.max_output_tokens),
"--timeout-seconds",
str(args.timeout_seconds),
]
if getattr(args, "analysis_date", None):
command.extend(["--analysis-date", str(args.analysis_date)])
max_scenarios = getattr(args, "max_scenarios", None)
if max_scenarios is not None:
command.extend(["--max-scenarios", str(int(max_scenarios))])
if bool(getattr(args, "use_mock", False)):
command.append("--use-mock")
return command
def build_codex_exec_command(
args: argparse.Namespace,
*,
output_file: Path,
schema_file: Path,
sandbox_mode: str,
model_override: str | None = None,
reasoning_effort: str | None = None,
) -> list[str]:
command = [
str(args.codex_binary),
"exec",
"-C",
str(REPO_ROOT),
"-s",
sandbox_mode,
"-c",
'approval_policy="never"',
"--output-schema",
str(schema_file),
"-o",
str(output_file),
"--color",
"never",
]
if reasoning_effort:
command.extend(["-c", f'model_reasoning_effort="{reasoning_effort}"'])
if getattr(args, "codex_profile", None):
command.extend(["-p", str(args.codex_profile)])
selected_model = model_override or getattr(args, "codex_model", None)
if selected_model:
command.extend(["-m", str(selected_model)])
return command
def read_json_output(file_path: Path) -> dict[str, Any]:
payload = json.loads(read_text_file(file_path))
if not isinstance(payload, dict):
raise RuntimeError(f"Expected JSON object in {file_path}")
return payload
def compact_step_output_for_review(step_output: Any) -> dict[str, Any]:
if not isinstance(step_output, dict):
return {}
entry_titles_sample: list[str] = []
entries = step_output.get("entries")
if isinstance(entries, list):
for item in entries[:5]:
if not isinstance(item, dict):
continue
title = str(item.get("item") or item.get("title") or "").strip()
if title:
entry_titles_sample.append(title)
return {
"status": step_output.get("status"),
"execution_status": step_output.get("execution_status"),
"acceptance_status": step_output.get("acceptance_status"),
"question_resolved": step_output.get("question_resolved"),
"detected_intent": step_output.get("detected_intent"),
"selected_recipe": step_output.get("selected_recipe"),
"capability_id": step_output.get("capability_id"),
"depends_on": step_output.get("depends_on"),
"semantic_tags": step_output.get("semantic_tags"),
"required_state_objects": step_output.get("required_state_objects"),
"required_carryover_invariants": step_output.get("required_carryover_invariants"),
"extracted_filters": step_output.get("extracted_filters"),
"focus_object": step_output.get("focus_object"),
"date_scope": step_output.get("date_scope"),
"result_mode": step_output.get("result_mode"),
"truth_mode": step_output.get("truth_mode"),
"answer_shape": step_output.get("answer_shape"),
"coverage_status": step_output.get("coverage_status"),
"evidence_grade": step_output.get("evidence_grade"),
"balance_confirmed": step_output.get("balance_confirmed"),
"assistant_text_excerpt": str(step_output.get("assistant_text") or "")[:3200],
"actual_direct_answer": step_output.get("actual_direct_answer"),
"business_first_review": step_output.get("business_first_review"),
"violated_invariants": step_output.get("violated_invariants"),
"warnings": step_output.get("warnings"),
"bounded_mcp_answer_validated": step_output.get("bounded_mcp_answer_validated"),
"memory_checkpoint_validated": step_output.get("memory_checkpoint_validated"),
"runtime_factual_answer_validated": step_output.get("runtime_factual_answer_validated"),
"guarded_insufficiency_validated": step_output.get("guarded_insufficiency_validated"),
"fallback_type": step_output.get("fallback_type"),
"mcp_call_status": step_output.get("mcp_call_status"),
"mcp_discovery_response_applied": step_output.get("mcp_discovery_response_applied"),
"mcp_discovery_selected_chain_id": step_output.get("mcp_discovery_selected_chain_id"),
"mcp_discovery_response_candidate_status": step_output.get("mcp_discovery_response_candidate_status"),
"mcp_discovery_route_candidate_status": step_output.get("mcp_discovery_route_candidate_status"),
"mcp_discovery_route_candidate_fact_family": step_output.get("mcp_discovery_route_candidate_fact_family"),
"mcp_discovery_route_candidate_action_family": step_output.get("mcp_discovery_route_candidate_action_family"),
"mcp_discovery_route_candidate_missing_axes": step_output.get("mcp_discovery_route_candidate_missing_axes"),
"mcp_discovery_route_candidate_executable_now": step_output.get(
"mcp_discovery_route_candidate_executable_now"
),
"mcp_discovery_route_candidate_enablement_reason": step_output.get(
"mcp_discovery_route_candidate_enablement_reason"
),
"mcp_discovery_route_candidate_next_action": step_output.get("mcp_discovery_route_candidate_next_action"),
"mcp_discovery_effective_intents": step_output.get("mcp_discovery_effective_intents"),
"failure_type": step_output.get("failure_type"),
"error_message": step_output.get("error_message"),
"entry_titles_sample": entry_titles_sample,
}
def collect_pack_scenario_artifacts(pack_dir: Path) -> list[dict[str, Any]]:
scenarios_root = pack_dir / "scenarios"
artifacts: list[dict[str, Any]] = []
if not scenarios_root.exists():
return artifacts
for scenario_dir in sorted(path for path in scenarios_root.iterdir() if path.is_dir()):
scenario_state = read_json_file(scenario_dir / "scenario_state.json") if (scenario_dir / "scenario_state.json").exists() else {}
artifacts.append(
{
"scenario_id": scenario_state.get("scenario_id") or scenario_dir.name,
"title": scenario_state.get("title"),
"session_id": scenario_state.get("session_id"),
"artifact_dir": str(scenario_dir),
"summary": read_text_file(scenario_dir / "scenario_summary.md") if (scenario_dir / "scenario_summary.md").exists() else "",
"scenario_state": scenario_state,
}
)
return artifacts
def derive_repair_target_severity(step_output: dict[str, Any]) -> str:
if bool(step_output.get("hard_fail")):
return "P0"
route_candidate_status = str(step_output.get("mcp_discovery_route_candidate_status") or "").strip()
violated_invariants = normalize_string_list(step_output.get("violated_invariants"))
if any(derive_invariant_severity(step_output, code) == "P0" for code in violated_invariants):
return "P0"
execution_status = str(step_output.get("execution_status") or "").strip()
acceptance_status = str(step_output.get("acceptance_status") or step_output.get("status") or "").strip()
reply_type = str(step_output.get("reply_type") or "").strip()
if execution_status == "blocked":
return "P0"
if route_candidate_status == "needs_route_enablement":
return "P1"
if acceptance_status in {"rejected", "needs_exact_capability"}:
return "P1"
if execution_status in {"partial", "needs_exact_capability"} or reply_type == "partial_coverage":
return "P1"
violated_invariants = normalize_string_list(step_output.get("violated_invariants"))
if any(derive_invariant_severity(step_output, code) == "P1" for code in violated_invariants):
return "P1"
if normalize_string_list(step_output.get("warnings")):
return "P2"
return "P2"
def derive_repair_problem_type(step_output: dict[str, Any]) -> str:
violated = set(normalize_string_list(step_output.get("violated_invariants")))
execution_status = str(step_output.get("execution_status") or "").strip()
acceptance_status = str(step_output.get("acceptance_status") or step_output.get("status") or "").strip()
reply_type = str(step_output.get("reply_type") or "").strip()
fallback_type = str(step_output.get("fallback_type") or "").strip()
mcp_call_status = str(step_output.get("mcp_call_status") or "").strip()
route_candidate_status = str(step_output.get("mcp_discovery_route_candidate_status") or "").strip()
if "wrong_followup_action" in violated:
return "followup_action_resolution_gap"
if "focus_object_missing" in violated:
return "object_memory_gap"
if "wrong_date_scope_state" in violated:
return "edge_carryover_gap"
if {"wrong_as_of_date", "wrong_period_from", "wrong_period_to"} & violated:
return "temporal_honesty_gap"
if {
"wrong_intent",
"wrong_capability",
"wrong_recipe",
"wrong_result_mode",
"forbidden_capability_selected",
"forbidden_recipe_selected",
} & violated:
return "route_gap"
if {"business_direct_answer_missing", "answer_layering_noise"} & violated:
return "answer_shape_mismatch"
if {"business_answer_too_verbose", "technical_garbage_in_answer"} & violated:
return "business_utility_gap"
if {"direct_answer_missing", "top_level_noise_present"} & violated:
return "presentation_gap"
if mcp_call_status == "materialized_but_not_anchor_matched":
return "domain_anchor_gap"
if route_candidate_status == "needs_route_enablement":
return "route_candidate_enablement_gap"
if acceptance_status == "needs_exact_capability" or execution_status == "needs_exact_capability":
return "capability_gap"
if reply_type in {"partial_coverage", "clarification_required", "route_mismatch_blocked"} or fallback_type == "partial":
return "evidence_gap"
return "other"
def derive_repair_root_cause_layers(step_output: dict[str, Any], problem_type: str) -> list[str]:
violated = set(normalize_string_list(step_output.get("violated_invariants")))
layers: list[str] = []
if problem_type == "followup_action_resolution_gap":
layers.append("followup_action_resolution_gap")
if "focus_object_missing" in violated:
layers.append("object_memory_gap")
elif problem_type == "object_memory_gap":
layers.append("object_memory_gap")
elif problem_type == "edge_carryover_gap":
layers.append("edge_carryover_gap")
if "wrong_as_of_date" in violated or "wrong_period_from" in violated or "wrong_period_to" in violated:
layers.append("temporal_honesty_gap")
elif problem_type == "temporal_honesty_gap":
layers.append("temporal_honesty_gap")
if "wrong_date_scope_state" in violated:
layers.append("edge_carryover_gap")
elif problem_type == "route_gap":
layers.append("semantic_understanding_gap")
elif problem_type == "route_candidate_enablement_gap":
layers.append("runtime_capability_gap")
layers.append("route_candidate_enablement_gap")
elif problem_type == "capability_gap":
layers.append("runtime_capability_gap")
elif problem_type == "presentation_gap":
layers.append("business_utility_gap")
if str(step_output.get("required_answer_shape") or "").strip():
layers.append("answer_shape_mismatch")
elif problem_type == "answer_shape_mismatch":
layers.append("answer_shape_mismatch")
layers.append("business_utility_gap")
elif problem_type == "business_utility_gap":
layers.append("business_utility_gap")
if "answer_layering_noise" in violated:
layers.append("answer_shape_mismatch")
elif problem_type == "evidence_gap":
layers.append("runtime_capability_gap")
elif problem_type == "domain_anchor_gap":
layers.append("domain_anchor_gap")
else:
layers.append("other")
return list(dict.fromkeys(layers))
def build_repair_fix_goal(step_output: dict[str, Any], problem_type: str) -> str:
question = str(step_output.get("question_resolved") or step_output.get("title") or step_output.get("step_id") or "").strip()
if problem_type == "followup_action_resolution_gap":
return f"Resolve `{question}` on the current business object and keep the requested micro-action instead of drifting to another drilldown."
if problem_type == "object_memory_gap":
return f"Preserve the selected business object for `{question}` so the follow-up resolves without re-anchoring from scratch."
if problem_type == "edge_carryover_gap":
return f"Carry forward the selected-object state and historical date scope into `{question}` without resetting the follow-up context."
if problem_type == "temporal_honesty_gap":
return f"Keep `{question}` on the requested historical date/period and separate exact-window evidence from nearest available out-of-window evidence."
if problem_type == "route_gap":
return f"Keep `{question}` on the expected exact route/capability instead of letting wording drift into a different semantic lane."
if problem_type == "route_candidate_enablement_gap":
return f"Review and enable the route candidate for `{question}` instead of leaving the understood business ask as a non-executable handoff."
if problem_type == "capability_gap":
return f"Enable an exact route for `{question}` so the loop no longer falls back to partial or unsupported behavior."
if problem_type == "presentation_gap":
return f"Make `{question}` answer-first: direct business answer in the first line, proof second, service notes last."
if problem_type == "answer_shape_mismatch":
return f"Make `{question}` start with the exact business answer requested, then put proof and caveats after it."
if problem_type == "business_utility_gap":
return f"Make `{question}` useful for a business reader: remove technical/scaffold noise and keep direct answers compact."
if problem_type == "evidence_gap":
return f"Return grounded evidence for `{question}` instead of a limited empty response when the correct route already fires."
if problem_type == "domain_anchor_gap":
return f"Match the selected business anchor for `{question}` against materialized rows so the exact route returns a grounded answer instead of an anchor-mismatch limit."
return f"Improve `{question}` with the smallest patch that removes the current acceptance failure without architecture drift."
def route_candidate_requires_enablement(step_output: dict[str, Any]) -> bool:
return str(step_output.get("mcp_discovery_route_candidate_status") or "").strip() == "needs_route_enablement"
def compact_route_candidate_handoff(
*,
scenario_id: str,
step_id: str,
step_output: dict[str, Any],
) -> dict[str, Any] | None:
status = str(step_output.get("mcp_discovery_route_candidate_status") or "").strip()
if not status:
return None
return {
"scenario_id": scenario_id,
"step_id": step_id,
"target_id": f"{scenario_id}:{step_id}",
"question_resolved": str(step_output.get("question_resolved") or "").strip() or None,
"candidate_status": status,
"selected_chain_id": str(step_output.get("mcp_discovery_selected_chain_id") or "").strip() or None,
"fact_family": str(step_output.get("mcp_discovery_route_candidate_fact_family") or "").strip() or None,
"action_family": str(step_output.get("mcp_discovery_route_candidate_action_family") or "").strip() or None,
"missing_axes": normalize_string_list(step_output.get("mcp_discovery_route_candidate_missing_axes")),
"executable_now": step_output.get("mcp_discovery_route_candidate_executable_now") is True,
"enablement_reason": str(step_output.get("mcp_discovery_route_candidate_enablement_reason") or "").strip()
or None,
"next_action": str(step_output.get("mcp_discovery_route_candidate_next_action") or "").strip() or None,
}
def build_route_candidate_focus_signature(candidate: dict[str, Any]) -> str:
status = str(candidate.get("candidate_status") or "unknown").strip() or "unknown"
selected_chain_id = str(candidate.get("selected_chain_id") or "no_chain").strip() or "no_chain"
missing_axes = ",".join(normalize_string_list(candidate.get("missing_axes"))) or "no_missing_axes"
return f"{status}|{selected_chain_id}|{missing_axes}"
def build_route_candidate_handoff_groups(candidates: list[dict[str, Any]]) -> list[dict[str, Any]]:
grouped: dict[str, dict[str, Any]] = {}
for candidate in candidates:
signature = build_route_candidate_focus_signature(candidate)
group = grouped.setdefault(
signature,
{
"group_id": signature,
"candidate_status": candidate.get("candidate_status"),
"selected_chain_id": candidate.get("selected_chain_id"),
"fact_family": candidate.get("fact_family"),
"action_family": candidate.get("action_family"),
"missing_axes": normalize_string_list(candidate.get("missing_axes")),
"executable_now": candidate.get("executable_now") is True,
"target_ids": [],
"scenario_ids": set(),
"sample_questions": [],
"next_actions": [],
"enablement_reasons": [],
},
)
target_id = str(candidate.get("target_id") or "").strip()
if target_id:
group["target_ids"].append(target_id)
scenario_id = str(candidate.get("scenario_id") or "").strip()
if scenario_id:
group["scenario_ids"].add(scenario_id)
question = str(candidate.get("question_resolved") or "").strip()
if question and len(group["sample_questions"]) < 3:
group["sample_questions"].append(question)
next_action = str(candidate.get("next_action") or "").strip()
if next_action and next_action not in group["next_actions"]:
group["next_actions"].append(next_action)
enablement_reason = str(candidate.get("enablement_reason") or "").strip()
if enablement_reason and enablement_reason not in group["enablement_reasons"]:
group["enablement_reasons"].append(enablement_reason)
result: list[dict[str, Any]] = []
for group in grouped.values():
scenario_ids = sorted(group.pop("scenario_ids"))
group["scenario_ids"] = scenario_ids
group["candidate_count"] = len(group.get("target_ids") or [])
result.append(group)
result.sort(
key=lambda item: (
0 if str(item.get("candidate_status") or "") == "needs_route_enablement" else 1,
-int(item.get("candidate_count") or 0),
str(item.get("group_id") or ""),
)
)
return result
def build_step_repair_target(
*,
scenario_id: str,
scenario_title: str,
scenario_dir: Path,
step_id: str,
step_output: dict[str, Any],
) -> dict[str, Any] | None:
acceptance_status = str(step_output.get("acceptance_status") or step_output.get("status") or "").strip() or "unknown"
execution_status = str(step_output.get("execution_status") or "").strip() or "unknown"
violated_invariants = normalize_string_list(step_output.get("violated_invariants"))
warnings = normalize_string_list(step_output.get("warnings"))
if (
acceptance_status in {"validated", "accepted"}
and not violated_invariants
and not warnings
and not route_candidate_requires_enablement(step_output)
):
return None
problem_type = derive_repair_problem_type(step_output)
severity = derive_repair_target_severity(step_output)
root_cause_layers = derive_repair_root_cause_layers(step_output, problem_type)
step_state_path = scenario_dir / "steps" / step_id / "step_state.json"
signals: list[str] = []
for field_name in ("reply_type", "fallback_type", "mcp_call_status", "selected_recipe", "capability_id"):
value = str(step_output.get(field_name) or "").strip()
if value:
signals.append(f"{field_name}={value}")
for violation in violated_invariants:
signals.append(f"violation={violation}")
for warning in warnings[:3]:
signals.append(f"warning={warning}")
route_candidate_status = str(step_output.get("mcp_discovery_route_candidate_status") or "").strip()
if route_candidate_status:
signals.append(f"route_candidate_status={route_candidate_status}")
selected_chain_id = str(step_output.get("mcp_discovery_selected_chain_id") or "").strip()
if selected_chain_id:
signals.append(f"selected_chain_id={selected_chain_id}")
missing_axes = normalize_string_list(step_output.get("mcp_discovery_route_candidate_missing_axes"))
if missing_axes:
signals.append(f"route_candidate_missing_axes={','.join(missing_axes)}")
target = {
"target_id": f"{scenario_id}:{step_id}",
"scenario_id": scenario_id,
"scenario_title": scenario_title,
"step_id": step_id,
"step_title": str(step_output.get("title") or "").strip() or None,
"question_resolved": str(step_output.get("question_resolved") or "").strip() or None,
"severity": severity,
"problem_type": problem_type,
"root_cause_layers": root_cause_layers,
"execution_status": execution_status,
"acceptance_status": acceptance_status,
"violated_invariants": violated_invariants,
"fix_goal": build_repair_fix_goal(step_output, problem_type),
"candidate_files": REPAIR_TARGET_FILE_HINTS.get(problem_type, REPAIR_TARGET_FILE_HINTS["other"]),
"signals": signals,
"artifact_refs": {
"scenario_dir": str(scenario_dir),
"step_state_json": str(step_state_path),
},
}
route_candidate = compact_route_candidate_handoff(scenario_id=scenario_id, step_id=step_id, step_output=step_output)
if route_candidate:
target["route_candidate"] = route_candidate
if route_candidate_status == "needs_route_enablement":
target["target_source"] = "route_candidate_enablement"
return target
def build_repair_focus_signature(target: dict[str, Any]) -> str:
problem_type = str(target.get("problem_type") or "other").strip() or "other"
candidate_files = normalize_string_list(target.get("candidate_files"))
primary_file = candidate_files[0] if candidate_files else "no_file_hint"
return f"{problem_type}|{primary_file}"
def build_priority_repair_foci(targets: list[dict[str, Any]]) -> list[dict[str, Any]]:
grouped: dict[str, dict[str, Any]] = {}
for target in targets:
focus_id = build_repair_focus_signature(target)
focus = grouped.setdefault(
focus_id,
{
"focus_id": focus_id,
"severity": str(target.get("severity") or "P2"),
"problem_type": str(target.get("problem_type") or "other"),
"root_cause_layers": normalize_string_list(target.get("root_cause_layers")),
"candidate_files": normalize_string_list(target.get("candidate_files")),
"target_ids": [],
"scenario_ids": set(),
},
)
focus["target_ids"].append(str(target.get("target_id") or ""))
scenario_id = str(target.get("scenario_id") or "").strip()
if scenario_id:
focus["scenario_ids"].add(scenario_id)
priority_foci: list[dict[str, Any]] = []
for focus in grouped.values():
scenario_ids = sorted(focus.pop("scenario_ids"))
target_ids = [target_id for target_id in focus.get("target_ids", []) if target_id]
focus["target_count"] = len(target_ids)
focus["scenario_count"] = len(scenario_ids)
focus["target_ids"] = target_ids
focus["scenario_ids"] = scenario_ids
priority_foci.append(focus)
priority_foci.sort(
key=lambda item: (
REPAIR_TARGET_SEVERITY_ORDER.get(str(item.get("severity") or "P2"), 99),
-int(item.get("target_count") or 0),
-int(item.get("scenario_count") or 0),
REPAIR_TARGET_PROBLEM_ORDER.get(str(item.get("problem_type") or "other"), 99),
str(item.get("focus_id") or ""),
)
)
for index, focus in enumerate(priority_foci, start=1):
primary_file = normalize_string_list(focus.get("candidate_files"))[:1]
focus["focus_rank"] = index
focus["rank_reason"] = (
f"severity={focus.get('severity')} targets={focus.get('target_count')} "
f"scenarios={focus.get('scenario_count')} primary_file={primary_file[0] if primary_file else 'n/a'}"
)
return priority_foci
def build_deterministic_repair_targets(
pack_state: dict[str, Any],
scenario_artifacts: list[dict[str, Any]],
) -> dict[str, Any]:
targets: list[dict[str, Any]] = []
route_candidates: list[dict[str, Any]] = []
step_validation_index: dict[str, dict[str, Any]] = {}
for scenario_artifact in scenario_artifacts:
scenario_id = str(scenario_artifact.get("scenario_id") or "").strip()
scenario_title = str(scenario_artifact.get("title") or "").strip()
scenario_dir = Path(str(scenario_artifact.get("artifact_dir") or ""))
scenario_state = scenario_artifact.get("scenario_state")
if not isinstance(scenario_state, dict):
continue
step_outputs = scenario_state.get("step_outputs")
if not isinstance(step_outputs, dict):
continue
for step_id, raw_step_output in step_outputs.items():
if not isinstance(raw_step_output, dict):
continue
step_key = f"{scenario_id}:{step_id}"
step_validation_index[step_key] = compact_step_output_for_review(raw_step_output)
route_candidate = compact_route_candidate_handoff(
scenario_id=scenario_id,
step_id=str(step_id),
step_output=raw_step_output,
)
if route_candidate:
route_candidates.append(route_candidate)
target = build_step_repair_target(
scenario_id=scenario_id,
scenario_title=scenario_title,
scenario_dir=scenario_dir,
step_id=str(step_id),
step_output=raw_step_output,
)
if target:
targets.append(target)
priority_foci = build_priority_repair_foci(targets)
focus_rank_by_id = {
str(focus.get("focus_id") or ""): int(focus.get("focus_rank") or 999)
for focus in priority_foci
if isinstance(focus, dict)
}
focus_target_count_by_id = {
str(focus.get("focus_id") or ""): int(focus.get("target_count") or 0)
for focus in priority_foci
if isinstance(focus, dict)
}
focus_scenario_count_by_id = {
str(focus.get("focus_id") or ""): int(focus.get("scenario_count") or 0)
for focus in priority_foci
if isinstance(focus, dict)
}
for target in targets:
focus_id = build_repair_focus_signature(target)
target["repair_focus_id"] = focus_id
target["repair_focus_rank"] = focus_rank_by_id.get(focus_id, 999)
target["repair_focus_target_count"] = focus_target_count_by_id.get(focus_id, 0)
target["repair_focus_scenario_count"] = focus_scenario_count_by_id.get(focus_id, 0)
targets.sort(
key=lambda item: (
REPAIR_TARGET_SEVERITY_ORDER.get(str(item.get("severity") or "P2"), 99),
int(item.get("repair_focus_rank") or 999),
-int(item.get("repair_focus_target_count") or 0),
-int(item.get("repair_focus_scenario_count") or 0),
REPAIR_TARGET_PROBLEM_ORDER.get(str(item.get("problem_type") or "other"), 99),
str(item.get("scenario_id") or ""),
str(item.get("step_id") or ""),
)
)
severity_counts = {"P0": 0, "P1": 0, "P2": 0}
for target in targets:
severity = str(target.get("severity") or "P2")
if severity in severity_counts:
severity_counts[severity] += 1
route_candidate_groups = build_route_candidate_handoff_groups(route_candidates)
route_candidate_status_counts: dict[str, int] = {}
for candidate in route_candidates:
status = str(candidate.get("candidate_status") or "unknown").strip() or "unknown"
route_candidate_status_counts[status] = route_candidate_status_counts.get(status, 0) + 1
return {
"schema_version": "domain_pack_repair_targets_v1",
"pack_id": pack_state.get("pack_id"),
"domain": pack_state.get("domain"),
"final_status": pack_state.get("final_status"),
"target_count": len(targets),
"severity_counts": severity_counts,
"priority_foci": priority_foci,
"targets": targets,
"step_validation_index": step_validation_index,
"route_candidate_handoff_count": len(route_candidates),
"route_candidate_status_counts": route_candidate_status_counts,
"route_candidate_groups": route_candidate_groups,
}
def normalize_repair_target_severity(value: Any) -> str:
severity = str(value or "").strip().upper()
return severity if severity in REPAIR_TARGET_SEVERITY_ORDER else "P2"
def normalize_repair_problem_type(value: Any, root_cause_layers: list[str] | None = None) -> str:
problem_type = str(value or "").strip()
if problem_type in REPAIR_TARGET_FILE_HINTS:
return problem_type
for layer in root_cause_layers or []:
if layer in REPAIR_TARGET_FILE_HINTS:
return layer
if problem_type in REPAIR_TARGET_PROBLEM_ORDER:
return problem_type
return "other"
def normalize_analyst_priority_repair_target(raw_target: dict[str, Any], index: int) -> dict[str, Any]:
scenario_id = str(raw_target.get("scenario_id") or "analyst").strip() or "analyst"
step_id = str(raw_target.get("step_id") or f"target_{index:02d}").strip() or f"target_{index:02d}"
root_cause_layers = normalize_string_list(raw_target.get("root_cause_layers"))
problem_type = normalize_repair_problem_type(raw_target.get("problem_type"), root_cause_layers)
severity = normalize_repair_target_severity(raw_target.get("severity"))
candidate_files = normalize_string_list(raw_target.get("candidate_files"))
if not candidate_files:
candidate_files = REPAIR_TARGET_FILE_HINTS.get(problem_type, REPAIR_TARGET_FILE_HINTS["other"])
fix_goal = str(raw_target.get("fix_goal") or "").strip()
if not fix_goal:
fix_goal = f"Resolve the analyst-identified `{problem_type}` on `{scenario_id}:{step_id}` without masking partial evidence as accepted."
if not root_cause_layers:
root_cause_layers = [problem_type]
return {
"target_id": f"{scenario_id}:{step_id}",
"scenario_id": scenario_id,
"scenario_title": str(raw_target.get("scenario_title") or "").strip() or None,
"step_id": step_id,
"step_title": str(raw_target.get("step_title") or "").strip() or None,
"question_resolved": str(raw_target.get("question_resolved") or "").strip() or None,
"severity": severity,
"problem_type": problem_type,
"root_cause_layers": root_cause_layers,
"execution_status": str(raw_target.get("execution_status") or "analyst_flagged").strip(),
"acceptance_status": str(raw_target.get("acceptance_status") or "requires_repair").strip(),
"violated_invariants": normalize_string_list(raw_target.get("violated_invariants")),
"fix_goal": fix_goal,
"candidate_files": candidate_files,
"signals": ["analyst_priority_target"],
"target_source": "analyst_verdict.priority_targets",
}
def analyst_target_contradicts_validated_step(target: dict[str, Any], step_snapshot: dict[str, Any] | None) -> bool:
if not isinstance(step_snapshot, dict):
return False
acceptance_status = str(step_snapshot.get("acceptance_status") or step_snapshot.get("status") or "").strip()
if acceptance_status not in {"validated", "accepted"}:
return False
if normalize_string_list(step_snapshot.get("violated_invariants")) or normalize_string_list(step_snapshot.get("warnings")):
return False
validator_flags = (
"bounded_mcp_answer_validated",
"memory_checkpoint_validated",
"runtime_factual_answer_validated",
"guarded_insufficiency_validated",
)
if not any(step_snapshot.get(flag) is True for flag in validator_flags):
return False
target_text = " ".join(
[
str(target.get("problem_type") or ""),
str(target.get("fix_goal") or ""),
" ".join(normalize_string_list(target.get("violated_invariants"))),
" ".join(normalize_string_list(target.get("root_cause_layers"))),
]
).casefold()
stale_validation_markers = (
"guarded_insufficiency_validated",
"runtime_factual_answer_validated",
"bounded_mcp_answer_validated",
"heuristic_candidates",
"silent heuristic",
"unvalidated",
"validated silently",
"masking",
)
if any(marker in target_text for marker in stale_validation_markers):
return True
assistant_text = str(step_snapshot.get("assistant_text_excerpt") or "").casefold()
problem_type = str(target.get("problem_type") or "").strip()
root_cause_layers = set(normalize_string_list(target.get("root_cause_layers")))
evidence_problem = problem_type == "evidence_gap" or "evidence_gap" in root_cause_layers
if step_snapshot.get("bounded_mcp_answer_validated") is True and evidence_problem:
legacy_metadata_markers = (
"unsupported",
"blocked",
"exact validated compute",
"machine-provable",
"legacy residue",
"asserted winner-year",
"winner-year",
"n/a/blocked",
)
bounded_boundary_markers = ("лимит", "доступн", "проверенн", "не чист", "не как чист")
if any(marker in target_text for marker in legacy_metadata_markers) and any(
marker in assistant_text for marker in bounded_boundary_markers
):
return True
followup_problem = problem_type in {
"followup_action_resolution_gap",
"object_memory_gap",
"bundle_reuse_gap",
"loop_coverage_gap",
} or bool(
root_cause_layers
& {"followup_action_resolution_gap", "object_memory_gap", "bundle_reuse_gap", "loop_coverage_gap"}
)
if step_snapshot.get("runtime_factual_answer_validated") is True and followup_problem:
focus_object = step_snapshot.get("focus_object") if isinstance(step_snapshot.get("focus_object"), dict) else {}
extracted_filters = (
step_snapshot.get("extracted_filters") if isinstance(step_snapshot.get("extracted_filters"), dict) else {}
)
focus_label = str(focus_object.get("label") or extracted_filters.get("counterparty") or "").strip().casefold()
if focus_label and focus_label in assistant_text:
return True
return False
def merge_analyst_priority_repair_targets(
repair_targets: dict[str, Any],
analyst_verdict: dict[str, Any],
) -> dict[str, Any]:
existing_targets = repair_targets.get("targets") if isinstance(repair_targets, dict) else []
step_validation_index = (
repair_targets.get("step_validation_index")
if isinstance(repair_targets.get("step_validation_index"), dict)
else {}
)
merged_by_id: dict[str, dict[str, Any]] = {}
for raw_target in (existing_targets if isinstance(existing_targets, list) else []):
if not isinstance(raw_target, dict):
continue
target_id = str(raw_target.get("target_id") or "").strip()
if not target_id:
scenario_id = str(raw_target.get("scenario_id") or "").strip()
step_id = str(raw_target.get("step_id") or "").strip()
target_id = f"{scenario_id}:{step_id}" if scenario_id and step_id else f"deterministic_{len(merged_by_id) + 1:02d}"
raw_target = {**raw_target, "target_id": target_id}
merged_by_id[target_id] = dict(raw_target)
analyst_priority_targets = analyst_verdict.get("priority_targets")
analyst_count = 0
suppressed_analyst_targets: list[dict[str, Any]] = []
if isinstance(analyst_priority_targets, list):
for index, raw_target in enumerate(analyst_priority_targets, start=1):
if not isinstance(raw_target, dict):
continue
analyst_target = normalize_analyst_priority_repair_target(raw_target, index)
analyst_count += 1
target_id = str(analyst_target.get("target_id") or "").strip()
step_snapshot = step_validation_index.get(target_id) if isinstance(step_validation_index, dict) else None
if analyst_target_contradicts_validated_step(analyst_target, step_snapshot):
suppressed_analyst_targets.append(
{
"target_id": target_id,
"reason": "analyst_target_contradicts_validated_step",
"validator_flags": {
"bounded_mcp_answer_validated": bool(step_snapshot.get("bounded_mcp_answer_validated")),
"memory_checkpoint_validated": bool(step_snapshot.get("memory_checkpoint_validated")),
"runtime_factual_answer_validated": bool(step_snapshot.get("runtime_factual_answer_validated")),
"guarded_insufficiency_validated": bool(step_snapshot.get("guarded_insufficiency_validated")),
}
if isinstance(step_snapshot, dict)
else {},
}
)
continue
existing = merged_by_id.get(target_id)
if existing:
existing_severity = normalize_repair_target_severity(existing.get("severity"))
analyst_severity = normalize_repair_target_severity(analyst_target.get("severity"))
merged = {**existing, **analyst_target}
if REPAIR_TARGET_SEVERITY_ORDER.get(existing_severity, 99) < REPAIR_TARGET_SEVERITY_ORDER.get(
analyst_severity, 99
):
merged["severity"] = existing_severity
merged["signals"] = list(dict.fromkeys(normalize_string_list(existing.get("signals")) + ["analyst_priority_target"]))
merged["target_source"] = "deterministic+analyst_verdict.priority_targets"
merged_by_id[target_id] = merged
else:
merged_by_id[target_id] = analyst_target
merged_targets = list(merged_by_id.values())
priority_foci = build_priority_repair_foci(merged_targets)
focus_rank_by_id = {
str(focus.get("focus_id") or ""): int(focus.get("focus_rank") or 999)
for focus in priority_foci
if isinstance(focus, dict)
}
focus_target_count_by_id = {
str(focus.get("focus_id") or ""): int(focus.get("target_count") or 0)
for focus in priority_foci
if isinstance(focus, dict)
}
focus_scenario_count_by_id = {
str(focus.get("focus_id") or ""): int(focus.get("scenario_count") or 0)
for focus in priority_foci
if isinstance(focus, dict)
}
for target in merged_targets:
focus_id = build_repair_focus_signature(target)
target["repair_focus_id"] = focus_id
target["repair_focus_rank"] = focus_rank_by_id.get(focus_id, 999)
target["repair_focus_target_count"] = focus_target_count_by_id.get(focus_id, 0)
target["repair_focus_scenario_count"] = focus_scenario_count_by_id.get(focus_id, 0)
merged_targets.sort(
key=lambda item: (
REPAIR_TARGET_SEVERITY_ORDER.get(str(item.get("severity") or "P2"), 99),
int(item.get("repair_focus_rank") or 999),
-int(item.get("repair_focus_target_count") or 0),
-int(item.get("repair_focus_scenario_count") or 0),
REPAIR_TARGET_PROBLEM_ORDER.get(str(item.get("problem_type") or "other"), 99),
str(item.get("scenario_id") or ""),
str(item.get("step_id") or ""),
)
)
severity_counts = {"P0": 0, "P1": 0, "P2": 0}
for target in merged_targets:
severity = normalize_repair_target_severity(target.get("severity"))
target["severity"] = severity
severity_counts[severity] += 1
return {
**(repair_targets if isinstance(repair_targets, dict) else {}),
"schema_version": "domain_pack_repair_targets_v1",
"target_count": len(merged_targets),
"deterministic_target_count": len(existing_targets) if isinstance(existing_targets, list) else 0,
"analyst_priority_target_count": analyst_count,
"suppressed_analyst_priority_target_count": len(suppressed_analyst_targets),
"suppressed_analyst_priority_targets": suppressed_analyst_targets,
"severity_counts": severity_counts,
"priority_foci": priority_foci,
"targets": merged_targets,
}
def select_primary_repair_focus(repair_targets: dict[str, Any]) -> dict[str, Any] | None:
if not isinstance(repair_targets, dict):
return None
priority_foci = repair_targets.get("priority_foci")
if not isinstance(priority_foci, list) or not priority_foci:
return None
primary_focus = priority_foci[0]
return primary_focus if isinstance(primary_focus, dict) else None
def build_repair_targets_summary(repair_targets: dict[str, Any]) -> str:
lines = [
"# Repair targets",
"",
f"- pack_id: `{repair_targets.get('pack_id') or 'n/a'}`",
f"- domain: `{repair_targets.get('domain') or 'n/a'}`",
f"- target_count: `{repair_targets.get('target_count') or 0}`",
f"- deterministic_target_count: `{repair_targets.get('deterministic_target_count') if 'deterministic_target_count' in repair_targets else 'n/a'}`",
f"- analyst_priority_target_count: `{repair_targets.get('analyst_priority_target_count') if 'analyst_priority_target_count' in repair_targets else 'n/a'}`",
f"- suppressed_analyst_priority_target_count: `{repair_targets.get('suppressed_analyst_priority_target_count') if 'suppressed_analyst_priority_target_count' in repair_targets else 0}`",
f"- severity_counts: `{dump_json(repair_targets.get('severity_counts') or {})}`",
f"- route_candidate_handoff_count: `{repair_targets.get('route_candidate_handoff_count') if 'route_candidate_handoff_count' in repair_targets else 0}`",
f"- route_candidate_status_counts: `{dump_json(repair_targets.get('route_candidate_status_counts') or {})}`",
]
route_candidate_groups = repair_targets.get("route_candidate_groups") or []
if isinstance(route_candidate_groups, list) and route_candidate_groups:
lines.extend(["", "## Route Candidate Groups"])
for group in route_candidate_groups:
if not isinstance(group, dict):
continue
lines.extend(
[
f"- group_id: `{group.get('group_id') or 'n/a'}`",
f" candidate_count: `{group.get('candidate_count') or 0}`",
f" status: `{group.get('candidate_status') or 'n/a'}`",
f" selected_chain_id: `{group.get('selected_chain_id') or 'n/a'}`",
f" missing_axes: `{', '.join(normalize_string_list(group.get('missing_axes'))) or 'n/a'}`",
f" executable_now: `{group.get('executable_now')}`",
f" target_ids: `{', '.join(normalize_string_list(group.get('target_ids'))) or 'n/a'}`",
]
)
priority_foci = repair_targets.get("priority_foci") or []
if isinstance(priority_foci, list) and priority_foci:
lines.extend(
[
"",
"## Priority foci",
]
)
for focus in priority_foci:
if not isinstance(focus, dict):
continue
lines.extend(
[
f"- `{focus.get('focus_id')}`",
f" focus_rank: `{focus.get('focus_rank')}`",
f" severity: `{focus.get('severity')}`",
f" problem_type: `{focus.get('problem_type')}`",
f" target_count: `{focus.get('target_count')}`",
f" scenario_count: `{focus.get('scenario_count')}`",
f" candidate_files: {', '.join(focus.get('candidate_files') or []) or 'none'}",
f" rank_reason: {focus.get('rank_reason') or 'n/a'}",
]
)
lines.extend(
[
"",
"## Targets",
]
)
for target in repair_targets.get("targets") or []:
if not isinstance(target, dict):
continue
lines.extend(
[
f"- `{target.get('target_id')}`",
f" severity: `{target.get('severity')}`",
f" problem_type: `{target.get('problem_type')}`",
f" repair_focus_rank: `{target.get('repair_focus_rank')}`",
f" repair_focus_target_count: `{target.get('repair_focus_target_count')}`",
f" root_cause_layers: {', '.join(target.get('root_cause_layers') or []) or 'none'}",
f" fix_goal: {target.get('fix_goal') or 'n/a'}",
f" candidate_files: {', '.join(target.get('candidate_files') or []) or 'none'}",
]
)
return "\n".join(lines).strip() + "\n"
def evaluate_deterministic_loop_gate(
pack_state: dict[str, Any],
repair_targets: dict[str, Any],
) -> tuple[bool, str]:
pack_final_status = str(pack_state.get("final_status") or "").strip() or "partial"
if pack_final_status != "accepted":
return False, f"pack_final_status={pack_final_status}"
severity_counts = repair_targets.get("severity_counts") if isinstance(repair_targets, dict) else {}
if isinstance(severity_counts, dict):
p0_count = int(severity_counts.get("P0") or 0)
p1_count = int(severity_counts.get("P1") or 0)
if p0_count > 0 or p1_count > 0:
return False, f"repair_targets_remaining=P0:{p0_count},P1:{p1_count}"
return True, "deterministic_gate_passed"
def build_pack_review_bundle(pack_dir: Path) -> str:
pack_state = read_json_file(pack_dir / "pack_state.json") if (pack_dir / "pack_state.json").exists() else {}
scenario_artifacts = collect_pack_scenario_artifacts(pack_dir)
scenarios_bundle: list[dict[str, Any]] = []
for scenario_artifact in scenario_artifacts:
scenario_state = scenario_artifact.get("scenario_state") if isinstance(scenario_artifact.get("scenario_state"), dict) else {}
step_outputs_raw = scenario_state.get("step_outputs") if isinstance(scenario_state, dict) else {}
compact_steps: dict[str, Any] = {}
if isinstance(step_outputs_raw, dict):
for step_id, step_output in step_outputs_raw.items():
compact_steps[str(step_id)] = compact_step_output_for_review(step_output)
scenarios_bundle.append(
{
"scenario_id": scenario_artifact.get("scenario_id"),
"title": scenario_artifact.get("title"),
"session_id": scenario_artifact.get("session_id"),
"artifact_dir": scenario_artifact.get("artifact_dir"),
"summary": scenario_artifact.get("summary") or "",
"step_outputs": compact_steps,
}
)
repair_targets = (
read_json_file(pack_dir / "repair_targets.json")
if (pack_dir / "repair_targets.json").exists()
else build_deterministic_repair_targets(pack_state, scenario_artifacts)
)
bundle = {
"pack_state": {
"pack_id": pack_state.get("pack_id"),
"domain": pack_state.get("domain"),
"title": pack_state.get("title"),
"execution_status": pack_state.get("execution_status"),
"final_status": pack_state.get("final_status"),
"scenario_results": pack_state.get("scenario_results"),
},
"pack_manifest": read_json_file(pack_dir / "pack_manifest.json") if (pack_dir / "pack_manifest.json").exists() else {},
"pack_summary": read_text_file(pack_dir / "pack_summary.md") if (pack_dir / "pack_summary.md").exists() else "",
"scenario_acceptance_matrix": (
read_text_file(pack_dir / "scenario_acceptance_matrix.md")
if (pack_dir / "scenario_acceptance_matrix.md").exists()
else ""
),
"deterministic_repair_targets": repair_targets,
"scenarios": scenarios_bundle,
}
return dump_json(bundle)
def _scenario_observed_wording_families(scenario: dict[str, Any]) -> list[str]:
families: list[str] = []
steps = scenario.get("steps")
if not isinstance(steps, list):
return families
for step in steps:
if not isinstance(step, dict):
continue
family = str(step.get("paraphrase_family") or step.get("wording_family") or "").strip()
if family:
families.append(family)
return list(dict.fromkeys(families))
def build_analyst_loop_prompt(
*,
loop_dir: Path,
iteration_dir: Path,
pack_dir: Path,
repair_targets_path: Path,
previous_pack_dir: Path | None,
previous_verdict_path: Path | None,
target_score: int,
review_bundle_json: str,
repair_targets_json: str,
previous_verdict_json: str | None,
) -> str:
comparison_block = ""
if previous_pack_dir is not None:
comparison_block = textwrap.dedent(
f"""\
Compare the current run with the previous run:
- previous_pack_dir: `{previous_pack_dir}`
"""
)
if previous_verdict_path is not None and previous_verdict_path.exists():
comparison_block += f"- previous_analyst_verdict: `{previous_verdict_path}`\n"
previous_verdict_block = ""
if previous_verdict_json:
previous_verdict_block = textwrap.dedent(
f"""\
Previous analyst verdict JSON:
```json
{previous_verdict_json}
```
"""
)
return textwrap.dedent(
f"""\
You are the strict `domain_analyst` for NDC_1C.
Use the repo rules from:
- `.codex/agents/domain_analyst.toml`
- `.codex/skills/domain-case-loop/SKILL.md`
- `.codex/skills/domain-case-loop/references/verdict_template.md`
- `.codex/skills/domain-case-loop/references/business_first_analyst_rubric.md`
Current loop context:
- loop_dir: `{loop_dir}`
- iteration_dir: `{iteration_dir}`
- current_pack_dir: `{pack_dir}`
{comparison_block}
Required artifacts to inspect:
- `{pack_dir / 'pack_summary.md'}`
- `{pack_dir / 'pack_state.json'}`
- `{pack_dir / 'scenario_acceptance_matrix.md'}`
- `{repair_targets_path}`
- all `scenario_summary.md`, `scenario_state.json`, and problematic `steps/*/step_state.json` files inside `{pack_dir / 'scenarios'}`
Goal:
- evaluate current domain-pack correctness for business meaning, route/capability quality, evidence quality, and absence of silent heuristic masking;
- evaluate business usefulness, direct-answer-first behavior, state continuity, and field truthfulness, not only technical groundedness;
- evaluate object-centric dialog continuity: stable `focus_object`, stable `answer_object`, reusable bundles such as `provenance_bundle`, and correct action resolution for pronoun-style follow-ups;
- evaluate action-first follow-up behavior, answer layering, compactness of narrow micro-actions, and temporal honesty when the runtime broadens beyond the requested date window;
- determine whether the gate `quality_score >= {target_score}` is reached;
- if not, provide the smallest high-value fix targets for the coder.
Rules:
- `accepted` is allowed only if quality_score >= {target_score}, unresolved_p0_count = 0, and regression_detected = false;
- `accepted` is forbidden if the evidence bundle shows `pack_state.final_status != accepted` or the deterministic repair targets still contain any `P0` or `P1` items;
- `accepted` also requires `direct_answer_ok = true`, `business_usefulness_ok = true`, `temporal_honesty_ok = true`, and `field_truth_ok = true`;
- Treat validated bounded MCP discovery as the semantic route when `bounded_mcp_answer_validated = true`, `mcp_discovery_response_applied = true`, `mcp_discovery_response_candidate_status = ready_for_guarded_use`, and `mcp_discovery_effective_intents` matches the business question. The legacy address route may be only a seed lane; do not call that silent heuristic masking unless the selected discovery chain is wrong, not ready, not applied, or the user-facing answer/state is semantically wrong.
- Use `mcp_discovery_route_candidate_status`, `mcp_discovery_route_candidate_missing_axes`, and `mcp_discovery_route_candidate_enablement_reason` to distinguish a valid user-scope clarification from a real missing reviewed route. Do not ask the coder to overfit the visible answer when the correct next action is route enablement or missing-axis clarification.
- Treat `guarded_insufficiency_validated = true` as an acceptable limited answer only when the user-facing text explicitly says the exact fact is not confirmed and separates movement/candidate evidence from confirmed balances.
- Before creating a priority target about missing validation, inspect the compact step state flags `bounded_mcp_answer_validated`, `memory_checkpoint_validated`, `runtime_factual_answer_validated`, and `guarded_insufficiency_validated`; do not repeat a stale target that contradicts those machine-readable flags.
- Conversely, if a step is accepted only by first-line shape while `bounded_mcp_answer_validated`, `memory_checkpoint_validated`, `runtime_factual_answer_validated`, and `guarded_insufficiency_validated` are all false, treat that as potential silent heuristic or partial masking.
- For broad report-style questions, judge the whole `assistant_text_excerpt`, not only `actual_direct_answer`; the first line may be an executive money summary while the following lines cover VAT, debt, inventory, role boundaries, and unknowns.
- For narrow follow-ups, do not require a full broad business overview. If the user asks only for money totals or best operating-flow year, a compact money-focused answer is correct when it states the method and boundaries honestly.
- when several failing steps share one deterministic repair focus, call out the highest-leverage shared focus first instead of only the lexicographically first failing step;
- `partial` means the pack is usable but exactness, routing, or coverage is still insufficient;
- `needs_exact_capability` means the primary blocker is a missing exact route or capability, but the loop should still continue autonomously unless a user decision is required;
- `continue` means there is a clear next patch cycle;
- `blocked` means the loop is stopped by a real hard blocker such as runtime/infrastructure failure, unavailable 1C data, or another condition that the repo cannot autonomously repair;
- set `requires_user_decision = true` when the next step cannot be chosen safely without user input, for example:
- an architecture fork or risky contour expansion;
- a scope tradeoff or important business ambiguity;
- a required observation anchor is missing and cannot be recovered safely from artifacts, 1C, or the current scenario state;
- the only remaining implementation path would rely on a hack, brittle workaround, heuristic masking, or disproportionate complexity/risk;
- if `requires_user_decision = true`, fill `user_decision_type` and `user_decision_prompt`;
- if the pack is below {target_score} but there is still safe autonomous implementation work, keep `requires_user_decision = false`;
- do not request user input merely because the score is still below {target_score}; request it only when the loop would otherwise guess, overfit, or risk architecture drift.
- return machine-readable fields for: `user_intent_summary`, `expected_direct_answer`, `actual_direct_answer`, `direct_answer_ok`, `business_usefulness_ok`, `business_utility_score`, `direct_answer_priority_score`, `state_continuity_score`, `answer_shape_score`, `evidence_clarity_score`, `focus_object_continuity_ok`, `bundle_reuse_ok`, `followup_action_resolution_ok`, `temporal_honesty_ok`, `field_truth_ok`, `answer_layering_ok`, `recommended_state_objects`, `root_cause_layers`, `broken_edge_ids`, `violated_invariants`;
- if the product found the evidence but failed to retain the selected object, provenance bundle, or another reusable resolved object across turns, classify that as `object_memory_gap` or `edge_carryover_gap`, not as a generic route problem;
- if the product retained the item but resolved the wrong action over that item, for example `покажи документы по этой позиции` -> `documents_by_counterparty`, classify that as `followup_action_resolution_gap`;
- if the product already resolved supplier/date/document details for the active item but failed to reuse that bundle for adjacent follow-ups, classify that as `bundle_reuse_gap`;
- if a narrow business follow-up opens with numbered scaffolding such as `Блок 1/2/3` or a full generic trace packet instead of a compact direct answer, lower business usefulness explicitly rather than treating it as harmless formatting;
- if the surfaced business field looks mislabeled, for example supplier vs organization, classify that as `field_mapping_gap`;
- if the answer blurs exact-window evidence with nearest available out-of-window evidence, classify that as `temporal_honesty_gap`;
- if the answer is technically grounded but still weak for a manager/accountant/operator, classify that as `business_utility_gap`.
Use this UTF-8 evidence bundle as the source of truth for artifact contents. Do not treat shell rendering artifacts as file corruption if the embedded bundle is readable.
Current evidence bundle:
```json
{review_bundle_json}
```
Deterministic repair targets:
```json
{repair_targets_json}
```
{previous_verdict_block}
Return JSON only and follow the schema exactly.
"""
).strip()
def build_coder_loop_prompt(
*,
loop_dir: Path,
iteration_dir: Path,
pack_dir: Path,
repair_targets_path: Path,
repair_targets_json: str,
assigned_focus: dict[str, Any] | None,
analyst_verdict_path: Path,
analyst_verdict_json: str,
) -> str:
assigned_focus_block = (
textwrap.dedent(
f"""\
Assigned deterministic repair focus for this iteration:
```json
{dump_json(assigned_focus)}
```
"""
).strip()
if assigned_focus
else "Assigned deterministic repair focus for this iteration: none"
)
return textwrap.dedent(
f"""\
You are the `domain_coder` for NDC_1C.
Use the repo rules from:
- `.codex/agents/domain_coder.toml`
- `.codex/skills/domain-case-loop/SKILL.md`
Current loop context:
- loop_dir: `{loop_dir}`
- iteration_dir: `{iteration_dir}`
- current_pack_dir: `{pack_dir}`
- deterministic_repair_targets: `{repair_targets_path}`
- analyst_verdict_json: `{analyst_verdict_path}`
Make the smallest domain-only patch in the working tree that improves the failing or partial scenarios named in the analyst verdict.
Hard rules:
- do not change the architecture;
- do not fabricate data;
- do not present heuristic answers as confirmed;
- do not touch unrelated files;
- preserve already successful baseline flows.
- preserve UTF-8 without BOM and the existing line structure of edited files; do not leave whole-file normalization-only rewrites or single-line collapses in the worktree;
- use minimal local edits; if a tool rewrites a file into normalization noise, restore the original file first and then apply only the intended semantic patch;
- use `root_cause_layers`, `broken_edge_ids`, `violated_invariants`, and business-utility scores from the analyst verdict to choose the smallest fix;
- use the deterministic repair targets to choose the highest-leverage repair focus first; within that focus, patch the narrowest shared layer that can clear the most `P0`/`P1` targets without architecture drift;
- the assigned deterministic repair focus below is mandatory for this iteration; do not switch to a lower-priority focus unless you are blocked from making a safe patch for the assigned focus;
- if the analyst verdict is optimistic but deterministic repair targets still contain `P0` or `P1`, trust the deterministic repair targets and keep fixing the pack;
- prioritize state continuity, selected-object persistence, stable `focus_object`, stable `answer_object`, reusable `provenance_bundle` / `sale_trace_bundle`, action-first answer behavior, compact micro-action answers, answer layering, temporal honesty, and field-truth mapping when those are the blocking layers;
- do not broaden scope when the analyst says the defect is mainly `object_memory_gap`, `followup_action_resolution_gap`, `bundle_reuse_gap`, `field_mapping_gap`, `temporal_honesty_gap`, `answer_shape_mismatch`, or `business_utility_gap`;
- when the verdict points to pronoun follow-ups or item-centric drilldowns, prefer a narrow object-state or follow-up-action fix over prompt inflation.
Required outputs:
- create `{iteration_dir / 'coder_plan.md'}` with a short plan;
- create `{iteration_dir / 'patch_summary.md'}` with a short summary of the patch;
Analyst verdict JSON:
```json
{analyst_verdict_json}
```
Deterministic repair targets JSON:
```json
{repair_targets_json}
```
{assigned_focus_block}
- then return JSON only and follow the schema exactly.
"""
).strip()
def evaluate_analyst_gate(
verdict: dict[str, Any], target_score: int
) -> tuple[bool, str, bool, str, str | None]:
quality_score = int(verdict.get("quality_score") or 0)
unresolved_p0_count = int(verdict.get("unresolved_p0_count") or 0)
regression_detected = bool(verdict.get("regression_detected"))
direct_answer_ok = bool(verdict.get("direct_answer_ok", True))
business_usefulness_ok = bool(verdict.get("business_usefulness_ok", True))
temporal_honesty_ok = bool(verdict.get("temporal_honesty_ok", True))
field_truth_ok = bool(verdict.get("field_truth_ok", True))
answer_layering_ok = bool(verdict.get("answer_layering_ok", True))
loop_decision = str(verdict.get("loop_decision") or "").strip() or "continue"
requires_user_decision = bool(verdict.get("requires_user_decision"))
user_decision_type = str(verdict.get("user_decision_type") or "").strip() or "none"
user_decision_prompt_raw = verdict.get("user_decision_prompt")
user_decision_prompt = str(user_decision_prompt_raw).strip() if user_decision_prompt_raw else None
accepted = (
quality_score >= target_score
and unresolved_p0_count == 0
and not regression_detected
and direct_answer_ok
and business_usefulness_ok
and temporal_honesty_ok
and field_truth_ok
and answer_layering_ok
and loop_decision == "accepted"
)
return accepted, loop_decision, requires_user_decision, user_decision_type, user_decision_prompt
def _limited_dict_items(raw_items: Any, limit: int = 5) -> list[dict[str, Any]]:
if not isinstance(raw_items, list):
return []
return [item for item in raw_items[:limit] if isinstance(item, dict)]
def build_business_audit_markdown(
*,
analyst_verdict: dict[str, Any],
repair_targets: dict[str, Any],
target_score: int,
) -> str:
lines = [
"# Business audit",
"",
"This is the business-first verdict for the replay. Read it before looking at route ids, debug fields, or code.",
"",
"## Gate",
f"- quality_score: `{analyst_verdict.get('quality_score')}` / target `{target_score}`",
f"- loop_decision: `{analyst_verdict.get('loop_decision') or 'n/a'}`",
f"- unresolved_p0_count: `{analyst_verdict.get('unresolved_p0_count')}`",
f"- regression_detected: `{bool(analyst_verdict.get('regression_detected'))}`",
f"- direct_answer_ok: `{bool(analyst_verdict.get('direct_answer_ok', True))}`",
f"- business_usefulness_ok: `{bool(analyst_verdict.get('business_usefulness_ok', True))}`",
f"- temporal_honesty_ok: `{bool(analyst_verdict.get('temporal_honesty_ok', True))}`",
f"- field_truth_ok: `{bool(analyst_verdict.get('field_truth_ok', True))}`",
f"- answer_layering_ok: `{bool(analyst_verdict.get('answer_layering_ok', True))}`",
"",
"## Human Meaning",
f"- user_intent_summary: {analyst_verdict.get('user_intent_summary') or 'n/a'}",
f"- expected_direct_answer: {analyst_verdict.get('expected_direct_answer') or 'n/a'}",
f"- actual_direct_answer: {analyst_verdict.get('actual_direct_answer') or 'n/a'}",
"",
"## Quality Scores",
f"- business_utility_score: `{analyst_verdict.get('business_utility_score')}`",
f"- direct_answer_priority_score: `{analyst_verdict.get('direct_answer_priority_score')}`",
f"- state_continuity_score: `{analyst_verdict.get('state_continuity_score')}`",
f"- answer_shape_score: `{analyst_verdict.get('answer_shape_score')}`",
f"- evidence_clarity_score: `{analyst_verdict.get('evidence_clarity_score')}`",
"",
"## Root Cause",
]
root_layers = normalize_string_list(analyst_verdict.get("root_cause_layers"))
broken_edges = normalize_string_list(analyst_verdict.get("broken_edge_ids"))
violated_invariants = normalize_string_list(analyst_verdict.get("violated_invariants"))
lines.extend([f"- root_cause_layer: `{item}`" for item in root_layers] or ["- root_cause_layer: `n/a`"])
lines.extend([f"- broken_edge_id: `{item}`" for item in broken_edges] or ["- broken_edge_id: `n/a`"])
lines.extend([f"- violated_invariant: `{item}`" for item in violated_invariants] or ["- violated_invariant: `n/a`"])
lines.extend(
[
"",
"## Repair Pressure",
f"- repair_target_count: `{repair_targets.get('target_count') if isinstance(repair_targets, dict) else 0}`",
f"- deterministic_target_count: `{repair_targets.get('deterministic_target_count') if isinstance(repair_targets, dict) else 'n/a'}`",
f"- analyst_priority_target_count: `{repair_targets.get('analyst_priority_target_count') if isinstance(repair_targets, dict) else 'n/a'}`",
f"- suppressed_analyst_priority_target_count: `{repair_targets.get('suppressed_analyst_priority_target_count') if isinstance(repair_targets, dict) else 0}`",
f"- severity_counts: `{dump_json(repair_targets.get('severity_counts') if isinstance(repair_targets, dict) else {})}`",
"",
"## Priority Foci",
]
)
priority_foci = _limited_dict_items(repair_targets.get("priority_foci") if isinstance(repair_targets, dict) else [])
if not priority_foci:
lines.append("- no priority foci")
for focus in priority_foci:
candidate_files = normalize_string_list(focus.get("candidate_files"))
lines.extend(
[
f"- focus_id: `{focus.get('focus_id') or 'n/a'}`",
f" severity: `{focus.get('severity') or 'n/a'}`",
f" problem_type: `{focus.get('problem_type') or 'n/a'}`",
f" target_count: `{focus.get('target_count') or 0}`",
f" rank_reason: {focus.get('rank_reason') or 'n/a'}",
f" candidate_files: `{', '.join(candidate_files) if candidate_files else 'n/a'}`",
]
)
lines.extend(["", "## Analyst Notes"])
notes = normalize_string_list(analyst_verdict.get("findings"))
if not notes:
notes = normalize_string_list(analyst_verdict.get("recommended_actions"))
lines.extend([f"- {item}" for item in notes] or ["- no additional analyst notes"])
return "\n".join(lines).strip() + "\n"
def build_lead_coder_handoff(
*,
loop_state: dict[str, Any],
iteration_id: str,
pack_dir: Path,
analyst_verdict_path: Path,
repair_targets_path: Path,
business_audit_path: Path,
analyst_verdict: dict[str, Any],
repair_targets: dict[str, Any],
target_score: int,
loop_decision: str,
analyst_accepted_gate: bool,
accepted_gate: bool,
deterministic_gate_ok: bool,
deterministic_gate_reason: str,
requires_user_decision: bool,
user_decision_type: str,
user_decision_prompt: str | None,
) -> dict[str, Any]:
assigned_focus = select_primary_repair_focus(repair_targets)
priority_foci = _limited_dict_items(repair_targets.get("priority_foci") if isinstance(repair_targets, dict) else [])
repair_items = _limited_dict_items(repair_targets.get("targets") if isinstance(repair_targets, dict) else [], limit=8)
route_candidate_groups = _limited_dict_items(
repair_targets.get("route_candidate_groups") if isinstance(repair_targets, dict) else [],
limit=8,
)
route_candidate_enablement_targets = [
item
for item in repair_items
if isinstance(item, dict) and str(item.get("target_source") or "") == "route_candidate_enablement"
]
candidate_files = [repo_relative(path) for path in build_coder_snapshot_paths(repair_targets)]
return {
"schema_version": "domain_loop_lead_coder_handoff_v1",
"repair_mode": REPAIR_MODE_LEAD_HANDOFF,
"created_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
"loop_id": loop_state.get("loop_id"),
"iteration_id": iteration_id,
"status": "lead_coder_repair_required",
"reason": "semantic replay did not meet the business-first acceptance gate; code repair must be done by Lead Codex in the main project context",
"quality_score": int(analyst_verdict.get("quality_score") or 0),
"target_score": target_score,
"loop_decision": loop_decision,
"analyst_accepted_gate": analyst_accepted_gate,
"accepted_gate": accepted_gate,
"deterministic_gate_ok": deterministic_gate_ok,
"deterministic_gate_reason": deterministic_gate_reason,
"requires_user_decision": requires_user_decision,
"user_decision_type": user_decision_type,
"user_decision_prompt": user_decision_prompt,
"artifact_refs": {
"pack_dir": repo_relative(pack_dir),
"business_audit": repo_relative(business_audit_path),
"analyst_verdict": repo_relative(analyst_verdict_path),
"repair_targets": repo_relative(repair_targets_path),
"pack_summary": repo_relative(pack_dir / "pack_summary.md"),
"pack_state": repo_relative(pack_dir / "pack_state.json"),
"scenario_acceptance_matrix": repo_relative(pack_dir / "scenario_acceptance_matrix.md"),
},
"human_meaning": {
"user_intent_summary": analyst_verdict.get("user_intent_summary"),
"expected_direct_answer": analyst_verdict.get("expected_direct_answer"),
"actual_direct_answer": analyst_verdict.get("actual_direct_answer"),
},
"root_cause_layers": normalize_string_list(analyst_verdict.get("root_cause_layers")),
"broken_edge_ids": normalize_string_list(analyst_verdict.get("broken_edge_ids")),
"violated_invariants": normalize_string_list(analyst_verdict.get("violated_invariants")),
"assigned_primary_focus": assigned_focus or None,
"priority_foci": priority_foci,
"top_repair_targets": repair_items,
"route_candidate_groups": route_candidate_groups,
"route_candidate_enablement_targets": route_candidate_enablement_targets,
"candidate_files": candidate_files,
"lead_instructions": [
"Read business_audit.md first and judge the user-facing answer before debug metadata.",
"Inspect analyst_verdict.json and repair_targets.json only after the semantic defect is clear.",
"Use route_candidate_groups to distinguish missing user scope from a reviewed-route enablement gap before patching.",
"Patch code manually in the main Codex context; do not launch a weak autonomous coder by default.",
"Keep the patch narrow, preserve UTF-8 without BOM, run targeted tests/build, rebuild graphify after code edits, then rerun the same semantic pack.",
],
}
def build_lead_coder_handoff_markdown(handoff: dict[str, Any]) -> str:
artifact_refs = handoff.get("artifact_refs") if isinstance(handoff.get("artifact_refs"), dict) else {}
human_meaning = handoff.get("human_meaning") if isinstance(handoff.get("human_meaning"), dict) else {}
lines = [
"# Lead Codex repair handoff",
"",
f"- repair_mode: `{handoff.get('repair_mode')}`",
f"- loop_id: `{handoff.get('loop_id')}`",
f"- iteration_id: `{handoff.get('iteration_id')}`",
f"- quality_score: `{handoff.get('quality_score')}` / target `{handoff.get('target_score')}`",
f"- loop_decision: `{handoff.get('loop_decision')}`",
f"- deterministic_gate_ok: `{handoff.get('deterministic_gate_ok')}`",
f"- deterministic_gate_reason: `{handoff.get('deterministic_gate_reason') or 'n/a'}`",
"",
"## Read First",
f"- business_audit: `{artifact_refs.get('business_audit')}`",
f"- analyst_verdict: `{artifact_refs.get('analyst_verdict')}`",
f"- repair_targets: `{artifact_refs.get('repair_targets')}`",
f"- pack_dir: `{artifact_refs.get('pack_dir')}`",
"",
"## Human Meaning",
f"- user_intent_summary: {human_meaning.get('user_intent_summary') or 'n/a'}",
f"- expected_direct_answer: {human_meaning.get('expected_direct_answer') or 'n/a'}",
f"- actual_direct_answer: {human_meaning.get('actual_direct_answer') or 'n/a'}",
"",
"## Primary Focus",
]
assigned_focus = handoff.get("assigned_primary_focus") if isinstance(handoff.get("assigned_primary_focus"), dict) else {}
if assigned_focus:
candidate_files = normalize_string_list(assigned_focus.get("candidate_files"))
root_cause_layers = normalize_string_list(assigned_focus.get("root_cause_layers"))
lines.extend(
[
f"- focus_id: `{assigned_focus.get('focus_id') or 'n/a'}`",
f"- severity: `{assigned_focus.get('severity') or 'n/a'}`",
f"- problem_type: `{assigned_focus.get('problem_type') or 'n/a'}`",
f"- target_count: `{assigned_focus.get('target_count') or 0}`",
f"- root_cause_layers: `{', '.join(root_cause_layers) if root_cause_layers else 'n/a'}`",
f"- rank_reason: {assigned_focus.get('rank_reason') or 'n/a'}",
f"- candidate_files: `{', '.join(candidate_files) if candidate_files else 'n/a'}`",
]
)
else:
lines.append("- no assigned focus")
lines.extend(["", "## Top Repair Targets"])
repair_items = _limited_dict_items(handoff.get("top_repair_targets"), limit=8)
if not repair_items:
lines.append("- no repair targets")
for target in repair_items:
candidate_files = normalize_string_list(target.get("candidate_files"))
lines.extend(
[
f"- `{target.get('target_id') or 'n/a'}`",
f" severity: `{target.get('severity') or 'n/a'}`",
f" problem_type: `{target.get('problem_type') or 'n/a'}`",
f" source: `{target.get('target_source') or 'n/a'}`",
f" fix_goal: {target.get('fix_goal') or 'n/a'}",
f" candidate_files: `{', '.join(candidate_files) if candidate_files else 'n/a'}`",
]
)
route_candidate_groups = _limited_dict_items(handoff.get("route_candidate_groups"), limit=8)
lines.extend(["", "## Route Candidate Handoff Groups"])
if not route_candidate_groups:
lines.append("- no route candidates")
for group in route_candidate_groups:
target_ids = normalize_string_list(group.get("target_ids"))
missing_axes = normalize_string_list(group.get("missing_axes"))
sample_questions = normalize_string_list(group.get("sample_questions"))
next_actions = normalize_string_list(group.get("next_actions"))
lines.extend(
[
f"- group_id: `{group.get('group_id') or 'n/a'}`",
f" status: `{group.get('candidate_status') or 'n/a'}`",
f" selected_chain_id: `{group.get('selected_chain_id') or 'n/a'}`",
f" fact/action: `{group.get('fact_family') or 'n/a'}` / `{group.get('action_family') or 'n/a'}`",
f" missing_axes: `{', '.join(missing_axes) if missing_axes else 'n/a'}`",
f" executable_now: `{group.get('executable_now')}`",
f" target_ids: `{', '.join(target_ids) if target_ids else 'n/a'}`",
f" sample_questions: `{'; '.join(sample_questions) if sample_questions else 'n/a'}`",
f" next_actions: `{'; '.join(next_actions) if next_actions else 'n/a'}`",
]
)
route_candidate_enablement_targets = _limited_dict_items(
handoff.get("route_candidate_enablement_targets"),
limit=8,
)
lines.extend(["", "## Route Candidate Enablement Targets"])
if not route_candidate_enablement_targets:
lines.append("- no route-candidate enablement targets")
for target in route_candidate_enablement_targets:
route_candidate = target.get("route_candidate") if isinstance(target.get("route_candidate"), dict) else {}
lines.extend(
[
f"- `{target.get('target_id') or 'n/a'}`",
f" severity: `{target.get('severity') or 'n/a'}`",
f" selected_chain_id: `{route_candidate.get('selected_chain_id') or 'n/a'}`",
f" missing_axes: `{', '.join(normalize_string_list(route_candidate.get('missing_axes'))) or 'n/a'}`",
f" fix_goal: {target.get('fix_goal') or 'n/a'}",
]
)
lines.extend(["", "## Candidate Files"])
candidate_files = normalize_string_list(handoff.get("candidate_files"))
lines.extend([f"- `{item}`" for item in candidate_files] or ["- no candidate files"])
lines.extend(["", "## Lead Instructions"])
lines.extend([f"- {item}" for item in normalize_string_list(handoff.get("lead_instructions"))])
return "\n".join(lines).strip() + "\n"
def save_lead_coder_handoff(
*,
loop_dir: Path,
iteration_dir: Path,
handoff: dict[str, Any],
) -> dict[str, str]:
iteration_json = iteration_dir / "lead_coder_handoff.json"
iteration_md = iteration_dir / "lead_coder_handoff.md"
latest_json = loop_dir / "lead_coder_handoff.json"
latest_md = loop_dir / "lead_coder_handoff.md"
markdown = build_lead_coder_handoff_markdown(handoff)
write_json(iteration_json, handoff)
write_text(iteration_md, markdown)
write_json(latest_json, handoff)
write_text(latest_md, markdown)
return {
"lead_coder_handoff_path": str(iteration_json),
"lead_coder_handoff_markdown_path": str(iteration_md),
"latest_lead_coder_handoff_path": str(latest_json),
"latest_lead_coder_handoff_markdown_path": str(latest_md),
}
def handle_run_pack(args: argparse.Namespace) -> int:
ensure_backend_health(args.backend_url, args.timeout_seconds)
pack_path = Path(args.manifest).resolve()
pack = load_scenario_pack(pack_path)
if args.pack_id:
pack["pack_id"] = args.pack_id.strip()
if args.analysis_date:
pack["analysis_context"] = merge_analysis_context(
pack.get("analysis_context"),
{"as_of_date": args.analysis_date, "source": "cli_override"},
)
pack_dir = Path(args.output_root).resolve() / pack["pack_id"]
scenarios_dir = pack_dir / "scenarios"
scenarios_dir.mkdir(parents=True, exist_ok=True)
write_json(pack_dir / "pack_manifest.json", pack)
write_text(pack_dir / "manifest_source.txt", f"{pack_path}\n")
scenario_results: list[dict[str, Any]] = []
max_scenarios = max(0, int(args.max_scenarios)) if args.max_scenarios is not None else None
scenarios_to_run = pack["scenarios"][:max_scenarios] if max_scenarios else pack["scenarios"]
for scenario in scenarios_to_run:
scenario_manifest = normalize_scenario_manifest(
scenario,
fallback_domain=pack["domain"],
fallback_analysis_context=pack.get("analysis_context"),
fallback_bindings=pack.get("bindings"),
default_scenario_id=scenario["scenario_id"],
)
scenario_dir = scenarios_dir / scenario_manifest["scenario_id"]
scenario_state, scenario_final_status = execute_scenario_manifest(
args=args,
manifest=scenario_manifest,
scenario_dir=scenario_dir,
manifest_source_label=f"{pack_path}#{scenario_manifest['scenario_id']}",
)
scenario_results.append(
{
"scenario_id": scenario_manifest["scenario_id"],
"title": scenario_manifest["title"],
"execution_status": derive_scenario_execution_status(scenario_state.get("step_outputs") or {}),
"final_status": scenario_final_status,
"session_id": scenario_state.get("session_id"),
"artifact_dir": str(scenario_dir),
}
)
execution_status = derive_pack_execution_status(scenario_results)
final_status = derive_pack_final_status(pack, scenario_results)
pack_state = {
"schema_version": SCENARIO_PACK_SCHEMA_VERSION,
"pack_id": pack["pack_id"],
"domain": pack["domain"],
"title": pack["title"],
"analysis_context": pack.get("analysis_context") or {},
"bindings": pack.get("bindings") or {},
"scenario_results": scenario_results,
"execution_status": execution_status,
"final_status": final_status,
"updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
}
scenario_artifacts = collect_pack_scenario_artifacts(pack_dir)
repair_targets = build_deterministic_repair_targets(pack_state, scenario_artifacts)
write_text(pack_dir / "scenario_acceptance_matrix.md", build_scenario_acceptance_matrix(pack, scenario_results))
write_json(pack_dir / "pack_state.json", pack_state)
write_json(pack_dir / "repair_targets.json", repair_targets)
write_text(pack_dir / "repair_targets.md", build_repair_targets_summary(repair_targets))
write_text(pack_dir / "pack_summary.md", build_pack_summary(pack, scenario_results, final_status, execution_status))
write_text(pack_dir / "final_status.md", build_pack_final_status(pack, scenario_results, final_status, execution_status))
print(f"[domain-case-loop] saved pack artifacts to {pack_dir}")
print(f"[domain-case-loop] execution_status={execution_status} final_status={final_status}")
return 0
def build_loop_summary(loop_state: dict[str, Any]) -> str:
lines = [
"# Loop summary",
"",
f"- loop_id: `{loop_state['loop_id']}`",
f"- repair_mode: `{loop_state.get('repair_mode') or 'n/a'}`",
f"- target_score: `{loop_state['target_score']}`",
f"- max_iterations: `{loop_state['max_iterations']}`",
f"- final_status: `{loop_state['final_status']}`",
f"- last_analyst_decision: `{loop_state.get('last_analyst_decision') or 'n/a'}`",
"",
"## Iterations",
]
for item in loop_state.get("iterations", []):
lines.extend(
[
f"- `{item['iteration_id']}`",
f" baseline_pack_dir: `{item['pack_dir']}`",
f" analyst_score: `{item.get('quality_score')}`",
f" analyst_decision: `{item.get('loop_decision')}`",
f" analyst_accepted_gate: `{item.get('analyst_accepted_gate')}`",
f" accepted_gate: `{item.get('accepted_gate')}`",
f" deterministic_gate_ok: `{item.get('deterministic_gate_ok')}`",
f" deterministic_gate_reason: `{item.get('deterministic_gate_reason') or 'n/a'}`",
f" requires_user_decision: `{item.get('requires_user_decision')}`",
f" user_decision_type: `{item.get('user_decision_type') or 'none'}`",
f" coder_status: `{item.get('coder_status') or 'n/a'}`",
f" assigned_repair_focus_id: `{item.get('assigned_repair_focus_id') or 'none'}`",
f" coder_workspace_hygiene_restored_files: `{', '.join(item.get('coder_workspace_hygiene_restored_files') or []) or 'none'}`",
f" analyst_verdict: `{item.get('analyst_verdict_path') or 'n/a'}`",
f" business_audit: `{item.get('business_audit_path') or 'n/a'}`",
f" repair_targets: `{item.get('repair_targets_path') or 'n/a'}`",
f" lead_coder_handoff: `{item.get('lead_coder_handoff_path') or 'n/a'}`",
f" repair_target_count: `{item.get('repair_target_count')}`",
f" repair_target_severity_counts: `{dump_json(item.get('repair_target_severity_counts') or {})}`",
]
)
return "\n".join(lines).strip() + "\n"
def build_loop_final_status(loop_state: dict[str, Any]) -> str:
return textwrap.dedent(
f"""\
# Final status
- status: `{loop_state['final_status']}`
- loop_id: `{loop_state['loop_id']}`
- repair_mode: `{loop_state.get('repair_mode') or 'n/a'}`
- target_score: `{loop_state['target_score']}`
- iterations_ran: `{len(loop_state.get('iterations', []))}`
- last_analyst_decision: `{loop_state.get('last_analyst_decision') or 'n/a'}`
- latest_lead_coder_handoff: `{loop_state.get('latest_lead_coder_handoff_path') or 'n/a'}`
- stop_reason: {loop_state.get('stop_reason') or 'n/a'}
"""
)
def handle_run_pack_loop(args: argparse.Namespace) -> int:
manifest_path = Path(args.manifest).resolve()
loop_id = str(args.loop_id or slugify_case_id("domain_pack_loop", None)).strip()
loop_dir = Path(args.output_root).resolve() / loop_id
iterations_dir = loop_dir / "iterations"
iterations_dir.mkdir(parents=True, exist_ok=True)
write_text(loop_dir / "manifest_source.txt", f"{manifest_path}\n")
target_score = int(args.target_score)
max_iterations = int(args.max_iterations)
repair_mode = normalize_repair_mode(getattr(args, "repair_mode", REPAIR_MODE_LEAD_HANDOFF))
if max_iterations < 1:
raise RuntimeError("--max-iterations must be >= 1")
loop_state: dict[str, Any] = {
"schema_version": AUTONOMOUS_LOOP_SCHEMA_VERSION,
"loop_id": loop_id,
"manifest_path": str(manifest_path),
"repair_mode": repair_mode,
"target_score": target_score,
"max_iterations": max_iterations,
"iterations": [],
"final_status": "partial",
"stop_reason": None,
"last_analyst_decision": None,
"last_user_decision_type": "none",
"last_user_decision_prompt": None,
"updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
}
write_json(loop_dir / "loop_state.json", loop_state)
previous_pack_dir: Path | None = None
previous_verdict_path: Path | None = None
for iteration_index in range(max_iterations):
iteration_id = f"iteration_{iteration_index:02d}"
iteration_dir = iterations_dir / iteration_id
iteration_dir.mkdir(parents=True, exist_ok=True)
pack_output_root = iteration_dir / "pack_output"
pack_id = "pack_run"
pack_command = build_run_pack_command(
args,
manifest_path=manifest_path,
pack_id=pack_id,
output_root=pack_output_root,
)
run_subprocess_command(
pack_command,
cwd=REPO_ROOT,
timeout_seconds=max(600, int(args.timeout_seconds) * 20),
stdout_path=iteration_dir / "pack_run.stdout.log",
stderr_path=iteration_dir / "pack_run.stderr.log",
)
pack_dir = pack_output_root / pack_id
analyst_verdict_path = iteration_dir / "analyst_verdict.json"
review_bundle_json = build_pack_review_bundle(pack_dir)
repair_targets_path = pack_dir / "repair_targets.json"
repair_targets = read_json_file(repair_targets_path) if repair_targets_path.exists() else {}
repair_targets_json = dump_json(repair_targets)
pack_state = read_json_file(pack_dir / "pack_state.json") if (pack_dir / "pack_state.json").exists() else {}
previous_verdict_json = read_text_file(previous_verdict_path) if previous_verdict_path is not None and previous_verdict_path.exists() else None
analyst_prompt = build_analyst_loop_prompt(
loop_dir=loop_dir,
iteration_dir=iteration_dir,
pack_dir=pack_dir,
repair_targets_path=repair_targets_path,
previous_pack_dir=previous_pack_dir,
previous_verdict_path=previous_verdict_path,
target_score=target_score,
review_bundle_json=review_bundle_json,
repair_targets_json=repair_targets_json,
previous_verdict_json=previous_verdict_json,
)
write_text(iteration_dir / "analyst_prompt.md", analyst_prompt + "\n")
analyst_command = build_codex_exec_command(
args,
output_file=analyst_verdict_path,
schema_file=Path(args.analyst_schema).resolve(),
sandbox_mode="read-only",
model_override=getattr(args, "analyst_codex_model", None),
reasoning_effort=getattr(args, "analyst_reasoning_effort", None),
)
run_subprocess_command(
analyst_command,
cwd=REPO_ROOT,
timeout_seconds=int(args.codex_timeout_seconds),
input_text=analyst_prompt,
stdout_path=iteration_dir / "analyst_exec.stdout.log",
stderr_path=iteration_dir / "analyst_exec.stderr.log",
)
analyst_verdict = read_json_output(analyst_verdict_path)
repair_targets = merge_analyst_priority_repair_targets(repair_targets, analyst_verdict)
repair_targets_path = iteration_dir / "semantic_repair_targets.json"
write_json(repair_targets_path, repair_targets)
write_text(iteration_dir / "semantic_repair_targets.md", build_repair_targets_summary(repair_targets))
repair_targets_json = dump_json(repair_targets)
analyst_accepted_gate, loop_decision, requires_user_decision, user_decision_type, user_decision_prompt = evaluate_analyst_gate(
analyst_verdict, target_score
)
deterministic_gate_ok, deterministic_gate_reason = evaluate_deterministic_loop_gate(pack_state, repair_targets)
business_audit_path = iteration_dir / "business_audit.md"
write_text(
business_audit_path,
build_business_audit_markdown(
analyst_verdict=analyst_verdict,
repair_targets=repair_targets,
target_score=target_score,
),
)
accepted_gate = analyst_accepted_gate and deterministic_gate_ok
repair_target_count = int(repair_targets.get("target_count") or 0) if isinstance(repair_targets, dict) else 0
repair_target_severity_counts = (
repair_targets.get("severity_counts")
if isinstance(repair_targets, dict) and isinstance(repair_targets.get("severity_counts"), dict)
else {}
)
loop_state["last_analyst_decision"] = loop_decision
loop_state["last_user_decision_type"] = user_decision_type
loop_state["last_user_decision_prompt"] = user_decision_prompt
iteration_record: dict[str, Any] = {
"iteration_id": iteration_id,
"pack_dir": str(pack_dir),
"quality_score": int(analyst_verdict.get("quality_score") or 0),
"loop_decision": loop_decision,
"analyst_accepted_gate": analyst_accepted_gate,
"accepted_gate": accepted_gate,
"deterministic_gate_ok": deterministic_gate_ok,
"deterministic_gate_reason": deterministic_gate_reason,
"requires_user_decision": requires_user_decision,
"user_decision_type": user_decision_type,
"user_decision_prompt": user_decision_prompt,
"analyst_verdict_path": str(analyst_verdict_path),
"business_audit_path": str(business_audit_path),
"repair_targets_path": str(repair_targets_path),
"repair_target_count": repair_target_count,
"repair_target_severity_counts": repair_target_severity_counts,
"coder_status": None,
}
if accepted_gate:
loop_state["iterations"].append(iteration_record)
loop_state["final_status"] = "accepted"
loop_state["stop_reason"] = f"analyst accepted + deterministic gate passed at {iteration_id}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
if requires_user_decision:
loop_state["iterations"].append(iteration_record)
if loop_decision in {"needs_exact_capability", "partial", "blocked"}:
loop_state["final_status"] = loop_decision
else:
loop_state["final_status"] = "partial"
prompt_suffix = f" | prompt: {user_decision_prompt}" if user_decision_prompt else ""
loop_state["stop_reason"] = f"user_decision_required at {iteration_id}: {user_decision_type}{prompt_suffix}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
if loop_decision == "blocked":
loop_state["iterations"].append(iteration_record)
loop_state["final_status"] = "blocked"
loop_state["stop_reason"] = f"analyst blocked at {iteration_id}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
if repair_mode == REPAIR_MODE_LEAD_HANDOFF:
handoff = build_lead_coder_handoff(
loop_state=loop_state,
iteration_id=iteration_id,
pack_dir=pack_dir,
analyst_verdict_path=analyst_verdict_path,
repair_targets_path=repair_targets_path,
business_audit_path=business_audit_path,
analyst_verdict=analyst_verdict,
repair_targets=repair_targets,
target_score=target_score,
loop_decision=loop_decision,
analyst_accepted_gate=analyst_accepted_gate,
accepted_gate=accepted_gate,
deterministic_gate_ok=deterministic_gate_ok,
deterministic_gate_reason=deterministic_gate_reason,
requires_user_decision=requires_user_decision,
user_decision_type=user_decision_type,
user_decision_prompt=user_decision_prompt,
)
handoff_paths = save_lead_coder_handoff(
loop_dir=loop_dir,
iteration_dir=iteration_dir,
handoff=handoff,
)
iteration_record["coder_status"] = "lead_handoff_required"
iteration_record.update(handoff_paths)
if isinstance(handoff.get("assigned_primary_focus"), dict):
iteration_record["assigned_repair_focus_id"] = str(
handoff["assigned_primary_focus"].get("focus_id") or ""
)
loop_state["iterations"].append(iteration_record)
loop_state["latest_lead_coder_handoff_path"] = handoff_paths["latest_lead_coder_handoff_path"]
loop_state["latest_lead_coder_handoff_markdown_path"] = handoff_paths[
"latest_lead_coder_handoff_markdown_path"
]
if loop_decision in {"needs_exact_capability", "partial"}:
loop_state["final_status"] = loop_decision
else:
loop_state["final_status"] = "partial"
loop_state["stop_reason"] = f"lead_coder_handoff_required at {iteration_id}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
coder_result_path = iteration_dir / "coder_result.json"
assigned_focus = select_primary_repair_focus(repair_targets)
coder_prompt = build_coder_loop_prompt(
loop_dir=loop_dir,
iteration_dir=iteration_dir,
pack_dir=pack_dir,
repair_targets_path=repair_targets_path,
repair_targets_json=repair_targets_json,
assigned_focus=assigned_focus,
analyst_verdict_path=analyst_verdict_path,
analyst_verdict_json=dump_json(analyst_verdict),
)
write_text(iteration_dir / "coder_prompt.md", coder_prompt + "\n")
coder_snapshot_paths = build_coder_snapshot_paths(repair_targets)
coder_snapshots = snapshot_coder_candidate_files(coder_snapshot_paths)
coder_command = build_codex_exec_command(
args,
output_file=coder_result_path,
schema_file=Path(args.coder_schema).resolve(),
sandbox_mode="workspace-write",
model_override=getattr(args, "coder_codex_model", None),
reasoning_effort=getattr(args, "coder_reasoning_effort", None),
)
run_subprocess_command(
coder_command,
cwd=REPO_ROOT,
timeout_seconds=int(args.codex_timeout_seconds),
input_text=coder_prompt,
stdout_path=iteration_dir / "coder_exec.stdout.log",
stderr_path=iteration_dir / "coder_exec.stderr.log",
)
restored_files = restore_line_collapsed_files_from_snapshot(coder_snapshots)
coder_result = read_json_output(coder_result_path)
coder_status = str(coder_result.get("status") or "").strip() or "unknown"
iteration_record["coder_status"] = coder_status
iteration_record["coder_result_path"] = str(coder_result_path)
if assigned_focus:
iteration_record["assigned_repair_focus_id"] = str(assigned_focus.get("focus_id") or "")
if restored_files:
iteration_record["coder_workspace_hygiene_restored_files"] = restored_files
loop_state["iterations"].append(iteration_record)
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
if coder_status == "blocked":
loop_state["final_status"] = "blocked"
loop_state["stop_reason"] = f"coder stopped progress at {iteration_id}: {coder_status}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
previous_pack_dir = pack_dir
previous_verdict_path = analyst_verdict_path
else:
if loop_state.get("last_analyst_decision") == "needs_exact_capability":
loop_state["final_status"] = "needs_exact_capability"
else:
loop_state["final_status"] = "partial"
loop_state["stop_reason"] = f"max_iterations_reached ({max_iterations})"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
write_text(loop_dir / "loop_summary.md", build_loop_summary(loop_state))
write_text(loop_dir / "final_status.md", build_loop_final_status(loop_state))
print(f"[domain-case-loop] saved loop artifacts to {loop_dir}")
print(f"[domain-case-loop] final_status={loop_state['final_status']}")
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Repo-native helper for NDC_1C domain-case and scenario orchestration")
subparsers = parser.add_subparsers(dest="command", required=True)
run_case = subparsers.add_parser("run-case", help="Run one assistant_stage1 case through the existing backend and save artifacts")
run_case.add_argument("--domain", required=True)
run_case.add_argument("--question", required=True)
run_case.add_argument("--case-id")
run_case.add_argument("--slot", default="baseline", choices=["baseline", "rerun"])
run_case.add_argument("--analysis-date")
run_case.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
run_case.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
run_case.add_argument("--sessions-dir", default=str(DEFAULT_SESSIONS_DIR))
run_case.add_argument("--reports-dir", default=str(DEFAULT_REPORTS_DIR))
run_case.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION)
run_case.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"])
run_case.add_argument("--llm-model", default=DEFAULT_LLM_MODEL)
run_case.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL)
run_case.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY)
run_case.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE)
run_case.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS)
run_case.add_argument("--timeout-seconds", type=int, default=300)
run_case.add_argument("--poll-interval-seconds", type=float, default=1.5)
run_case.add_argument("--expected-capability")
run_case.add_argument("--expected-result-mode")
run_case.add_argument("--use-mock", action="store_true")
run_case.set_defaults(func=handle_run_case)
import_export = subparsers.add_parser("import-export", help="Import an existing technical export markdown and build artifacts")
import_export.add_argument("--domain", required=True)
import_export.add_argument("--input", required=True)
import_export.add_argument("--question")
import_export.add_argument("--case-id")
import_export.add_argument("--slot", default="baseline", choices=["baseline", "rerun"])
import_export.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
import_export.add_argument("--expected-capability")
import_export.add_argument("--expected-result-mode")
import_export.set_defaults(func=handle_import_export)
run_scenario = subparsers.add_parser(
"run-scenario",
help="Run one multi-step domain scenario in a shared assistant session and save per-step artifacts",
)
run_scenario.add_argument("--manifest", required=True)
run_scenario.add_argument("--scenario-id")
run_scenario.add_argument("--analysis-date")
run_scenario.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
run_scenario.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
run_scenario.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION)
run_scenario.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"])
run_scenario.add_argument("--llm-model", default=DEFAULT_LLM_MODEL)
run_scenario.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL)
run_scenario.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY)
run_scenario.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE)
run_scenario.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS)
run_scenario.add_argument("--timeout-seconds", type=int, default=180)
run_scenario.add_argument("--use-mock", action="store_true")
run_scenario.set_defaults(func=handle_run_scenario)
run_pack = subparsers.add_parser(
"run-pack",
help="Run a multi-scenario domain pack and save aggregate orchestration artifacts",
)
run_pack.add_argument("--manifest", required=True)
run_pack.add_argument("--pack-id")
run_pack.add_argument("--analysis-date")
run_pack.add_argument("--max-scenarios", type=int)
run_pack.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
run_pack.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
run_pack.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION)
run_pack.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"])
run_pack.add_argument("--llm-model", default=DEFAULT_LLM_MODEL)
run_pack.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL)
run_pack.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY)
run_pack.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE)
run_pack.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS)
run_pack.add_argument("--timeout-seconds", type=int, default=180)
run_pack.add_argument("--use-mock", action="store_true")
run_pack.set_defaults(func=handle_run_pack)
run_pack_loop = subparsers.add_parser(
"run-pack-loop",
help="Run semantic pack replay with strong analyst review and either Lead Codex handoff or opt-in auto-coder repair",
)
run_pack_loop.add_argument("--manifest", required=True)
run_pack_loop.add_argument("--loop-id")
run_pack_loop.add_argument("--analysis-date")
run_pack_loop.add_argument("--max-scenarios", type=int)
run_pack_loop.add_argument("--target-score", type=int, default=80)
run_pack_loop.add_argument("--max-iterations", type=int, default=8)
run_pack_loop.add_argument(
"--repair-mode",
default=REPAIR_MODE_LEAD_HANDOFF,
choices=[REPAIR_MODE_LEAD_HANDOFF, REPAIR_MODE_AUTO_CODER],
help="Default lead-handoff stops after the strong analyst verdict and writes Lead Codex repair artifacts; auto-coder preserves the old autonomous coder loop.",
)
run_pack_loop.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
run_pack_loop.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
run_pack_loop.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION)
run_pack_loop.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"])
run_pack_loop.add_argument("--llm-model", default=DEFAULT_LLM_MODEL)
run_pack_loop.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL)
run_pack_loop.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY)
run_pack_loop.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE)
run_pack_loop.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS)
run_pack_loop.add_argument("--timeout-seconds", type=int, default=180)
run_pack_loop.add_argument("--use-mock", action="store_true")
run_pack_loop.add_argument("--codex-binary", default="codex")
run_pack_loop.add_argument("--codex-profile")
run_pack_loop.add_argument("--codex-model")
run_pack_loop.add_argument("--analyst-codex-model", default="gpt-5.4")
run_pack_loop.add_argument("--coder-codex-model", default="gpt-5.4")
run_pack_loop.add_argument("--analyst-reasoning-effort", default="medium")
run_pack_loop.add_argument("--coder-reasoning-effort", default="high")
run_pack_loop.add_argument("--codex-timeout-seconds", type=int, default=1800)
run_pack_loop.add_argument(
"--analyst-schema",
default=str(DEFAULT_LOOP_SCHEMA_DIR / "domain_loop_analyst_verdict.schema.json"),
)
run_pack_loop.add_argument(
"--coder-schema",
default=str(DEFAULT_LOOP_SCHEMA_DIR / "domain_loop_coder_result.schema.json"),
)
run_pack_loop.set_defaults(func=handle_run_pack_loop)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
try:
return int(args.func(args))
except Exception as error: # noqa: BLE001
print(f"[domain-case-loop] error: {error}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())