Добавить repair handoff для GUI-прогонов stage-loop

This commit is contained in:
dctouch 2026-05-09 12:02:21 +03:00
parent 4089708dfd
commit 2244c62554
3 changed files with 132 additions and 0 deletions

View File

@ -137,6 +137,8 @@ It stores the GUI review under `artifacts/domain_runs/stage_agent_loops/<stage_i
- `continue_repair_from_gui_review_p1` when the run is semantically usable but still noisy, over-broad, or poorly layered; - `continue_repair_from_gui_review_p1` when the run is semantically usable but still noisy, over-broad, or poorly layered;
- `manual_gui_confirmation_or_stage_close` when the GUI run is clean enough for final human confirmation. - `manual_gui_confirmation_or_stage_close` when the GUI run is clean enough for final human confirmation.
It also writes `stage_repair_handoff.md/json` next to the stage summary. That handoff is the preferred input for the next coder pass: it lists primary repair targets and sample user-facing failures without forcing the coder to reread the entire GUI conversation first.
## Placeholder contract ## Placeholder contract
Scenario questions can reference earlier step outputs with placeholders such as: Scenario questions can reference earlier step outputs with placeholders such as:

View File

@ -364,6 +364,93 @@ def build_gui_review_stage_summary(
return base return base
def build_stage_repair_handoff(summary: dict[str, Any], review: dict[str, Any]) -> dict[str, Any]:
latest = summary.get("latest_gui_review") if isinstance(summary.get("latest_gui_review"), dict) else {}
repair_targets = review.get("repair_targets") if isinstance(review.get("repair_targets"), list) else []
findings = review.get("findings") if isinstance(review.get("findings"), list) else []
ordered_targets = [
target
for target in repair_targets
if isinstance(target, dict)
]
primary_targets = ordered_targets[:5]
target_issue_codes = {str(target.get("issue_code") or "") for target in primary_targets}
sample_findings = [
finding
for finding in findings
if isinstance(finding, dict)
and (
not target_issue_codes
or any(str(code) in target_issue_codes for code in finding.get("issue_codes", []))
)
][:8]
return {
"schema_version": "stage_gui_repair_handoff_v1",
"stage_id": summary.get("stage_id"),
"run_id": latest.get("run_id") or review.get("run_id"),
"next_action": summary.get("next_action"),
"overall_business_status": latest.get("overall_business_status"),
"p0_findings": latest.get("p0_findings"),
"p1_findings": latest.get("p1_findings"),
"question_quality_score": latest.get("question_quality_score"),
"review_markdown": latest.get("review_markdown"),
"repair_targets_json": latest.get("repair_targets_json"),
"primary_repair_targets": primary_targets,
"sample_findings": sample_findings,
}
def build_stage_repair_handoff_markdown(handoff: dict[str, Any]) -> str:
lines = [
"# Stage GUI Repair Handoff",
"",
f"- stage_id: `{handoff.get('stage_id')}`",
f"- run_id: `{handoff.get('run_id')}`",
f"- next_action: `{handoff.get('next_action')}`",
f"- overall_business_status: `{handoff.get('overall_business_status')}`",
f"- p0_findings: `{handoff.get('p0_findings')}`",
f"- p1_findings: `{handoff.get('p1_findings')}`",
f"- question_quality_score: `{handoff.get('question_quality_score')}`",
f"- review_markdown: `{handoff.get('review_markdown')}`",
f"- repair_targets_json: `{handoff.get('repair_targets_json')}`",
"",
"## Primary Repair Targets",
]
targets = handoff.get("primary_repair_targets") if isinstance(handoff.get("primary_repair_targets"), list) else []
if not targets:
lines.append("- no repair targets")
else:
for target in targets:
if not isinstance(target, dict):
continue
lines.append(
f"- `{target.get('severity')}` `{target.get('problem_layer')}` / `{target.get('issue_code')}`: "
f"{target.get('occurrences')} occurrence(s)"
)
lines.extend(["", "## Sample Findings"])
findings = handoff.get("sample_findings") if isinstance(handoff.get("sample_findings"), list) else []
if not findings:
lines.append("- no sample findings")
else:
for finding in findings:
if not isinstance(finding, dict):
continue
lines.extend(
[
f"- turn `{finding.get('turn_index')}` `{finding.get('severity')}` "
f"`{', '.join(str(code) for code in finding.get('issue_codes', []))}`",
f" question: {str(finding.get('question') or '').strip()}",
f" first_line: {str(finding.get('assistant_first_line') or '').strip()}",
]
)
return "\n".join(lines).strip() + "\n"
def save_stage_repair_handoff(stage_dir: Path, handoff: dict[str, Any]) -> None:
write_json(stage_dir / "stage_repair_handoff.json", handoff)
write_text(stage_dir / "stage_repair_handoff.md", build_stage_repair_handoff_markdown(handoff))
def handle_ingest_gui_run(args: argparse.Namespace) -> int: def handle_ingest_gui_run(args: argparse.Namespace) -> int:
stage_manifest_path = repo_path(args.manifest) stage_manifest_path = repo_path(args.manifest)
stage_manifest = load_stage_manifest(stage_manifest_path) stage_manifest = load_stage_manifest(stage_manifest_path)
@ -393,6 +480,7 @@ def handle_ingest_gui_run(args: argparse.Namespace) -> int:
previous_summary=previous_summary, previous_summary=previous_summary,
) )
save_stage_summary(stage_dir, summary) save_stage_summary(stage_dir, summary)
save_stage_repair_handoff(stage_dir, build_stage_repair_handoff(summary, review))
print(json.dumps(summary, ensure_ascii=False, indent=2)) print(json.dumps(summary, ensure_ascii=False, indent=2))
return 0 return 0

View File

@ -204,6 +204,46 @@ class StageAgentLoopTests(unittest.TestCase):
self.assertFalse(summary["accepted_gate"]) self.assertFalse(summary["accepted_gate"])
self.assertEqual(summary["latest_gui_review"]["repair_targets_count"], 1) self.assertEqual(summary["latest_gui_review"]["repair_targets_count"], 1)
def test_stage_repair_handoff_keeps_primary_targets_and_samples(self) -> None:
summary = {
"stage_id": "agent_loop",
"next_action": "continue_repair_from_gui_review_p0",
"latest_gui_review": {
"run_id": "assistant-stage1-test",
"overall_business_status": "fail",
"p0_findings": 1,
"p1_findings": 0,
"question_quality_score": 100,
"review_markdown": "review.md",
"repair_targets_json": "repair_targets.json",
},
}
review = {
"repair_targets": [
{
"problem_layer": "answer_shape_mismatch",
"issue_code": "business_direct_answer_missing",
"severity": "P0",
"occurrences": 2,
}
],
"findings": [
{
"turn_index": 19,
"severity": "P0",
"issue_codes": ["business_direct_answer_missing"],
"question": "какой у нас самый доходный год",
"assistant_first_line": "Коротко: Ограниченный бизнес-обзор...",
}
],
}
handoff = stage_loop.build_stage_repair_handoff(summary, review)
self.assertEqual(handoff["next_action"], "continue_repair_from_gui_review_p0")
self.assertEqual(handoff["primary_repair_targets"][0]["issue_code"], "business_direct_answer_missing")
self.assertEqual(handoff["sample_findings"][0]["turn_index"], 19)
def test_handle_ingest_gui_run_materializes_stage_review(self) -> None: def test_handle_ingest_gui_run_materializes_stage_review(self) -> None:
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp) root = Path(tmp)
@ -261,11 +301,13 @@ class StageAgentLoopTests(unittest.TestCase):
stage_dir = output_root / "agent_loop" stage_dir = output_root / "agent_loop"
summary = json.loads((stage_dir / "stage_loop_summary.json").read_text(encoding="utf-8")) summary = json.loads((stage_dir / "stage_loop_summary.json").read_text(encoding="utf-8"))
handoff_exists = (stage_dir / "stage_loop_handoff.md").exists() handoff_exists = (stage_dir / "stage_loop_handoff.md").exists()
repair_handoff_exists = (stage_dir / "stage_repair_handoff.md").exists()
review_exists = (stage_dir / "gui_run_reviews" / run_id / "run_review.json").exists() review_exists = (stage_dir / "gui_run_reviews" / run_id / "run_review.json").exists()
self.assertEqual(exit_code, 0) self.assertEqual(exit_code, 0)
self.assertEqual(summary["next_action"], "continue_repair_from_gui_review_p0") self.assertEqual(summary["next_action"], "continue_repair_from_gui_review_p0")
self.assertTrue(handoff_exists) self.assertTrue(handoff_exists)
self.assertTrue(repair_handoff_exists)
self.assertTrue(review_exists) self.assertTrue(review_exists)