from __future__ import annotations from datetime import datetime, timezone from typing import Any SCENARIO_ACCEPTANCE_MATRIX_SCHEMA_VERSION = "scenario_acceptance_matrix_v1" TRUTH_HARNESS_PACK_STATE_SCHEMA_VERSION = "truth_harness_pack_state_v1" SEVERITY_TO_PRIORITY = { "critical": "P0", "warning": "P1", "info": "P2", } PRIORITY_RANK = {"P0": 0, "P1": 1, "P2": 2, "none": 3} SELECTED_OBJECT_INTENTS = { "inventory_purchase_provenance_for_item", "inventory_purchase_documents_for_item", "inventory_sale_trace_for_item", "inventory_profitability_for_item", "inventory_purchase_to_sale_chain", } META_CONTEXT_TAGS = { "meta_smalltalk", "meta_scope", "meta_capability", "meta_memory", "meta_historical_capability", } def _now_iso() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat() def _normalize_step_outputs(scenario_state: dict[str, Any]) -> dict[str, dict[str, Any]]: raw = scenario_state.get("step_outputs") return raw if isinstance(raw, dict) else {} def _normalize_findings(step_state: dict[str, Any]) -> list[dict[str, Any]]: raw = step_state.get("review_findings") return [item for item in raw if isinstance(item, dict)] if isinstance(raw, list) else [] def _priority_from_finding(finding: dict[str, Any]) -> str: severity = str(finding.get("severity") or "").strip().lower() return SEVERITY_TO_PRIORITY.get(severity, "P2") def _highest_priority(findings: list[dict[str, Any]]) -> str: if not findings: return "none" priorities = [_priority_from_finding(item) for item in findings] return sorted(priorities, key=lambda item: PRIORITY_RANK.get(item, 99))[0] def _normalize_semantic_tags(step: dict[str, Any]) -> set[str]: raw = step.get("semantic_tags") if not isinstance(raw, list): return set() return {str(item).strip().lower() for item in raw if str(item).strip()} def _has_selected_object_signal(step: dict[str, Any]) -> bool: question = str(step.get("question_template") or "").lower() semantic_tags = _normalize_semantic_tags(step) expected_intents = { str(item).strip() for item in (step.get("expected_intents") or []) if str(item).strip() } if "selected_object" in semantic_tags: return True if expected_intents & SELECTED_OBJECT_INTENTS: return True return any( marker in question for marker in ( "выбранному объекту", "по этой позиции", "по ней", "по нему", "\"", ) ) def _has_meta_context_signal(step: dict[str, Any]) -> bool: semantic_tags = _normalize_semantic_tags(step) if semantic_tags & META_CONTEXT_TAGS: return True title = str(step.get("title") or "").lower() step_id = str(step.get("step_id") or "").lower() question = str(step.get("question_template") or "").lower() return any( marker in f"{step_id} {title} {question}" for marker in ("meta", "memory", "smalltalk", "историческ", "что ты умеешь", "что можешь", "по какой компании", "по какой базе") ) def _is_direct_answer_code(code: str) -> bool: return code.startswith("required_direct_answer_") or code.startswith("forbidden_direct_answer_") def _is_temporal_code(code: str) -> bool: return ( code.startswith("missing_filter:") or code.startswith("wrong_filter:") or code.startswith("forbidden_filter_key:") or code.startswith("forbidden_filter_value:") or code.startswith("period_carryover_") or code.startswith("previous_step_missing:") ) def _is_truth_gate_code(code: str) -> bool: return code in { "unexpected_reply_type", "unexpected_limited_reason_category", "wrong_result_mode", } def _is_route_code(code: str) -> bool: return code in {"wrong_intent", "wrong_capability", "wrong_recipe", "question_sequence_mismatch"} def _is_human_answer_quality_code(code: str) -> bool: return code in { "required_answer_patterns_any_missing", "required_answer_patterns_all_missing", "forbidden_answer_pattern_hit", } def _is_meta_context_code(code: str) -> bool: return ( _is_route_code(code) or _is_truth_gate_code(code) or _is_human_answer_quality_code(code) or _is_direct_answer_code(code) ) def _derive_step_invariant_failures(step: dict[str, Any], findings: list[dict[str, Any]]) -> dict[str, bool]: codes = [str(item.get("code") or "").strip() for item in findings] selected_object_step = _has_selected_object_signal(step) meta_context_step = _has_meta_context_signal(step) return { "direct_answer": any(_is_direct_answer_code(code) for code in codes), "temporal_honesty": any(_is_temporal_code(code) for code in codes), "selected_object_continuity": selected_object_step and any(_is_route_code(code) for code in codes), "truth_gate": any(_is_truth_gate_code(code) for code in codes), "human_answer_quality": any(_is_human_answer_quality_code(code) for code in codes), "meta_context_integrity": meta_context_step and any(_is_meta_context_code(code) for code in codes), } def build_scenario_acceptance_matrix( spec: dict[str, Any], scenario_state: dict[str, Any], review_summary: dict[str, Any] ) -> dict[str, Any]: step_outputs = _normalize_step_outputs(scenario_state) rows: list[dict[str, Any]] = [] unresolved_priority_counts = {"P0": 0, "P1": 0, "P2": 0} invariant_failure_counts = { "direct_answer": 0, "temporal_honesty": 0, "selected_object_continuity": 0, "truth_gate": 0, "human_answer_quality": 0, "meta_context_integrity": 0, } for index, step in enumerate(spec.get("steps") or [], start=1): step_id = str(step.get("step_id") or "").strip() step_state = step_outputs.get(step_id, {}) if step_id else {} findings = _normalize_findings(step_state) invariant_failures = _derive_step_invariant_failures(step, findings) for invariant_name, failed in invariant_failures.items(): if failed: invariant_failure_counts[invariant_name] += 1 highest_priority = _highest_priority(findings) if highest_priority in unresolved_priority_counts: unresolved_priority_counts[highest_priority] += len( [item for item in findings if _priority_from_finding(item) == highest_priority] ) rows.append( { "index": index, "step_id": step_id, "title": step.get("title"), "question": step.get("question_template"), "criticality": str(step.get("criticality") or "critical"), "semantic_tags": sorted(_normalize_semantic_tags(step)), "review_status": str(step_state.get("review_status") or "unknown"), "reply_type": step_state.get("reply_type"), "detected_intent": step_state.get("detected_intent"), "capability_id": step_state.get("capability_id"), "selected_object_step": _has_selected_object_signal(step), "meta_context_step": _has_meta_context_signal(step), "highest_unresolved_priority": highest_priority, "unresolved_findings_count": len(findings), "invariant_failures": [name for name, failed in invariant_failures.items() if failed], "findings": findings, } ) invariants = { "direct_answer_ok": invariant_failure_counts["direct_answer"] == 0, "temporal_honesty_ok": invariant_failure_counts["temporal_honesty"] == 0, "selected_object_continuity_ok": invariant_failure_counts["selected_object_continuity"] == 0, "truth_gate_ok": invariant_failure_counts["truth_gate"] == 0, "human_answer_quality_ok": invariant_failure_counts["human_answer_quality"] == 0, "meta_context_integrity_ok": invariant_failure_counts["meta_context_integrity"] == 0, } critical_rows = [row for row in rows if row["criticality"] == "critical"] critical_path_green = bool(critical_rows) and all(row["review_status"] == "pass" for row in critical_rows) return { "schema_version": SCENARIO_ACCEPTANCE_MATRIX_SCHEMA_VERSION, "scenario_id": spec.get("scenario_id"), "domain": spec.get("domain"), "title": spec.get("title"), "review_source": review_summary.get("review_source"), "session_id": scenario_state.get("session_id"), "rows": rows, "summary": { "steps_total": len(rows), "critical_steps_total": len(critical_rows), "critical_steps_passed": sum(1 for row in critical_rows if row["review_status"] == "pass"), "critical_path_green": critical_path_green, "unresolved_p0_count": unresolved_priority_counts["P0"], "unresolved_p1_count": unresolved_priority_counts["P1"], "unresolved_p2_count": unresolved_priority_counts["P2"], "invariant_failure_counts": invariant_failure_counts, "invariants": invariants, }, "updated_at": _now_iso(), } def derive_truth_harness_pack_state( spec: dict[str, Any], scenario_state: dict[str, Any], review_summary: dict[str, Any], acceptance_matrix: dict[str, Any], ) -> dict[str, Any]: summary = acceptance_matrix.get("summary") if isinstance(acceptance_matrix.get("summary"), dict) else {} invariants = summary.get("invariants") if isinstance(summary.get("invariants"), dict) else {} unresolved_p0_count = int(summary.get("unresolved_p0_count") or 0) review_overall_status = str(review_summary.get("overall_status") or "unknown") step_outputs = _normalize_step_outputs(scenario_state) if not step_outputs: final_status = "blocked" final_status_reason = "no_step_outputs" elif review_overall_status == "pass" and unresolved_p0_count == 0 and all(bool(value) for value in invariants.values()): final_status = "accepted" final_status_reason = "scenario_acceptance_gate_passed" else: final_status = "partial" if unresolved_p0_count > 0: final_status_reason = "unresolved_p0" elif review_overall_status == "warning": final_status_reason = "review_warning_remaining" elif review_overall_status == "fail": final_status_reason = "review_failures_remaining" else: final_status_reason = "acceptance_invariants_not_green" return { "schema_version": TRUTH_HARNESS_PACK_STATE_SCHEMA_VERSION, "pack_id": spec.get("scenario_id"), "scenario_id": spec.get("scenario_id"), "domain": spec.get("domain"), "title": spec.get("title"), "review_source": review_summary.get("review_source"), "session_id": scenario_state.get("session_id"), "steps_total": review_summary.get("steps_total"), "steps_passed": review_summary.get("steps_passed"), "steps_with_warning": review_summary.get("steps_with_warning"), "steps_failed": review_summary.get("steps_failed"), "review_overall_status": review_overall_status, "execution_status": "exact" if review_overall_status == "pass" else "partial", "final_status": final_status, "final_status_reason": final_status_reason, "acceptance_gate_passed": final_status == "accepted", "no_unresolved_p0": unresolved_p0_count == 0, "unresolved_p0_count": unresolved_p0_count, "unresolved_p1_count": int(summary.get("unresolved_p1_count") or 0), "unresolved_p2_count": int(summary.get("unresolved_p2_count") or 0), "critical_path_green": bool(summary.get("critical_path_green")), "invariants": invariants, "updated_at": _now_iso(), } def build_scenario_acceptance_matrix_markdown(acceptance_matrix: dict[str, Any]) -> str: summary = acceptance_matrix.get("summary") if isinstance(acceptance_matrix.get("summary"), dict) else {} invariants = summary.get("invariants") if isinstance(summary.get("invariants"), dict) else {} lines = [ "# Scenario acceptance matrix", "", f"- scenario_id: `{acceptance_matrix.get('scenario_id') or 'n/a'}`", f"- domain: `{acceptance_matrix.get('domain') or 'n/a'}`", f"- title: {acceptance_matrix.get('title') or 'n/a'}", f"- review_source: `{acceptance_matrix.get('review_source') or 'n/a'}`", f"- session_id: `{acceptance_matrix.get('session_id') or 'n/a'}`", f"- critical_path_green: `{summary.get('critical_path_green')}`", f"- unresolved_p0_count: `{summary.get('unresolved_p0_count')}`", f"- unresolved_p1_count: `{summary.get('unresolved_p1_count')}`", f"- unresolved_p2_count: `{summary.get('unresolved_p2_count')}`", "", "## Acceptance invariants", f"- direct_answer_ok: `{invariants.get('direct_answer_ok')}`", f"- temporal_honesty_ok: `{invariants.get('temporal_honesty_ok')}`", f"- selected_object_continuity_ok: `{invariants.get('selected_object_continuity_ok')}`", f"- truth_gate_ok: `{invariants.get('truth_gate_ok')}`", f"- human_answer_quality_ok: `{invariants.get('human_answer_quality_ok')}`", f"- meta_context_integrity_ok: `{invariants.get('meta_context_integrity_ok')}`", "", "## Steps", ] for row in acceptance_matrix.get("rows") or []: lines.extend( [ f"- `{row.get('step_id')}`", f" review_status: `{row.get('review_status')}`", f" criticality: `{row.get('criticality')}`", f" semantic_tags: {', '.join(row.get('semantic_tags') or []) or 'none'}", f" highest_unresolved_priority: `{row.get('highest_unresolved_priority')}`", f" selected_object_step: `{row.get('selected_object_step')}`", f" meta_context_step: `{row.get('meta_context_step')}`", f" invariant_failures: {', '.join(row.get('invariant_failures') or []) or 'none'}", ] ) return "\n".join(lines).strip() + "\n" def build_truth_harness_final_status_markdown(pack_state: dict[str, Any]) -> str: invariants = pack_state.get("invariants") if isinstance(pack_state.get("invariants"), dict) else {} return ( "# Final status\n\n" f"- status: `{pack_state.get('final_status') or 'n/a'}`\n" f"- reason: `{pack_state.get('final_status_reason') or 'n/a'}`\n" f"- review_overall_status: `{pack_state.get('review_overall_status') or 'n/a'}`\n" f"- no_unresolved_p0: `{pack_state.get('no_unresolved_p0')}`\n" f"- direct_answer_ok: `{invariants.get('direct_answer_ok')}`\n" f"- temporal_honesty_ok: `{invariants.get('temporal_honesty_ok')}`\n" f"- selected_object_continuity_ok: `{invariants.get('selected_object_continuity_ok')}`\n" f"- truth_gate_ok: `{invariants.get('truth_gate_ok')}`\n" f"- human_answer_quality_ok: `{invariants.get('human_answer_quality_ok')}`\n" f"- meta_context_integrity_ok: `{invariants.get('meta_context_integrity_ok')}`\n" )