Добавить безопасное продолжение stage-loop

This commit is contained in:
dctouch 2026-05-09 13:22:09 +03:00
parent 73b9053bab
commit de1aa3d17c
3 changed files with 350 additions and 19 deletions

View File

@ -96,6 +96,7 @@ python scripts/stage_agent_loop.py ingest-gui-run --manifest docs/orchestration/
python scripts/stage_agent_loop.py prepare-repair --manifest docs/orchestration/<stage_loop>.json
python scripts/stage_agent_loop.py run-repair --manifest docs/orchestration/<stage_loop>.json --dry-run
python scripts/stage_agent_loop.py status --manifest docs/orchestration/<stage_loop>.json
python scripts/stage_agent_loop.py continue --manifest docs/orchestration/<stage_loop>.json
python scripts/stage_agent_loop.py summarize --manifest docs/orchestration/<stage_loop>.json
```
@ -144,6 +145,8 @@ It stores the GUI review under `artifacts/domain_runs/stage_agent_loops/<stage_i
Use `python scripts/stage_agent_loop.py status --manifest docs/orchestration/<stage_loop>.json` as the cheap read-only checkpoint before continuing a stage. It prints the current next action, closing gate, latest GUI run, latest repair coder status, and latest repair validation status without modifying artifacts.
Use `python scripts/stage_agent_loop.py continue --manifest docs/orchestration/<stage_loop>.json` as the safe one-command continuation layer. It can prepare a repair iteration and materialize `run-repair --dry-run` automatically; it will not run the real coder pass unless `--execute-repair` is passed, and it waits for a `--run-id assistant-stage1-<id>` when the next required step is post-repair rerun/ingest validation.
It also writes `stage_repair_handoff.md/json` next to the stage summary. That handoff is the preferred input for the next coder pass: it lists primary repair targets and sample user-facing failures without forcing the coder to reread the entire GUI conversation first.
To prepare the next repair iteration from that handoff, run:

View File

@ -954,6 +954,12 @@ def build_stage_repair_coder_command(
def handle_run_repair(args: argparse.Namespace) -> int:
payload = run_stage_repair(args)
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def run_stage_repair(args: argparse.Namespace) -> dict[str, Any]:
stage_manifest_path = repo_path(args.manifest)
stage_manifest = load_stage_manifest(stage_manifest_path)
stage_dir = stage_dir_for(repo_path(args.output_root), stage_manifest["stage_id"])
@ -990,8 +996,7 @@ def handle_run_repair(args: argparse.Namespace) -> int:
execution=payload,
),
)
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
return payload
snapshots = dcl.snapshot_coder_candidate_files(repair_candidate_paths(plan))
dcl.run_subprocess_command(
@ -1019,11 +1024,16 @@ def handle_run_repair(args: argparse.Namespace) -> int:
execution=payload,
),
)
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
return payload
def handle_ingest_gui_run(args: argparse.Namespace) -> int:
summary = ingest_gui_run_review(args)
print(json.dumps(summary, ensure_ascii=False, indent=2))
return 0
def ingest_gui_run_review(args: argparse.Namespace) -> dict[str, Any]:
stage_manifest_path = repo_path(args.manifest)
stage_manifest = load_stage_manifest(stage_manifest_path)
stage_dir = stage_dir_for(repo_path(args.output_root), stage_manifest["stage_id"])
@ -1053,36 +1063,49 @@ def handle_ingest_gui_run(args: argparse.Namespace) -> int:
)
save_stage_summary(stage_dir, summary)
save_stage_repair_handoff(stage_dir, build_stage_repair_handoff(summary, review))
print(json.dumps(summary, ensure_ascii=False, indent=2))
return 0
return summary
def handle_prepare_repair(args: argparse.Namespace) -> int:
stage_manifest_path = repo_path(args.manifest)
stage_manifest = load_stage_manifest(stage_manifest_path)
stage_dir = stage_dir_for(repo_path(args.output_root), stage_manifest["stage_id"])
handoff_path = repo_path(args.handoff) if args.handoff else stage_dir / "stage_repair_handoff.json"
handoff = load_json_object(handoff_path, "Stage repair handoff")
iteration_id = str(args.iteration_id or f"repair_{slugify(str(handoff.get('run_id') or 'gui_run'))}").strip()
payload = prepare_stage_repair_artifacts(
manifest_path=repo_path(args.manifest),
output_root=repo_path(args.output_root),
handoff_path=repo_path(args.handoff) if args.handoff else None,
iteration_id=args.iteration_id,
)
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def prepare_stage_repair_artifacts(
*,
manifest_path: Path,
output_root: Path,
handoff_path: Path | None = None,
iteration_id: str | None = None,
) -> dict[str, Any]:
stage_manifest = load_stage_manifest(manifest_path)
stage_dir = stage_dir_for(output_root, stage_manifest["stage_id"])
resolved_handoff_path = handoff_path if handoff_path is not None else stage_dir / "stage_repair_handoff.json"
handoff = load_json_object(resolved_handoff_path, "Stage repair handoff")
resolved_iteration_id = str(iteration_id or f"repair_{slugify(str(handoff.get('run_id') or 'gui_run'))}").strip()
plan = build_stage_repair_iteration_plan(
stage_manifest=stage_manifest,
stage_dir=stage_dir,
handoff=handoff,
iteration_id=iteration_id,
iteration_id=resolved_iteration_id,
)
iteration_dir = save_stage_repair_iteration(stage_dir, plan)
payload = {
return {
"schema_version": "stage_gui_repair_prepare_result_v1",
"stage_id": stage_manifest["stage_id"],
"iteration_id": iteration_id,
"iteration_id": resolved_iteration_id,
"iteration_dir": repo_relative(iteration_dir),
"repair_plan": repo_relative(iteration_dir / "repair_iteration_plan.json"),
"repair_prompt": repo_relative(iteration_dir / "repair_prompt.md"),
"repair_checklist": repo_relative(iteration_dir / "repair_checklist.md"),
"candidate_files": plan.get("candidate_files") or [],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def handle_plan(args: argparse.Namespace) -> int:
@ -1105,6 +1128,7 @@ def handle_plan(args: argparse.Namespace) -> int:
def build_stage_status(stage_manifest: dict[str, Any], stage_dir: Path) -> dict[str, Any]:
summary_path = stage_dir / "stage_loop_summary.json"
summary = load_json_object(summary_path, "Existing stage summary") if summary_path.exists() else {}
next_action = str(summary.get("next_action") or "run_stage_loop_or_ingest_gui_run")
latest_gui_review = summary.get("latest_gui_review") if isinstance(summary.get("latest_gui_review"), dict) else {}
latest_repair_execution = (
summary.get("latest_repair_execution")
@ -1132,8 +1156,8 @@ def build_stage_status(stage_manifest: dict[str, Any], stage_dir: Path) -> dict[
"accepted_gate": summary.get("accepted_gate"),
"loop_accepted_gate": summary.get("loop_accepted_gate"),
"stage_closing_gate": stage_closing_gate or None,
"next_action": summary.get("next_action") or "run_stage_loop_or_ingest_gui_run",
"next_step_guidance": summary.get("next_step_guidance") or build_next_step_guidance("run_stage_loop_or_ingest_gui_run"),
"next_action": next_action,
"next_step_guidance": summary.get("next_step_guidance") or build_next_step_guidance(next_action),
"latest_gui_run_id": latest_gui_review.get("run_id"),
"latest_gui_business_status": latest_gui_review.get("overall_business_status"),
"latest_repair_coder_status": latest_repair_execution.get("coder_status"),
@ -1154,6 +1178,107 @@ def handle_status(args: argparse.Namespace) -> int:
return 0
def args_with(args: argparse.Namespace, **overrides: Any) -> argparse.Namespace:
values = vars(args).copy()
values.update(overrides)
return argparse.Namespace(**values)
def handle_continue(args: argparse.Namespace) -> int:
stage_manifest_path = repo_path(args.manifest)
stage_manifest = load_stage_manifest(stage_manifest_path)
stage_dir = stage_dir_for(repo_path(args.output_root), stage_manifest["stage_id"])
stage_dir.mkdir(parents=True, exist_ok=True)
status_before = build_stage_status(stage_manifest, stage_dir)
next_action = str(status_before.get("next_action") or "")
payload: dict[str, Any] = {
"schema_version": "stage_agent_loop_continue_result_v1",
"stage_id": stage_manifest["stage_id"],
"stage_dir": repo_relative(stage_dir),
"observed_next_action": next_action,
"performed_action": "none",
"created_at": now_iso(),
"status_before": status_before,
}
if next_action in {"continue_repair_from_gui_review_p0", "continue_repair_from_gui_review_p1"}:
prepare_payload = prepare_stage_repair_artifacts(
manifest_path=stage_manifest_path,
output_root=repo_path(args.output_root),
handoff_path=repo_path(args.handoff) if args.handoff else None,
iteration_id=args.iteration_id,
)
repair_payload = run_stage_repair(
args_with(
args,
plan=str(repo_path(str(prepare_payload["repair_plan"]))),
dry_run=True,
)
)
payload.update(
{
"performed_action": "prepare_repair_and_run_repair_dry_run",
"prepare_repair": prepare_payload,
"run_repair": repair_payload,
"next_action": repair_payload.get("next_action"),
}
)
elif next_action == "execute_repair_without_dry_run_or_review_command":
if bool(args.execute_repair):
repair_payload = run_stage_repair(args_with(args, dry_run=False))
payload.update(
{
"performed_action": "run_repair_execute",
"run_repair": repair_payload,
"next_action": repair_payload.get("next_action"),
}
)
else:
payload.update(
{
"performed_action": "wait_for_explicit_execute_repair",
"next_action": next_action,
"suggested_command": (
"python scripts/stage_agent_loop.py continue "
"--manifest <stage_manifest.json> --execute-repair"
),
}
)
elif next_action == "rerun_same_stage_or_gui_and_ingest_result":
if getattr(args, "run_id", None):
ingest_summary = ingest_gui_run_review(args)
payload.update(
{
"performed_action": "ingest_gui_run_for_repair_validation",
"ingest_summary": ingest_summary,
"next_action": ingest_summary.get("next_action"),
}
)
else:
payload.update(
{
"performed_action": "wait_for_rerun_ingest",
"next_action": next_action,
"suggested_command": (
"python scripts/stage_agent_loop.py continue "
"--manifest <stage_manifest.json> --run-id assistant-stage1-<new_id>"
),
}
)
else:
payload.update(
{
"performed_action": "no_safe_automatic_step",
"next_action": next_action,
}
)
payload["status_after"] = build_stage_status(stage_manifest, stage_dir)
write_json(stage_dir / "stage_continue_result.json", payload)
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def handle_summarize(args: argparse.Namespace) -> int:
stage_manifest_path = repo_path(args.manifest)
stage_manifest = load_stage_manifest(stage_manifest_path)
@ -1260,6 +1385,23 @@ def build_parser() -> argparse.ArgumentParser:
add_common_args(status_parser)
status_parser.set_defaults(func=handle_status)
continue_parser = subparsers.add_parser(
"continue",
help="Execute the next safe stage-loop step derived from status.next_action.",
)
add_common_args(continue_parser)
continue_parser.add_argument("--handoff")
continue_parser.add_argument("--iteration-id")
continue_parser.add_argument("--plan")
continue_parser.add_argument("--coder-schema", default=str(DEFAULT_REPAIR_CODER_SCHEMA))
continue_parser.add_argument("--execute-repair", action="store_true")
continue_parser.add_argument("--run-id")
continue_parser.add_argument("--session-file")
continue_parser.add_argument("--sessions-dir", default=str(gui_review.DEFAULT_SESSIONS_DIR))
continue_parser.add_argument("--reports-dir", default=str(gui_review.DEFAULT_REPORTS_DIR))
continue_parser.add_argument("--review-output-dir")
continue_parser.set_defaults(func=handle_continue)
ingest_parser = subparsers.add_parser(
"ingest-gui-run",
help="Attach an existing assistant-stage1 GUI run review to the stage loop summary.",

View File

@ -592,6 +592,192 @@ class StageAgentLoopTests(unittest.TestCase):
self.assertEqual(status["latest_repair_coder_status"], "patched")
self.assertEqual(status["latest_validation_status"], "failed_p0")
def test_build_stage_status_derives_guidance_from_existing_next_action(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
stage_dir = root / "stage_runs" / "agent_loop"
write_json(
stage_dir / "stage_loop_summary.json",
{
"stage_id": "agent_loop",
"next_action": "continue_repair_from_gui_review_p0",
},
)
status = stage_loop.build_stage_status(
{
"stage_id": "agent_loop",
"module_name": "Agent Loop",
"title": "Agent Loop",
},
stage_dir,
)
self.assertEqual(status["next_action"], "continue_repair_from_gui_review_p0")
self.assertIn("prepare-repair", status["next_step_guidance"]["command_templates"][0])
def test_handle_continue_prepares_repair_and_dry_runs_command(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
manifest_path = root / "stage.json"
output_root = root / "stage_runs"
stage_dir = output_root / "agent_loop"
schema_path = root / "coder.schema.json"
write_json(
manifest_path,
{
"stage_id": "agent_loop",
"module_name": "Agent Loop",
"title": "Agent Loop",
"pack_manifest": "docs/orchestration/demo_pack.json",
},
)
write_json(schema_path, {"type": "object", "additionalProperties": True})
write_json(
stage_dir / "stage_loop_summary.json",
{
"stage_id": "agent_loop",
"next_action": "continue_repair_from_gui_review_p0",
},
)
write_json(
stage_dir / "stage_repair_handoff.json",
{
"run_id": "assistant-stage1-continue",
"next_action": "continue_repair_from_gui_review_p0",
"primary_repair_targets": [
{
"problem_layer": "answer_shape_mismatch",
"issue_code": "business_direct_answer_missing",
"severity": "P0",
"occurrences": 1,
}
],
"sample_findings": [],
},
)
exit_code = stage_loop.handle_continue(
stage_args(
manifest=str(manifest_path),
output_root=str(output_root),
handoff=None,
iteration_id=None,
plan=None,
coder_schema=str(schema_path),
execute_repair=False,
run_id=None,
session_file=None,
sessions_dir=str(root / "sessions"),
reports_dir=str(root / "reports"),
review_output_dir=None,
)
)
result = json.loads((stage_dir / "stage_continue_result.json").read_text(encoding="utf-8"))
summary = json.loads((stage_dir / "stage_loop_summary.json").read_text(encoding="utf-8"))
command_exists = (
stage_dir
/ "repair_iterations"
/ "repair_assistant-stage1-continue"
/ "repair_coder.command.txt"
).exists()
self.assertEqual(exit_code, 0)
self.assertEqual(result["performed_action"], "prepare_repair_and_run_repair_dry_run")
self.assertEqual(result["next_action"], "execute_repair_without_dry_run_or_review_command")
self.assertEqual(summary["next_action"], "execute_repair_without_dry_run_or_review_command")
self.assertTrue(command_exists)
def test_handle_continue_waits_before_real_repair_without_flag(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
manifest_path = root / "stage.json"
output_root = root / "stage_runs"
stage_dir = output_root / "agent_loop"
write_json(
manifest_path,
{
"stage_id": "agent_loop",
"module_name": "Agent Loop",
"title": "Agent Loop",
"pack_manifest": "docs/orchestration/demo_pack.json",
},
)
write_json(
stage_dir / "stage_loop_summary.json",
{
"stage_id": "agent_loop",
"next_action": "execute_repair_without_dry_run_or_review_command",
},
)
exit_code = stage_loop.handle_continue(
stage_args(
manifest=str(manifest_path),
output_root=str(output_root),
handoff=None,
iteration_id=None,
plan=None,
coder_schema=str(root / "coder.schema.json"),
execute_repair=False,
run_id=None,
session_file=None,
sessions_dir=str(root / "sessions"),
reports_dir=str(root / "reports"),
review_output_dir=None,
)
)
result = json.loads((stage_dir / "stage_continue_result.json").read_text(encoding="utf-8"))
self.assertEqual(exit_code, 0)
self.assertEqual(result["performed_action"], "wait_for_explicit_execute_repair")
self.assertEqual(result["next_action"], "execute_repair_without_dry_run_or_review_command")
def test_handle_continue_waits_for_rerun_ingest_without_run_id(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
manifest_path = root / "stage.json"
output_root = root / "stage_runs"
stage_dir = output_root / "agent_loop"
write_json(
manifest_path,
{
"stage_id": "agent_loop",
"module_name": "Agent Loop",
"title": "Agent Loop",
"pack_manifest": "docs/orchestration/demo_pack.json",
},
)
write_json(
stage_dir / "stage_loop_summary.json",
{
"stage_id": "agent_loop",
"next_action": "rerun_same_stage_or_gui_and_ingest_result",
},
)
exit_code = stage_loop.handle_continue(
stage_args(
manifest=str(manifest_path),
output_root=str(output_root),
handoff=None,
iteration_id=None,
plan=None,
coder_schema=str(root / "coder.schema.json"),
execute_repair=False,
run_id=None,
session_file=None,
sessions_dir=str(root / "sessions"),
reports_dir=str(root / "reports"),
review_output_dir=None,
)
)
result = json.loads((stage_dir / "stage_continue_result.json").read_text(encoding="utf-8"))
self.assertEqual(exit_code, 0)
self.assertEqual(result["performed_action"], "wait_for_rerun_ingest")
self.assertEqual(result["next_action"], "rerun_same_stage_or_gui_and_ingest_result")
def test_resolve_stage_repair_iteration_auto_prepares_from_handoff(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)