from __future__ import annotations import argparse import json import re from datetime import date, datetime, timezone from pathlib import Path from types import SimpleNamespace from typing import Any import domain_case_loop as dcl REPO_ROOT = Path(__file__).resolve().parent.parent DEFAULT_OUTPUT_ROOT = REPO_ROOT / "artifacts" / "domain_runs" TRUTH_HARNESS_SPEC_SCHEMA_VERSION = "domain_truth_harness_spec_v1" TRUTH_HARNESS_STATE_SCHEMA_VERSION = "domain_truth_harness_state_v1" TRUTH_HARNESS_REVIEW_SCHEMA_VERSION = "domain_truth_harness_review_v1" TRUTH_HARNESS_MANIFEST_SCHEMA_VERSION = "domain_truth_harness_manifest_v1" DEFAULT_CRITICALITY = "critical" TECHNICAL_QUESTION_FIELDS = ( "expected_intents", "expected_capability", "expected_recipe", "expected_result_mode", "required_filters", "forbidden_capabilities", "forbidden_recipes", "required_state_objects", "required_answer_shape", "forbidden_answer_patterns", "required_carryover_invariants", "invariant_severity", ) def dump_json(payload: Any) -> str: return json.dumps(payload, ensure_ascii=False, indent=2) def write_text(file_path: Path, text: str) -> None: file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(text, encoding="utf-8", newline="\n") def write_json(file_path: Path, payload: Any) -> None: write_text(file_path, dump_json(payload) + "\n") def read_json(file_path: Path) -> dict[str, Any]: payload = json.loads(file_path.read_text(encoding="utf-8-sig")) if not isinstance(payload, dict): raise RuntimeError(f"Expected JSON object in {file_path}") return payload def normalize_criticality(value: Any) -> str: raw = str(value or "").strip().lower() if raw in {"critical", "warning", "info"}: return raw return DEFAULT_CRITICALITY def normalize_pattern_list(raw_value: Any) -> list[str]: return [item for item in dcl.normalize_string_list(raw_value) if item] def normalize_filter_values_mapping(raw_value: Any) -> dict[str, list[str]]: if not isinstance(raw_value, dict): return {} output: dict[str, list[str]] = {} for key, value in raw_value.items(): normalized_key = str(key or "").strip() if not normalized_key: continue normalized_values = normalize_pattern_list(value) if normalized_values: output[normalized_key] = normalized_values return output def normalize_step_spec(index: int, raw_step: Any) -> dict[str, Any]: normalized_step = dcl.normalize_step_definition(index, raw_step) step = raw_step if isinstance(raw_step, dict) else {} normalized_step["criticality"] = normalize_criticality(step.get("criticality")) normalized_step["allowed_reply_types"] = normalize_pattern_list(step.get("allowed_reply_types")) normalized_step["allowed_limited_reason_categories"] = normalize_pattern_list( step.get("allowed_limited_reason_categories") ) normalized_step["required_answer_patterns_any"] = normalize_pattern_list(step.get("required_answer_patterns_any")) normalized_step["required_answer_patterns_all"] = normalize_pattern_list(step.get("required_answer_patterns_all")) normalized_step["required_direct_answer_patterns_any"] = normalize_pattern_list( step.get("required_direct_answer_patterns_any") ) normalized_step["required_direct_answer_patterns_all"] = normalize_pattern_list( step.get("required_direct_answer_patterns_all") ) normalized_step["forbidden_direct_answer_patterns"] = normalize_pattern_list( step.get("forbidden_direct_answer_patterns") ) normalized_step["forbidden_filter_keys"] = normalize_pattern_list(step.get("forbidden_filter_keys")) normalized_step["forbidden_filter_values"] = normalize_filter_values_mapping(step.get("forbidden_filter_values")) normalized_step["required_filter_within_previous_step_period"] = { str(key).strip(): str(value).strip() for key, value in (step.get("required_filter_within_previous_step_period") or {}).items() if str(key).strip() and str(value).strip() } if isinstance(step.get("required_filter_within_previous_step_period"), dict) else {} normalized_step["notes"] = str(step.get("notes") or "").strip() or None return normalized_step def load_truth_harness_spec(file_path: Path) -> dict[str, Any]: raw_spec = read_json(file_path) steps_raw = raw_spec.get("steps") if not isinstance(steps_raw, list) or not steps_raw: raise RuntimeError("Truth harness spec must define non-empty `steps`") scenario_id = str(raw_spec.get("scenario_id") or "").strip() domain = str(raw_spec.get("domain") or "").strip() if not scenario_id: raise RuntimeError("Truth harness spec must define `scenario_id`") if not domain: raise RuntimeError("Truth harness spec must define `domain`") return { "schema_version": str(raw_spec.get("schema_version") or TRUTH_HARNESS_SPEC_SCHEMA_VERSION), "scenario_id": scenario_id, "domain": domain, "title": str(raw_spec.get("title") or scenario_id).strip() or scenario_id, "description": str(raw_spec.get("description") or "").strip() or None, "source_export": str(raw_spec.get("source_export") or "").strip() or None, "bindings": dcl.normalize_bindings(raw_spec.get("bindings")), "steps": [normalize_step_spec(index + 1, raw_step) for index, raw_step in enumerate(steps_raw)], } def build_runtime_bindings() -> dict[str, Any]: today = date.today() today_iso = today.isoformat() today_dot = today.strftime("%d.%m.%Y") return { "today_iso": today_iso, "today_dot": today_dot, "today_iso_regex": re.escape(today_iso), "today_dot_regex": re.escape(today_dot), "generated_at_utc": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), } def build_review_root( bindings: dict[str, Any], runtime_bindings: dict[str, Any], step_results: dict[str, Any] ) -> dict[str, Any]: root: dict[str, Any] = { "bindings": bindings, "runtime": runtime_bindings, "step_results": step_results, } if isinstance(step_results, dict): root.update(step_results) return root def lookup_review_value( path_expression: str, step_results: dict[str, Any], bindings: dict[str, Any], runtime_bindings: dict[str, Any] ) -> Any: current: Any = build_review_root(bindings, runtime_bindings, step_results) for token in dcl.parse_path_tokens(path_expression): if isinstance(token, int): if not isinstance(current, list): raise RuntimeError(f"Placeholder `{path_expression}` does not point to a list before index access") if token >= len(current): raise RuntimeError(f"Placeholder `{path_expression}` index {token} is out of range") current = current[token] continue if not isinstance(current, dict) or token not in current: raise RuntimeError(f"Placeholder `{path_expression}` could not be resolved at `{token}`") current = current[token] return current def resolve_template_string( template: str, step_results: dict[str, Any], bindings: dict[str, Any], runtime_bindings: dict[str, Any] ) -> str: pattern = re.compile(r"{{\s*([^{}]+?)\s*}}") def replace(match: re.Match[str]) -> str: value = lookup_review_value(match.group(1), step_results, bindings, runtime_bindings) if isinstance(value, (dict, list)): return dump_json(value) return str(value) return pattern.sub(replace, template) def resolve_nested_placeholders( raw_value: Any, step_results: dict[str, Any], bindings: dict[str, Any], runtime_bindings: dict[str, Any] ) -> Any: if isinstance(raw_value, str): return resolve_template_string(raw_value, step_results, bindings, runtime_bindings) if isinstance(raw_value, list): return [resolve_nested_placeholders(item, step_results, bindings, runtime_bindings) for item in raw_value] if isinstance(raw_value, dict): return { str(key): resolve_nested_placeholders(value, step_results, bindings, runtime_bindings) for key, value in raw_value.items() } return raw_value def build_generated_manifest(spec: dict[str, Any]) -> dict[str, Any]: manifest_steps: list[dict[str, Any]] = [] previous_step_id: str | None = None for step in spec["steps"]: manifest_step: dict[str, Any] = { "step_id": step["step_id"], "title": step["title"], "question": step["question_template"], "depends_on": [previous_step_id] if previous_step_id else [], "analysis_context": {}, } for field_name in TECHNICAL_QUESTION_FIELDS: manifest_step[field_name] = step.get(field_name) manifest_steps.append(manifest_step) previous_step_id = step["step_id"] return { "schema_version": TRUTH_HARNESS_MANIFEST_SCHEMA_VERSION, "scenario_id": spec["scenario_id"], "domain": spec["domain"], "title": spec["title"], "description": spec.get("description"), "analysis_context": {}, "bindings": spec.get("bindings") or {}, "steps": manifest_steps, } def build_runner_args(args: argparse.Namespace) -> SimpleNamespace: return SimpleNamespace( backend_url=args.backend_url, prompt_version=args.prompt_version, llm_provider=args.llm_provider, llm_model=args.llm_model, llm_base_url=args.llm_base_url, llm_api_key=args.llm_api_key, temperature=args.temperature, max_output_tokens=args.max_output_tokens, timeout_seconds=args.timeout_seconds, use_mock=bool(args.use_mock), ) def build_plain_assistant_message_payload( args: argparse.Namespace, *, question: str, session_id: str | None ) -> dict[str, Any]: return dcl.drop_none_values( { "session_id": session_id, "user_message": question, "message": question, "mode": "assistant", "llmProvider": args.llm_provider, "apiKey": args.llm_api_key, "model": args.llm_model, "baseUrl": args.llm_base_url, "temperature": args.temperature, "maxOutputTokens": args.max_output_tokens, "promptVersion": args.prompt_version, "useMock": bool(args.use_mock), } ) def build_placeholder_step_result(step_state: dict[str, Any]) -> dict[str, Any]: result = dict(step_state) result["filters"] = dict(step_state.get("extracted_filters") or {}) return result def append_finding( findings: list[dict[str, Any]], step: dict[str, Any], code: str, message: str, *, actual: Any = None, expected: Any = None, ) -> None: findings.append( { "code": code, "severity": step.get("criticality") or DEFAULT_CRITICALITY, "message": message, "actual": actual, "expected": expected, } ) def matches_any_pattern(text: str, patterns: list[str]) -> bool: return any(re.search(pattern, text, flags=re.IGNORECASE) for pattern in patterns if pattern) def find_missing_patterns(text: str, patterns: list[str]) -> list[str]: return [pattern for pattern in patterns if pattern and not re.search(pattern, text, flags=re.IGNORECASE)] def normalize_actual_filter_value(filter_key: str, raw_value: Any) -> str: if filter_key in {"as_of_date", "period_from", "period_to"}: return dcl.normalize_iso_date(raw_value) or "" return str(raw_value or "").strip() def evaluate_truth_step( *, step: dict[str, Any], step_state: dict[str, Any], step_results: dict[str, Any], bindings: dict[str, Any], runtime_bindings: dict[str, Any], ) -> dict[str, Any]: findings: list[dict[str, Any]] = [] reply_type = str(step_state.get("reply_type") or "").strip() assistant_text = str(step_state.get("assistant_text") or "") direct_answer = str(step_state.get("actual_direct_answer") or "").strip() detected_intent = str(step_state.get("detected_intent") or "").strip() selected_recipe = str(step_state.get("selected_recipe") or "").strip() capability_id = str(step_state.get("capability_id") or "").strip() limited_reason_category = str(step_state.get("limited_reason_category") or "").strip() extracted_filters = ( step_state.get("extracted_filters") if isinstance(step_state.get("extracted_filters"), dict) else {} ) if step_state.get("question_resolved") != step["question_template"]: append_finding( findings, step, "question_sequence_mismatch", "В live/export прогоне вопрос отличается от зафиксированного exact-сценария.", actual=step_state.get("question_resolved"), expected=step["question_template"], ) if step["allowed_reply_types"] and reply_type not in step["allowed_reply_types"]: append_finding( findings, step, "unexpected_reply_type", "Тип ответа не соответствует ожидаемому режиму шага.", actual=reply_type, expected=step["allowed_reply_types"], ) allowed_limited_reason_categories = step.get("allowed_limited_reason_categories") or [] if allowed_limited_reason_categories and limited_reason_category: if limited_reason_category not in allowed_limited_reason_categories: append_finding( findings, step, "unexpected_limited_reason_category", "Категория честного ограничения не соответствует допустимым сценарным условиям.", actual=limited_reason_category, expected=allowed_limited_reason_categories, ) expected_intents = dcl.normalize_string_list( resolve_nested_placeholders(step.get("expected_intents") or [], step_results, bindings, runtime_bindings) ) if expected_intents and not dcl.identifier_in_list(detected_intent, expected_intents): append_finding( findings, step, "wrong_intent", "Интент не соответствует ожидаемому бизнес-смыслу шага.", actual=detected_intent or None, expected=expected_intents, ) expected_capability = str( resolve_nested_placeholders(step.get("expected_capability"), step_results, bindings, runtime_bindings) or "" ).strip() if expected_capability and not dcl.identifiers_match(capability_id, expected_capability): append_finding( findings, step, "wrong_capability", "Выбрана не та capability, которую должен был отработать этот шаг.", actual=capability_id or None, expected=expected_capability, ) expected_recipe = str( resolve_nested_placeholders(step.get("expected_recipe"), step_results, bindings, runtime_bindings) or "" ).strip() if expected_recipe and not dcl.identifiers_match(selected_recipe, expected_recipe): append_finding( findings, step, "wrong_recipe", "Выбран не тот рецепт адресного контура.", actual=selected_recipe or None, expected=expected_recipe, ) expected_result_mode = str( resolve_nested_placeholders(step.get("expected_result_mode"), step_results, bindings, runtime_bindings) or "" ).strip() actual_result_mode = str(step_state.get("result_mode") or "").strip() if expected_result_mode and actual_result_mode and not dcl.identifiers_match(actual_result_mode, expected_result_mode): append_finding( findings, step, "wrong_result_mode", "Режим результата не соответствует ожидаемой форме ответа.", actual=actual_result_mode, expected=expected_result_mode, ) required_filters = dcl.normalize_validation_filters( resolve_nested_placeholders(step.get("required_filters") or {}, step_results, bindings, runtime_bindings) ) for filter_key, expected_value in required_filters.items(): actual_value = normalize_actual_filter_value(filter_key, extracted_filters.get(filter_key)) if not actual_value: append_finding( findings, step, f"missing_filter:{filter_key}", f"В техчате отсутствует обязательный фильтр `{filter_key}`.", actual=actual_value or None, expected=expected_value, ) continue if actual_value != expected_value: append_finding( findings, step, f"wrong_filter:{filter_key}", f"Фильтр `{filter_key}` не совпадает с ожидаемым carryover/period состоянием.", actual=actual_value, expected=expected_value, ) forbidden_filter_keys = dcl.normalize_string_list( resolve_nested_placeholders(step.get("forbidden_filter_keys") or [], step_results, bindings, runtime_bindings) ) for filter_key in forbidden_filter_keys: actual_value = normalize_actual_filter_value(filter_key, extracted_filters.get(filter_key)) if actual_value: append_finding( findings, step, f"forbidden_filter_key:{filter_key}", f"Шаг не должен был тащить фильтр `{filter_key}` в техчат.", actual=actual_value, expected="absent", ) forbidden_filter_values = normalize_filter_values_mapping( resolve_nested_placeholders(step.get("forbidden_filter_values") or {}, step_results, bindings, runtime_bindings) ) for filter_key, forbidden_values in forbidden_filter_values.items(): actual_value = normalize_actual_filter_value(filter_key, extracted_filters.get(filter_key)) if actual_value and actual_value in forbidden_values: append_finding( findings, step, f"forbidden_filter_value:{filter_key}", f"Шаг унаследовал запрещенное значение фильтра `{filter_key}`.", actual=actual_value, expected={"not_in": forbidden_values}, ) within_previous_period = step.get("required_filter_within_previous_step_period") or {} for filter_key, step_ref in within_previous_period.items(): resolved_step_ref = str( resolve_nested_placeholders(step_ref, step_results, bindings, runtime_bindings) or "" ).strip() previous_step = step_results.get(resolved_step_ref) actual_value = dcl.normalize_iso_date(extracted_filters.get(filter_key)) if not isinstance(previous_step, dict): append_finding( findings, step, f"previous_step_missing:{filter_key}", f"Не удалось разрешить предыдущий шаг `{resolved_step_ref}` для проверки диапазона фильтра `{filter_key}`.", actual=actual_value, expected=resolved_step_ref, ) continue previous_filters = previous_step.get("filters") if isinstance(previous_step.get("filters"), dict) else {} previous_from = dcl.normalize_iso_date(previous_filters.get("period_from")) previous_to = dcl.normalize_iso_date(previous_filters.get("period_to")) if not actual_value or not previous_from or not previous_to: append_finding( findings, step, f"period_carryover_missing:{filter_key}", f"Не удалось подтвердить, что `{filter_key}` попал в диапазон предыдущего шага `{resolved_step_ref}`.", actual=actual_value, expected={"period_from": previous_from, "period_to": previous_to}, ) continue if not (previous_from <= actual_value <= previous_to): append_finding( findings, step, f"period_carryover_outside:{filter_key}", f"Фильтр `{filter_key}` вышел за диапазон предыдущего релевантного периода.", actual=actual_value, expected={"period_from": previous_from, "period_to": previous_to}, ) required_answer_patterns_any = normalize_pattern_list( resolve_nested_placeholders(step.get("required_answer_patterns_any") or [], step_results, bindings, runtime_bindings) ) if required_answer_patterns_any and not matches_any_pattern(assistant_text, required_answer_patterns_any): append_finding( findings, step, "required_answer_patterns_any_missing", "Полный ответ модели не содержит ни одного из обязательных смысловых паттернов.", actual=assistant_text, expected=required_answer_patterns_any, ) required_answer_patterns_all = normalize_pattern_list( resolve_nested_placeholders(step.get("required_answer_patterns_all") or [], step_results, bindings, runtime_bindings) ) missing_answer_patterns = find_missing_patterns(assistant_text, required_answer_patterns_all) if missing_answer_patterns: append_finding( findings, step, "required_answer_patterns_all_missing", "Полный ответ модели не закрыл обязательные паттерны шага.", actual=assistant_text, expected=missing_answer_patterns, ) forbidden_answer_patterns = normalize_pattern_list( resolve_nested_placeholders(step.get("forbidden_answer_patterns") or [], step_results, bindings, runtime_bindings) ) forbidden_answer_hits = [pattern for pattern in forbidden_answer_patterns if re.search(pattern, assistant_text, flags=re.IGNORECASE)] if forbidden_answer_hits: append_finding( findings, step, "forbidden_answer_pattern_hit", "В полном ответе модели найден запрещенный паттерн.", actual=forbidden_answer_hits, expected="absent", ) required_direct_any = normalize_pattern_list( resolve_nested_placeholders( step.get("required_direct_answer_patterns_any") or [], step_results, bindings, runtime_bindings, ) ) if required_direct_any and not matches_any_pattern(direct_answer, required_direct_any): append_finding( findings, step, "required_direct_answer_patterns_any_missing", "Первая строка ответа не содержит ни одного из обязательных direct-answer паттернов.", actual=direct_answer, expected=required_direct_any, ) required_direct_all = normalize_pattern_list( resolve_nested_placeholders( step.get("required_direct_answer_patterns_all") or [], step_results, bindings, runtime_bindings, ) ) missing_direct_patterns = find_missing_patterns(direct_answer, required_direct_all) if missing_direct_patterns: append_finding( findings, step, "required_direct_answer_patterns_all_missing", "Первая строка ответа не закрыла обязательные direct-answer паттерны.", actual=direct_answer, expected=missing_direct_patterns, ) forbidden_direct_patterns = normalize_pattern_list( resolve_nested_placeholders( step.get("forbidden_direct_answer_patterns") or [], step_results, bindings, runtime_bindings, ) ) forbidden_direct_hits = [pattern for pattern in forbidden_direct_patterns if re.search(pattern, direct_answer, flags=re.IGNORECASE)] if forbidden_direct_hits: append_finding( findings, step, "forbidden_direct_answer_pattern_hit", "Первая строка ответа попала в запрещенный direct-answer паттерн.", actual=forbidden_direct_hits, expected="absent", ) critical_findings = [item for item in findings if item.get("severity") == "critical"] warning_findings = [item for item in findings if item.get("severity") == "warning"] info_findings = [item for item in findings if item.get("severity") == "info"] review_status = "pass" if critical_findings: review_status = "fail" elif warning_findings: review_status = "warning" elif info_findings: review_status = "info" reviewed_state = dict(step_state) reviewed_state["review_findings"] = findings reviewed_state["review_status"] = review_status reviewed_state["critical_findings_count"] = len(critical_findings) reviewed_state["warning_findings_count"] = len(warning_findings) reviewed_state["info_findings_count"] = len(info_findings) reviewed_state["truth_harness_notes"] = step.get("notes") return reviewed_state def build_truth_review_summary(spec: dict[str, Any], scenario_state: dict[str, Any], review_source: str) -> dict[str, Any]: step_outputs = scenario_state.get("step_outputs") if isinstance(scenario_state.get("step_outputs"), dict) else {} review_steps = [step_outputs.get(step["step_id"], {}) for step in spec["steps"]] passed = sum(1 for item in review_steps if str(item.get("review_status") or "") == "pass") warnings = sum(1 for item in review_steps if str(item.get("review_status") or "") == "warning") failed = sum(1 for item in review_steps if str(item.get("review_status") or "") == "fail") return { "schema_version": TRUTH_HARNESS_REVIEW_SCHEMA_VERSION, "review_source": review_source, "scenario_id": spec["scenario_id"], "domain": spec["domain"], "title": spec["title"], "session_id": scenario_state.get("session_id"), "steps_total": len(spec["steps"]), "steps_passed": passed, "steps_with_warning": warnings, "steps_failed": failed, "overall_status": "fail" if failed else ("warning" if warnings else "pass"), } def build_truth_review_markdown(spec: dict[str, Any], scenario_state: dict[str, Any], review_summary: dict[str, Any]) -> str: lines = [ "# Truth harness review", "", f"- scenario_id: `{spec['scenario_id']}`", f"- domain: `{spec['domain']}`", f"- title: {spec['title']}", f"- review_source: `{review_summary.get('review_source') or 'n/a'}`", f"- session_id: `{scenario_state.get('session_id') or 'n/a'}`", f"- overall_status: `{review_summary.get('overall_status') or 'n/a'}`", f"- steps_total: `{review_summary.get('steps_total')}`", f"- steps_passed: `{review_summary.get('steps_passed')}`", f"- steps_with_warning: `{review_summary.get('steps_with_warning')}`", f"- steps_failed: `{review_summary.get('steps_failed')}`", "", "## Steps", ] step_outputs = scenario_state.get("step_outputs") if isinstance(scenario_state.get("step_outputs"), dict) else {} for index, step in enumerate(spec["steps"], start=1): step_state = step_outputs.get(step["step_id"], {}) findings = step_state.get("review_findings") if isinstance(step_state.get("review_findings"), list) else [] lines.extend( [ f"{index}. `{step['step_id']}` - {step['question_template']}", f"review_status: `{step_state.get('review_status') or 'n/a'}`", f"reply_type: `{step_state.get('reply_type') or 'n/a'}`", f"intent: `{step_state.get('detected_intent') or 'n/a'}`", f"recipe: `{step_state.get('selected_recipe') or 'n/a'}`", f"capability: `{step_state.get('capability_id') or 'n/a'}`", f"limited_reason_category: `{step_state.get('limited_reason_category') or 'n/a'}`", f"filters: `{dump_json(step_state.get('extracted_filters') or {})}`", f"direct_answer: {step_state.get('actual_direct_answer') or 'n/a'}", ] ) if step.get("notes"): lines.append(f"notes: {step['notes']}") if findings: lines.append("findings:") for finding in findings: lines.append( f"- [{finding.get('severity')}] {finding.get('code')}: {finding.get('message')} " f"(actual={dump_json(finding.get('actual'))}, expected={dump_json(finding.get('expected'))})" ) else: lines.append("findings: none") lines.append("") return "\n".join(lines).strip() + "\n" def save_step_bundle( *, step_dir: Path, export_markdown: str, turn_artifact: dict[str, Any], session_record: dict[str, Any] | None, response_payload: dict[str, Any] | None, step_state: dict[str, Any], ) -> None: write_text(step_dir / "output.md", export_markdown) write_json(step_dir / "debug.json", step_state.get("technical_debug_payload") or {}) write_json(step_dir / "turn.json", turn_artifact) write_json(step_dir / "assistant_response.json", response_payload or {}) write_json(step_dir / "step_state.json", step_state) if session_record is not None: write_json(step_dir / "session.json", session_record) write_text(step_dir / "resolved_question.txt", f"{step_state.get('question_resolved') or ''}\n") def build_step_state_from_turn( *, spec: dict[str, Any], step: dict[str, Any], step_index: int, question: str, conversation: list[dict[str, Any]], session_record: dict[str, Any] | None, export_markdown: str, ) -> tuple[dict[str, Any], dict[str, Any]]: mini_conversation = conversation[-2:] if len(conversation) >= 2 else conversation turn_artifact = dcl.build_turn_artifact( slot="step", domain=spec["domain"], case_id=spec["scenario_id"], question=question, session_id=str(spec.get("session_id") or "n/a"), conversation=mini_conversation, session_record=session_record, job_record=None, report_case=None, export_file_name="output.md", ) turn_artifact["schema_version"] = "domain_truth_harness_turn_artifact_v1" turn_artifact["scenario"] = { "scenario_id": spec["scenario_id"], "step_id": step["step_id"], "step_index": step_index, "question_template": step["question_template"], "question_resolved": question, "review_mode": "truth_harness_strict_replay", } last_assistant = dcl.find_last_assistant(mini_conversation) entries = dcl.extract_structured_entries(str(last_assistant.get("text") or "")) base_step_state = dcl.build_scenario_step_state( scenario_id=spec["scenario_id"], domain=spec["domain"], step=step, step_index=step_index, question_resolved=question, analysis_context={}, turn_artifact=turn_artifact, entries=entries, ) base_step_state["export_markdown"] = export_markdown base_step_state["technical_debug_payload"] = ( last_assistant.get("debug") if isinstance(last_assistant.get("debug"), dict) else {} ) return base_step_state, turn_artifact def build_conversation_pairs(conversation: list[dict[str, Any]]) -> list[tuple[dict[str, Any], dict[str, Any] | None]]: pairs: list[tuple[dict[str, Any], dict[str, Any] | None]] = [] for index, item in enumerate(conversation): if item.get("role") != "user": continue assistant_item: dict[str, Any] | None = None if index + 1 < len(conversation) and conversation[index + 1].get("role") == "assistant": assistant_item = conversation[index + 1] pairs.append((item, assistant_item)) return pairs def review_export(spec: dict[str, Any], export_path: Path, output_dir: Path) -> dict[str, Any]: export_text = export_path.read_text(encoding="utf-8-sig") session_id, conversation = dcl.parse_export_markdown(export_text) pairs = build_conversation_pairs(conversation) if len(pairs) != len(spec["steps"]): raise RuntimeError( f"Spec steps ({len(spec['steps'])}) and export user turns ({len(pairs)}) do not match for {export_path}" ) output_dir.mkdir(parents=True, exist_ok=True) write_json(output_dir / "truth_harness_spec.json", spec) write_text(output_dir / "source_export.md", export_text) step_results: dict[str, Any] = {} runtime_bindings = build_runtime_bindings() scenario_state: dict[str, Any] = { "schema_version": TRUTH_HARNESS_STATE_SCHEMA_VERSION, "scenario_id": spec["scenario_id"], "domain": spec["domain"], "title": spec["title"], "session_id": session_id, "review_source": str(export_path), "step_outputs": {}, "updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), } steps_dir = output_dir / "steps" steps_dir.mkdir(parents=True, exist_ok=True) for index, (step, pair) in enumerate(zip(spec["steps"], pairs, strict=True), start=1): user_item, assistant_item = pair if assistant_item is None: raise RuntimeError(f"Export pair for step `{step['step_id']}` is missing assistant message") mini_conversation = [user_item, assistant_item] export_markdown = dcl.build_conversation_export(session_id, mini_conversation, mode="technical") base_step_state, turn_artifact = build_step_state_from_turn( spec={**spec, "session_id": session_id}, step=step, step_index=index, question=str(user_item.get("text") or ""), conversation=mini_conversation, session_record=None, export_markdown=export_markdown, ) reviewed_step = evaluate_truth_step( step=step, step_state=base_step_state, step_results=step_results, bindings=spec.get("bindings") or {}, runtime_bindings=runtime_bindings, ) scenario_state["step_outputs"][step["step_id"]] = reviewed_step step_results[step["step_id"]] = build_placeholder_step_result(reviewed_step) save_step_bundle( step_dir=steps_dir / step["step_id"], export_markdown=export_markdown, turn_artifact=turn_artifact, session_record=None, response_payload=None, step_state=reviewed_step, ) scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat() review_summary = build_truth_review_summary(spec, scenario_state, f"export:{export_path}") review_markdown = build_truth_review_markdown(spec, scenario_state, review_summary) write_json(output_dir / "scenario_state.json", scenario_state) write_json(output_dir / "truth_review.json", {"summary": review_summary, "steps": scenario_state["step_outputs"]}) write_text(output_dir / "truth_review.md", review_markdown) return {"scenario_state": scenario_state, "review_summary": review_summary} def run_live(spec: dict[str, Any], output_dir: Path, args: argparse.Namespace) -> dict[str, Any]: runner_args = build_runner_args(args) dcl.ensure_backend_health(runner_args.backend_url, runner_args.timeout_seconds) output_dir.mkdir(parents=True, exist_ok=True) manifest = build_generated_manifest(spec) write_json(output_dir / "truth_harness_spec.json", spec) write_json(output_dir / "scenario_manifest.json", manifest) steps_dir = output_dir / "steps" steps_dir.mkdir(parents=True, exist_ok=True) runtime_bindings = build_runtime_bindings() step_results: dict[str, Any] = {} scenario_state: dict[str, Any] = { "schema_version": TRUTH_HARNESS_STATE_SCHEMA_VERSION, "scenario_id": spec["scenario_id"], "domain": spec["domain"], "title": spec["title"], "session_id": None, "review_source": "live_strict_replay", "step_outputs": {}, "updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), } write_json(output_dir / "scenario_state.json", scenario_state) current_session_id: str | None = None for index, step in enumerate(spec["steps"], start=1): question = step["question_template"] payload = build_plain_assistant_message_payload(runner_args, question=question, session_id=current_session_id) response_payload = dcl.http_json( f"{runner_args.backend_url}/api/assistant/message", method="POST", payload=payload, timeout=max(30, int(runner_args.timeout_seconds)), ) current_session_id = str(response_payload.get("session_id") or current_session_id or "").strip() or None if not current_session_id: raise RuntimeError(f"Assistant response for step `{step['step_id']}` does not contain session_id") session_record = dcl.fetch_session_snapshot( runner_args.backend_url, current_session_id, runner_args.timeout_seconds ) conversation = dcl.extract_conversation_from_session(session_record) export_markdown = dcl.build_conversation_export(current_session_id, conversation, mode="technical") base_step_state, turn_artifact = build_step_state_from_turn( spec={**spec, "session_id": current_session_id}, step=step, step_index=index, question=question, conversation=conversation, session_record=session_record, export_markdown=export_markdown, ) reviewed_step = evaluate_truth_step( step=step, step_state=base_step_state, step_results=step_results, bindings=spec.get("bindings") or {}, runtime_bindings=runtime_bindings, ) scenario_state["session_id"] = current_session_id scenario_state["step_outputs"][step["step_id"]] = reviewed_step step_results[step["step_id"]] = build_placeholder_step_result(reviewed_step) scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat() save_step_bundle( step_dir=steps_dir / step["step_id"], export_markdown=export_markdown, turn_artifact=turn_artifact, session_record=session_record, response_payload=response_payload, step_state=reviewed_step, ) write_json(output_dir / "scenario_state.json", scenario_state) print( f"[truth-harness] step {index}/{len(spec['steps'])}: {step['step_id']} -> " f"{reviewed_step.get('review_status')} ({reviewed_step.get('reply_type') or 'n/a'})" ) review_summary = build_truth_review_summary(spec, scenario_state, "live_strict_replay") review_markdown = build_truth_review_markdown(spec, scenario_state, review_summary) write_text(output_dir / "session_id.txt", f"{scenario_state.get('session_id') or ''}\n") write_json(output_dir / "scenario_state.json", scenario_state) write_json(output_dir / "truth_review.json", {"summary": review_summary, "steps": scenario_state["step_outputs"]}) write_text(output_dir / "truth_review.md", review_markdown) write_text(output_dir / "final_status.md", f"# Final status\n\n- status: `{review_summary['overall_status']}`\n") print(f"[truth-harness] saved artifacts to {output_dir}") print(f"[truth-harness] overall_status={review_summary['overall_status']}") return {"scenario_state": scenario_state, "review_summary": review_summary} def build_bootstrap_spec(export_path: Path, scenario_id: str, domain: str, title: str | None) -> dict[str, Any]: export_text = export_path.read_text(encoding="utf-8-sig") _, conversation = dcl.parse_export_markdown(export_text) pairs = build_conversation_pairs(conversation) steps = [] for index, (user_item, _assistant_item) in enumerate(pairs, start=1): steps.append( { "step_id": f"step_{index:02d}", "title": f"Step {index:02d}", "question": str(user_item.get("text") or "").strip(), "criticality": DEFAULT_CRITICALITY, } ) return { "schema_version": TRUTH_HARNESS_SPEC_SCHEMA_VERSION, "scenario_id": scenario_id, "domain": domain, "title": title or scenario_id, "description": f"Exact sequential replay spec bootstrapped from {export_path}", "source_export": str(export_path), "bindings": {}, "steps": steps, } def default_output_dir(base_name: str) -> Path: stamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") return DEFAULT_OUTPUT_ROOT / f"{base_name}_{stamp}" def handle_bootstrap(args: argparse.Namespace) -> int: export_path = Path(args.export).resolve() output_path = Path(args.output).resolve() spec = build_bootstrap_spec( export_path=export_path, scenario_id=args.scenario_id, domain=args.domain, title=args.title, ) write_json(output_path, spec) print(f"[truth-harness] wrote bootstrap spec to {output_path}") return 0 def handle_review_export(args: argparse.Namespace) -> int: spec_path = Path(args.spec).resolve() spec = load_truth_harness_spec(spec_path) export_path = Path(args.export or spec.get("source_export") or "").resolve() if not export_path.exists(): raise RuntimeError("Export file for review-export does not exist") output_dir = Path(args.output_dir).resolve() if args.output_dir else default_output_dir( f"{spec['scenario_id']}_review" ) result = review_export(spec, export_path, output_dir) print(f"[truth-harness] review-export overall_status={result['review_summary']['overall_status']}") print(f"[truth-harness] artifacts={output_dir}") return 0 def handle_run_live(args: argparse.Namespace) -> int: spec_path = Path(args.spec).resolve() spec = load_truth_harness_spec(spec_path) output_dir = Path(args.output_dir).resolve() if args.output_dir else default_output_dir( f"{spec['scenario_id']}_live" ) result = run_live(spec, output_dir, args) print(f"[truth-harness] run-live overall_status={result['review_summary']['overall_status']}") print(f"[truth-harness] artifacts={output_dir}") return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Strict sequential truth harness for real assistant dialogue scenarios") subparsers = parser.add_subparsers(dest="command", required=True) bootstrap = subparsers.add_parser( "bootstrap", help="Create an exact-question truth harness spec from a technical export" ) bootstrap.add_argument("--export", required=True) bootstrap.add_argument("--output", required=True) bootstrap.add_argument("--scenario-id", required=True) bootstrap.add_argument("--domain", required=True) bootstrap.add_argument("--title") bootstrap.set_defaults(func=handle_bootstrap) review_export_cmd = subparsers.add_parser( "review-export", help="Review an existing technical export against a truth harness spec" ) review_export_cmd.add_argument("--spec", required=True) review_export_cmd.add_argument("--export") review_export_cmd.add_argument("--output-dir") review_export_cmd.set_defaults(func=handle_review_export) run_live_cmd = subparsers.add_parser( "run-live", help="Run the exact scenario live, strictly sequentially, with no injected carryover context", ) run_live_cmd.add_argument("--spec", required=True) run_live_cmd.add_argument("--output-dir") run_live_cmd.add_argument("--backend-url", default=dcl.DEFAULT_BACKEND_URL) run_live_cmd.add_argument("--prompt-version", default=dcl.DEFAULT_PROMPT_VERSION) run_live_cmd.add_argument("--llm-provider", default=dcl.DEFAULT_LLM_PROVIDER) run_live_cmd.add_argument("--llm-model", default=dcl.DEFAULT_LLM_MODEL) run_live_cmd.add_argument("--llm-base-url", default=dcl.DEFAULT_LLM_BASE_URL) run_live_cmd.add_argument("--llm-api-key", default=dcl.DEFAULT_LLM_API_KEY) run_live_cmd.add_argument("--temperature", type=float, default=dcl.DEFAULT_TEMPERATURE) run_live_cmd.add_argument("--max-output-tokens", type=int, default=dcl.DEFAULT_MAX_OUTPUT_TOKENS) run_live_cmd.add_argument("--timeout-seconds", type=int, default=120) run_live_cmd.add_argument("--use-mock", action="store_true") run_live_cmd.set_defaults(func=handle_run_live) return parser def main() -> int: parser = build_parser() args = parser.parse_args() return int(args.func(args)) if __name__ == "__main__": raise SystemExit(main())