Ужесточить gate auto-coder по issue catalog

This commit is contained in:
dctouch 2026-05-24 12:48:17 +03:00
parent cd8e98bd3f
commit 81acca3332
3 changed files with 288 additions and 5 deletions

View File

@ -120,6 +120,7 @@
"severity": "P0",
"business_meaning": "Ответ не начинает с прямого бизнес-вывода, хотя пользователь задал прямой вопрос.",
"root_layers": ["answer_surface", "business_utility"],
"expected_answer_contract": "direct_answer_surface_v1",
"detectors": ["first_line_not_direct_answer", "top_level_scaffold_before_answer"],
"allowed_patch_targets": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
@ -135,12 +136,25 @@
"failed_scenario",
"direct_answer_surface_pack",
"accepted_smoke_pack"
],
"acceptance": {
"must_have": [
"direct_answer_first",
"business_meaning_before_service_context",
"honest_unknown_when_evidence_is_limited"
],
"must_not_have": [
"debug ids before answer",
"route ids before answer",
"service scaffold before answer"
]
}
},
"technical_garbage_in_answer": {
"severity": "P0",
"business_meaning": "Финальный ответ протащил debug/runtime/MCP-механику в пользовательскую поверхность.",
"root_layers": ["answer_surface", "business_utility"],
"expected_answer_contract": "technical_garbage_free_answer_v1",
"detectors": ["runtime_tokens_in_user_answer", "capability_ids_in_user_answer"],
"allowed_patch_targets": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
@ -155,7 +169,19 @@
"failed_scenario",
"technical_garbage_canary_pack",
"accepted_smoke_pack"
],
"acceptance": {
"must_have": [
"business_answer_only",
"evidence_summary_when_useful"
],
"must_not_have": [
"route ids",
"capability ids",
"debug payload",
"raw runtime enums"
]
}
},
"accounting_contract_missing": {
"severity": "P1",
@ -181,6 +207,7 @@
"severity": "P2",
"business_meaning": "Ограниченный ответ не предлагает полезный следующий шаг.",
"root_layers": ["answer_surface", "business_utility"],
"expected_answer_contract": "limited_answer_next_action_v1",
"detectors": ["limited_answer_without_next_action"],
"allowed_patch_targets": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
@ -194,7 +221,18 @@
"failed_scenario",
"limited_answer_pack",
"accepted_smoke_pack"
],
"acceptance": {
"must_have": [
"clear_limitation",
"next_action_if_limited",
"no_overclaiming"
],
"must_not_have": [
"fake evidence",
"masked route failure"
]
}
},
"route_candidate_enablement_gap": {
"severity": "P1",

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import argparse
import fnmatch
import json
import re
import subprocess
@ -44,6 +45,38 @@ AUTO_CODER_ALLOWED_ISSUE_CODES = {
"business_next_step_missing",
"technical_garbage_in_answer",
}
AUTO_CODER_BROAD_PATCH_TARGETS = {
".",
"./",
"*",
"**",
"llm_normalizer",
"llm_normalizer/",
"llm_normalizer/backend",
"llm_normalizer/backend/",
"llm_normalizer/backend/src",
"llm_normalizer/backend/src/",
"llm_normalizer/backend/src/services",
"llm_normalizer/backend/src/services/",
"scripts",
"scripts/",
"docs",
"docs/",
"docs/orchestration",
"docs/orchestration/",
}
AUTO_CODER_FORBIDDEN_ALLOWED_TARGET_MARKERS = (
"active_domain_contract",
"shared_llm_connection",
"promptbuilder",
"prompt_registry",
"mcp protocol",
"mcp runtime",
"fake data",
"fake fixtures",
"heuristic masking",
"global orchestration",
)
def load_shared_local_llm_defaults(config_path: Path | None = None) -> dict[str, Any]:
@ -947,6 +980,53 @@ def expected_answer_contract_for_issue(issue_code: str, step_output: dict[str, A
return None
def issue_acceptance_contract_name(issue_code: str, catalog_entry: dict[str, Any]) -> str | None:
expected_contract = str(catalog_entry.get("expected_answer_contract") or "").strip()
if expected_contract:
return expected_contract
acceptance = catalog_entry.get("acceptance")
if isinstance(acceptance, dict) and (
normalize_string_list(acceptance.get("must_have")) or normalize_string_list(acceptance.get("must_not_have"))
):
return f"issue_acceptance:{issue_code}"
return None
def normalize_patch_target(value: Any) -> str:
return str(value or "").strip().replace("\\", "/")
def patch_target_matches_catalog(target: str, catalog_targets: list[str]) -> bool:
normalized_target = normalize_patch_target(target)
if not normalized_target:
return False
for raw_catalog_target in catalog_targets:
catalog_target = normalize_patch_target(raw_catalog_target)
if not catalog_target:
continue
if normalized_target == catalog_target:
return True
if any(marker in catalog_target for marker in ("*", "?", "[")) and fnmatch.fnmatch(
normalized_target,
catalog_target,
):
return True
if catalog_target.endswith("/**") and normalized_target.startswith(catalog_target[:-3].rstrip("/") + "/"):
return True
return False
def is_broad_auto_coder_patch_target(value: Any) -> bool:
normalized = normalize_patch_target(value).lower()
if not normalized or normalized in AUTO_CODER_BROAD_PATCH_TARGETS:
return True
if normalized.endswith("/**"):
return True
if normalized.count("/") < 2 and ("*" in normalized or normalized.endswith("/")):
return True
return any(marker in normalized for marker in AUTO_CODER_FORBIDDEN_ALLOWED_TARGET_MARKERS)
def evidence_paths_for_step(scenario_dir: Path, step_id: str) -> list[str]:
step_dir = scenario_dir / "steps" / step_id
candidates = [
@ -4501,6 +4581,7 @@ def evaluate_auto_coder_gate(
repair_targets: dict[str, Any],
assigned_focus: dict[str, Any] | None,
) -> dict[str, Any]:
catalog = load_issue_catalog()
issue_codes = normalize_string_list((assigned_focus or {}).get("issue_codes"))
root_layers = normalize_string_list((assigned_focus or {}).get("root_cause_layers"))
allowed_patch_targets = normalize_string_list((assigned_focus or {}).get("allowed_patch_targets"))
@ -4508,6 +4589,9 @@ def evaluate_auto_coder_gate(
rerun_matrix = normalize_string_list((assigned_focus or {}).get("rerun_matrix"))
focus_id = str((assigned_focus or {}).get("focus_id") or "").strip() or None
blocking_reasons: list[str] = []
catalog_allowed_patch_targets: list[str] = []
catalog_forbidden_patch_targets: list[str] = []
issue_catalog_contracts: dict[str, Any] = {}
if not assigned_focus:
blocking_reasons.append("missing_assigned_focus")
@ -4516,6 +4600,41 @@ def evaluate_auto_coder_gate(
for issue_code in issue_codes:
if issue_code not in AUTO_CODER_ALLOWED_ISSUE_CODES:
blocking_reasons.append(f"issue_code_not_allowlisted:{issue_code}")
catalog_entry = issue_catalog_entry(issue_code, catalog)
if not catalog_entry:
blocking_reasons.append(f"issue_code_missing_from_catalog:{issue_code}")
continue
catalog_root_layers = normalize_string_list(catalog_entry.get("root_layers"))
catalog_allowed = normalize_string_list(catalog_entry.get("allowed_patch_targets"))
catalog_forbidden = normalize_string_list(catalog_entry.get("forbidden_patch_targets"))
catalog_rerun = normalize_string_list(catalog_entry.get("rerun_matrix"))
expected_contract = issue_acceptance_contract_name(issue_code, catalog_entry)
if not catalog_root_layers:
blocking_reasons.append(f"catalog_missing_root_layers:{issue_code}")
if not catalog_allowed:
blocking_reasons.append(f"catalog_missing_allowed_patch_targets:{issue_code}")
if not catalog_forbidden:
blocking_reasons.append(f"catalog_missing_forbidden_patch_targets:{issue_code}")
if not catalog_rerun:
blocking_reasons.append(f"catalog_missing_rerun_matrix:{issue_code}")
if catalog_rerun and "accepted_smoke_pack" not in catalog_rerun:
blocking_reasons.append(f"catalog_missing_accepted_smoke_pack:{issue_code}")
if not expected_contract:
blocking_reasons.append(f"catalog_missing_expected_answer_contract:{issue_code}")
for target in catalog_allowed:
if target not in catalog_allowed_patch_targets:
catalog_allowed_patch_targets.append(target)
for target in catalog_forbidden:
if target not in catalog_forbidden_patch_targets:
catalog_forbidden_patch_targets.append(target)
issue_catalog_contracts[issue_code] = {
"severity": catalog_entry.get("severity"),
"root_layers": catalog_root_layers,
"expected_answer_contract": expected_contract,
"allowed_patch_targets": catalog_allowed,
"forbidden_patch_targets": catalog_forbidden,
"rerun_matrix": catalog_rerun,
}
if not root_layers:
blocking_reasons.append("missing_root_layers")
if not allowed_patch_targets:
@ -4526,6 +4645,15 @@ def evaluate_auto_coder_gate(
blocking_reasons.append("missing_rerun_matrix")
if rerun_matrix and "accepted_smoke_pack" not in rerun_matrix:
blocking_reasons.append("missing_accepted_smoke_pack")
for patch_target in allowed_patch_targets:
if is_broad_auto_coder_patch_target(patch_target):
blocking_reasons.append(f"broad_allowed_patch_target:{patch_target}")
if catalog_allowed_patch_targets and not patch_target_matches_catalog(patch_target, catalog_allowed_patch_targets):
blocking_reasons.append(f"allowed_patch_target_not_in_catalog:{patch_target}")
normalized_focus_forbidden = {normalize_patch_target(item) for item in forbidden_patch_targets}
for patch_target in catalog_forbidden_patch_targets:
if normalize_patch_target(patch_target) not in normalized_focus_forbidden:
blocking_reasons.append(f"missing_catalog_forbidden_patch_target:{patch_target}")
target_items = repair_targets.get("targets") if isinstance(repair_targets.get("targets"), list) else []
focus_target_ids = set(normalize_string_list((assigned_focus or {}).get("target_ids")))
@ -4541,12 +4669,39 @@ def evaluate_auto_coder_gate(
target_issue = str(target.get("issue_code") or "").strip()
if not target_issue:
blocking_reasons.append(f"target_missing_issue_code:{target_id}")
elif target_issue not in issue_codes:
blocking_reasons.append(f"target_issue_not_in_focus:{target_id}:{target_issue}")
target_catalog_entry = issue_catalog_entry(target_issue, catalog) if target_issue else {}
target_expected_contract = expected_answer_contract_for_issue(
target_issue,
target,
target_catalog_entry,
) or issue_acceptance_contract_name(target_issue, target_catalog_entry)
if not normalize_string_list(target.get("root_cause_layers")):
blocking_reasons.append(f"target_missing_root_layers:{target_id}")
if not target_expected_contract:
blocking_reasons.append(f"target_missing_expected_answer_contract:{target_id}")
if not normalize_string_list(target.get("evidence_paths")):
blocking_reasons.append(f"target_missing_evidence_paths:{target_id}")
if not normalize_string_list(target.get("allowed_patch_targets")):
blocking_reasons.append(f"target_missing_allowed_patch_targets:{target_id}")
for patch_target in normalize_string_list(target.get("allowed_patch_targets")):
if is_broad_auto_coder_patch_target(patch_target):
blocking_reasons.append(f"target_broad_allowed_patch_target:{target_id}:{patch_target}")
if catalog_allowed_patch_targets and not patch_target_matches_catalog(patch_target, catalog_allowed_patch_targets):
blocking_reasons.append(f"target_allowed_patch_target_not_in_catalog:{target_id}:{patch_target}")
if not normalize_string_list(target.get("forbidden_patch_targets")):
blocking_reasons.append(f"target_missing_forbidden_patch_targets:{target_id}")
normalized_target_forbidden = {
normalize_patch_target(item) for item in normalize_string_list(target.get("forbidden_patch_targets"))
}
for patch_target in normalize_string_list(target_catalog_entry.get("forbidden_patch_targets")):
if normalize_patch_target(patch_target) not in normalized_target_forbidden:
blocking_reasons.append(f"target_missing_catalog_forbidden_patch_target:{target_id}:{patch_target}")
if not normalize_string_list(target.get("rerun_matrix")):
blocking_reasons.append(f"target_missing_rerun_matrix:{target_id}")
elif "accepted_smoke_pack" not in normalize_string_list(target.get("rerun_matrix")):
blocking_reasons.append(f"target_missing_accepted_smoke_pack:{target_id}")
allowed = not blocking_reasons
return {
@ -4560,12 +4715,16 @@ def evaluate_auto_coder_gate(
"forbidden_patch_targets": forbidden_patch_targets,
"rerun_matrix": rerun_matrix,
"allowlisted_issue_codes": sorted(AUTO_CODER_ALLOWED_ISSUE_CODES),
"issue_catalog_contracts": issue_catalog_contracts,
"blocking_reasons": blocking_reasons,
"reason": "auto_coder_gate_passed" if allowed else ";".join(blocking_reasons),
"policy": {
"auto_coder_default": False,
"requires_issue_catalog_contract": True,
"requires_expected_answer_contract": True,
"requires_target_evidence_paths": True,
"requires_accepted_smoke_pack": True,
"requires_catalog_limited_patch_scope": True,
"lead_owns_merge_and_acceptance": True,
},
}

View File

@ -181,8 +181,11 @@ class DomainCaseLoopLeadHandoffTests(unittest.TestCase):
{
"target_id": "pack:s01",
"issue_code": "business_direct_answer_missing",
"root_cause_layers": ["answer_surface"],
"expected_business_answer_contract": "direct_answer_surface_v1",
"evidence_paths": ["artifacts/domain_runs/pack/steps/s01/output.md"],
"allowed_patch_targets": ["llm_normalizer/backend/src/services/address_runtime/composeStage.ts"],
"forbidden_patch_targets": ["routing rewrites"],
"forbidden_patch_targets": ["routing rewrites", "fake evidence", "global runtime rewrite"],
"rerun_matrix": ["failed_scenario", "direct_answer_surface_pack", "accepted_smoke_pack"],
}
],
@ -192,7 +195,7 @@ class DomainCaseLoopLeadHandoffTests(unittest.TestCase):
"issue_codes": ["business_direct_answer_missing"],
"root_cause_layers": ["answer_surface"],
"allowed_patch_targets": ["llm_normalizer/backend/src/services/address_runtime/composeStage.ts"],
"forbidden_patch_targets": ["routing rewrites"],
"forbidden_patch_targets": ["routing rewrites", "fake evidence", "global runtime rewrite"],
"rerun_matrix": ["failed_scenario", "direct_answer_surface_pack", "accepted_smoke_pack"],
"target_ids": ["pack:s01"],
}
@ -201,6 +204,89 @@ class DomainCaseLoopLeadHandoffTests(unittest.TestCase):
self.assertTrue(gate["allowed"])
self.assertEqual(gate["reason"], "auto_coder_gate_passed")
self.assertEqual(
gate["issue_catalog_contracts"]["business_direct_answer_missing"]["expected_answer_contract"],
"direct_answer_surface_v1",
)
def test_auto_coder_gate_blocks_broad_or_blind_patch_scope(self) -> None:
repair_targets = {
"targets": [
{
"target_id": "pack:s01",
"issue_code": "business_direct_answer_missing",
"root_cause_layers": ["answer_surface"],
"expected_business_answer_contract": "direct_answer_surface_v1",
"allowed_patch_targets": ["llm_normalizer/backend/src/services/"],
"forbidden_patch_targets": ["routing rewrites", "fake evidence", "global runtime rewrite"],
"rerun_matrix": ["failed_scenario", "direct_answer_surface_pack", "accepted_smoke_pack"],
}
],
}
assigned_focus = {
"focus_id": "answer_shape|services",
"issue_codes": ["business_direct_answer_missing"],
"root_cause_layers": ["answer_surface"],
"allowed_patch_targets": ["llm_normalizer/backend/src/services/"],
"forbidden_patch_targets": ["routing rewrites", "fake evidence", "global runtime rewrite"],
"rerun_matrix": ["failed_scenario", "direct_answer_surface_pack", "accepted_smoke_pack"],
"target_ids": ["pack:s01"],
}
gate = dcl.evaluate_auto_coder_gate(repair_targets, assigned_focus)
self.assertFalse(gate["allowed"])
self.assertIn("broad_allowed_patch_target:llm_normalizer/backend/src/services/", gate["blocking_reasons"])
self.assertIn("target_missing_evidence_paths:pack:s01", gate["blocking_reasons"])
def test_auto_coder_gate_blocks_catalog_issue_without_answer_contract(self) -> None:
original_load_issue_catalog = dcl.load_issue_catalog
dcl.load_issue_catalog = lambda: {
"schema_version": "agent_issue_catalog_v1",
"issues": {
"business_direct_answer_missing": {
"severity": "P0",
"root_layers": ["answer_surface"],
"allowed_patch_targets": ["llm_normalizer/backend/src/services/address_runtime/composeStage.ts"],
"forbidden_patch_targets": ["routing rewrites"],
"rerun_matrix": ["failed_scenario", "accepted_smoke_pack"],
}
},
}
try:
repair_targets = {
"targets": [
{
"target_id": "pack:s01",
"issue_code": "business_direct_answer_missing",
"root_cause_layers": ["answer_surface"],
"expected_business_answer_contract": "direct_answer_surface_v1",
"evidence_paths": ["artifacts/domain_runs/pack/steps/s01/output.md"],
"allowed_patch_targets": ["llm_normalizer/backend/src/services/address_runtime/composeStage.ts"],
"forbidden_patch_targets": ["routing rewrites"],
"rerun_matrix": ["failed_scenario", "accepted_smoke_pack"],
}
],
}
assigned_focus = {
"focus_id": "answer_shape|composeStage",
"issue_codes": ["business_direct_answer_missing"],
"root_cause_layers": ["answer_surface"],
"allowed_patch_targets": ["llm_normalizer/backend/src/services/address_runtime/composeStage.ts"],
"forbidden_patch_targets": ["routing rewrites"],
"rerun_matrix": ["failed_scenario", "accepted_smoke_pack"],
"target_ids": ["pack:s01"],
}
gate = dcl.evaluate_auto_coder_gate(repair_targets, assigned_focus)
finally:
dcl.load_issue_catalog = original_load_issue_catalog
self.assertFalse(gate["allowed"])
self.assertIn(
"catalog_missing_expected_answer_contract:business_direct_answer_missing",
gate["blocking_reasons"],
)
def test_analyst_priority_targets_become_lead_repair_targets(self) -> None:
repair_targets = {