ОРРКЕСТРАЦИЯ - Оркестрация домена: запретить ложный accepted при живых repair targets

This commit is contained in:
dctouch 2026-04-14 23:27:48 +03:00
parent 82a020e302
commit 5934f5f3fc
2 changed files with 521 additions and 20 deletions

View File

@ -60,6 +60,60 @@ DEFAULT_INVARIANT_SEVERITY: dict[str, str] = {
"top_level_noise_present": "P0",
}
REPAIR_TARGET_SEVERITY_ORDER = {"P0": 0, "P1": 1, "P2": 2}
REPAIR_TARGET_FILE_HINTS: dict[str, list[str]] = {
"followup_action_resolution_gap": [
"llm_normalizer/backend/src/services/addressIntentResolver.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"object_memory_gap": [
"llm_normalizer/backend/src/services/addressNavigationState.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"edge_carryover_gap": [
"llm_normalizer/backend/src/services/addressNavigationState.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/addressFilterExtractor.ts",
],
"temporal_honesty_gap": [
"llm_normalizer/backend/src/services/addressFilterExtractor.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
],
"route_gap": [
"llm_normalizer/backend/src/services/addressQueryClassifier.ts",
"llm_normalizer/backend/src/services/addressIntentResolver.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"capability_gap": [
"llm_normalizer/backend/src/services/addressCapabilityPolicy.ts",
"llm_normalizer/backend/src/services/addressRecipeCatalog.ts",
"llm_normalizer/backend/src/services/addressQueryService.ts",
],
"presentation_gap": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"evidence_gap": [
"llm_normalizer/backend/src/services/addressQueryService.ts",
"llm_normalizer/backend/src/services/addressRecipeCatalog.ts",
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
],
"domain_anchor_gap": [
"llm_normalizer/backend/src/services/addressQueryService.ts",
"llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts",
"llm_normalizer/backend/src/services/addressNavigationState.ts",
],
"other": [
"llm_normalizer/backend/src/services/addressQueryService.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
}
def dump_json(payload: Any) -> str:
return json.dumps(payload, ensure_ascii=False, indent=2)
@ -2525,27 +2579,311 @@ def compact_step_output_for_review(step_output: Any) -> dict[str, Any]:
}
def build_pack_review_bundle(pack_dir: Path) -> str:
pack_state = read_json_file(pack_dir / "pack_state.json") if (pack_dir / "pack_state.json").exists() else {}
def collect_pack_scenario_artifacts(pack_dir: Path) -> list[dict[str, Any]]:
scenarios_root = pack_dir / "scenarios"
scenarios_bundle: list[dict[str, Any]] = []
if scenarios_root.exists():
artifacts: list[dict[str, Any]] = []
if not scenarios_root.exists():
return artifacts
for scenario_dir in sorted(path for path in scenarios_root.iterdir() if path.is_dir()):
scenario_state = read_json_file(scenario_dir / "scenario_state.json") if (scenario_dir / "scenario_state.json").exists() else {}
step_outputs_raw = scenario_state.get("step_outputs")
artifacts.append(
{
"scenario_id": scenario_state.get("scenario_id") or scenario_dir.name,
"title": scenario_state.get("title"),
"session_id": scenario_state.get("session_id"),
"artifact_dir": str(scenario_dir),
"summary": read_text_file(scenario_dir / "scenario_summary.md") if (scenario_dir / "scenario_summary.md").exists() else "",
"scenario_state": scenario_state,
}
)
return artifacts
def derive_repair_target_severity(step_output: dict[str, Any]) -> str:
if bool(step_output.get("hard_fail")):
return "P0"
violated_invariants = normalize_string_list(step_output.get("violated_invariants"))
if any(derive_invariant_severity(step_output, code) == "P0" for code in violated_invariants):
return "P0"
execution_status = str(step_output.get("execution_status") or "").strip()
acceptance_status = str(step_output.get("acceptance_status") or step_output.get("status") or "").strip()
reply_type = str(step_output.get("reply_type") or "").strip()
if execution_status == "blocked":
return "P0"
if acceptance_status in {"rejected", "needs_exact_capability"}:
return "P1"
if execution_status in {"partial", "needs_exact_capability"} or reply_type == "partial_coverage":
return "P1"
if normalize_string_list(step_output.get("warnings")):
return "P2"
return "P2"
def derive_repair_problem_type(step_output: dict[str, Any]) -> str:
violated = set(normalize_string_list(step_output.get("violated_invariants")))
execution_status = str(step_output.get("execution_status") or "").strip()
acceptance_status = str(step_output.get("acceptance_status") or step_output.get("status") or "").strip()
reply_type = str(step_output.get("reply_type") or "").strip()
fallback_type = str(step_output.get("fallback_type") or "").strip()
mcp_call_status = str(step_output.get("mcp_call_status") or "").strip()
if "wrong_followup_action" in violated:
return "followup_action_resolution_gap"
if "focus_object_missing" in violated:
return "object_memory_gap"
if "wrong_date_scope_state" in violated:
return "edge_carryover_gap"
if {"wrong_as_of_date", "wrong_period_from", "wrong_period_to"} & violated:
return "temporal_honesty_gap"
if {
"wrong_intent",
"wrong_capability",
"wrong_recipe",
"wrong_result_mode",
"forbidden_capability_selected",
"forbidden_recipe_selected",
} & violated:
return "route_gap"
if {"direct_answer_missing", "top_level_noise_present"} & violated:
return "presentation_gap"
if mcp_call_status == "materialized_but_not_anchor_matched":
return "domain_anchor_gap"
if acceptance_status == "needs_exact_capability" or execution_status == "needs_exact_capability":
return "capability_gap"
if reply_type in {"partial_coverage", "clarification_required", "route_mismatch_blocked"} or fallback_type == "partial":
return "evidence_gap"
return "other"
def derive_repair_root_cause_layers(step_output: dict[str, Any], problem_type: str) -> list[str]:
violated = set(normalize_string_list(step_output.get("violated_invariants")))
layers: list[str] = []
if problem_type == "followup_action_resolution_gap":
layers.append("followup_action_resolution_gap")
if "focus_object_missing" in violated:
layers.append("object_memory_gap")
elif problem_type == "object_memory_gap":
layers.append("object_memory_gap")
elif problem_type == "edge_carryover_gap":
layers.append("edge_carryover_gap")
if "wrong_as_of_date" in violated or "wrong_period_from" in violated or "wrong_period_to" in violated:
layers.append("temporal_honesty_gap")
elif problem_type == "temporal_honesty_gap":
layers.append("temporal_honesty_gap")
if "wrong_date_scope_state" in violated:
layers.append("edge_carryover_gap")
elif problem_type == "route_gap":
layers.append("semantic_understanding_gap")
elif problem_type == "capability_gap":
layers.append("runtime_capability_gap")
elif problem_type == "presentation_gap":
layers.append("business_utility_gap")
if str(step_output.get("required_answer_shape") or "").strip():
layers.append("answer_shape_mismatch")
elif problem_type == "evidence_gap":
layers.append("runtime_capability_gap")
elif problem_type == "domain_anchor_gap":
layers.append("domain_anchor_gap")
else:
layers.append("other")
return list(dict.fromkeys(layers))
def build_repair_fix_goal(step_output: dict[str, Any], problem_type: str) -> str:
question = str(step_output.get("question_resolved") or step_output.get("title") or step_output.get("step_id") or "").strip()
if problem_type == "followup_action_resolution_gap":
return f"Resolve `{question}` on the current business object and keep the requested micro-action instead of drifting to another drilldown."
if problem_type == "object_memory_gap":
return f"Preserve the selected business object for `{question}` so the follow-up resolves without re-anchoring from scratch."
if problem_type == "edge_carryover_gap":
return f"Carry forward the selected-object state and historical date scope into `{question}` without resetting the follow-up context."
if problem_type == "temporal_honesty_gap":
return f"Keep `{question}` on the requested historical date/period and separate exact-window evidence from nearest available out-of-window evidence."
if problem_type == "route_gap":
return f"Keep `{question}` on the expected exact route/capability instead of letting wording drift into a different semantic lane."
if problem_type == "capability_gap":
return f"Enable an exact route for `{question}` so the loop no longer falls back to partial or unsupported behavior."
if problem_type == "presentation_gap":
return f"Make `{question}` answer-first: direct business answer in the first line, proof second, service notes last."
if problem_type == "evidence_gap":
return f"Return grounded evidence for `{question}` instead of a limited empty response when the correct route already fires."
if problem_type == "domain_anchor_gap":
return f"Match the selected business anchor for `{question}` against materialized rows so the exact route returns a grounded answer instead of an anchor-mismatch limit."
return f"Improve `{question}` with the smallest patch that removes the current acceptance failure without architecture drift."
def build_step_repair_target(
*,
scenario_id: str,
scenario_title: str,
scenario_dir: Path,
step_id: str,
step_output: dict[str, Any],
) -> dict[str, Any] | None:
acceptance_status = str(step_output.get("acceptance_status") or step_output.get("status") or "").strip() or "unknown"
execution_status = str(step_output.get("execution_status") or "").strip() or "unknown"
violated_invariants = normalize_string_list(step_output.get("violated_invariants"))
warnings = normalize_string_list(step_output.get("warnings"))
if acceptance_status in {"validated", "accepted"} and execution_status == "exact" and not violated_invariants and not warnings:
return None
problem_type = derive_repair_problem_type(step_output)
severity = derive_repair_target_severity(step_output)
root_cause_layers = derive_repair_root_cause_layers(step_output, problem_type)
step_state_path = scenario_dir / "steps" / step_id / "step_state.json"
signals: list[str] = []
for field_name in ("reply_type", "fallback_type", "mcp_call_status", "selected_recipe", "capability_id"):
value = str(step_output.get(field_name) or "").strip()
if value:
signals.append(f"{field_name}={value}")
for violation in violated_invariants:
signals.append(f"violation={violation}")
for warning in warnings[:3]:
signals.append(f"warning={warning}")
return {
"target_id": f"{scenario_id}:{step_id}",
"scenario_id": scenario_id,
"scenario_title": scenario_title,
"step_id": step_id,
"step_title": str(step_output.get("title") or "").strip() or None,
"question_resolved": str(step_output.get("question_resolved") or "").strip() or None,
"severity": severity,
"problem_type": problem_type,
"root_cause_layers": root_cause_layers,
"execution_status": execution_status,
"acceptance_status": acceptance_status,
"violated_invariants": violated_invariants,
"fix_goal": build_repair_fix_goal(step_output, problem_type),
"candidate_files": REPAIR_TARGET_FILE_HINTS.get(problem_type, REPAIR_TARGET_FILE_HINTS["other"]),
"signals": signals,
"artifact_refs": {
"scenario_dir": str(scenario_dir),
"step_state_json": str(step_state_path),
},
}
def build_deterministic_repair_targets(
pack_state: dict[str, Any],
scenario_artifacts: list[dict[str, Any]],
) -> dict[str, Any]:
targets: list[dict[str, Any]] = []
for scenario_artifact in scenario_artifacts:
scenario_id = str(scenario_artifact.get("scenario_id") or "").strip()
scenario_title = str(scenario_artifact.get("title") or "").strip()
scenario_dir = Path(str(scenario_artifact.get("artifact_dir") or ""))
scenario_state = scenario_artifact.get("scenario_state")
if not isinstance(scenario_state, dict):
continue
step_outputs = scenario_state.get("step_outputs")
if not isinstance(step_outputs, dict):
continue
for step_id, raw_step_output in step_outputs.items():
if not isinstance(raw_step_output, dict):
continue
target = build_step_repair_target(
scenario_id=scenario_id,
scenario_title=scenario_title,
scenario_dir=scenario_dir,
step_id=str(step_id),
step_output=raw_step_output,
)
if target:
targets.append(target)
targets.sort(
key=lambda item: (
REPAIR_TARGET_SEVERITY_ORDER.get(str(item.get("severity") or "P2"), 99),
str(item.get("scenario_id") or ""),
str(item.get("step_id") or ""),
)
)
severity_counts = {"P0": 0, "P1": 0, "P2": 0}
for target in targets:
severity = str(target.get("severity") or "P2")
if severity in severity_counts:
severity_counts[severity] += 1
return {
"schema_version": "domain_pack_repair_targets_v1",
"pack_id": pack_state.get("pack_id"),
"domain": pack_state.get("domain"),
"final_status": pack_state.get("final_status"),
"target_count": len(targets),
"severity_counts": severity_counts,
"targets": targets,
}
def build_repair_targets_summary(repair_targets: dict[str, Any]) -> str:
lines = [
"# Repair targets",
"",
f"- pack_id: `{repair_targets.get('pack_id') or 'n/a'}`",
f"- domain: `{repair_targets.get('domain') or 'n/a'}`",
f"- target_count: `{repair_targets.get('target_count') or 0}`",
f"- severity_counts: `{dump_json(repair_targets.get('severity_counts') or {})}`",
"",
"## Targets",
]
for target in repair_targets.get("targets") or []:
if not isinstance(target, dict):
continue
lines.extend(
[
f"- `{target.get('target_id')}`",
f" severity: `{target.get('severity')}`",
f" problem_type: `{target.get('problem_type')}`",
f" root_cause_layers: {', '.join(target.get('root_cause_layers') or []) or 'none'}",
f" fix_goal: {target.get('fix_goal') or 'n/a'}",
f" candidate_files: {', '.join(target.get('candidate_files') or []) or 'none'}",
]
)
return "\n".join(lines).strip() + "\n"
def evaluate_deterministic_loop_gate(
pack_state: dict[str, Any],
repair_targets: dict[str, Any],
) -> tuple[bool, str]:
pack_final_status = str(pack_state.get("final_status") or "").strip() or "partial"
if pack_final_status != "accepted":
return False, f"pack_final_status={pack_final_status}"
severity_counts = repair_targets.get("severity_counts") if isinstance(repair_targets, dict) else {}
if isinstance(severity_counts, dict):
p0_count = int(severity_counts.get("P0") or 0)
p1_count = int(severity_counts.get("P1") or 0)
if p0_count > 0 or p1_count > 0:
return False, f"repair_targets_remaining=P0:{p0_count},P1:{p1_count}"
return True, "deterministic_gate_passed"
def build_pack_review_bundle(pack_dir: Path) -> str:
pack_state = read_json_file(pack_dir / "pack_state.json") if (pack_dir / "pack_state.json").exists() else {}
scenario_artifacts = collect_pack_scenario_artifacts(pack_dir)
scenarios_bundle: list[dict[str, Any]] = []
for scenario_artifact in scenario_artifacts:
scenario_state = scenario_artifact.get("scenario_state") if isinstance(scenario_artifact.get("scenario_state"), dict) else {}
step_outputs_raw = scenario_state.get("step_outputs") if isinstance(scenario_state, dict) else {}
compact_steps: dict[str, Any] = {}
if isinstance(step_outputs_raw, dict):
for step_id, step_output in step_outputs_raw.items():
compact_steps[str(step_id)] = compact_step_output_for_review(step_output)
scenarios_bundle.append(
{
"scenario_id": scenario_state.get("scenario_id") or scenario_dir.name,
"title": scenario_state.get("title"),
"session_id": scenario_state.get("session_id"),
"summary": read_text_file(scenario_dir / "scenario_summary.md") if (scenario_dir / "scenario_summary.md").exists() else "",
"scenario_id": scenario_artifact.get("scenario_id"),
"title": scenario_artifact.get("title"),
"session_id": scenario_artifact.get("session_id"),
"artifact_dir": scenario_artifact.get("artifact_dir"),
"summary": scenario_artifact.get("summary") or "",
"step_outputs": compact_steps,
}
)
repair_targets = (
read_json_file(pack_dir / "repair_targets.json")
if (pack_dir / "repair_targets.json").exists()
else build_deterministic_repair_targets(pack_state, scenario_artifacts)
)
bundle = {
"pack_state": {
"pack_id": pack_state.get("pack_id"),
@ -2562,6 +2900,7 @@ def build_pack_review_bundle(pack_dir: Path) -> str:
if (pack_dir / "scenario_acceptance_matrix.md").exists()
else ""
),
"deterministic_repair_targets": repair_targets,
"scenarios": scenarios_bundle,
}
return dump_json(bundle)
@ -2586,10 +2925,12 @@ def build_analyst_loop_prompt(
loop_dir: Path,
iteration_dir: Path,
pack_dir: Path,
repair_targets_path: Path,
previous_pack_dir: Path | None,
previous_verdict_path: Path | None,
target_score: int,
review_bundle_json: str,
repair_targets_json: str,
previous_verdict_json: str | None,
) -> str:
comparison_block = ""
@ -2633,6 +2974,7 @@ def build_analyst_loop_prompt(
- `{pack_dir / 'pack_summary.md'}`
- `{pack_dir / 'pack_state.json'}`
- `{pack_dir / 'scenario_acceptance_matrix.md'}`
- `{repair_targets_path}`
- all `scenario_summary.md`, `scenario_state.json`, and problematic `steps/*/step_state.json` files inside `{pack_dir / 'scenarios'}`
Goal:
@ -2645,6 +2987,7 @@ def build_analyst_loop_prompt(
Rules:
- `accepted` is allowed only if quality_score >= {target_score}, unresolved_p0_count = 0, and regression_detected = false;
- `accepted` is forbidden if the evidence bundle shows `pack_state.final_status != accepted` or the deterministic repair targets still contain any `P0` or `P1` items;
- `accepted` also requires `direct_answer_ok = true`, `business_usefulness_ok = true`, `temporal_honesty_ok = true`, and `field_truth_ok = true`;
- `partial` means the pack is usable but exactness, routing, or coverage is still insufficient;
- `needs_exact_capability` means the primary blocker is a missing exact route or capability, but the loop should still continue autonomously unless a user decision is required;
@ -2673,6 +3016,11 @@ def build_analyst_loop_prompt(
```json
{review_bundle_json}
```
Deterministic repair targets:
```json
{repair_targets_json}
```
{previous_verdict_block}
Return JSON only and follow the schema exactly.
@ -2685,6 +3033,8 @@ def build_coder_loop_prompt(
loop_dir: Path,
iteration_dir: Path,
pack_dir: Path,
repair_targets_path: Path,
repair_targets_json: str,
analyst_verdict_path: Path,
analyst_verdict_json: str,
) -> str:
@ -2700,6 +3050,7 @@ def build_coder_loop_prompt(
- loop_dir: `{loop_dir}`
- iteration_dir: `{iteration_dir}`
- current_pack_dir: `{pack_dir}`
- deterministic_repair_targets: `{repair_targets_path}`
- analyst_verdict_json: `{analyst_verdict_path}`
Make the smallest domain-only patch in the working tree that improves the failing or partial scenarios named in the analyst verdict.
@ -2711,6 +3062,8 @@ def build_coder_loop_prompt(
- do not touch unrelated files;
- preserve already successful baseline flows.
- use `root_cause_layers`, `broken_edge_ids`, `violated_invariants`, and business-utility scores from the analyst verdict to choose the smallest fix;
- use the deterministic repair targets to choose the narrowest failing edge before touching broader scenarios;
- if the analyst verdict is optimistic but deterministic repair targets still contain `P0` or `P1`, trust the deterministic repair targets and keep fixing the pack;
- prioritize state continuity, selected-object persistence, stable `focus_object`, stable `answer_object`, reusable `provenance_bundle` / `sale_trace_bundle`, action-first answer behavior, compact micro-action answers, answer layering, temporal honesty, and field-truth mapping when those are the blocking layers;
- do not broaden scope when the analyst says the defect is mainly `object_memory_gap`, `followup_action_resolution_gap`, `bundle_reuse_gap`, `field_mapping_gap`, `temporal_honesty_gap`, `answer_shape_mismatch`, or `business_utility_gap`;
- when the verdict points to pronoun follow-ups or item-centric drilldowns, prefer a narrow object-state or follow-up-action fix over prompt inflation.
@ -2724,6 +3077,11 @@ def build_coder_loop_prompt(
{analyst_verdict_json}
```
Deterministic repair targets JSON:
```json
{repair_targets_json}
```
- then return JSON only and follow the schema exactly.
"""
).strip()
@ -2821,8 +3179,12 @@ def handle_run_pack(args: argparse.Namespace) -> int:
"final_status": final_status,
"updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
}
scenario_artifacts = collect_pack_scenario_artifacts(pack_dir)
repair_targets = build_deterministic_repair_targets(pack_state, scenario_artifacts)
write_text(pack_dir / "scenario_acceptance_matrix.md", build_scenario_acceptance_matrix(pack, scenario_results))
write_json(pack_dir / "pack_state.json", pack_state)
write_json(pack_dir / "repair_targets.json", repair_targets)
write_text(pack_dir / "repair_targets.md", build_repair_targets_summary(repair_targets))
write_text(pack_dir / "pack_summary.md", build_pack_summary(pack, scenario_results, final_status, execution_status))
write_text(pack_dir / "final_status.md", build_pack_final_status(pack, scenario_results, final_status, execution_status))
print(f"[domain-case-loop] saved pack artifacts to {pack_dir}")
@ -2849,11 +3211,17 @@ def build_loop_summary(loop_state: dict[str, Any]) -> str:
f" baseline_pack_dir: `{item['pack_dir']}`",
f" analyst_score: `{item.get('quality_score')}`",
f" analyst_decision: `{item.get('loop_decision')}`",
f" analyst_accepted_gate: `{item.get('analyst_accepted_gate')}`",
f" accepted_gate: `{item.get('accepted_gate')}`",
f" deterministic_gate_ok: `{item.get('deterministic_gate_ok')}`",
f" deterministic_gate_reason: `{item.get('deterministic_gate_reason') or 'n/a'}`",
f" requires_user_decision: `{item.get('requires_user_decision')}`",
f" user_decision_type: `{item.get('user_decision_type') or 'none'}`",
f" coder_status: `{item.get('coder_status') or 'n/a'}`",
f" analyst_verdict: `{item.get('analyst_verdict_path') or 'n/a'}`",
f" repair_targets: `{item.get('repair_targets_path') or 'n/a'}`",
f" repair_target_count: `{item.get('repair_target_count')}`",
f" repair_target_severity_counts: `{dump_json(item.get('repair_target_severity_counts') or {})}`",
]
)
return "\n".join(lines).strip() + "\n"
@ -2930,15 +3298,21 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int:
analyst_verdict_path = iteration_dir / "analyst_verdict.json"
review_bundle_json = build_pack_review_bundle(pack_dir)
repair_targets_path = pack_dir / "repair_targets.json"
repair_targets = read_json_file(repair_targets_path) if repair_targets_path.exists() else {}
repair_targets_json = dump_json(repair_targets)
pack_state = read_json_file(pack_dir / "pack_state.json") if (pack_dir / "pack_state.json").exists() else {}
previous_verdict_json = read_text_file(previous_verdict_path) if previous_verdict_path is not None and previous_verdict_path.exists() else None
analyst_prompt = build_analyst_loop_prompt(
loop_dir=loop_dir,
iteration_dir=iteration_dir,
pack_dir=pack_dir,
repair_targets_path=repair_targets_path,
previous_pack_dir=previous_pack_dir,
previous_verdict_path=previous_verdict_path,
target_score=target_score,
review_bundle_json=review_bundle_json,
repair_targets_json=repair_targets_json,
previous_verdict_json=previous_verdict_json,
)
write_text(iteration_dir / "analyst_prompt.md", analyst_prompt + "\n")
@ -2959,9 +3333,17 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int:
stderr_path=iteration_dir / "analyst_exec.stderr.log",
)
analyst_verdict = read_json_output(analyst_verdict_path)
accepted_gate, loop_decision, requires_user_decision, user_decision_type, user_decision_prompt = evaluate_analyst_gate(
analyst_accepted_gate, loop_decision, requires_user_decision, user_decision_type, user_decision_prompt = evaluate_analyst_gate(
analyst_verdict, target_score
)
deterministic_gate_ok, deterministic_gate_reason = evaluate_deterministic_loop_gate(pack_state, repair_targets)
accepted_gate = analyst_accepted_gate and deterministic_gate_ok
repair_target_count = int(repair_targets.get("target_count") or 0) if isinstance(repair_targets, dict) else 0
repair_target_severity_counts = (
repair_targets.get("severity_counts")
if isinstance(repair_targets, dict) and isinstance(repair_targets.get("severity_counts"), dict)
else {}
)
loop_state["last_analyst_decision"] = loop_decision
loop_state["last_user_decision_type"] = user_decision_type
loop_state["last_user_decision_prompt"] = user_decision_prompt
@ -2971,18 +3353,24 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int:
"pack_dir": str(pack_dir),
"quality_score": int(analyst_verdict.get("quality_score") or 0),
"loop_decision": loop_decision,
"analyst_accepted_gate": analyst_accepted_gate,
"accepted_gate": accepted_gate,
"deterministic_gate_ok": deterministic_gate_ok,
"deterministic_gate_reason": deterministic_gate_reason,
"requires_user_decision": requires_user_decision,
"user_decision_type": user_decision_type,
"user_decision_prompt": user_decision_prompt,
"analyst_verdict_path": str(analyst_verdict_path),
"repair_targets_path": str(repair_targets_path),
"repair_target_count": repair_target_count,
"repair_target_severity_counts": repair_target_severity_counts,
"coder_status": None,
}
if accepted_gate:
loop_state["iterations"].append(iteration_record)
loop_state["final_status"] = "accepted"
loop_state["stop_reason"] = f"analyst accepted at {iteration_id}"
loop_state["stop_reason"] = f"analyst accepted + deterministic gate passed at {iteration_id}"
loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
write_json(loop_dir / "loop_state.json", loop_state)
break
@ -3012,6 +3400,8 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int:
loop_dir=loop_dir,
iteration_dir=iteration_dir,
pack_dir=pack_dir,
repair_targets_path=repair_targets_path,
repair_targets_json=repair_targets_json,
analyst_verdict_path=analyst_verdict_path,
analyst_verdict_json=dump_json(analyst_verdict),
)

View File

@ -7,10 +7,12 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from scripts.domain_case_loop import (
build_deterministic_repair_targets,
build_scenario_acceptance_matrix,
carry_forward_analysis_context,
derive_pack_final_status,
evaluate_analyst_gate,
evaluate_deterministic_loop_gate,
load_scenario_pack,
merge_scenario_date_scope,
validate_step_contract,
@ -499,3 +501,112 @@ def test_validate_step_contract_rejects_top_level_noise_as_direct_answer() -> No
assert validated["acceptance_status"] == "rejected"
assert "direct_answer_missing" in validated["violated_invariants"]
assert "top_level_noise_present" in validated["violated_invariants"]
def test_build_deterministic_repair_targets_marks_followup_router_gap_as_p0() -> None:
repair_targets = build_deterministic_repair_targets(
{"pack_id": "demo_pack", "domain": "inventory_stock", "final_status": "partial"},
[
{
"scenario_id": "inventory_selected_item_provenance",
"title": "Selected item provenance",
"artifact_dir": "artifacts/domain_runs/demo/scenarios/inventory_selected_item_provenance",
"scenario_state": {
"step_outputs": {
"step_02_supplier": {
"step_id": "step_02_supplier",
"question_resolved": 'По выбранному объекту "Столешница": кто поставил',
"execution_status": "exact",
"acceptance_status": "rejected",
"reply_type": "factual",
"selected_recipe": "address_inventory_on_hand_as_of_date_v1",
"capability_id": "confirmed_inventory_on_hand_as_of_date",
"violated_invariants": [
"wrong_followup_action",
"focus_object_missing",
"forbidden_capability_selected",
],
"warnings": [],
"hard_fail": True,
}
}
},
}
],
)
assert repair_targets["target_count"] == 1
target = repair_targets["targets"][0]
assert target["severity"] == "P0"
assert target["problem_type"] == "followup_action_resolution_gap"
assert "followup_action_resolution_gap" in target["root_cause_layers"]
assert "object_memory_gap" in target["root_cause_layers"]
assert "addressIntentResolver.ts" in " ".join(target["candidate_files"])
def test_build_deterministic_repair_targets_marks_anchor_gap_as_p1() -> None:
repair_targets = build_deterministic_repair_targets(
{"pack_id": "demo_pack", "domain": "inventory_stock", "final_status": "partial"},
[
{
"scenario_id": "inventory_sale_trace",
"title": "Sale trace",
"artifact_dir": "artifacts/domain_runs/demo/scenarios/inventory_sale_trace",
"scenario_state": {
"step_outputs": {
"step_02_selected_item_buyer_ui": {
"step_id": "step_02_selected_item_buyer_ui",
"question_resolved": 'По выбранному объекту "Шкаф": кому был продан товар',
"execution_status": "partial",
"acceptance_status": "rejected",
"reply_type": "partial_coverage",
"fallback_type": "partial",
"mcp_call_status": "materialized_but_not_anchor_matched",
"selected_recipe": "address_inventory_sale_trace_for_item_v1",
"capability_id": "inventory_inventory_sale_trace_for_item",
"violated_invariants": [],
"warnings": [],
"hard_fail": False,
}
}
},
}
],
)
assert repair_targets["target_count"] == 1
target = repair_targets["targets"][0]
assert target["severity"] == "P1"
assert target["problem_type"] == "domain_anchor_gap"
assert target["root_cause_layers"] == ["domain_anchor_gap"]
assert "addressQueryService.ts" in " ".join(target["candidate_files"])
def test_evaluate_deterministic_loop_gate_rejects_partial_pack_even_without_targets() -> None:
gate_ok, reason = evaluate_deterministic_loop_gate(
{"final_status": "partial"},
{"severity_counts": {"P0": 0, "P1": 0}},
)
assert gate_ok is False
assert reason == "pack_final_status=partial"
def test_evaluate_deterministic_loop_gate_rejects_remaining_p1_targets() -> None:
gate_ok, reason = evaluate_deterministic_loop_gate(
{"final_status": "accepted"},
{"severity_counts": {"P0": 0, "P1": 2}},
)
assert gate_ok is False
assert reason == "repair_targets_remaining=P0:0,P1:2"
def test_evaluate_deterministic_loop_gate_accepts_clean_pack_without_remaining_p0_p1() -> None:
gate_ok, reason = evaluate_deterministic_loop_gate(
{"final_status": "accepted"},
{"severity_counts": {"P0": 0, "P1": 0, "warning": 1}},
)
assert gate_ok is True
assert reason == "deterministic_gate_passed"