From 5934f5f3fc5020f41c021e20c952bf7054376279 Mon Sep 17 00:00:00 2001 From: dctouch Date: Tue, 14 Apr 2026 23:27:48 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9E=D0=A0=D0=A0=D0=9A=D0=95=D0=A1=D0=A2?= =?UTF-8?q?=D0=A0=D0=90=D0=A6=D0=98=D0=AF=20-=20=D0=9E=D1=80=D0=BA=D0=B5?= =?UTF-8?q?=D1=81=D1=82=D1=80=D0=B0=D1=86=D0=B8=D1=8F=20=D0=B4=D0=BE=D0=BC?= =?UTF-8?q?=D0=B5=D0=BD=D0=B0:=20=D0=B7=D0=B0=D0=BF=D1=80=D0=B5=D1=82?= =?UTF-8?q?=D0=B8=D1=82=D1=8C=20=D0=BB=D0=BE=D0=B6=D0=BD=D1=8B=D0=B9=20acc?= =?UTF-8?q?epted=20=D0=BF=D1=80=D0=B8=20=D0=B6=D0=B8=D0=B2=D1=8B=D1=85=20r?= =?UTF-8?q?epair=20targets?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/domain_case_loop.py | 430 +++++++++++++++++++++++++++++++-- tests/test_domain_case_loop.py | 111 +++++++++ 2 files changed, 521 insertions(+), 20 deletions(-) diff --git a/scripts/domain_case_loop.py b/scripts/domain_case_loop.py index b831a1c..4ad75da 100644 --- a/scripts/domain_case_loop.py +++ b/scripts/domain_case_loop.py @@ -60,6 +60,60 @@ DEFAULT_INVARIANT_SEVERITY: dict[str, str] = { "top_level_noise_present": "P0", } +REPAIR_TARGET_SEVERITY_ORDER = {"P0": 0, "P1": 1, "P2": 2} + +REPAIR_TARGET_FILE_HINTS: dict[str, list[str]] = { + "followup_action_resolution_gap": [ + "llm_normalizer/backend/src/services/addressIntentResolver.ts", + "llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts", + "llm_normalizer/backend/src/services/assistantService.ts", + ], + "object_memory_gap": [ + "llm_normalizer/backend/src/services/addressNavigationState.ts", + "llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts", + "llm_normalizer/backend/src/services/assistantService.ts", + ], + "edge_carryover_gap": [ + "llm_normalizer/backend/src/services/addressNavigationState.ts", + "llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts", + "llm_normalizer/backend/src/services/addressFilterExtractor.ts", + ], + "temporal_honesty_gap": [ + "llm_normalizer/backend/src/services/addressFilterExtractor.ts", + "llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts", + "llm_normalizer/backend/src/services/address_runtime/composeStage.ts", + ], + "route_gap": [ + "llm_normalizer/backend/src/services/addressQueryClassifier.ts", + "llm_normalizer/backend/src/services/addressIntentResolver.ts", + "llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts", + "llm_normalizer/backend/src/services/assistantService.ts", + ], + "capability_gap": [ + "llm_normalizer/backend/src/services/addressCapabilityPolicy.ts", + "llm_normalizer/backend/src/services/addressRecipeCatalog.ts", + "llm_normalizer/backend/src/services/addressQueryService.ts", + ], + "presentation_gap": [ + "llm_normalizer/backend/src/services/address_runtime/composeStage.ts", + "llm_normalizer/backend/src/services/assistantService.ts", + ], + "evidence_gap": [ + "llm_normalizer/backend/src/services/addressQueryService.ts", + "llm_normalizer/backend/src/services/addressRecipeCatalog.ts", + "llm_normalizer/backend/src/services/address_runtime/composeStage.ts", + ], + "domain_anchor_gap": [ + "llm_normalizer/backend/src/services/addressQueryService.ts", + "llm_normalizer/backend/src/services/address_runtime/decomposeStage.ts", + "llm_normalizer/backend/src/services/addressNavigationState.ts", + ], + "other": [ + "llm_normalizer/backend/src/services/addressQueryService.ts", + "llm_normalizer/backend/src/services/assistantService.ts", + ], +} + def dump_json(payload: Any) -> str: return json.dumps(payload, ensure_ascii=False, indent=2) @@ -2525,27 +2579,311 @@ def compact_step_output_for_review(step_output: Any) -> dict[str, Any]: } +def collect_pack_scenario_artifacts(pack_dir: Path) -> list[dict[str, Any]]: + scenarios_root = pack_dir / "scenarios" + artifacts: list[dict[str, Any]] = [] + if not scenarios_root.exists(): + return artifacts + for scenario_dir in sorted(path for path in scenarios_root.iterdir() if path.is_dir()): + scenario_state = read_json_file(scenario_dir / "scenario_state.json") if (scenario_dir / "scenario_state.json").exists() else {} + artifacts.append( + { + "scenario_id": scenario_state.get("scenario_id") or scenario_dir.name, + "title": scenario_state.get("title"), + "session_id": scenario_state.get("session_id"), + "artifact_dir": str(scenario_dir), + "summary": read_text_file(scenario_dir / "scenario_summary.md") if (scenario_dir / "scenario_summary.md").exists() else "", + "scenario_state": scenario_state, + } + ) + return artifacts + + +def derive_repair_target_severity(step_output: dict[str, Any]) -> str: + if bool(step_output.get("hard_fail")): + return "P0" + violated_invariants = normalize_string_list(step_output.get("violated_invariants")) + if any(derive_invariant_severity(step_output, code) == "P0" for code in violated_invariants): + return "P0" + execution_status = str(step_output.get("execution_status") or "").strip() + acceptance_status = str(step_output.get("acceptance_status") or step_output.get("status") or "").strip() + reply_type = str(step_output.get("reply_type") or "").strip() + if execution_status == "blocked": + return "P0" + if acceptance_status in {"rejected", "needs_exact_capability"}: + return "P1" + if execution_status in {"partial", "needs_exact_capability"} or reply_type == "partial_coverage": + return "P1" + if normalize_string_list(step_output.get("warnings")): + return "P2" + return "P2" + + +def derive_repair_problem_type(step_output: dict[str, Any]) -> str: + violated = set(normalize_string_list(step_output.get("violated_invariants"))) + execution_status = str(step_output.get("execution_status") or "").strip() + acceptance_status = str(step_output.get("acceptance_status") or step_output.get("status") or "").strip() + reply_type = str(step_output.get("reply_type") or "").strip() + fallback_type = str(step_output.get("fallback_type") or "").strip() + mcp_call_status = str(step_output.get("mcp_call_status") or "").strip() + + if "wrong_followup_action" in violated: + return "followup_action_resolution_gap" + if "focus_object_missing" in violated: + return "object_memory_gap" + if "wrong_date_scope_state" in violated: + return "edge_carryover_gap" + if {"wrong_as_of_date", "wrong_period_from", "wrong_period_to"} & violated: + return "temporal_honesty_gap" + if { + "wrong_intent", + "wrong_capability", + "wrong_recipe", + "wrong_result_mode", + "forbidden_capability_selected", + "forbidden_recipe_selected", + } & violated: + return "route_gap" + if {"direct_answer_missing", "top_level_noise_present"} & violated: + return "presentation_gap" + if mcp_call_status == "materialized_but_not_anchor_matched": + return "domain_anchor_gap" + if acceptance_status == "needs_exact_capability" or execution_status == "needs_exact_capability": + return "capability_gap" + if reply_type in {"partial_coverage", "clarification_required", "route_mismatch_blocked"} or fallback_type == "partial": + return "evidence_gap" + return "other" + + +def derive_repair_root_cause_layers(step_output: dict[str, Any], problem_type: str) -> list[str]: + violated = set(normalize_string_list(step_output.get("violated_invariants"))) + layers: list[str] = [] + if problem_type == "followup_action_resolution_gap": + layers.append("followup_action_resolution_gap") + if "focus_object_missing" in violated: + layers.append("object_memory_gap") + elif problem_type == "object_memory_gap": + layers.append("object_memory_gap") + elif problem_type == "edge_carryover_gap": + layers.append("edge_carryover_gap") + if "wrong_as_of_date" in violated or "wrong_period_from" in violated or "wrong_period_to" in violated: + layers.append("temporal_honesty_gap") + elif problem_type == "temporal_honesty_gap": + layers.append("temporal_honesty_gap") + if "wrong_date_scope_state" in violated: + layers.append("edge_carryover_gap") + elif problem_type == "route_gap": + layers.append("semantic_understanding_gap") + elif problem_type == "capability_gap": + layers.append("runtime_capability_gap") + elif problem_type == "presentation_gap": + layers.append("business_utility_gap") + if str(step_output.get("required_answer_shape") or "").strip(): + layers.append("answer_shape_mismatch") + elif problem_type == "evidence_gap": + layers.append("runtime_capability_gap") + elif problem_type == "domain_anchor_gap": + layers.append("domain_anchor_gap") + else: + layers.append("other") + return list(dict.fromkeys(layers)) + + +def build_repair_fix_goal(step_output: dict[str, Any], problem_type: str) -> str: + question = str(step_output.get("question_resolved") or step_output.get("title") or step_output.get("step_id") or "").strip() + if problem_type == "followup_action_resolution_gap": + return f"Resolve `{question}` on the current business object and keep the requested micro-action instead of drifting to another drilldown." + if problem_type == "object_memory_gap": + return f"Preserve the selected business object for `{question}` so the follow-up resolves without re-anchoring from scratch." + if problem_type == "edge_carryover_gap": + return f"Carry forward the selected-object state and historical date scope into `{question}` without resetting the follow-up context." + if problem_type == "temporal_honesty_gap": + return f"Keep `{question}` on the requested historical date/period and separate exact-window evidence from nearest available out-of-window evidence." + if problem_type == "route_gap": + return f"Keep `{question}` on the expected exact route/capability instead of letting wording drift into a different semantic lane." + if problem_type == "capability_gap": + return f"Enable an exact route for `{question}` so the loop no longer falls back to partial or unsupported behavior." + if problem_type == "presentation_gap": + return f"Make `{question}` answer-first: direct business answer in the first line, proof second, service notes last." + if problem_type == "evidence_gap": + return f"Return grounded evidence for `{question}` instead of a limited empty response when the correct route already fires." + if problem_type == "domain_anchor_gap": + return f"Match the selected business anchor for `{question}` against materialized rows so the exact route returns a grounded answer instead of an anchor-mismatch limit." + return f"Improve `{question}` with the smallest patch that removes the current acceptance failure without architecture drift." + + +def build_step_repair_target( + *, + scenario_id: str, + scenario_title: str, + scenario_dir: Path, + step_id: str, + step_output: dict[str, Any], +) -> dict[str, Any] | None: + acceptance_status = str(step_output.get("acceptance_status") or step_output.get("status") or "").strip() or "unknown" + execution_status = str(step_output.get("execution_status") or "").strip() or "unknown" + violated_invariants = normalize_string_list(step_output.get("violated_invariants")) + warnings = normalize_string_list(step_output.get("warnings")) + if acceptance_status in {"validated", "accepted"} and execution_status == "exact" and not violated_invariants and not warnings: + return None + + problem_type = derive_repair_problem_type(step_output) + severity = derive_repair_target_severity(step_output) + root_cause_layers = derive_repair_root_cause_layers(step_output, problem_type) + step_state_path = scenario_dir / "steps" / step_id / "step_state.json" + signals: list[str] = [] + for field_name in ("reply_type", "fallback_type", "mcp_call_status", "selected_recipe", "capability_id"): + value = str(step_output.get(field_name) or "").strip() + if value: + signals.append(f"{field_name}={value}") + for violation in violated_invariants: + signals.append(f"violation={violation}") + for warning in warnings[:3]: + signals.append(f"warning={warning}") + + return { + "target_id": f"{scenario_id}:{step_id}", + "scenario_id": scenario_id, + "scenario_title": scenario_title, + "step_id": step_id, + "step_title": str(step_output.get("title") or "").strip() or None, + "question_resolved": str(step_output.get("question_resolved") or "").strip() or None, + "severity": severity, + "problem_type": problem_type, + "root_cause_layers": root_cause_layers, + "execution_status": execution_status, + "acceptance_status": acceptance_status, + "violated_invariants": violated_invariants, + "fix_goal": build_repair_fix_goal(step_output, problem_type), + "candidate_files": REPAIR_TARGET_FILE_HINTS.get(problem_type, REPAIR_TARGET_FILE_HINTS["other"]), + "signals": signals, + "artifact_refs": { + "scenario_dir": str(scenario_dir), + "step_state_json": str(step_state_path), + }, + } + + +def build_deterministic_repair_targets( + pack_state: dict[str, Any], + scenario_artifacts: list[dict[str, Any]], +) -> dict[str, Any]: + targets: list[dict[str, Any]] = [] + for scenario_artifact in scenario_artifacts: + scenario_id = str(scenario_artifact.get("scenario_id") or "").strip() + scenario_title = str(scenario_artifact.get("title") or "").strip() + scenario_dir = Path(str(scenario_artifact.get("artifact_dir") or "")) + scenario_state = scenario_artifact.get("scenario_state") + if not isinstance(scenario_state, dict): + continue + step_outputs = scenario_state.get("step_outputs") + if not isinstance(step_outputs, dict): + continue + for step_id, raw_step_output in step_outputs.items(): + if not isinstance(raw_step_output, dict): + continue + target = build_step_repair_target( + scenario_id=scenario_id, + scenario_title=scenario_title, + scenario_dir=scenario_dir, + step_id=str(step_id), + step_output=raw_step_output, + ) + if target: + targets.append(target) + + targets.sort( + key=lambda item: ( + REPAIR_TARGET_SEVERITY_ORDER.get(str(item.get("severity") or "P2"), 99), + str(item.get("scenario_id") or ""), + str(item.get("step_id") or ""), + ) + ) + severity_counts = {"P0": 0, "P1": 0, "P2": 0} + for target in targets: + severity = str(target.get("severity") or "P2") + if severity in severity_counts: + severity_counts[severity] += 1 + return { + "schema_version": "domain_pack_repair_targets_v1", + "pack_id": pack_state.get("pack_id"), + "domain": pack_state.get("domain"), + "final_status": pack_state.get("final_status"), + "target_count": len(targets), + "severity_counts": severity_counts, + "targets": targets, + } + + +def build_repair_targets_summary(repair_targets: dict[str, Any]) -> str: + lines = [ + "# Repair targets", + "", + f"- pack_id: `{repair_targets.get('pack_id') or 'n/a'}`", + f"- domain: `{repair_targets.get('domain') or 'n/a'}`", + f"- target_count: `{repair_targets.get('target_count') or 0}`", + f"- severity_counts: `{dump_json(repair_targets.get('severity_counts') or {})}`", + "", + "## Targets", + ] + for target in repair_targets.get("targets") or []: + if not isinstance(target, dict): + continue + lines.extend( + [ + f"- `{target.get('target_id')}`", + f" severity: `{target.get('severity')}`", + f" problem_type: `{target.get('problem_type')}`", + f" root_cause_layers: {', '.join(target.get('root_cause_layers') or []) or 'none'}", + f" fix_goal: {target.get('fix_goal') or 'n/a'}", + f" candidate_files: {', '.join(target.get('candidate_files') or []) or 'none'}", + ] + ) + return "\n".join(lines).strip() + "\n" + + +def evaluate_deterministic_loop_gate( + pack_state: dict[str, Any], + repair_targets: dict[str, Any], +) -> tuple[bool, str]: + pack_final_status = str(pack_state.get("final_status") or "").strip() or "partial" + if pack_final_status != "accepted": + return False, f"pack_final_status={pack_final_status}" + severity_counts = repair_targets.get("severity_counts") if isinstance(repair_targets, dict) else {} + if isinstance(severity_counts, dict): + p0_count = int(severity_counts.get("P0") or 0) + p1_count = int(severity_counts.get("P1") or 0) + if p0_count > 0 or p1_count > 0: + return False, f"repair_targets_remaining=P0:{p0_count},P1:{p1_count}" + return True, "deterministic_gate_passed" + + def build_pack_review_bundle(pack_dir: Path) -> str: pack_state = read_json_file(pack_dir / "pack_state.json") if (pack_dir / "pack_state.json").exists() else {} - scenarios_root = pack_dir / "scenarios" + scenario_artifacts = collect_pack_scenario_artifacts(pack_dir) scenarios_bundle: list[dict[str, Any]] = [] - if scenarios_root.exists(): - for scenario_dir in sorted(path for path in scenarios_root.iterdir() if path.is_dir()): - scenario_state = read_json_file(scenario_dir / "scenario_state.json") if (scenario_dir / "scenario_state.json").exists() else {} - step_outputs_raw = scenario_state.get("step_outputs") - compact_steps: dict[str, Any] = {} - if isinstance(step_outputs_raw, dict): - for step_id, step_output in step_outputs_raw.items(): - compact_steps[str(step_id)] = compact_step_output_for_review(step_output) - scenarios_bundle.append( - { - "scenario_id": scenario_state.get("scenario_id") or scenario_dir.name, - "title": scenario_state.get("title"), - "session_id": scenario_state.get("session_id"), - "summary": read_text_file(scenario_dir / "scenario_summary.md") if (scenario_dir / "scenario_summary.md").exists() else "", - "step_outputs": compact_steps, - } - ) + for scenario_artifact in scenario_artifacts: + scenario_state = scenario_artifact.get("scenario_state") if isinstance(scenario_artifact.get("scenario_state"), dict) else {} + step_outputs_raw = scenario_state.get("step_outputs") if isinstance(scenario_state, dict) else {} + compact_steps: dict[str, Any] = {} + if isinstance(step_outputs_raw, dict): + for step_id, step_output in step_outputs_raw.items(): + compact_steps[str(step_id)] = compact_step_output_for_review(step_output) + scenarios_bundle.append( + { + "scenario_id": scenario_artifact.get("scenario_id"), + "title": scenario_artifact.get("title"), + "session_id": scenario_artifact.get("session_id"), + "artifact_dir": scenario_artifact.get("artifact_dir"), + "summary": scenario_artifact.get("summary") or "", + "step_outputs": compact_steps, + } + ) + repair_targets = ( + read_json_file(pack_dir / "repair_targets.json") + if (pack_dir / "repair_targets.json").exists() + else build_deterministic_repair_targets(pack_state, scenario_artifacts) + ) bundle = { "pack_state": { "pack_id": pack_state.get("pack_id"), @@ -2562,6 +2900,7 @@ def build_pack_review_bundle(pack_dir: Path) -> str: if (pack_dir / "scenario_acceptance_matrix.md").exists() else "" ), + "deterministic_repair_targets": repair_targets, "scenarios": scenarios_bundle, } return dump_json(bundle) @@ -2586,10 +2925,12 @@ def build_analyst_loop_prompt( loop_dir: Path, iteration_dir: Path, pack_dir: Path, + repair_targets_path: Path, previous_pack_dir: Path | None, previous_verdict_path: Path | None, target_score: int, review_bundle_json: str, + repair_targets_json: str, previous_verdict_json: str | None, ) -> str: comparison_block = "" @@ -2633,6 +2974,7 @@ def build_analyst_loop_prompt( - `{pack_dir / 'pack_summary.md'}` - `{pack_dir / 'pack_state.json'}` - `{pack_dir / 'scenario_acceptance_matrix.md'}` + - `{repair_targets_path}` - all `scenario_summary.md`, `scenario_state.json`, and problematic `steps/*/step_state.json` files inside `{pack_dir / 'scenarios'}` Goal: @@ -2645,6 +2987,7 @@ def build_analyst_loop_prompt( Rules: - `accepted` is allowed only if quality_score >= {target_score}, unresolved_p0_count = 0, and regression_detected = false; + - `accepted` is forbidden if the evidence bundle shows `pack_state.final_status != accepted` or the deterministic repair targets still contain any `P0` or `P1` items; - `accepted` also requires `direct_answer_ok = true`, `business_usefulness_ok = true`, `temporal_honesty_ok = true`, and `field_truth_ok = true`; - `partial` means the pack is usable but exactness, routing, or coverage is still insufficient; - `needs_exact_capability` means the primary blocker is a missing exact route or capability, but the loop should still continue autonomously unless a user decision is required; @@ -2673,6 +3016,11 @@ def build_analyst_loop_prompt( ```json {review_bundle_json} ``` + + Deterministic repair targets: + ```json + {repair_targets_json} + ``` {previous_verdict_block} Return JSON only and follow the schema exactly. @@ -2685,6 +3033,8 @@ def build_coder_loop_prompt( loop_dir: Path, iteration_dir: Path, pack_dir: Path, + repair_targets_path: Path, + repair_targets_json: str, analyst_verdict_path: Path, analyst_verdict_json: str, ) -> str: @@ -2700,6 +3050,7 @@ def build_coder_loop_prompt( - loop_dir: `{loop_dir}` - iteration_dir: `{iteration_dir}` - current_pack_dir: `{pack_dir}` + - deterministic_repair_targets: `{repair_targets_path}` - analyst_verdict_json: `{analyst_verdict_path}` Make the smallest domain-only patch in the working tree that improves the failing or partial scenarios named in the analyst verdict. @@ -2711,6 +3062,8 @@ def build_coder_loop_prompt( - do not touch unrelated files; - preserve already successful baseline flows. - use `root_cause_layers`, `broken_edge_ids`, `violated_invariants`, and business-utility scores from the analyst verdict to choose the smallest fix; + - use the deterministic repair targets to choose the narrowest failing edge before touching broader scenarios; + - if the analyst verdict is optimistic but deterministic repair targets still contain `P0` or `P1`, trust the deterministic repair targets and keep fixing the pack; - prioritize state continuity, selected-object persistence, stable `focus_object`, stable `answer_object`, reusable `provenance_bundle` / `sale_trace_bundle`, action-first answer behavior, compact micro-action answers, answer layering, temporal honesty, and field-truth mapping when those are the blocking layers; - do not broaden scope when the analyst says the defect is mainly `object_memory_gap`, `followup_action_resolution_gap`, `bundle_reuse_gap`, `field_mapping_gap`, `temporal_honesty_gap`, `answer_shape_mismatch`, or `business_utility_gap`; - when the verdict points to pronoun follow-ups or item-centric drilldowns, prefer a narrow object-state or follow-up-action fix over prompt inflation. @@ -2724,6 +3077,11 @@ def build_coder_loop_prompt( {analyst_verdict_json} ``` + Deterministic repair targets JSON: + ```json + {repair_targets_json} + ``` + - then return JSON only and follow the schema exactly. """ ).strip() @@ -2821,8 +3179,12 @@ def handle_run_pack(args: argparse.Namespace) -> int: "final_status": final_status, "updated_at": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), } + scenario_artifacts = collect_pack_scenario_artifacts(pack_dir) + repair_targets = build_deterministic_repair_targets(pack_state, scenario_artifacts) write_text(pack_dir / "scenario_acceptance_matrix.md", build_scenario_acceptance_matrix(pack, scenario_results)) write_json(pack_dir / "pack_state.json", pack_state) + write_json(pack_dir / "repair_targets.json", repair_targets) + write_text(pack_dir / "repair_targets.md", build_repair_targets_summary(repair_targets)) write_text(pack_dir / "pack_summary.md", build_pack_summary(pack, scenario_results, final_status, execution_status)) write_text(pack_dir / "final_status.md", build_pack_final_status(pack, scenario_results, final_status, execution_status)) print(f"[domain-case-loop] saved pack artifacts to {pack_dir}") @@ -2849,11 +3211,17 @@ def build_loop_summary(loop_state: dict[str, Any]) -> str: f" baseline_pack_dir: `{item['pack_dir']}`", f" analyst_score: `{item.get('quality_score')}`", f" analyst_decision: `{item.get('loop_decision')}`", + f" analyst_accepted_gate: `{item.get('analyst_accepted_gate')}`", f" accepted_gate: `{item.get('accepted_gate')}`", + f" deterministic_gate_ok: `{item.get('deterministic_gate_ok')}`", + f" deterministic_gate_reason: `{item.get('deterministic_gate_reason') or 'n/a'}`", f" requires_user_decision: `{item.get('requires_user_decision')}`", f" user_decision_type: `{item.get('user_decision_type') or 'none'}`", f" coder_status: `{item.get('coder_status') or 'n/a'}`", f" analyst_verdict: `{item.get('analyst_verdict_path') or 'n/a'}`", + f" repair_targets: `{item.get('repair_targets_path') or 'n/a'}`", + f" repair_target_count: `{item.get('repair_target_count')}`", + f" repair_target_severity_counts: `{dump_json(item.get('repair_target_severity_counts') or {})}`", ] ) return "\n".join(lines).strip() + "\n" @@ -2930,15 +3298,21 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int: analyst_verdict_path = iteration_dir / "analyst_verdict.json" review_bundle_json = build_pack_review_bundle(pack_dir) + repair_targets_path = pack_dir / "repair_targets.json" + repair_targets = read_json_file(repair_targets_path) if repair_targets_path.exists() else {} + repair_targets_json = dump_json(repair_targets) + pack_state = read_json_file(pack_dir / "pack_state.json") if (pack_dir / "pack_state.json").exists() else {} previous_verdict_json = read_text_file(previous_verdict_path) if previous_verdict_path is not None and previous_verdict_path.exists() else None analyst_prompt = build_analyst_loop_prompt( loop_dir=loop_dir, iteration_dir=iteration_dir, pack_dir=pack_dir, + repair_targets_path=repair_targets_path, previous_pack_dir=previous_pack_dir, previous_verdict_path=previous_verdict_path, target_score=target_score, review_bundle_json=review_bundle_json, + repair_targets_json=repair_targets_json, previous_verdict_json=previous_verdict_json, ) write_text(iteration_dir / "analyst_prompt.md", analyst_prompt + "\n") @@ -2959,9 +3333,17 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int: stderr_path=iteration_dir / "analyst_exec.stderr.log", ) analyst_verdict = read_json_output(analyst_verdict_path) - accepted_gate, loop_decision, requires_user_decision, user_decision_type, user_decision_prompt = evaluate_analyst_gate( + analyst_accepted_gate, loop_decision, requires_user_decision, user_decision_type, user_decision_prompt = evaluate_analyst_gate( analyst_verdict, target_score ) + deterministic_gate_ok, deterministic_gate_reason = evaluate_deterministic_loop_gate(pack_state, repair_targets) + accepted_gate = analyst_accepted_gate and deterministic_gate_ok + repair_target_count = int(repair_targets.get("target_count") or 0) if isinstance(repair_targets, dict) else 0 + repair_target_severity_counts = ( + repair_targets.get("severity_counts") + if isinstance(repair_targets, dict) and isinstance(repair_targets.get("severity_counts"), dict) + else {} + ) loop_state["last_analyst_decision"] = loop_decision loop_state["last_user_decision_type"] = user_decision_type loop_state["last_user_decision_prompt"] = user_decision_prompt @@ -2971,18 +3353,24 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int: "pack_dir": str(pack_dir), "quality_score": int(analyst_verdict.get("quality_score") or 0), "loop_decision": loop_decision, + "analyst_accepted_gate": analyst_accepted_gate, "accepted_gate": accepted_gate, + "deterministic_gate_ok": deterministic_gate_ok, + "deterministic_gate_reason": deterministic_gate_reason, "requires_user_decision": requires_user_decision, "user_decision_type": user_decision_type, "user_decision_prompt": user_decision_prompt, "analyst_verdict_path": str(analyst_verdict_path), + "repair_targets_path": str(repair_targets_path), + "repair_target_count": repair_target_count, + "repair_target_severity_counts": repair_target_severity_counts, "coder_status": None, } if accepted_gate: loop_state["iterations"].append(iteration_record) loop_state["final_status"] = "accepted" - loop_state["stop_reason"] = f"analyst accepted at {iteration_id}" + loop_state["stop_reason"] = f"analyst accepted + deterministic gate passed at {iteration_id}" loop_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat() write_json(loop_dir / "loop_state.json", loop_state) break @@ -3012,6 +3400,8 @@ def handle_run_pack_loop(args: argparse.Namespace) -> int: loop_dir=loop_dir, iteration_dir=iteration_dir, pack_dir=pack_dir, + repair_targets_path=repair_targets_path, + repair_targets_json=repair_targets_json, analyst_verdict_path=analyst_verdict_path, analyst_verdict_json=dump_json(analyst_verdict), ) diff --git a/tests/test_domain_case_loop.py b/tests/test_domain_case_loop.py index c2f07e2..0762ac2 100644 --- a/tests/test_domain_case_loop.py +++ b/tests/test_domain_case_loop.py @@ -7,10 +7,12 @@ from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from scripts.domain_case_loop import ( + build_deterministic_repair_targets, build_scenario_acceptance_matrix, carry_forward_analysis_context, derive_pack_final_status, evaluate_analyst_gate, + evaluate_deterministic_loop_gate, load_scenario_pack, merge_scenario_date_scope, validate_step_contract, @@ -499,3 +501,112 @@ def test_validate_step_contract_rejects_top_level_noise_as_direct_answer() -> No assert validated["acceptance_status"] == "rejected" assert "direct_answer_missing" in validated["violated_invariants"] assert "top_level_noise_present" in validated["violated_invariants"] + + +def test_build_deterministic_repair_targets_marks_followup_router_gap_as_p0() -> None: + repair_targets = build_deterministic_repair_targets( + {"pack_id": "demo_pack", "domain": "inventory_stock", "final_status": "partial"}, + [ + { + "scenario_id": "inventory_selected_item_provenance", + "title": "Selected item provenance", + "artifact_dir": "artifacts/domain_runs/demo/scenarios/inventory_selected_item_provenance", + "scenario_state": { + "step_outputs": { + "step_02_supplier": { + "step_id": "step_02_supplier", + "question_resolved": 'По выбранному объекту "Столешница": кто поставил', + "execution_status": "exact", + "acceptance_status": "rejected", + "reply_type": "factual", + "selected_recipe": "address_inventory_on_hand_as_of_date_v1", + "capability_id": "confirmed_inventory_on_hand_as_of_date", + "violated_invariants": [ + "wrong_followup_action", + "focus_object_missing", + "forbidden_capability_selected", + ], + "warnings": [], + "hard_fail": True, + } + } + }, + } + ], + ) + + assert repair_targets["target_count"] == 1 + target = repair_targets["targets"][0] + assert target["severity"] == "P0" + assert target["problem_type"] == "followup_action_resolution_gap" + assert "followup_action_resolution_gap" in target["root_cause_layers"] + assert "object_memory_gap" in target["root_cause_layers"] + assert "addressIntentResolver.ts" in " ".join(target["candidate_files"]) + + +def test_build_deterministic_repair_targets_marks_anchor_gap_as_p1() -> None: + repair_targets = build_deterministic_repair_targets( + {"pack_id": "demo_pack", "domain": "inventory_stock", "final_status": "partial"}, + [ + { + "scenario_id": "inventory_sale_trace", + "title": "Sale trace", + "artifact_dir": "artifacts/domain_runs/demo/scenarios/inventory_sale_trace", + "scenario_state": { + "step_outputs": { + "step_02_selected_item_buyer_ui": { + "step_id": "step_02_selected_item_buyer_ui", + "question_resolved": 'По выбранному объекту "Шкаф": кому был продан товар', + "execution_status": "partial", + "acceptance_status": "rejected", + "reply_type": "partial_coverage", + "fallback_type": "partial", + "mcp_call_status": "materialized_but_not_anchor_matched", + "selected_recipe": "address_inventory_sale_trace_for_item_v1", + "capability_id": "inventory_inventory_sale_trace_for_item", + "violated_invariants": [], + "warnings": [], + "hard_fail": False, + } + } + }, + } + ], + ) + + assert repair_targets["target_count"] == 1 + target = repair_targets["targets"][0] + assert target["severity"] == "P1" + assert target["problem_type"] == "domain_anchor_gap" + assert target["root_cause_layers"] == ["domain_anchor_gap"] + assert "addressQueryService.ts" in " ".join(target["candidate_files"]) + + +def test_evaluate_deterministic_loop_gate_rejects_partial_pack_even_without_targets() -> None: + gate_ok, reason = evaluate_deterministic_loop_gate( + {"final_status": "partial"}, + {"severity_counts": {"P0": 0, "P1": 0}}, + ) + + assert gate_ok is False + assert reason == "pack_final_status=partial" + + +def test_evaluate_deterministic_loop_gate_rejects_remaining_p1_targets() -> None: + gate_ok, reason = evaluate_deterministic_loop_gate( + {"final_status": "accepted"}, + {"severity_counts": {"P0": 0, "P1": 2}}, + ) + + assert gate_ok is False + assert reason == "repair_targets_remaining=P0:0,P1:2" + + +def test_evaluate_deterministic_loop_gate_accepts_clean_pack_without_remaining_p0_p1() -> None: + gate_ok, reason = evaluate_deterministic_loop_gate( + {"final_status": "accepted"}, + {"severity_counts": {"P0": 0, "P1": 0, "warning": 1}}, + ) + + assert gate_ok is True + assert reason == "deterministic_gate_passed"