NODEDC_1C/scripts/scenario_acceptance_policy.py

363 lines
16 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
SCENARIO_ACCEPTANCE_MATRIX_SCHEMA_VERSION = "scenario_acceptance_matrix_v1"
TRUTH_HARNESS_PACK_STATE_SCHEMA_VERSION = "truth_harness_pack_state_v1"
SEVERITY_TO_PRIORITY = {
"critical": "P0",
"warning": "P1",
"info": "P2",
}
PRIORITY_RANK = {"P0": 0, "P1": 1, "P2": 2, "none": 3}
SELECTED_OBJECT_INTENTS = {
"inventory_purchase_provenance_for_item",
"inventory_purchase_documents_for_item",
"inventory_sale_trace_for_item",
"inventory_profitability_for_item",
"inventory_purchase_to_sale_chain",
}
META_CONTEXT_TAGS = {
"meta_smalltalk",
"meta_scope",
"meta_capability",
"meta_memory",
"meta_historical_capability",
}
def _now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def _normalize_step_outputs(scenario_state: dict[str, Any]) -> dict[str, dict[str, Any]]:
raw = scenario_state.get("step_outputs")
return raw if isinstance(raw, dict) else {}
def _normalize_findings(step_state: dict[str, Any]) -> list[dict[str, Any]]:
raw = step_state.get("review_findings")
return [item for item in raw if isinstance(item, dict)] if isinstance(raw, list) else []
def _priority_from_finding(finding: dict[str, Any]) -> str:
severity = str(finding.get("severity") or "").strip().lower()
return SEVERITY_TO_PRIORITY.get(severity, "P2")
def _highest_priority(findings: list[dict[str, Any]]) -> str:
if not findings:
return "none"
priorities = [_priority_from_finding(item) for item in findings]
return sorted(priorities, key=lambda item: PRIORITY_RANK.get(item, 99))[0]
def _normalize_semantic_tags(step: dict[str, Any]) -> set[str]:
raw = step.get("semantic_tags")
if not isinstance(raw, list):
return set()
return {str(item).strip().lower() for item in raw if str(item).strip()}
def _has_selected_object_signal(step: dict[str, Any]) -> bool:
question = str(step.get("question_template") or "").lower()
semantic_tags = _normalize_semantic_tags(step)
expected_intents = {
str(item).strip()
for item in (step.get("expected_intents") or [])
if str(item).strip()
}
if "selected_object" in semantic_tags:
return True
if expected_intents & SELECTED_OBJECT_INTENTS:
return True
return any(
marker in question
for marker in (
"выбранному объекту",
"по этой позиции",
"по ней",
"по нему",
"\"",
)
)
def _has_meta_context_signal(step: dict[str, Any]) -> bool:
semantic_tags = _normalize_semantic_tags(step)
if semantic_tags & META_CONTEXT_TAGS:
return True
title = str(step.get("title") or "").lower()
step_id = str(step.get("step_id") or "").lower()
question = str(step.get("question_template") or "").lower()
return any(
marker in f"{step_id} {title} {question}"
for marker in ("meta", "memory", "smalltalk", "историческ", "что ты умеешь", "что можешь", "по какой компании", "по какой базе")
)
def _is_direct_answer_code(code: str) -> bool:
return code.startswith("required_direct_answer_") or code.startswith("forbidden_direct_answer_")
def _is_temporal_code(code: str) -> bool:
return (
code.startswith("missing_filter:")
or code.startswith("wrong_filter:")
or code.startswith("forbidden_filter_key:")
or code.startswith("forbidden_filter_value:")
or code.startswith("period_carryover_")
or code.startswith("previous_step_missing:")
)
def _is_truth_gate_code(code: str) -> bool:
return code in {
"unexpected_reply_type",
"unexpected_limited_reason_category",
"wrong_result_mode",
}
def _is_route_code(code: str) -> bool:
return code in {"wrong_intent", "wrong_capability", "wrong_recipe", "question_sequence_mismatch"}
def _is_human_answer_quality_code(code: str) -> bool:
return code in {
"required_answer_patterns_any_missing",
"required_answer_patterns_all_missing",
"forbidden_answer_pattern_hit",
}
def _is_meta_context_code(code: str) -> bool:
return (
_is_route_code(code)
or _is_truth_gate_code(code)
or _is_human_answer_quality_code(code)
or _is_direct_answer_code(code)
)
def _derive_step_invariant_failures(step: dict[str, Any], findings: list[dict[str, Any]]) -> dict[str, bool]:
codes = [str(item.get("code") or "").strip() for item in findings]
selected_object_step = _has_selected_object_signal(step)
meta_context_step = _has_meta_context_signal(step)
return {
"direct_answer": any(_is_direct_answer_code(code) for code in codes),
"temporal_honesty": any(_is_temporal_code(code) for code in codes),
"selected_object_continuity": selected_object_step and any(_is_route_code(code) for code in codes),
"truth_gate": any(_is_truth_gate_code(code) for code in codes),
"human_answer_quality": any(_is_human_answer_quality_code(code) for code in codes),
"meta_context_integrity": meta_context_step and any(_is_meta_context_code(code) for code in codes),
}
def build_scenario_acceptance_matrix(
spec: dict[str, Any], scenario_state: dict[str, Any], review_summary: dict[str, Any]
) -> dict[str, Any]:
step_outputs = _normalize_step_outputs(scenario_state)
rows: list[dict[str, Any]] = []
unresolved_priority_counts = {"P0": 0, "P1": 0, "P2": 0}
invariant_failure_counts = {
"direct_answer": 0,
"temporal_honesty": 0,
"selected_object_continuity": 0,
"truth_gate": 0,
"human_answer_quality": 0,
"meta_context_integrity": 0,
}
for index, step in enumerate(spec.get("steps") or [], start=1):
step_id = str(step.get("step_id") or "").strip()
step_state = step_outputs.get(step_id, {}) if step_id else {}
findings = _normalize_findings(step_state)
invariant_failures = _derive_step_invariant_failures(step, findings)
for invariant_name, failed in invariant_failures.items():
if failed:
invariant_failure_counts[invariant_name] += 1
highest_priority = _highest_priority(findings)
if highest_priority in unresolved_priority_counts:
unresolved_priority_counts[highest_priority] += len(
[item for item in findings if _priority_from_finding(item) == highest_priority]
)
rows.append(
{
"index": index,
"step_id": step_id,
"title": step.get("title"),
"question": step.get("question_template"),
"criticality": str(step.get("criticality") or "critical"),
"semantic_tags": sorted(_normalize_semantic_tags(step)),
"review_status": str(step_state.get("review_status") or "unknown"),
"reply_type": step_state.get("reply_type"),
"detected_intent": step_state.get("detected_intent"),
"capability_id": step_state.get("capability_id"),
"mcp_discovery_catalog_chain_alignment_status": step_state.get("mcp_discovery_catalog_chain_alignment_status"),
"mcp_discovery_catalog_chain_top_match": step_state.get("mcp_discovery_catalog_chain_top_match"),
"mcp_discovery_catalog_chain_selected_matches_top": step_state.get("mcp_discovery_catalog_chain_selected_matches_top"),
"selected_object_step": _has_selected_object_signal(step),
"meta_context_step": _has_meta_context_signal(step),
"highest_unresolved_priority": highest_priority,
"unresolved_findings_count": len(findings),
"invariant_failures": [name for name, failed in invariant_failures.items() if failed],
"findings": findings,
}
)
invariants = {
"direct_answer_ok": invariant_failure_counts["direct_answer"] == 0,
"temporal_honesty_ok": invariant_failure_counts["temporal_honesty"] == 0,
"selected_object_continuity_ok": invariant_failure_counts["selected_object_continuity"] == 0,
"truth_gate_ok": invariant_failure_counts["truth_gate"] == 0,
"human_answer_quality_ok": invariant_failure_counts["human_answer_quality"] == 0,
"meta_context_integrity_ok": invariant_failure_counts["meta_context_integrity"] == 0,
}
critical_rows = [row for row in rows if row["criticality"] == "critical"]
critical_path_green = bool(critical_rows) and all(row["review_status"] == "pass" for row in critical_rows)
return {
"schema_version": SCENARIO_ACCEPTANCE_MATRIX_SCHEMA_VERSION,
"scenario_id": spec.get("scenario_id"),
"domain": spec.get("domain"),
"title": spec.get("title"),
"review_source": review_summary.get("review_source"),
"session_id": scenario_state.get("session_id"),
"rows": rows,
"summary": {
"steps_total": len(rows),
"critical_steps_total": len(critical_rows),
"critical_steps_passed": sum(1 for row in critical_rows if row["review_status"] == "pass"),
"critical_path_green": critical_path_green,
"unresolved_p0_count": unresolved_priority_counts["P0"],
"unresolved_p1_count": unresolved_priority_counts["P1"],
"unresolved_p2_count": unresolved_priority_counts["P2"],
"invariant_failure_counts": invariant_failure_counts,
"invariants": invariants,
},
"updated_at": _now_iso(),
}
def derive_truth_harness_pack_state(
spec: dict[str, Any],
scenario_state: dict[str, Any],
review_summary: dict[str, Any],
acceptance_matrix: dict[str, Any],
) -> dict[str, Any]:
summary = acceptance_matrix.get("summary") if isinstance(acceptance_matrix.get("summary"), dict) else {}
invariants = summary.get("invariants") if isinstance(summary.get("invariants"), dict) else {}
unresolved_p0_count = int(summary.get("unresolved_p0_count") or 0)
review_overall_status = str(review_summary.get("overall_status") or "unknown")
step_outputs = _normalize_step_outputs(scenario_state)
if not step_outputs:
final_status = "blocked"
final_status_reason = "no_step_outputs"
elif review_overall_status == "pass" and unresolved_p0_count == 0 and all(bool(value) for value in invariants.values()):
final_status = "accepted"
final_status_reason = "scenario_acceptance_gate_passed"
else:
final_status = "partial"
if unresolved_p0_count > 0:
final_status_reason = "unresolved_p0"
elif review_overall_status == "warning":
final_status_reason = "review_warning_remaining"
elif review_overall_status == "fail":
final_status_reason = "review_failures_remaining"
else:
final_status_reason = "acceptance_invariants_not_green"
return {
"schema_version": TRUTH_HARNESS_PACK_STATE_SCHEMA_VERSION,
"pack_id": spec.get("scenario_id"),
"scenario_id": spec.get("scenario_id"),
"domain": spec.get("domain"),
"title": spec.get("title"),
"review_source": review_summary.get("review_source"),
"session_id": scenario_state.get("session_id"),
"steps_total": review_summary.get("steps_total"),
"steps_passed": review_summary.get("steps_passed"),
"steps_with_warning": review_summary.get("steps_with_warning"),
"steps_failed": review_summary.get("steps_failed"),
"review_overall_status": review_overall_status,
"execution_status": "exact" if review_overall_status == "pass" else "partial",
"final_status": final_status,
"final_status_reason": final_status_reason,
"acceptance_gate_passed": final_status == "accepted",
"no_unresolved_p0": unresolved_p0_count == 0,
"unresolved_p0_count": unresolved_p0_count,
"unresolved_p1_count": int(summary.get("unresolved_p1_count") or 0),
"unresolved_p2_count": int(summary.get("unresolved_p2_count") or 0),
"critical_path_green": bool(summary.get("critical_path_green")),
"invariants": invariants,
"updated_at": _now_iso(),
}
def build_scenario_acceptance_matrix_markdown(acceptance_matrix: dict[str, Any]) -> str:
summary = acceptance_matrix.get("summary") if isinstance(acceptance_matrix.get("summary"), dict) else {}
invariants = summary.get("invariants") if isinstance(summary.get("invariants"), dict) else {}
lines = [
"# Scenario acceptance matrix",
"",
f"- scenario_id: `{acceptance_matrix.get('scenario_id') or 'n/a'}`",
f"- domain: `{acceptance_matrix.get('domain') or 'n/a'}`",
f"- title: {acceptance_matrix.get('title') or 'n/a'}",
f"- review_source: `{acceptance_matrix.get('review_source') or 'n/a'}`",
f"- session_id: `{acceptance_matrix.get('session_id') or 'n/a'}`",
f"- critical_path_green: `{summary.get('critical_path_green')}`",
f"- unresolved_p0_count: `{summary.get('unresolved_p0_count')}`",
f"- unresolved_p1_count: `{summary.get('unresolved_p1_count')}`",
f"- unresolved_p2_count: `{summary.get('unresolved_p2_count')}`",
"",
"## Acceptance invariants",
f"- direct_answer_ok: `{invariants.get('direct_answer_ok')}`",
f"- temporal_honesty_ok: `{invariants.get('temporal_honesty_ok')}`",
f"- selected_object_continuity_ok: `{invariants.get('selected_object_continuity_ok')}`",
f"- truth_gate_ok: `{invariants.get('truth_gate_ok')}`",
f"- human_answer_quality_ok: `{invariants.get('human_answer_quality_ok')}`",
f"- meta_context_integrity_ok: `{invariants.get('meta_context_integrity_ok')}`",
"",
"## Steps",
]
for row in acceptance_matrix.get("rows") or []:
lines.extend(
[
f"- `{row.get('step_id')}`",
f" review_status: `{row.get('review_status')}`",
f" criticality: `{row.get('criticality')}`",
f" semantic_tags: {', '.join(row.get('semantic_tags') or []) or 'none'}",
f" catalog_alignment_status: `{row.get('mcp_discovery_catalog_chain_alignment_status') or 'n/a'}`",
f" catalog_top_match: `{row.get('mcp_discovery_catalog_chain_top_match') or 'n/a'}`",
f" catalog_selected_matches_top: `{row.get('mcp_discovery_catalog_chain_selected_matches_top')}`",
f" highest_unresolved_priority: `{row.get('highest_unresolved_priority')}`",
f" selected_object_step: `{row.get('selected_object_step')}`",
f" meta_context_step: `{row.get('meta_context_step')}`",
f" invariant_failures: {', '.join(row.get('invariant_failures') or []) or 'none'}",
]
)
return "\n".join(lines).strip() + "\n"
def build_truth_harness_final_status_markdown(pack_state: dict[str, Any]) -> str:
invariants = pack_state.get("invariants") if isinstance(pack_state.get("invariants"), dict) else {}
return (
"# Final status\n\n"
f"- status: `{pack_state.get('final_status') or 'n/a'}`\n"
f"- reason: `{pack_state.get('final_status_reason') or 'n/a'}`\n"
f"- review_overall_status: `{pack_state.get('review_overall_status') or 'n/a'}`\n"
f"- no_unresolved_p0: `{pack_state.get('no_unresolved_p0')}`\n"
f"- direct_answer_ok: `{invariants.get('direct_answer_ok')}`\n"
f"- temporal_honesty_ok: `{invariants.get('temporal_honesty_ok')}`\n"
f"- selected_object_continuity_ok: `{invariants.get('selected_object_continuity_ok')}`\n"
f"- truth_gate_ok: `{invariants.get('truth_gate_ok')}`\n"
f"- human_answer_quality_ok: `{invariants.get('human_answer_quality_ok')}`\n"
f"- meta_context_integrity_ok: `{invariants.get('meta_context_integrity_ok')}`\n"
)