diff --git a/scripts/domain_case_loop.py b/scripts/domain_case_loop.py index 8e4364b..08e5fc2 100644 --- a/scripts/domain_case_loop.py +++ b/scripts/domain_case_loop.py @@ -137,9 +137,68 @@ BUSINESS_TECHNICAL_GARBAGE_MARKERS = ( "probe ", "query_movements", "query_documents", + "surrogate-формул", + "vat-объект", + "truth gate", + "coverage", + "checked_sources_only", + "строк с суммой", + "строк в выборке", + "эвристический shortlist", ) BUSINESS_DIRECT_ANSWER_SOFT_LIMIT = 1800 +BUSINESS_LIMITED_ANSWER_MARKERS = ( + "не удалось", + "не могу подтвердить", + "не подтвержден", + "не подтверждён", + "не хватает", + "нужен период", + "нужна организация", + "нужно уточнить", + "нельзя честно", + "нельзя построить", + "нет достаточной базы", +) +BUSINESS_NEXT_ACTION_MARKERS = ( + "могу", + "уточните", + "следующий шаг", + "что проверить дальше", + "дальше", + "можно", + "предлагаю", + "показать найден", + "расширить", + "проверить 90", + "посчитать по", + "построить по", +) +NOMENCLATURE_MARGIN_EXPECTED_ANSWER_MARKERS = ( + "период", + "выручк", + "себестоим", + "валов", + "марж", +) +NOMENCLATURE_MARGIN_WRONG_DOMAIN_ANSWER_MARKERS = ( + "амортизац", + "основн", + "объект ос", + "карточк", + "оплата завис", + "зависш", + "закрытие расчет", + "закрытие расчёт", + "списание с расчетного", + "списание с расчётного", + "банковск", + "settlement", + "payment_document", + "unresolved settlement", +) + GUARDED_INSUFFICIENCY_PRIMARY_MARKERS = ( "\u0442\u043e\u0447\u043d\u044b\u0439", "\u0442\u043e\u0447\u043d\u044b\u0435", @@ -188,8 +247,11 @@ DEFAULT_INVARIANT_SEVERITY: dict[str, str] = { "business_direct_answer_missing": "P0", "technical_garbage_in_answer": "P0", "counterparty_value_flow_misrouted_to_company_profit": "P0", + "domain_leak_accounting_route": "P0", "answer_layering_noise": "P1", + "accounting_contract_missing": "P1", "business_answer_too_verbose": "P1", + "business_next_step_missing": "P2", } REPAIR_TARGET_SEVERITY_ORDER = {"P0": 0, "P1": 1, "P2": 2} @@ -1845,6 +1907,10 @@ def _marker_hits(text: str, markers: tuple[str, ...]) -> list[str]: return [marker for marker in markers if marker and marker in lowered] +def _has_any_marker(text: str, markers: tuple[str, ...]) -> bool: + return bool(_marker_hits(text, markers)) + + def is_report_style_business_question(question: str) -> bool: return bool(_marker_hits(question, BUSINESS_REPORT_REQUEST_MARKERS)) @@ -1855,6 +1921,20 @@ def is_direct_style_business_question(question: str) -> bool: return bool(_marker_hits(question, BUSINESS_DIRECT_QUESTION_MARKERS)) +def is_nomenclature_margin_context(step_state: dict[str, Any], question: str) -> bool: + detected_intent = str(step_state.get("detected_intent") or "").strip() + capability_id = str(step_state.get("capability_id") or "").strip() + if detected_intent == "inventory_margin_ranking_for_nomenclature": + return True + if capability_id == "inventory_inventory_margin_ranking_for_nomenclature": + return True + lowered_question = _review_text(question) + has_subject = "номенклатур" in lowered_question or "товар" in lowered_question + has_margin_signal = any(marker in lowered_question for marker in ("прибыл", "марж", "реализован", "реализац")) + has_rank_signal = any(marker in lowered_question for marker in ("высок", "низк", "какая", "какие")) + return has_subject and has_margin_signal and has_rank_signal + + def build_business_first_review(step_state: dict[str, Any]) -> dict[str, Any]: question = str(step_state.get("question_resolved") or step_state.get("question_template") or "").strip() assistant_text = str(step_state.get("assistant_text") or "") @@ -1882,11 +1962,38 @@ def build_business_first_review(step_state: dict[str, Any]) -> dict[str, Any]: if too_verbose_for_direct: issue_codes.append("business_answer_too_verbose") + limited_answer = _has_any_marker(assistant_text, BUSINESS_LIMITED_ANSWER_MARKERS) + has_next_action = _has_any_marker(assistant_text, BUSINESS_NEXT_ACTION_MARKERS) + nomenclature_margin_context = is_nomenclature_margin_context(step_state, question) + wrong_margin_domain_hits = ( + _marker_hits(assistant_text, NOMENCLATURE_MARGIN_WRONG_DOMAIN_ANSWER_MARKERS) + if nomenclature_margin_context + else [] + ) + margin_contract_hits = ( + _marker_hits(assistant_text, NOMENCLATURE_MARGIN_EXPECTED_ANSWER_MARKERS) + if nomenclature_margin_context + else [] + ) + if wrong_margin_domain_hits: + issue_codes.append("domain_leak_accounting_route") + if nomenclature_margin_context and len(set(margin_contract_hits)) < 2: + issue_codes.append("accounting_contract_missing") + if nomenclature_margin_context and limited_answer and not has_next_action: + issue_codes.append("business_next_step_missing") + root_cause_layers: list[str] = [] if "business_direct_answer_missing" in issue_codes or "answer_layering_noise" in issue_codes: root_cause_layers.append("answer_shape_mismatch") if "business_answer_too_verbose" in issue_codes or "technical_garbage_in_answer" in issue_codes: root_cause_layers.append("business_utility_gap") + if "domain_leak_accounting_route" in issue_codes: + root_cause_layers.append("domain_purity_gap") + root_cause_layers.append("route_gap") + if "accounting_contract_missing" in issue_codes: + root_cause_layers.append("accounting_contract_gap") + if "business_next_step_missing" in issue_codes: + root_cause_layers.append("business_utility_gap") return { "schema_version": "business_first_step_review_v1", @@ -1903,8 +2010,13 @@ def build_business_first_review(step_state: dict[str, Any]) -> dict[str, Any]: "top_line_scaffold_present": bool(scaffold_hits or top_noise), "top_line_scaffold_hits": scaffold_hits, "too_verbose_for_direct_question": too_verbose_for_direct, + "limited_answer_detected": limited_answer, + "next_action_present": has_next_action, + "nomenclature_margin_context": nomenclature_margin_context, + "domain_leak_hits": wrong_margin_domain_hits, + "accounting_contract_hits": margin_contract_hits, "business_usefulness_ok": not issue_codes, - "issue_codes": issue_codes, + "issue_codes": list(dict.fromkeys(issue_codes)), "suggested_root_cause_layers": list(dict.fromkeys(root_cause_layers)), } @@ -1920,7 +2032,7 @@ def derive_invariant_severity(step_state: dict[str, Any], violation_code: str) - overrides = step_state.get("invariant_severity") if isinstance(overrides, dict): override = str(overrides.get(violation_code) or "").strip().upper() - if override in {"P0", "P1", "WARNING"}: + if override in {"P0", "P1", "P2", "WARNING"}: return override return DEFAULT_INVARIANT_SEVERITY.get(violation_code, "P1") diff --git a/scripts/domain_truth_harness.py b/scripts/domain_truth_harness.py index 9d6e6dc..7fc3fa9 100644 --- a/scripts/domain_truth_harness.py +++ b/scripts/domain_truth_harness.py @@ -351,16 +351,22 @@ BUSINESS_REVIEW_FINDING_MESSAGES = { "technical_garbage_in_answer": "User-facing answer leaked internal runtime or MCP identifiers.", "business_direct_answer_missing": "The answer did not put the direct business answer first.", "counterparty_value_flow_misrouted_to_company_profit": "Counterparty received/paid/net flow question was answered with company profit instead of counterparty cashflow.", + "domain_leak_accounting_route": "The answer leaked into the wrong accounting domain for the user's business question.", + "accounting_contract_missing": "The answer did not expose the required accounting contract for the requested business calculation.", "answer_layering_noise": "The answer opened with scaffolding or report framing instead of a clean business result.", "business_answer_too_verbose": "The answer is too verbose for a direct business question.", + "business_next_step_missing": "A bounded or insufficient answer did not offer a useful next action.", } BUSINESS_REVIEW_FINDING_SEVERITY = { "technical_garbage_in_answer": "critical", "business_direct_answer_missing": "critical", "counterparty_value_flow_misrouted_to_company_profit": "critical", + "domain_leak_accounting_route": "critical", + "accounting_contract_missing": "warning", "answer_layering_noise": "critical", "business_answer_too_verbose": "warning", + "business_next_step_missing": "warning", } @@ -1191,15 +1197,20 @@ def build_business_review_summary(spec: dict[str, Any], scenario_state: dict[str "suggested_root_cause_layers": business_review.get("suggested_root_cause_layers") or [], } ) - failed = sum( - 1 - for step in steps - if any( - issue in {"technical_garbage_in_answer", "business_direct_answer_missing", "answer_layering_noise"} - for issue in step["issue_codes"] - ) - ) - warnings = sum(1 for step in steps if "business_answer_too_verbose" in step["issue_codes"]) + failure_issues = { + "technical_garbage_in_answer", + "business_direct_answer_missing", + "answer_layering_noise", + "counterparty_value_flow_misrouted_to_company_profit", + "domain_leak_accounting_route", + } + warning_issues = { + "business_answer_too_verbose", + "accounting_contract_missing", + "business_next_step_missing", + } + failed = sum(1 for step in steps if any(issue in failure_issues for issue in step["issue_codes"])) + warnings = sum(1 for step in steps if any(issue in warning_issues for issue in step["issue_codes"])) semantic_status = "fail" if failed or review_failures else ("warning" if warnings or review_warnings else "pass") return { "schema_version": "business_first_run_review_v1", diff --git a/scripts/test_review_assistant_stage1_run.py b/scripts/test_review_assistant_stage1_run.py index c84a65a..cea5345 100644 --- a/scripts/test_review_assistant_stage1_run.py +++ b/scripts/test_review_assistant_stage1_run.py @@ -268,6 +268,127 @@ class AssistantStage1RunReviewTests(unittest.TestCase): self.assertGreaterEqual(review["tag_counts"]["contextual_followup"], 3) self.assertGreaterEqual(review["tag_counts"]["direct_business_question"], 2) + def test_review_flags_nomenclature_margin_answer_that_leaks_to_os_and_settlements(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + sessions_dir = root / "sessions" + reports_dir = root / "reports" + run_id = "assistant-stage1-margin-domain-leak" + session_file = sessions_dir / f"{run_id}-SAVED-001.json" + report_file = reports_dir / f"{run_id}.md" + write_json( + session_file, + session_payload( + [ + { + "role": "user", + "text": "Какая номенклатура товара реализована с высокой прибылью какая с низкой", + }, + { + "role": "assistant", + "text": "По объекту ОС видна амортизация и зависшая оплата. Проверьте карточку ОС и закрытие расчетов.", + "reply_type": "factual_with_explanation", + "message_id": "a-margin-leak", + "trace_id": "trace-margin-leak", + "debug": {"detected_intent": "inventory_margin_ranking_for_nomenclature"}, + }, + ] + ), + ) + report_file.parent.mkdir(parents=True, exist_ok=True) + report_file.write_text(f"# Assistant Stage 1 Eval Run\n\n- run_id: {run_id}\n", encoding="utf-8") + + review = reviewer.build_run_review( + run_id=run_id, + session_files=[session_file], + report_path=report_file, + ) + + self.assertEqual(review["summary"]["overall_business_status"], "fail") + self.assertIn("domain_leak_accounting_route", review["summary"]["issue_counts"]) + target_by_issue = {item["issue_code"]: item for item in review["repair_targets"]} + self.assertEqual(target_by_issue["domain_leak_accounting_route"]["severity"], "P0") + + def test_review_warns_when_limited_business_answer_has_no_next_action(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + sessions_dir = root / "sessions" + reports_dir = root / "reports" + run_id = "assistant-stage1-limited-no-next-action" + session_file = sessions_dir / f"{run_id}-SAVED-001.json" + report_file = reports_dir / f"{run_id}.md" + write_json( + session_file, + session_payload( + [ + { + "role": "user", + "text": "Какая номенклатура товара реализована с высокой прибылью какая с низкой", + }, + { + "role": "assistant", + "text": "За май 2020 рейтинг прибыльности номенклатуры построить нельзя: не подтверждена себестоимость реализации.", + "reply_type": "partial_coverage", + "message_id": "a-limited-no-next", + "trace_id": "trace-limited-no-next", + "debug": {"detected_intent": "inventory_margin_ranking_for_nomenclature"}, + }, + ] + ), + ) + report_file.parent.mkdir(parents=True, exist_ok=True) + report_file.write_text(f"# Assistant Stage 1 Eval Run\n\n- run_id: {run_id}\n", encoding="utf-8") + + review = reviewer.build_run_review( + run_id=run_id, + session_files=[session_file], + report_path=report_file, + ) + + self.assertIn("business_next_step_missing", review["summary"]["issue_counts"]) + target_by_issue = {item["issue_code"]: item for item in review["repair_targets"]} + self.assertEqual(target_by_issue["business_next_step_missing"]["severity"], "P2") + + def test_review_accepts_margin_clarification_with_accounting_contract_and_next_action(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + sessions_dir = root / "sessions" + reports_dir = root / "reports" + run_id = "assistant-stage1-margin-clarification-clean" + session_file = sessions_dir / f"{run_id}-SAVED-001.json" + report_file = reports_dir / f"{run_id}.md" + write_json( + session_file, + session_payload( + [ + { + "role": "user", + "text": "Какая номенклатура товара реализована с высокой прибылью какая с низкой", + }, + { + "role": "assistant", + "text": "Для рейтинга прибыльности нужен период. Могу посчитать по номенклатуре: выручку без НДС, себестоимость реализации, валовую прибыль и маржинальность. Уточните месяц, квартал или год.", + "reply_type": "partial_coverage", + "message_id": "a-margin-clean", + "trace_id": "trace-margin-clean", + "debug": {"detected_intent": "inventory_margin_ranking_for_nomenclature"}, + }, + ] + ), + ) + report_file.parent.mkdir(parents=True, exist_ok=True) + report_file.write_text(f"# Assistant Stage 1 Eval Run\n\n- run_id: {run_id}\n", encoding="utf-8") + + review = reviewer.build_run_review( + run_id=run_id, + session_files=[session_file], + report_path=report_file, + ) + + self.assertNotIn("domain_leak_accounting_route", review["summary"]["issue_counts"]) + self.assertNotIn("accounting_contract_missing", review["summary"]["issue_counts"]) + self.assertNotIn("business_next_step_missing", review["summary"]["issue_counts"]) + if __name__ == "__main__": unittest.main()