Усилить агентный ревью-гейт маржинальности 1С

This commit is contained in:
dctouch 2026-05-23 22:35:47 +03:00
parent 6ddd61f975
commit f56bde3a45
3 changed files with 255 additions and 11 deletions

View File

@ -137,9 +137,68 @@ BUSINESS_TECHNICAL_GARBAGE_MARKERS = (
"probe ",
"query_movements",
"query_documents",
"surrogate-формул",
"vat-объект",
"truth gate",
"coverage",
"checked_sources_only",
"строк с суммой",
"строк в выборке",
"эвристический shortlist",
)
BUSINESS_DIRECT_ANSWER_SOFT_LIMIT = 1800
BUSINESS_LIMITED_ANSWER_MARKERS = (
"не удалось",
"не могу подтвердить",
"не подтвержден",
"не подтверждён",
"не хватает",
"нужен период",
"нужна организация",
"нужно уточнить",
"нельзя честно",
"нельзя построить",
"нет достаточной базы",
)
BUSINESS_NEXT_ACTION_MARKERS = (
"могу",
"уточните",
"следующий шаг",
"что проверить дальше",
"дальше",
"можно",
"предлагаю",
"показать найден",
"расширить",
"проверить 90",
"посчитать по",
"построить по",
)
NOMENCLATURE_MARGIN_EXPECTED_ANSWER_MARKERS = (
"период",
"выручк",
"себестоим",
"валов",
"марж",
)
NOMENCLATURE_MARGIN_WRONG_DOMAIN_ANSWER_MARKERS = (
"амортизац",
"основн",
"объект ос",
"карточк",
"оплата завис",
"зависш",
"закрытие расчет",
"закрытие расчёт",
"списание с расчетного",
"списание с расчётного",
"банковск",
"settlement",
"payment_document",
"unresolved settlement",
)
GUARDED_INSUFFICIENCY_PRIMARY_MARKERS = (
"\u0442\u043e\u0447\u043d\u044b\u0439",
"\u0442\u043e\u0447\u043d\u044b\u0435",
@ -188,8 +247,11 @@ DEFAULT_INVARIANT_SEVERITY: dict[str, str] = {
"business_direct_answer_missing": "P0",
"technical_garbage_in_answer": "P0",
"counterparty_value_flow_misrouted_to_company_profit": "P0",
"domain_leak_accounting_route": "P0",
"answer_layering_noise": "P1",
"accounting_contract_missing": "P1",
"business_answer_too_verbose": "P1",
"business_next_step_missing": "P2",
}
REPAIR_TARGET_SEVERITY_ORDER = {"P0": 0, "P1": 1, "P2": 2}
@ -1845,6 +1907,10 @@ def _marker_hits(text: str, markers: tuple[str, ...]) -> list[str]:
return [marker for marker in markers if marker and marker in lowered]
def _has_any_marker(text: str, markers: tuple[str, ...]) -> bool:
return bool(_marker_hits(text, markers))
def is_report_style_business_question(question: str) -> bool:
return bool(_marker_hits(question, BUSINESS_REPORT_REQUEST_MARKERS))
@ -1855,6 +1921,20 @@ def is_direct_style_business_question(question: str) -> bool:
return bool(_marker_hits(question, BUSINESS_DIRECT_QUESTION_MARKERS))
def is_nomenclature_margin_context(step_state: dict[str, Any], question: str) -> bool:
detected_intent = str(step_state.get("detected_intent") or "").strip()
capability_id = str(step_state.get("capability_id") or "").strip()
if detected_intent == "inventory_margin_ranking_for_nomenclature":
return True
if capability_id == "inventory_inventory_margin_ranking_for_nomenclature":
return True
lowered_question = _review_text(question)
has_subject = "номенклатур" in lowered_question or "товар" in lowered_question
has_margin_signal = any(marker in lowered_question for marker in ("прибыл", "марж", "реализован", "реализац"))
has_rank_signal = any(marker in lowered_question for marker in ("высок", "низк", "какая", "какие"))
return has_subject and has_margin_signal and has_rank_signal
def build_business_first_review(step_state: dict[str, Any]) -> dict[str, Any]:
question = str(step_state.get("question_resolved") or step_state.get("question_template") or "").strip()
assistant_text = str(step_state.get("assistant_text") or "")
@ -1882,11 +1962,38 @@ def build_business_first_review(step_state: dict[str, Any]) -> dict[str, Any]:
if too_verbose_for_direct:
issue_codes.append("business_answer_too_verbose")
limited_answer = _has_any_marker(assistant_text, BUSINESS_LIMITED_ANSWER_MARKERS)
has_next_action = _has_any_marker(assistant_text, BUSINESS_NEXT_ACTION_MARKERS)
nomenclature_margin_context = is_nomenclature_margin_context(step_state, question)
wrong_margin_domain_hits = (
_marker_hits(assistant_text, NOMENCLATURE_MARGIN_WRONG_DOMAIN_ANSWER_MARKERS)
if nomenclature_margin_context
else []
)
margin_contract_hits = (
_marker_hits(assistant_text, NOMENCLATURE_MARGIN_EXPECTED_ANSWER_MARKERS)
if nomenclature_margin_context
else []
)
if wrong_margin_domain_hits:
issue_codes.append("domain_leak_accounting_route")
if nomenclature_margin_context and len(set(margin_contract_hits)) < 2:
issue_codes.append("accounting_contract_missing")
if nomenclature_margin_context and limited_answer and not has_next_action:
issue_codes.append("business_next_step_missing")
root_cause_layers: list[str] = []
if "business_direct_answer_missing" in issue_codes or "answer_layering_noise" in issue_codes:
root_cause_layers.append("answer_shape_mismatch")
if "business_answer_too_verbose" in issue_codes or "technical_garbage_in_answer" in issue_codes:
root_cause_layers.append("business_utility_gap")
if "domain_leak_accounting_route" in issue_codes:
root_cause_layers.append("domain_purity_gap")
root_cause_layers.append("route_gap")
if "accounting_contract_missing" in issue_codes:
root_cause_layers.append("accounting_contract_gap")
if "business_next_step_missing" in issue_codes:
root_cause_layers.append("business_utility_gap")
return {
"schema_version": "business_first_step_review_v1",
@ -1903,8 +2010,13 @@ def build_business_first_review(step_state: dict[str, Any]) -> dict[str, Any]:
"top_line_scaffold_present": bool(scaffold_hits or top_noise),
"top_line_scaffold_hits": scaffold_hits,
"too_verbose_for_direct_question": too_verbose_for_direct,
"limited_answer_detected": limited_answer,
"next_action_present": has_next_action,
"nomenclature_margin_context": nomenclature_margin_context,
"domain_leak_hits": wrong_margin_domain_hits,
"accounting_contract_hits": margin_contract_hits,
"business_usefulness_ok": not issue_codes,
"issue_codes": issue_codes,
"issue_codes": list(dict.fromkeys(issue_codes)),
"suggested_root_cause_layers": list(dict.fromkeys(root_cause_layers)),
}
@ -1920,7 +2032,7 @@ def derive_invariant_severity(step_state: dict[str, Any], violation_code: str) -
overrides = step_state.get("invariant_severity")
if isinstance(overrides, dict):
override = str(overrides.get(violation_code) or "").strip().upper()
if override in {"P0", "P1", "WARNING"}:
if override in {"P0", "P1", "P2", "WARNING"}:
return override
return DEFAULT_INVARIANT_SEVERITY.get(violation_code, "P1")

View File

@ -351,16 +351,22 @@ BUSINESS_REVIEW_FINDING_MESSAGES = {
"technical_garbage_in_answer": "User-facing answer leaked internal runtime or MCP identifiers.",
"business_direct_answer_missing": "The answer did not put the direct business answer first.",
"counterparty_value_flow_misrouted_to_company_profit": "Counterparty received/paid/net flow question was answered with company profit instead of counterparty cashflow.",
"domain_leak_accounting_route": "The answer leaked into the wrong accounting domain for the user's business question.",
"accounting_contract_missing": "The answer did not expose the required accounting contract for the requested business calculation.",
"answer_layering_noise": "The answer opened with scaffolding or report framing instead of a clean business result.",
"business_answer_too_verbose": "The answer is too verbose for a direct business question.",
"business_next_step_missing": "A bounded or insufficient answer did not offer a useful next action.",
}
BUSINESS_REVIEW_FINDING_SEVERITY = {
"technical_garbage_in_answer": "critical",
"business_direct_answer_missing": "critical",
"counterparty_value_flow_misrouted_to_company_profit": "critical",
"domain_leak_accounting_route": "critical",
"accounting_contract_missing": "warning",
"answer_layering_noise": "critical",
"business_answer_too_verbose": "warning",
"business_next_step_missing": "warning",
}
@ -1191,15 +1197,20 @@ def build_business_review_summary(spec: dict[str, Any], scenario_state: dict[str
"suggested_root_cause_layers": business_review.get("suggested_root_cause_layers") or [],
}
)
failed = sum(
1
for step in steps
if any(
issue in {"technical_garbage_in_answer", "business_direct_answer_missing", "answer_layering_noise"}
for issue in step["issue_codes"]
)
)
warnings = sum(1 for step in steps if "business_answer_too_verbose" in step["issue_codes"])
failure_issues = {
"technical_garbage_in_answer",
"business_direct_answer_missing",
"answer_layering_noise",
"counterparty_value_flow_misrouted_to_company_profit",
"domain_leak_accounting_route",
}
warning_issues = {
"business_answer_too_verbose",
"accounting_contract_missing",
"business_next_step_missing",
}
failed = sum(1 for step in steps if any(issue in failure_issues for issue in step["issue_codes"]))
warnings = sum(1 for step in steps if any(issue in warning_issues for issue in step["issue_codes"]))
semantic_status = "fail" if failed or review_failures else ("warning" if warnings or review_warnings else "pass")
return {
"schema_version": "business_first_run_review_v1",

View File

@ -268,6 +268,127 @@ class AssistantStage1RunReviewTests(unittest.TestCase):
self.assertGreaterEqual(review["tag_counts"]["contextual_followup"], 3)
self.assertGreaterEqual(review["tag_counts"]["direct_business_question"], 2)
def test_review_flags_nomenclature_margin_answer_that_leaks_to_os_and_settlements(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
sessions_dir = root / "sessions"
reports_dir = root / "reports"
run_id = "assistant-stage1-margin-domain-leak"
session_file = sessions_dir / f"{run_id}-SAVED-001.json"
report_file = reports_dir / f"{run_id}.md"
write_json(
session_file,
session_payload(
[
{
"role": "user",
"text": "Какая номенклатура товара реализована с высокой прибылью какая с низкой",
},
{
"role": "assistant",
"text": "По объекту ОС видна амортизация и зависшая оплата. Проверьте карточку ОС и закрытие расчетов.",
"reply_type": "factual_with_explanation",
"message_id": "a-margin-leak",
"trace_id": "trace-margin-leak",
"debug": {"detected_intent": "inventory_margin_ranking_for_nomenclature"},
},
]
),
)
report_file.parent.mkdir(parents=True, exist_ok=True)
report_file.write_text(f"# Assistant Stage 1 Eval Run\n\n- run_id: {run_id}\n", encoding="utf-8")
review = reviewer.build_run_review(
run_id=run_id,
session_files=[session_file],
report_path=report_file,
)
self.assertEqual(review["summary"]["overall_business_status"], "fail")
self.assertIn("domain_leak_accounting_route", review["summary"]["issue_counts"])
target_by_issue = {item["issue_code"]: item for item in review["repair_targets"]}
self.assertEqual(target_by_issue["domain_leak_accounting_route"]["severity"], "P0")
def test_review_warns_when_limited_business_answer_has_no_next_action(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
sessions_dir = root / "sessions"
reports_dir = root / "reports"
run_id = "assistant-stage1-limited-no-next-action"
session_file = sessions_dir / f"{run_id}-SAVED-001.json"
report_file = reports_dir / f"{run_id}.md"
write_json(
session_file,
session_payload(
[
{
"role": "user",
"text": "Какая номенклатура товара реализована с высокой прибылью какая с низкой",
},
{
"role": "assistant",
"text": "За май 2020 рейтинг прибыльности номенклатуры построить нельзя: не подтверждена себестоимость реализации.",
"reply_type": "partial_coverage",
"message_id": "a-limited-no-next",
"trace_id": "trace-limited-no-next",
"debug": {"detected_intent": "inventory_margin_ranking_for_nomenclature"},
},
]
),
)
report_file.parent.mkdir(parents=True, exist_ok=True)
report_file.write_text(f"# Assistant Stage 1 Eval Run\n\n- run_id: {run_id}\n", encoding="utf-8")
review = reviewer.build_run_review(
run_id=run_id,
session_files=[session_file],
report_path=report_file,
)
self.assertIn("business_next_step_missing", review["summary"]["issue_counts"])
target_by_issue = {item["issue_code"]: item for item in review["repair_targets"]}
self.assertEqual(target_by_issue["business_next_step_missing"]["severity"], "P2")
def test_review_accepts_margin_clarification_with_accounting_contract_and_next_action(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
sessions_dir = root / "sessions"
reports_dir = root / "reports"
run_id = "assistant-stage1-margin-clarification-clean"
session_file = sessions_dir / f"{run_id}-SAVED-001.json"
report_file = reports_dir / f"{run_id}.md"
write_json(
session_file,
session_payload(
[
{
"role": "user",
"text": "Какая номенклатура товара реализована с высокой прибылью какая с низкой",
},
{
"role": "assistant",
"text": "Для рейтинга прибыльности нужен период. Могу посчитать по номенклатуре: выручку без НДС, себестоимость реализации, валовую прибыль и маржинальность. Уточните месяц, квартал или год.",
"reply_type": "partial_coverage",
"message_id": "a-margin-clean",
"trace_id": "trace-margin-clean",
"debug": {"detected_intent": "inventory_margin_ranking_for_nomenclature"},
},
]
),
)
report_file.parent.mkdir(parents=True, exist_ok=True)
report_file.write_text(f"# Assistant Stage 1 Eval Run\n\n- run_id: {run_id}\n", encoding="utf-8")
review = reviewer.build_run_review(
run_id=run_id,
session_files=[session_file],
report_path=report_file,
)
self.assertNotIn("domain_leak_accounting_route", review["summary"]["issue_counts"])
self.assertNotIn("accounting_contract_missing", review["summary"]["issue_counts"])
self.assertNotIn("business_next_step_missing", review["summary"]["issue_counts"])
if __name__ == "__main__":
unittest.main()