307 lines
13 KiB
Python
307 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
|
|
SCENARIO_ACCEPTANCE_MATRIX_SCHEMA_VERSION = "scenario_acceptance_matrix_v1"
|
|
TRUTH_HARNESS_PACK_STATE_SCHEMA_VERSION = "truth_harness_pack_state_v1"
|
|
|
|
SEVERITY_TO_PRIORITY = {
|
|
"critical": "P0",
|
|
"warning": "P1",
|
|
"info": "P2",
|
|
}
|
|
PRIORITY_RANK = {"P0": 0, "P1": 1, "P2": 2, "none": 3}
|
|
|
|
SELECTED_OBJECT_INTENTS = {
|
|
"inventory_purchase_provenance_for_item",
|
|
"inventory_purchase_documents_for_item",
|
|
"inventory_sale_trace_for_item",
|
|
"inventory_profitability_for_item",
|
|
"inventory_purchase_to_sale_chain",
|
|
}
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
|
|
|
|
|
|
def _normalize_step_outputs(scenario_state: dict[str, Any]) -> dict[str, dict[str, Any]]:
|
|
raw = scenario_state.get("step_outputs")
|
|
return raw if isinstance(raw, dict) else {}
|
|
|
|
|
|
def _normalize_findings(step_state: dict[str, Any]) -> list[dict[str, Any]]:
|
|
raw = step_state.get("review_findings")
|
|
return [item for item in raw if isinstance(item, dict)] if isinstance(raw, list) else []
|
|
|
|
|
|
def _priority_from_finding(finding: dict[str, Any]) -> str:
|
|
severity = str(finding.get("severity") or "").strip().lower()
|
|
return SEVERITY_TO_PRIORITY.get(severity, "P2")
|
|
|
|
|
|
def _highest_priority(findings: list[dict[str, Any]]) -> str:
|
|
if not findings:
|
|
return "none"
|
|
priorities = [_priority_from_finding(item) for item in findings]
|
|
return sorted(priorities, key=lambda item: PRIORITY_RANK.get(item, 99))[0]
|
|
|
|
|
|
def _has_selected_object_signal(step: dict[str, Any]) -> bool:
|
|
question = str(step.get("question_template") or "").lower()
|
|
expected_intents = {
|
|
str(item).strip()
|
|
for item in (step.get("expected_intents") or [])
|
|
if str(item).strip()
|
|
}
|
|
if expected_intents & SELECTED_OBJECT_INTENTS:
|
|
return True
|
|
return any(
|
|
marker in question
|
|
for marker in (
|
|
"выбранному объекту",
|
|
"по этой позиции",
|
|
"по ней",
|
|
"по нему",
|
|
"\"",
|
|
)
|
|
)
|
|
|
|
|
|
def _is_direct_answer_code(code: str) -> bool:
|
|
return code.startswith("required_direct_answer_") or code.startswith("forbidden_direct_answer_")
|
|
|
|
|
|
def _is_temporal_code(code: str) -> bool:
|
|
return (
|
|
code.startswith("missing_filter:")
|
|
or code.startswith("wrong_filter:")
|
|
or code.startswith("forbidden_filter_key:")
|
|
or code.startswith("forbidden_filter_value:")
|
|
or code.startswith("period_carryover_")
|
|
or code.startswith("previous_step_missing:")
|
|
)
|
|
|
|
|
|
def _is_truth_gate_code(code: str) -> bool:
|
|
return code in {
|
|
"unexpected_reply_type",
|
|
"unexpected_limited_reason_category",
|
|
"wrong_result_mode",
|
|
}
|
|
|
|
|
|
def _is_route_code(code: str) -> bool:
|
|
return code in {"wrong_intent", "wrong_capability", "wrong_recipe", "question_sequence_mismatch"}
|
|
|
|
|
|
def _is_human_answer_quality_code(code: str) -> bool:
|
|
return code in {
|
|
"required_answer_patterns_any_missing",
|
|
"required_answer_patterns_all_missing",
|
|
"forbidden_answer_pattern_hit",
|
|
}
|
|
|
|
|
|
def _derive_step_invariant_failures(step: dict[str, Any], findings: list[dict[str, Any]]) -> dict[str, bool]:
|
|
codes = [str(item.get("code") or "").strip() for item in findings]
|
|
selected_object_step = _has_selected_object_signal(step)
|
|
return {
|
|
"direct_answer": any(_is_direct_answer_code(code) for code in codes),
|
|
"temporal_honesty": any(_is_temporal_code(code) for code in codes),
|
|
"selected_object_continuity": selected_object_step and any(_is_route_code(code) for code in codes),
|
|
"truth_gate": any(_is_truth_gate_code(code) for code in codes),
|
|
"human_answer_quality": any(_is_human_answer_quality_code(code) for code in codes),
|
|
}
|
|
|
|
|
|
def build_scenario_acceptance_matrix(
|
|
spec: dict[str, Any], scenario_state: dict[str, Any], review_summary: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
step_outputs = _normalize_step_outputs(scenario_state)
|
|
rows: list[dict[str, Any]] = []
|
|
unresolved_priority_counts = {"P0": 0, "P1": 0, "P2": 0}
|
|
invariant_failure_counts = {
|
|
"direct_answer": 0,
|
|
"temporal_honesty": 0,
|
|
"selected_object_continuity": 0,
|
|
"truth_gate": 0,
|
|
"human_answer_quality": 0,
|
|
}
|
|
|
|
for index, step in enumerate(spec.get("steps") or [], start=1):
|
|
step_id = str(step.get("step_id") or "").strip()
|
|
step_state = step_outputs.get(step_id, {}) if step_id else {}
|
|
findings = _normalize_findings(step_state)
|
|
invariant_failures = _derive_step_invariant_failures(step, findings)
|
|
for invariant_name, failed in invariant_failures.items():
|
|
if failed:
|
|
invariant_failure_counts[invariant_name] += 1
|
|
highest_priority = _highest_priority(findings)
|
|
if highest_priority in unresolved_priority_counts:
|
|
unresolved_priority_counts[highest_priority] += len(
|
|
[item for item in findings if _priority_from_finding(item) == highest_priority]
|
|
)
|
|
rows.append(
|
|
{
|
|
"index": index,
|
|
"step_id": step_id,
|
|
"title": step.get("title"),
|
|
"question": step.get("question_template"),
|
|
"criticality": str(step.get("criticality") or "critical"),
|
|
"review_status": str(step_state.get("review_status") or "unknown"),
|
|
"reply_type": step_state.get("reply_type"),
|
|
"detected_intent": step_state.get("detected_intent"),
|
|
"capability_id": step_state.get("capability_id"),
|
|
"selected_object_step": _has_selected_object_signal(step),
|
|
"highest_unresolved_priority": highest_priority,
|
|
"unresolved_findings_count": len(findings),
|
|
"invariant_failures": [name for name, failed in invariant_failures.items() if failed],
|
|
"findings": findings,
|
|
}
|
|
)
|
|
|
|
invariants = {
|
|
"direct_answer_ok": invariant_failure_counts["direct_answer"] == 0,
|
|
"temporal_honesty_ok": invariant_failure_counts["temporal_honesty"] == 0,
|
|
"selected_object_continuity_ok": invariant_failure_counts["selected_object_continuity"] == 0,
|
|
"truth_gate_ok": invariant_failure_counts["truth_gate"] == 0,
|
|
"human_answer_quality_ok": invariant_failure_counts["human_answer_quality"] == 0,
|
|
}
|
|
critical_rows = [row for row in rows if row["criticality"] == "critical"]
|
|
critical_path_green = bool(critical_rows) and all(row["review_status"] == "pass" for row in critical_rows)
|
|
|
|
return {
|
|
"schema_version": SCENARIO_ACCEPTANCE_MATRIX_SCHEMA_VERSION,
|
|
"scenario_id": spec.get("scenario_id"),
|
|
"domain": spec.get("domain"),
|
|
"title": spec.get("title"),
|
|
"review_source": review_summary.get("review_source"),
|
|
"session_id": scenario_state.get("session_id"),
|
|
"rows": rows,
|
|
"summary": {
|
|
"steps_total": len(rows),
|
|
"critical_steps_total": len(critical_rows),
|
|
"critical_steps_passed": sum(1 for row in critical_rows if row["review_status"] == "pass"),
|
|
"critical_path_green": critical_path_green,
|
|
"unresolved_p0_count": unresolved_priority_counts["P0"],
|
|
"unresolved_p1_count": unresolved_priority_counts["P1"],
|
|
"unresolved_p2_count": unresolved_priority_counts["P2"],
|
|
"invariant_failure_counts": invariant_failure_counts,
|
|
"invariants": invariants,
|
|
},
|
|
"updated_at": _now_iso(),
|
|
}
|
|
|
|
|
|
def derive_truth_harness_pack_state(
|
|
spec: dict[str, Any],
|
|
scenario_state: dict[str, Any],
|
|
review_summary: dict[str, Any],
|
|
acceptance_matrix: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
summary = acceptance_matrix.get("summary") if isinstance(acceptance_matrix.get("summary"), dict) else {}
|
|
invariants = summary.get("invariants") if isinstance(summary.get("invariants"), dict) else {}
|
|
unresolved_p0_count = int(summary.get("unresolved_p0_count") or 0)
|
|
review_overall_status = str(review_summary.get("overall_status") or "unknown")
|
|
step_outputs = _normalize_step_outputs(scenario_state)
|
|
|
|
if not step_outputs:
|
|
final_status = "blocked"
|
|
final_status_reason = "no_step_outputs"
|
|
elif review_overall_status == "pass" and unresolved_p0_count == 0 and all(bool(value) for value in invariants.values()):
|
|
final_status = "accepted"
|
|
final_status_reason = "scenario_acceptance_gate_passed"
|
|
else:
|
|
final_status = "partial"
|
|
if unresolved_p0_count > 0:
|
|
final_status_reason = "unresolved_p0"
|
|
elif review_overall_status == "warning":
|
|
final_status_reason = "review_warning_remaining"
|
|
elif review_overall_status == "fail":
|
|
final_status_reason = "review_failures_remaining"
|
|
else:
|
|
final_status_reason = "acceptance_invariants_not_green"
|
|
|
|
return {
|
|
"schema_version": TRUTH_HARNESS_PACK_STATE_SCHEMA_VERSION,
|
|
"pack_id": spec.get("scenario_id"),
|
|
"scenario_id": spec.get("scenario_id"),
|
|
"domain": spec.get("domain"),
|
|
"title": spec.get("title"),
|
|
"review_source": review_summary.get("review_source"),
|
|
"session_id": scenario_state.get("session_id"),
|
|
"steps_total": review_summary.get("steps_total"),
|
|
"steps_passed": review_summary.get("steps_passed"),
|
|
"steps_with_warning": review_summary.get("steps_with_warning"),
|
|
"steps_failed": review_summary.get("steps_failed"),
|
|
"review_overall_status": review_overall_status,
|
|
"execution_status": "exact" if review_overall_status == "pass" else "partial",
|
|
"final_status": final_status,
|
|
"final_status_reason": final_status_reason,
|
|
"acceptance_gate_passed": final_status == "accepted",
|
|
"no_unresolved_p0": unresolved_p0_count == 0,
|
|
"unresolved_p0_count": unresolved_p0_count,
|
|
"unresolved_p1_count": int(summary.get("unresolved_p1_count") or 0),
|
|
"unresolved_p2_count": int(summary.get("unresolved_p2_count") or 0),
|
|
"critical_path_green": bool(summary.get("critical_path_green")),
|
|
"invariants": invariants,
|
|
"updated_at": _now_iso(),
|
|
}
|
|
|
|
|
|
def build_scenario_acceptance_matrix_markdown(acceptance_matrix: dict[str, Any]) -> str:
|
|
summary = acceptance_matrix.get("summary") if isinstance(acceptance_matrix.get("summary"), dict) else {}
|
|
invariants = summary.get("invariants") if isinstance(summary.get("invariants"), dict) else {}
|
|
lines = [
|
|
"# Scenario acceptance matrix",
|
|
"",
|
|
f"- scenario_id: `{acceptance_matrix.get('scenario_id') or 'n/a'}`",
|
|
f"- domain: `{acceptance_matrix.get('domain') or 'n/a'}`",
|
|
f"- title: {acceptance_matrix.get('title') or 'n/a'}",
|
|
f"- review_source: `{acceptance_matrix.get('review_source') or 'n/a'}`",
|
|
f"- session_id: `{acceptance_matrix.get('session_id') or 'n/a'}`",
|
|
f"- critical_path_green: `{summary.get('critical_path_green')}`",
|
|
f"- unresolved_p0_count: `{summary.get('unresolved_p0_count')}`",
|
|
f"- unresolved_p1_count: `{summary.get('unresolved_p1_count')}`",
|
|
f"- unresolved_p2_count: `{summary.get('unresolved_p2_count')}`",
|
|
"",
|
|
"## Acceptance invariants",
|
|
f"- direct_answer_ok: `{invariants.get('direct_answer_ok')}`",
|
|
f"- temporal_honesty_ok: `{invariants.get('temporal_honesty_ok')}`",
|
|
f"- selected_object_continuity_ok: `{invariants.get('selected_object_continuity_ok')}`",
|
|
f"- truth_gate_ok: `{invariants.get('truth_gate_ok')}`",
|
|
f"- human_answer_quality_ok: `{invariants.get('human_answer_quality_ok')}`",
|
|
"",
|
|
"## Steps",
|
|
]
|
|
for row in acceptance_matrix.get("rows") or []:
|
|
lines.extend(
|
|
[
|
|
f"- `{row.get('step_id')}`",
|
|
f" review_status: `{row.get('review_status')}`",
|
|
f" criticality: `{row.get('criticality')}`",
|
|
f" highest_unresolved_priority: `{row.get('highest_unresolved_priority')}`",
|
|
f" selected_object_step: `{row.get('selected_object_step')}`",
|
|
f" invariant_failures: {', '.join(row.get('invariant_failures') or []) or 'none'}",
|
|
]
|
|
)
|
|
return "\n".join(lines).strip() + "\n"
|
|
|
|
|
|
def build_truth_harness_final_status_markdown(pack_state: dict[str, Any]) -> str:
|
|
invariants = pack_state.get("invariants") if isinstance(pack_state.get("invariants"), dict) else {}
|
|
return (
|
|
"# Final status\n\n"
|
|
f"- status: `{pack_state.get('final_status') or 'n/a'}`\n"
|
|
f"- reason: `{pack_state.get('final_status_reason') or 'n/a'}`\n"
|
|
f"- review_overall_status: `{pack_state.get('review_overall_status') or 'n/a'}`\n"
|
|
f"- no_unresolved_p0: `{pack_state.get('no_unresolved_p0')}`\n"
|
|
f"- direct_answer_ok: `{invariants.get('direct_answer_ok')}`\n"
|
|
f"- temporal_honesty_ok: `{invariants.get('temporal_honesty_ok')}`\n"
|
|
f"- selected_object_continuity_ok: `{invariants.get('selected_object_continuity_ok')}`\n"
|
|
f"- truth_gate_ok: `{invariants.get('truth_gate_ok')}`\n"
|
|
)
|