NODEDC_1C/scripts/domain_truth_harness.py

1198 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import argparse
import json
import re
from datetime import date, datetime, timezone
from pathlib import Path
from types import SimpleNamespace
from typing import Any
import domain_case_loop as dcl
import scenario_acceptance_policy as sap
REPO_ROOT = Path(__file__).resolve().parent.parent
DEFAULT_OUTPUT_ROOT = REPO_ROOT / "artifacts" / "domain_runs"
TRUTH_HARNESS_SPEC_SCHEMA_VERSION = "domain_truth_harness_spec_v1"
TRUTH_HARNESS_STATE_SCHEMA_VERSION = "domain_truth_harness_state_v1"
TRUTH_HARNESS_REVIEW_SCHEMA_VERSION = "domain_truth_harness_review_v1"
TRUTH_HARNESS_MANIFEST_SCHEMA_VERSION = "domain_truth_harness_manifest_v1"
DEFAULT_CRITICALITY = "critical"
TECHNICAL_QUESTION_FIELDS = (
"expected_intents",
"expected_capability",
"expected_recipe",
"expected_result_mode",
"expected_catalog_alignment_status",
"expected_catalog_chain_top_match",
"expected_catalog_selected_matches_top",
"required_filters",
"forbidden_capabilities",
"forbidden_recipes",
"required_state_objects",
"required_answer_shape",
"forbidden_answer_patterns",
"required_carryover_invariants",
"invariant_severity",
)
def dump_json(payload: Any) -> str:
return json.dumps(payload, ensure_ascii=False, indent=2)
def write_text(file_path: Path, text: str) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(text, encoding="utf-8", newline="\n")
def write_json(file_path: Path, payload: Any) -> None:
write_text(file_path, dump_json(payload) + "\n")
def read_json(file_path: Path) -> dict[str, Any]:
payload = json.loads(file_path.read_text(encoding="utf-8-sig"))
if not isinstance(payload, dict):
raise RuntimeError(f"Expected JSON object in {file_path}")
return payload
def normalize_criticality(value: Any) -> str:
raw = str(value or "").strip().lower()
if raw in {"critical", "warning", "info"}:
return raw
return DEFAULT_CRITICALITY
def normalize_pattern_list(raw_value: Any) -> list[str]:
return [item for item in dcl.normalize_string_list(raw_value) if item]
def normalize_filter_values_mapping(raw_value: Any) -> dict[str, list[str]]:
if not isinstance(raw_value, dict):
return {}
output: dict[str, list[str]] = {}
for key, value in raw_value.items():
normalized_key = str(key or "").strip()
if not normalized_key:
continue
normalized_values = normalize_pattern_list(value)
if normalized_values:
output[normalized_key] = normalized_values
return output
def normalize_step_spec(index: int, raw_step: Any) -> dict[str, Any]:
normalized_step = dcl.normalize_step_definition(index, raw_step)
step = raw_step if isinstance(raw_step, dict) else {}
normalized_step["criticality"] = normalize_criticality(step.get("criticality"))
normalized_step["semantic_tags"] = dcl.normalize_string_list(step.get("semantic_tags"))
normalized_step["allowed_reply_types"] = normalize_pattern_list(step.get("allowed_reply_types"))
normalized_step["allowed_limited_reason_categories"] = normalize_pattern_list(
step.get("allowed_limited_reason_categories")
)
normalized_step["expected_catalog_alignment_status"] = (
str(step.get("expected_catalog_alignment_status") or "").strip() or None
)
normalized_step["expected_catalog_chain_top_match"] = (
str(step.get("expected_catalog_chain_top_match") or "").strip() or None
)
normalized_step["expected_catalog_selected_matches_top"] = step.get("expected_catalog_selected_matches_top")
normalized_step["required_answer_patterns_any"] = normalize_pattern_list(step.get("required_answer_patterns_any"))
normalized_step["required_answer_patterns_all"] = normalize_pattern_list(step.get("required_answer_patterns_all"))
normalized_step["required_direct_answer_patterns_any"] = normalize_pattern_list(
step.get("required_direct_answer_patterns_any")
)
normalized_step["required_direct_answer_patterns_all"] = normalize_pattern_list(
step.get("required_direct_answer_patterns_all")
)
normalized_step["forbidden_direct_answer_patterns"] = normalize_pattern_list(
step.get("forbidden_direct_answer_patterns")
)
normalized_step["forbidden_filter_keys"] = normalize_pattern_list(step.get("forbidden_filter_keys"))
normalized_step["forbidden_filter_values"] = normalize_filter_values_mapping(step.get("forbidden_filter_values"))
normalized_step["required_filter_within_previous_step_period"] = {
str(key).strip(): str(value).strip()
for key, value in (step.get("required_filter_within_previous_step_period") or {}).items()
if str(key).strip() and str(value).strip()
} if isinstance(step.get("required_filter_within_previous_step_period"), dict) else {}
normalized_step["notes"] = str(step.get("notes") or "").strip() or None
return normalized_step
def load_truth_harness_spec(file_path: Path) -> dict[str, Any]:
raw_spec = read_json(file_path)
steps_raw = raw_spec.get("steps")
if not isinstance(steps_raw, list) or not steps_raw:
raise RuntimeError("Truth harness spec must define non-empty `steps`")
scenario_id = str(raw_spec.get("scenario_id") or "").strip()
domain = str(raw_spec.get("domain") or "").strip()
if not scenario_id:
raise RuntimeError("Truth harness spec must define `scenario_id`")
if not domain:
raise RuntimeError("Truth harness spec must define `domain`")
return {
"schema_version": str(raw_spec.get("schema_version") or TRUTH_HARNESS_SPEC_SCHEMA_VERSION),
"scenario_id": scenario_id,
"domain": domain,
"title": str(raw_spec.get("title") or scenario_id).strip() or scenario_id,
"description": str(raw_spec.get("description") or "").strip() or None,
"source_export": str(raw_spec.get("source_export") or "").strip() or None,
"bindings": dcl.normalize_bindings(raw_spec.get("bindings")),
"steps": [normalize_step_spec(index + 1, raw_step) for index, raw_step in enumerate(steps_raw)],
}
def build_runtime_bindings() -> dict[str, Any]:
today = date.today()
today_iso = today.isoformat()
today_dot = today.strftime("%d.%m.%Y")
return {
"today_iso": today_iso,
"today_dot": today_dot,
"today_iso_regex": re.escape(today_iso),
"today_dot_regex": re.escape(today_dot),
"generated_at_utc": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
}
def build_review_root(
bindings: dict[str, Any], runtime_bindings: dict[str, Any], step_results: dict[str, Any]
) -> dict[str, Any]:
root: dict[str, Any] = {
"bindings": bindings,
"runtime": runtime_bindings,
"step_results": step_results,
}
if isinstance(step_results, dict):
root.update(step_results)
return root
def lookup_review_value(
path_expression: str, step_results: dict[str, Any], bindings: dict[str, Any], runtime_bindings: dict[str, Any]
) -> Any:
current: Any = build_review_root(bindings, runtime_bindings, step_results)
for token in dcl.parse_path_tokens(path_expression):
if isinstance(token, int):
if not isinstance(current, list):
raise RuntimeError(f"Placeholder `{path_expression}` does not point to a list before index access")
if token >= len(current):
raise RuntimeError(f"Placeholder `{path_expression}` index {token} is out of range")
current = current[token]
continue
if not isinstance(current, dict) or token not in current:
raise RuntimeError(f"Placeholder `{path_expression}` could not be resolved at `{token}`")
current = current[token]
return current
def resolve_template_string(
template: str, step_results: dict[str, Any], bindings: dict[str, Any], runtime_bindings: dict[str, Any]
) -> str:
pattern = re.compile(r"{{\s*([^{}]+?)\s*}}")
def replace(match: re.Match[str]) -> str:
value = lookup_review_value(match.group(1), step_results, bindings, runtime_bindings)
if isinstance(value, (dict, list)):
return dump_json(value)
return str(value)
return pattern.sub(replace, template)
def resolve_nested_placeholders(
raw_value: Any, step_results: dict[str, Any], bindings: dict[str, Any], runtime_bindings: dict[str, Any]
) -> Any:
if isinstance(raw_value, str):
return resolve_template_string(raw_value, step_results, bindings, runtime_bindings)
if isinstance(raw_value, list):
return [resolve_nested_placeholders(item, step_results, bindings, runtime_bindings) for item in raw_value]
if isinstance(raw_value, dict):
return {
str(key): resolve_nested_placeholders(value, step_results, bindings, runtime_bindings)
for key, value in raw_value.items()
}
return raw_value
def build_generated_manifest(spec: dict[str, Any]) -> dict[str, Any]:
manifest_steps: list[dict[str, Any]] = []
previous_step_id: str | None = None
for step in spec["steps"]:
manifest_step: dict[str, Any] = {
"step_id": step["step_id"],
"title": step["title"],
"question": step["question_template"],
"depends_on": [previous_step_id] if previous_step_id else [],
"analysis_context": {},
}
for field_name in TECHNICAL_QUESTION_FIELDS:
manifest_step[field_name] = step.get(field_name)
manifest_step["semantic_tags"] = step.get("semantic_tags") or []
manifest_steps.append(manifest_step)
previous_step_id = step["step_id"]
return {
"schema_version": TRUTH_HARNESS_MANIFEST_SCHEMA_VERSION,
"scenario_id": spec["scenario_id"],
"domain": spec["domain"],
"title": spec["title"],
"description": spec.get("description"),
"analysis_context": {},
"bindings": spec.get("bindings") or {},
"steps": manifest_steps,
}
def build_runner_args(args: argparse.Namespace) -> SimpleNamespace:
return SimpleNamespace(
backend_url=args.backend_url,
prompt_version=args.prompt_version,
llm_provider=args.llm_provider,
llm_model=args.llm_model,
llm_base_url=args.llm_base_url,
llm_api_key=args.llm_api_key,
temperature=args.temperature,
max_output_tokens=args.max_output_tokens,
timeout_seconds=args.timeout_seconds,
use_mock=bool(args.use_mock),
)
def build_plain_assistant_message_payload(
args: argparse.Namespace, *, question: str, session_id: str | None
) -> dict[str, Any]:
return dcl.drop_none_values(
{
"session_id": session_id,
"user_message": question,
"message": question,
"mode": "assistant",
"llmProvider": args.llm_provider,
"apiKey": args.llm_api_key,
"model": args.llm_model,
"baseUrl": args.llm_base_url,
"temperature": args.temperature,
"maxOutputTokens": args.max_output_tokens,
"promptVersion": args.prompt_version,
"useMock": bool(args.use_mock),
}
)
def build_placeholder_step_result(step_state: dict[str, Any]) -> dict[str, Any]:
result = dict(step_state)
result["filters"] = dict(step_state.get("extracted_filters") or {})
return result
def append_finding(
findings: list[dict[str, Any]],
step: dict[str, Any],
code: str,
message: str,
*,
actual: Any = None,
expected: Any = None,
severity: str | None = None,
) -> None:
findings.append(
{
"code": code,
"severity": severity or step.get("criticality") or DEFAULT_CRITICALITY,
"message": message,
"actual": actual,
"expected": expected,
}
)
def matches_any_pattern(text: str, patterns: list[str]) -> bool:
return any(re.search(pattern, text, flags=re.IGNORECASE) for pattern in patterns if pattern)
def find_missing_patterns(text: str, patterns: list[str]) -> list[str]:
return [pattern for pattern in patterns if pattern and not re.search(pattern, text, flags=re.IGNORECASE)]
def normalize_actual_filter_value(filter_key: str, raw_value: Any) -> str:
if filter_key in {"as_of_date", "period_from", "period_to"}:
return dcl.normalize_iso_date(raw_value) or ""
return str(raw_value or "").strip()
def normalize_optional_bool(value: Any) -> bool | None:
if isinstance(value, bool):
return value
raw = str(value or "").strip().lower()
if raw in {"true", "1", "yes", "y"}:
return True
if raw in {"false", "0", "no", "n"}:
return False
return None
def evaluate_truth_step(
*,
step: dict[str, Any],
step_state: dict[str, Any],
step_results: dict[str, Any],
bindings: dict[str, Any],
runtime_bindings: dict[str, Any],
) -> dict[str, Any]:
findings: list[dict[str, Any]] = []
reply_type = str(step_state.get("reply_type") or "").strip()
assistant_text = str(step_state.get("assistant_text") or "")
direct_answer = str(step_state.get("actual_direct_answer") or "").strip()
detected_intent = str(step_state.get("detected_intent") or "").strip()
selected_recipe = str(step_state.get("selected_recipe") or "").strip()
capability_id = str(step_state.get("capability_id") or "").strip()
catalog_alignment_status = str(step_state.get("mcp_discovery_catalog_chain_alignment_status") or "").strip()
catalog_chain_top_match = str(step_state.get("mcp_discovery_catalog_chain_top_match") or "").strip()
limited_reason_category = str(step_state.get("limited_reason_category") or "").strip()
extracted_filters = (
step_state.get("extracted_filters") if isinstance(step_state.get("extracted_filters"), dict) else {}
)
if (
catalog_alignment_status in {"selected_lower_rank", "selected_outside_match_set"}
and not bool(step.get("allow_catalog_alignment_divergence"))
):
append_finding(
findings,
step,
"catalog_alignment_divergence",
"Planner selected chain diverges from the top reviewed catalog-chain match and needs semantic review.",
actual={
"alignment_status": catalog_alignment_status,
"top_match": step_state.get("mcp_discovery_catalog_chain_top_match"),
"selected_matches_top": step_state.get("mcp_discovery_catalog_chain_selected_matches_top"),
},
expected="selected_matches_top or explicit allow_catalog_alignment_divergence",
severity="warning",
)
expected_catalog_alignment_status = str(
resolve_nested_placeholders(
step.get("expected_catalog_alignment_status"),
step_results,
bindings,
runtime_bindings,
)
or ""
).strip()
if expected_catalog_alignment_status and catalog_alignment_status != expected_catalog_alignment_status:
append_finding(
findings,
step,
"wrong_catalog_alignment_status",
"Catalog-chain alignment status does not match the expected planner/catalog verdict for this step.",
actual=catalog_alignment_status or None,
expected=expected_catalog_alignment_status,
)
expected_catalog_chain_top_match = str(
resolve_nested_placeholders(
step.get("expected_catalog_chain_top_match"),
step_results,
bindings,
runtime_bindings,
)
or ""
).strip()
if expected_catalog_chain_top_match and catalog_chain_top_match != expected_catalog_chain_top_match:
append_finding(
findings,
step,
"wrong_catalog_chain_top_match",
"Top reviewed catalog-chain match does not match the expected chain for this step.",
actual=catalog_chain_top_match or None,
expected=expected_catalog_chain_top_match,
)
expected_catalog_selected_matches_top = normalize_optional_bool(
resolve_nested_placeholders(
step.get("expected_catalog_selected_matches_top"),
step_results,
bindings,
runtime_bindings,
)
)
if expected_catalog_selected_matches_top is not None:
actual_catalog_selected_matches_top = step_state.get("mcp_discovery_catalog_chain_selected_matches_top") is True
if actual_catalog_selected_matches_top != expected_catalog_selected_matches_top:
append_finding(
findings,
step,
"wrong_catalog_selected_matches_top",
"Selected chain top-match flag does not match the expected planner/catalog verdict for this step.",
actual=actual_catalog_selected_matches_top,
expected=expected_catalog_selected_matches_top,
)
if step_state.get("question_resolved") != step["question_template"]:
append_finding(
findings,
step,
"question_sequence_mismatch",
"В live/export прогоне вопрос отличается от зафиксированного exact-сценария.",
actual=step_state.get("question_resolved"),
expected=step["question_template"],
)
if step["allowed_reply_types"] and reply_type not in step["allowed_reply_types"]:
append_finding(
findings,
step,
"unexpected_reply_type",
"Тип ответа не соответствует ожидаемому режиму шага.",
actual=reply_type,
expected=step["allowed_reply_types"],
)
allowed_limited_reason_categories = step.get("allowed_limited_reason_categories") or []
if allowed_limited_reason_categories and limited_reason_category:
if limited_reason_category not in allowed_limited_reason_categories:
append_finding(
findings,
step,
"unexpected_limited_reason_category",
"Категория честного ограничения не соответствует допустимым сценарным условиям.",
actual=limited_reason_category,
expected=allowed_limited_reason_categories,
)
expected_intents = dcl.normalize_string_list(
resolve_nested_placeholders(step.get("expected_intents") or [], step_results, bindings, runtime_bindings)
)
if expected_intents and not dcl.identifier_in_list(detected_intent, expected_intents):
append_finding(
findings,
step,
"wrong_intent",
"Интент не соответствует ожидаемому бизнес-смыслу шага.",
actual=detected_intent or None,
expected=expected_intents,
)
expected_capability = str(
resolve_nested_placeholders(step.get("expected_capability"), step_results, bindings, runtime_bindings) or ""
).strip()
if expected_capability and not dcl.identifiers_match(capability_id, expected_capability):
append_finding(
findings,
step,
"wrong_capability",
"Выбрана не та capability, которую должен был отработать этот шаг.",
actual=capability_id or None,
expected=expected_capability,
)
expected_recipe = str(
resolve_nested_placeholders(step.get("expected_recipe"), step_results, bindings, runtime_bindings) or ""
).strip()
if expected_recipe and not dcl.identifiers_match(selected_recipe, expected_recipe):
append_finding(
findings,
step,
"wrong_recipe",
"Выбран не тот рецепт адресного контура.",
actual=selected_recipe or None,
expected=expected_recipe,
)
expected_result_mode = str(
resolve_nested_placeholders(step.get("expected_result_mode"), step_results, bindings, runtime_bindings) or ""
).strip()
actual_result_mode = str(step_state.get("result_mode") or "").strip()
if expected_result_mode and actual_result_mode and not dcl.identifiers_match(actual_result_mode, expected_result_mode):
append_finding(
findings,
step,
"wrong_result_mode",
"Режим результата не соответствует ожидаемой форме ответа.",
actual=actual_result_mode,
expected=expected_result_mode,
)
required_filters = dcl.normalize_validation_filters(
resolve_nested_placeholders(step.get("required_filters") or {}, step_results, bindings, runtime_bindings)
)
for filter_key, expected_value in required_filters.items():
actual_value = normalize_actual_filter_value(filter_key, extracted_filters.get(filter_key))
if not actual_value:
append_finding(
findings,
step,
f"missing_filter:{filter_key}",
f"В техчате отсутствует обязательный фильтр `{filter_key}`.",
actual=actual_value or None,
expected=expected_value,
)
continue
if actual_value != expected_value:
append_finding(
findings,
step,
f"wrong_filter:{filter_key}",
f"Фильтр `{filter_key}` не совпадает с ожидаемым carryover/period состоянием.",
actual=actual_value,
expected=expected_value,
)
forbidden_filter_keys = dcl.normalize_string_list(
resolve_nested_placeholders(step.get("forbidden_filter_keys") or [], step_results, bindings, runtime_bindings)
)
for filter_key in forbidden_filter_keys:
actual_value = normalize_actual_filter_value(filter_key, extracted_filters.get(filter_key))
if actual_value:
append_finding(
findings,
step,
f"forbidden_filter_key:{filter_key}",
f"Шаг не должен был тащить фильтр `{filter_key}` в техчат.",
actual=actual_value,
expected="absent",
)
forbidden_filter_values = normalize_filter_values_mapping(
resolve_nested_placeholders(step.get("forbidden_filter_values") or {}, step_results, bindings, runtime_bindings)
)
for filter_key, forbidden_values in forbidden_filter_values.items():
actual_value = normalize_actual_filter_value(filter_key, extracted_filters.get(filter_key))
if actual_value and actual_value in forbidden_values:
append_finding(
findings,
step,
f"forbidden_filter_value:{filter_key}",
f"Шаг унаследовал запрещенное значение фильтра `{filter_key}`.",
actual=actual_value,
expected={"not_in": forbidden_values},
)
within_previous_period = step.get("required_filter_within_previous_step_period") or {}
for filter_key, step_ref in within_previous_period.items():
resolved_step_ref = str(
resolve_nested_placeholders(step_ref, step_results, bindings, runtime_bindings) or ""
).strip()
previous_step = step_results.get(resolved_step_ref)
actual_value = dcl.normalize_iso_date(extracted_filters.get(filter_key))
if not isinstance(previous_step, dict):
append_finding(
findings,
step,
f"previous_step_missing:{filter_key}",
f"Не удалось разрешить предыдущий шаг `{resolved_step_ref}` для проверки диапазона фильтра `{filter_key}`.",
actual=actual_value,
expected=resolved_step_ref,
)
continue
previous_filters = previous_step.get("filters") if isinstance(previous_step.get("filters"), dict) else {}
previous_from = dcl.normalize_iso_date(previous_filters.get("period_from"))
previous_to = dcl.normalize_iso_date(previous_filters.get("period_to"))
if not actual_value or not previous_from or not previous_to:
append_finding(
findings,
step,
f"period_carryover_missing:{filter_key}",
f"Не удалось подтвердить, что `{filter_key}` попал в диапазон предыдущего шага `{resolved_step_ref}`.",
actual=actual_value,
expected={"period_from": previous_from, "period_to": previous_to},
)
continue
if not (previous_from <= actual_value <= previous_to):
append_finding(
findings,
step,
f"period_carryover_outside:{filter_key}",
f"Фильтр `{filter_key}` вышел за диапазон предыдущего релевантного периода.",
actual=actual_value,
expected={"period_from": previous_from, "period_to": previous_to},
)
required_answer_patterns_any = normalize_pattern_list(
resolve_nested_placeholders(step.get("required_answer_patterns_any") or [], step_results, bindings, runtime_bindings)
)
if required_answer_patterns_any and not matches_any_pattern(assistant_text, required_answer_patterns_any):
append_finding(
findings,
step,
"required_answer_patterns_any_missing",
"Полный ответ модели не содержит ни одного из обязательных смысловых паттернов.",
actual=assistant_text,
expected=required_answer_patterns_any,
)
required_answer_patterns_all = normalize_pattern_list(
resolve_nested_placeholders(step.get("required_answer_patterns_all") or [], step_results, bindings, runtime_bindings)
)
missing_answer_patterns = find_missing_patterns(assistant_text, required_answer_patterns_all)
if missing_answer_patterns:
append_finding(
findings,
step,
"required_answer_patterns_all_missing",
"Полный ответ модели не закрыл обязательные паттерны шага.",
actual=assistant_text,
expected=missing_answer_patterns,
)
forbidden_answer_patterns = normalize_pattern_list(
resolve_nested_placeholders(step.get("forbidden_answer_patterns") or [], step_results, bindings, runtime_bindings)
)
forbidden_answer_hits = [pattern for pattern in forbidden_answer_patterns if re.search(pattern, assistant_text, flags=re.IGNORECASE)]
if forbidden_answer_hits:
append_finding(
findings,
step,
"forbidden_answer_pattern_hit",
"В полном ответе модели найден запрещенный паттерн.",
actual=forbidden_answer_hits,
expected="absent",
)
required_direct_any = normalize_pattern_list(
resolve_nested_placeholders(
step.get("required_direct_answer_patterns_any") or [],
step_results,
bindings,
runtime_bindings,
)
)
if required_direct_any and not matches_any_pattern(direct_answer, required_direct_any):
append_finding(
findings,
step,
"required_direct_answer_patterns_any_missing",
"Первая строка ответа не содержит ни одного из обязательных direct-answer паттернов.",
actual=direct_answer,
expected=required_direct_any,
)
required_direct_all = normalize_pattern_list(
resolve_nested_placeholders(
step.get("required_direct_answer_patterns_all") or [],
step_results,
bindings,
runtime_bindings,
)
)
missing_direct_patterns = find_missing_patterns(direct_answer, required_direct_all)
if missing_direct_patterns:
append_finding(
findings,
step,
"required_direct_answer_patterns_all_missing",
"Первая строка ответа не закрыла обязательные direct-answer паттерны.",
actual=direct_answer,
expected=missing_direct_patterns,
)
forbidden_direct_patterns = normalize_pattern_list(
resolve_nested_placeholders(
step.get("forbidden_direct_answer_patterns") or [],
step_results,
bindings,
runtime_bindings,
)
)
forbidden_direct_hits = [pattern for pattern in forbidden_direct_patterns if re.search(pattern, direct_answer, flags=re.IGNORECASE)]
if forbidden_direct_hits:
append_finding(
findings,
step,
"forbidden_direct_answer_pattern_hit",
"Первая строка ответа попала в запрещенный direct-answer паттерн.",
actual=forbidden_direct_hits,
expected="absent",
)
critical_findings = [item for item in findings if item.get("severity") == "critical"]
warning_findings = [item for item in findings if item.get("severity") == "warning"]
info_findings = [item for item in findings if item.get("severity") == "info"]
review_status = "pass"
if critical_findings:
review_status = "fail"
elif warning_findings:
review_status = "warning"
elif info_findings:
review_status = "info"
reviewed_state = dict(step_state)
reviewed_state["review_findings"] = findings
reviewed_state["review_status"] = review_status
reviewed_state["critical_findings_count"] = len(critical_findings)
reviewed_state["warning_findings_count"] = len(warning_findings)
reviewed_state["info_findings_count"] = len(info_findings)
reviewed_state["truth_harness_notes"] = step.get("notes")
return reviewed_state
def build_truth_review_summary(spec: dict[str, Any], scenario_state: dict[str, Any], review_source: str) -> dict[str, Any]:
step_outputs = scenario_state.get("step_outputs") if isinstance(scenario_state.get("step_outputs"), dict) else {}
review_steps = [step_outputs.get(step["step_id"], {}) for step in spec["steps"]]
passed = sum(1 for item in review_steps if str(item.get("review_status") or "") == "pass")
warnings = sum(1 for item in review_steps if str(item.get("review_status") or "") == "warning")
failed = sum(1 for item in review_steps if str(item.get("review_status") or "") == "fail")
return {
"schema_version": TRUTH_HARNESS_REVIEW_SCHEMA_VERSION,
"review_source": review_source,
"scenario_id": spec["scenario_id"],
"domain": spec["domain"],
"title": spec["title"],
"session_id": scenario_state.get("session_id"),
"steps_total": len(spec["steps"]),
"steps_passed": passed,
"steps_with_warning": warnings,
"steps_failed": failed,
"overall_status": "fail" if failed else ("warning" if warnings else "pass"),
}
def build_truth_review_markdown(spec: dict[str, Any], scenario_state: dict[str, Any], review_summary: dict[str, Any]) -> str:
lines = [
"# Truth harness review",
"",
f"- scenario_id: `{spec['scenario_id']}`",
f"- domain: `{spec['domain']}`",
f"- title: {spec['title']}",
f"- review_source: `{review_summary.get('review_source') or 'n/a'}`",
f"- session_id: `{scenario_state.get('session_id') or 'n/a'}`",
f"- overall_status: `{review_summary.get('overall_status') or 'n/a'}`",
f"- steps_total: `{review_summary.get('steps_total')}`",
f"- steps_passed: `{review_summary.get('steps_passed')}`",
f"- steps_with_warning: `{review_summary.get('steps_with_warning')}`",
f"- steps_failed: `{review_summary.get('steps_failed')}`",
"",
"## Steps",
]
step_outputs = scenario_state.get("step_outputs") if isinstance(scenario_state.get("step_outputs"), dict) else {}
for index, step in enumerate(spec["steps"], start=1):
step_state = step_outputs.get(step["step_id"], {})
findings = step_state.get("review_findings") if isinstance(step_state.get("review_findings"), list) else []
lines.extend(
[
f"{index}. `{step['step_id']}` - {step['question_template']}",
f"review_status: `{step_state.get('review_status') or 'n/a'}`",
f"reply_type: `{step_state.get('reply_type') or 'n/a'}`",
f"intent: `{step_state.get('detected_intent') or 'n/a'}`",
f"recipe: `{step_state.get('selected_recipe') or 'n/a'}`",
f"capability: `{step_state.get('capability_id') or 'n/a'}`",
f"catalog_alignment_status: `{step_state.get('mcp_discovery_catalog_chain_alignment_status') or 'n/a'}`",
f"catalog_top_match: `{step_state.get('mcp_discovery_catalog_chain_top_match') or 'n/a'}`",
f"catalog_selected_matches_top: `{step_state.get('mcp_discovery_catalog_chain_selected_matches_top')}`",
f"limited_reason_category: `{step_state.get('limited_reason_category') or 'n/a'}`",
f"filters: `{dump_json(step_state.get('extracted_filters') or {})}`",
f"direct_answer: {step_state.get('actual_direct_answer') or 'n/a'}",
]
)
if step.get("notes"):
lines.append(f"notes: {step['notes']}")
if findings:
lines.append("findings:")
for finding in findings:
lines.append(
f"- [{finding.get('severity')}] {finding.get('code')}: {finding.get('message')} "
f"(actual={dump_json(finding.get('actual'))}, expected={dump_json(finding.get('expected'))})"
)
else:
lines.append("findings: none")
lines.append("")
return "\n".join(lines).strip() + "\n"
def write_acceptance_artifacts(
output_dir: Path,
spec: dict[str, Any],
scenario_state: dict[str, Any],
review_summary: dict[str, Any],
) -> dict[str, Any]:
acceptance_matrix = sap.build_scenario_acceptance_matrix(spec, scenario_state, review_summary)
pack_state = sap.derive_truth_harness_pack_state(spec, scenario_state, review_summary, acceptance_matrix)
write_json(output_dir / "scenario_acceptance_matrix.json", acceptance_matrix)
write_text(output_dir / "scenario_acceptance_matrix.md", sap.build_scenario_acceptance_matrix_markdown(acceptance_matrix))
write_json(output_dir / "pack_state.json", pack_state)
write_text(output_dir / "final_status.md", sap.build_truth_harness_final_status_markdown(pack_state))
return {"acceptance_matrix": acceptance_matrix, "pack_state": pack_state}
def save_step_bundle(
*,
step_dir: Path,
export_markdown: str,
turn_artifact: dict[str, Any],
session_record: dict[str, Any] | None,
response_payload: dict[str, Any] | None,
step_state: dict[str, Any],
) -> None:
write_text(step_dir / "output.md", export_markdown)
write_json(step_dir / "debug.json", step_state.get("technical_debug_payload") or {})
write_json(step_dir / "turn.json", turn_artifact)
write_json(step_dir / "assistant_response.json", response_payload or {})
write_json(step_dir / "step_state.json", step_state)
if session_record is not None:
write_json(step_dir / "session.json", session_record)
write_text(step_dir / "resolved_question.txt", f"{step_state.get('question_resolved') or ''}\n")
def build_step_state_from_turn(
*,
spec: dict[str, Any],
step: dict[str, Any],
step_index: int,
question: str,
conversation: list[dict[str, Any]],
session_record: dict[str, Any] | None,
export_markdown: str,
) -> tuple[dict[str, Any], dict[str, Any]]:
mini_conversation = conversation[-2:] if len(conversation) >= 2 else conversation
turn_artifact = dcl.build_turn_artifact(
slot="step",
domain=spec["domain"],
case_id=spec["scenario_id"],
question=question,
session_id=str(spec.get("session_id") or "n/a"),
conversation=mini_conversation,
session_record=session_record,
job_record=None,
report_case=None,
export_file_name="output.md",
)
turn_artifact["schema_version"] = "domain_truth_harness_turn_artifact_v1"
turn_artifact["scenario"] = {
"scenario_id": spec["scenario_id"],
"step_id": step["step_id"],
"step_index": step_index,
"question_template": step["question_template"],
"question_resolved": question,
"review_mode": "truth_harness_strict_replay",
}
last_assistant = dcl.find_last_assistant(mini_conversation)
entries = dcl.extract_structured_entries(str(last_assistant.get("text") or ""))
base_step_state = dcl.build_scenario_step_state(
scenario_id=spec["scenario_id"],
domain=spec["domain"],
step=step,
step_index=step_index,
question_resolved=question,
analysis_context={},
turn_artifact=turn_artifact,
entries=entries,
)
base_step_state["export_markdown"] = export_markdown
base_step_state["technical_debug_payload"] = (
last_assistant.get("debug") if isinstance(last_assistant.get("debug"), dict) else {}
)
return base_step_state, turn_artifact
def build_conversation_pairs(conversation: list[dict[str, Any]]) -> list[tuple[dict[str, Any], dict[str, Any] | None]]:
pairs: list[tuple[dict[str, Any], dict[str, Any] | None]] = []
for index, item in enumerate(conversation):
if item.get("role") != "user":
continue
assistant_item: dict[str, Any] | None = None
if index + 1 < len(conversation) and conversation[index + 1].get("role") == "assistant":
assistant_item = conversation[index + 1]
pairs.append((item, assistant_item))
return pairs
def review_export(spec: dict[str, Any], export_path: Path, output_dir: Path) -> dict[str, Any]:
export_text = export_path.read_text(encoding="utf-8-sig")
session_id, conversation = dcl.parse_export_markdown(export_text)
pairs = build_conversation_pairs(conversation)
if len(pairs) != len(spec["steps"]):
raise RuntimeError(
f"Spec steps ({len(spec['steps'])}) and export user turns ({len(pairs)}) do not match for {export_path}"
)
output_dir.mkdir(parents=True, exist_ok=True)
write_json(output_dir / "truth_harness_spec.json", spec)
write_text(output_dir / "source_export.md", export_text)
step_results: dict[str, Any] = {}
runtime_bindings = build_runtime_bindings()
scenario_state: dict[str, Any] = {
"schema_version": TRUTH_HARNESS_STATE_SCHEMA_VERSION,
"scenario_id": spec["scenario_id"],
"domain": spec["domain"],
"title": spec["title"],
"session_id": session_id,
"review_source": str(export_path),
"step_outputs": {},
"updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
}
steps_dir = output_dir / "steps"
steps_dir.mkdir(parents=True, exist_ok=True)
for index, (step, pair) in enumerate(zip(spec["steps"], pairs, strict=True), start=1):
user_item, assistant_item = pair
if assistant_item is None:
raise RuntimeError(f"Export pair for step `{step['step_id']}` is missing assistant message")
mini_conversation = [user_item, assistant_item]
export_markdown = dcl.build_conversation_export(session_id, mini_conversation, mode="technical")
base_step_state, turn_artifact = build_step_state_from_turn(
spec={**spec, "session_id": session_id},
step=step,
step_index=index,
question=str(user_item.get("text") or ""),
conversation=mini_conversation,
session_record=None,
export_markdown=export_markdown,
)
reviewed_step = evaluate_truth_step(
step=step,
step_state=base_step_state,
step_results=step_results,
bindings=spec.get("bindings") or {},
runtime_bindings=runtime_bindings,
)
scenario_state["step_outputs"][step["step_id"]] = reviewed_step
step_results[step["step_id"]] = build_placeholder_step_result(reviewed_step)
save_step_bundle(
step_dir=steps_dir / step["step_id"],
export_markdown=export_markdown,
turn_artifact=turn_artifact,
session_record=None,
response_payload=None,
step_state=reviewed_step,
)
scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
review_summary = build_truth_review_summary(spec, scenario_state, f"export:{export_path}")
review_markdown = build_truth_review_markdown(spec, scenario_state, review_summary)
write_json(output_dir / "scenario_state.json", scenario_state)
write_json(output_dir / "truth_review.json", {"summary": review_summary, "steps": scenario_state["step_outputs"]})
write_text(output_dir / "truth_review.md", review_markdown)
acceptance_bundle = write_acceptance_artifacts(output_dir, spec, scenario_state, review_summary)
return {
"scenario_state": scenario_state,
"review_summary": review_summary,
"acceptance_matrix": acceptance_bundle["acceptance_matrix"],
"pack_state": acceptance_bundle["pack_state"],
}
def run_live(spec: dict[str, Any], output_dir: Path, args: argparse.Namespace) -> dict[str, Any]:
runner_args = build_runner_args(args)
dcl.ensure_backend_health(runner_args.backend_url, runner_args.timeout_seconds)
output_dir.mkdir(parents=True, exist_ok=True)
manifest = build_generated_manifest(spec)
write_json(output_dir / "truth_harness_spec.json", spec)
write_json(output_dir / "scenario_manifest.json", manifest)
steps_dir = output_dir / "steps"
steps_dir.mkdir(parents=True, exist_ok=True)
runtime_bindings = build_runtime_bindings()
step_results: dict[str, Any] = {}
scenario_state: dict[str, Any] = {
"schema_version": TRUTH_HARNESS_STATE_SCHEMA_VERSION,
"scenario_id": spec["scenario_id"],
"domain": spec["domain"],
"title": spec["title"],
"session_id": None,
"review_source": "live_strict_replay",
"step_outputs": {},
"updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
}
write_json(output_dir / "scenario_state.json", scenario_state)
current_session_id: str | None = None
for index, step in enumerate(spec["steps"], start=1):
question = step["question_template"]
payload = build_plain_assistant_message_payload(runner_args, question=question, session_id=current_session_id)
response_payload = dcl.http_json(
f"{runner_args.backend_url}/api/assistant/message",
method="POST",
payload=payload,
timeout=max(30, int(runner_args.timeout_seconds)),
)
current_session_id = str(response_payload.get("session_id") or current_session_id or "").strip() or None
if not current_session_id:
raise RuntimeError(f"Assistant response for step `{step['step_id']}` does not contain session_id")
session_record = dcl.fetch_session_snapshot(
runner_args.backend_url, current_session_id, runner_args.timeout_seconds
)
conversation = dcl.extract_conversation_from_session(session_record)
export_markdown = dcl.build_conversation_export(current_session_id, conversation, mode="technical")
base_step_state, turn_artifact = build_step_state_from_turn(
spec={**spec, "session_id": current_session_id},
step=step,
step_index=index,
question=question,
conversation=conversation,
session_record=session_record,
export_markdown=export_markdown,
)
reviewed_step = evaluate_truth_step(
step=step,
step_state=base_step_state,
step_results=step_results,
bindings=spec.get("bindings") or {},
runtime_bindings=runtime_bindings,
)
scenario_state["session_id"] = current_session_id
scenario_state["step_outputs"][step["step_id"]] = reviewed_step
step_results[step["step_id"]] = build_placeholder_step_result(reviewed_step)
scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
save_step_bundle(
step_dir=steps_dir / step["step_id"],
export_markdown=export_markdown,
turn_artifact=turn_artifact,
session_record=session_record,
response_payload=response_payload,
step_state=reviewed_step,
)
write_json(output_dir / "scenario_state.json", scenario_state)
print(
f"[truth-harness] step {index}/{len(spec['steps'])}: {step['step_id']} -> "
f"{reviewed_step.get('review_status')} ({reviewed_step.get('reply_type') or 'n/a'})"
)
review_summary = build_truth_review_summary(spec, scenario_state, "live_strict_replay")
review_markdown = build_truth_review_markdown(spec, scenario_state, review_summary)
write_text(output_dir / "session_id.txt", f"{scenario_state.get('session_id') or ''}\n")
write_json(output_dir / "scenario_state.json", scenario_state)
write_json(output_dir / "truth_review.json", {"summary": review_summary, "steps": scenario_state["step_outputs"]})
write_text(output_dir / "truth_review.md", review_markdown)
acceptance_bundle = write_acceptance_artifacts(output_dir, spec, scenario_state, review_summary)
print(f"[truth-harness] saved artifacts to {output_dir}")
print(f"[truth-harness] overall_status={review_summary['overall_status']}")
return {
"scenario_state": scenario_state,
"review_summary": review_summary,
"acceptance_matrix": acceptance_bundle["acceptance_matrix"],
"pack_state": acceptance_bundle["pack_state"],
}
def build_bootstrap_spec(export_path: Path, scenario_id: str, domain: str, title: str | None) -> dict[str, Any]:
export_text = export_path.read_text(encoding="utf-8-sig")
_, conversation = dcl.parse_export_markdown(export_text)
pairs = build_conversation_pairs(conversation)
steps = []
for index, (user_item, _assistant_item) in enumerate(pairs, start=1):
steps.append(
{
"step_id": f"step_{index:02d}",
"title": f"Step {index:02d}",
"question": str(user_item.get("text") or "").strip(),
"criticality": DEFAULT_CRITICALITY,
}
)
return {
"schema_version": TRUTH_HARNESS_SPEC_SCHEMA_VERSION,
"scenario_id": scenario_id,
"domain": domain,
"title": title or scenario_id,
"description": f"Exact sequential replay spec bootstrapped from {export_path}",
"source_export": str(export_path),
"bindings": {},
"steps": steps,
}
def default_output_dir(base_name: str) -> Path:
stamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
return DEFAULT_OUTPUT_ROOT / f"{base_name}_{stamp}"
def handle_bootstrap(args: argparse.Namespace) -> int:
export_path = Path(args.export).resolve()
output_path = Path(args.output).resolve()
spec = build_bootstrap_spec(
export_path=export_path,
scenario_id=args.scenario_id,
domain=args.domain,
title=args.title,
)
write_json(output_path, spec)
print(f"[truth-harness] wrote bootstrap spec to {output_path}")
return 0
def handle_review_export(args: argparse.Namespace) -> int:
spec_path = Path(args.spec).resolve()
spec = load_truth_harness_spec(spec_path)
export_path = Path(args.export or spec.get("source_export") or "").resolve()
if not export_path.exists():
raise RuntimeError("Export file for review-export does not exist")
output_dir = Path(args.output_dir).resolve() if args.output_dir else default_output_dir(
f"{spec['scenario_id']}_review"
)
result = review_export(spec, export_path, output_dir)
print(f"[truth-harness] review-export overall_status={result['review_summary']['overall_status']}")
print(f"[truth-harness] review-export final_status={result['pack_state']['final_status']}")
print(f"[truth-harness] artifacts={output_dir}")
return 0
def handle_run_live(args: argparse.Namespace) -> int:
spec_path = Path(args.spec).resolve()
spec = load_truth_harness_spec(spec_path)
output_dir = Path(args.output_dir).resolve() if args.output_dir else default_output_dir(
f"{spec['scenario_id']}_live"
)
result = run_live(spec, output_dir, args)
print(f"[truth-harness] run-live overall_status={result['review_summary']['overall_status']}")
print(f"[truth-harness] run-live final_status={result['pack_state']['final_status']}")
print(f"[truth-harness] artifacts={output_dir}")
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Strict sequential truth harness for real assistant dialogue scenarios")
subparsers = parser.add_subparsers(dest="command", required=True)
bootstrap = subparsers.add_parser(
"bootstrap", help="Create an exact-question truth harness spec from a technical export"
)
bootstrap.add_argument("--export", required=True)
bootstrap.add_argument("--output", required=True)
bootstrap.add_argument("--scenario-id", required=True)
bootstrap.add_argument("--domain", required=True)
bootstrap.add_argument("--title")
bootstrap.set_defaults(func=handle_bootstrap)
review_export_cmd = subparsers.add_parser(
"review-export", help="Review an existing technical export against a truth harness spec"
)
review_export_cmd.add_argument("--spec", required=True)
review_export_cmd.add_argument("--export")
review_export_cmd.add_argument("--output-dir")
review_export_cmd.set_defaults(func=handle_review_export)
run_live_cmd = subparsers.add_parser(
"run-live",
help="Run the exact scenario live, strictly sequentially, with no injected carryover context",
)
run_live_cmd.add_argument("--spec", required=True)
run_live_cmd.add_argument("--output-dir")
run_live_cmd.add_argument("--backend-url", default=dcl.DEFAULT_BACKEND_URL)
run_live_cmd.add_argument("--prompt-version", default=dcl.DEFAULT_PROMPT_VERSION)
run_live_cmd.add_argument("--llm-provider", default=dcl.DEFAULT_LLM_PROVIDER)
run_live_cmd.add_argument("--llm-model", default=dcl.DEFAULT_LLM_MODEL)
run_live_cmd.add_argument("--llm-base-url", default=dcl.DEFAULT_LLM_BASE_URL)
run_live_cmd.add_argument("--llm-api-key", default=dcl.DEFAULT_LLM_API_KEY)
run_live_cmd.add_argument("--temperature", type=float, default=dcl.DEFAULT_TEMPERATURE)
run_live_cmd.add_argument("--max-output-tokens", type=int, default=dcl.DEFAULT_MAX_OUTPUT_TOKENS)
run_live_cmd.add_argument("--timeout-seconds", type=int, default=120)
run_live_cmd.add_argument("--use-mock", action="store_true")
run_live_cmd.set_defaults(func=handle_run_live)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
return int(args.func(args))
if __name__ == "__main__":
raise SystemExit(main())