Связать repair-pass со статусом stage-loop

This commit is contained in:
dctouch 2026-05-09 12:43:36 +03:00
parent fdf70b3d4f
commit b4f50346cc
3 changed files with 327 additions and 0 deletions

View File

@ -94,6 +94,7 @@ python scripts/stage_agent_loop.py plan --manifest docs/orchestration/<stage_loo
python scripts/stage_agent_loop.py run --manifest docs/orchestration/<stage_loop>.json
python scripts/stage_agent_loop.py ingest-gui-run --manifest docs/orchestration/<stage_loop>.json --run-id assistant-stage1-<id>
python scripts/stage_agent_loop.py prepare-repair --manifest docs/orchestration/<stage_loop>.json
python scripts/stage_agent_loop.py run-repair --manifest docs/orchestration/<stage_loop>.json --dry-run
python scripts/stage_agent_loop.py summarize --manifest docs/orchestration/<stage_loop>.json
```
@ -148,6 +149,14 @@ python scripts/stage_agent_loop.py prepare-repair --manifest docs/orchestration/
This writes `repair_iterations/<iteration_id>/repair_iteration_plan.json`, `repair_prompt.md`, and `repair_checklist.md`. The plan enriches GUI repair targets with candidate runtime files and rerun instructions, so the next coder pass can start from a bounded business defect instead of a full transcript archaeology dig.
To materialize or execute the coder command for that repair iteration, run:
```powershell
python scripts/stage_agent_loop.py run-repair --manifest docs/orchestration/<stage_loop>.json --dry-run
```
`--dry-run` writes `repair_coder.command.txt`, records `repair_execution_summary.json`, updates `stage_loop_summary.json`, and prints the exact non-interactive Codex command without changing code. Without `--dry-run`, it executes the coder command with the prepared `repair_prompt.md`, writes `repair_coder_result.json`, captures stdout/stderr, records `repair_execution_summary.json`, and updates the stage next action to rerun/ingest, inspect, or stop for a decision depending on the coder status. After a real coder patch, rerun the same semantic pack or GUI session and ingest the new `assistant-stage1-<id>`.
## Placeholder contract
Scenario questions can reference earlier step outputs with placeholders such as:

View File

@ -16,6 +16,7 @@ import review_assistant_stage1_run as gui_review
REPO_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_STAGE_OUTPUT_ROOT = REPO_ROOT / "artifacts" / "domain_runs" / "stage_agent_loops"
DEFAULT_REPAIR_CODER_SCHEMA = REPO_ROOT / "docs" / "orchestration" / "schemas" / "domain_loop_coder_result.schema.json"
STAGE_LOOP_SCHEMA_VERSION = "stage_agent_loop_manifest_v1"
STAGE_SUMMARY_SCHEMA_VERSION = "stage_agent_loop_summary_v1"
@ -273,6 +274,23 @@ def build_stage_handoff_markdown(summary: dict[str, Any]) -> str:
f"- review_markdown: `{latest_gui_review.get('review_markdown')}`",
]
)
latest_repair_execution = (
summary.get("latest_repair_execution")
if isinstance(summary.get("latest_repair_execution"), dict)
else {}
)
if latest_repair_execution:
lines.extend(
[
"",
"## Latest Repair Execution",
f"- iteration_dir: `{latest_repair_execution.get('iteration_dir')}`",
f"- dry_run: `{latest_repair_execution.get('dry_run')}`",
f"- coder_status: `{latest_repair_execution.get('coder_status') or 'n/a'}`",
f"- changed_files: `{len(latest_repair_execution.get('changed_files') or [])}`",
f"- execution_summary: `{latest_repair_execution.get('repair_execution_summary')}`",
]
)
return "\n".join(lines).strip() + "\n"
@ -281,6 +299,61 @@ def save_stage_summary(stage_dir: Path, summary: dict[str, Any]) -> None:
write_text(stage_dir / "stage_loop_handoff.md", build_stage_handoff_markdown(summary))
def repair_execution_next_action(*, dry_run: bool, coder_status: str | None = None) -> str:
if dry_run:
return "execute_repair_without_dry_run_or_review_command"
if coder_status == "patched":
return "rerun_same_stage_or_gui_and_ingest_result"
if coder_status == "no_changes":
return "rerun_or_inspect_repair_targets"
if coder_status == "blocked":
return "user_or_architecture_decision_required"
return "inspect_repair_execution_result"
def build_repair_execution_stage_summary(
*,
stage_manifest: dict[str, Any],
previous_summary: dict[str, Any] | None,
execution: dict[str, Any],
) -> dict[str, Any]:
base = dict(previous_summary or {})
next_action = str(execution.get("next_action") or repair_execution_next_action(dry_run=bool(execution.get("dry_run"))))
base.update(
{
"schema_version": STAGE_SUMMARY_SCHEMA_VERSION,
"stage_id": stage_manifest["stage_id"],
"module_name": stage_manifest.get("module_name"),
"title": stage_manifest.get("title"),
"global_plan_refs": stage_manifest.get("global_plan_refs") or base.get("global_plan_refs") or [],
"target_score": stage_manifest.get("target_score", base.get("target_score")),
"acceptance_invariants": stage_manifest.get("acceptance_invariants") or base.get("acceptance_invariants") or [],
"accepted_gate": False,
"manual_confirmation_required": False,
"next_action": next_action,
"latest_repair_execution": {
"iteration_dir": execution.get("iteration_dir"),
"repair_prompt": execution.get("repair_prompt"),
"repair_coder_command": execution.get("repair_coder_command"),
"repair_coder_result": execution.get("repair_coder_result"),
"repair_execution_summary": execution.get("repair_execution_summary"),
"candidate_files": execution.get("candidate_files") or [],
"dry_run": bool(execution.get("dry_run")),
"coder_status": execution.get("coder_status"),
"changed_files": execution.get("changed_files") if isinstance(execution.get("changed_files"), list) else [],
"restored_line_collapsed_files": (
execution.get("restored_line_collapsed_files")
if isinstance(execution.get("restored_line_collapsed_files"), list)
else []
),
"next_action": next_action,
},
"updated_at": now_iso(),
}
)
return base
def build_save_autorun_command(args: argparse.Namespace, stage_manifest: dict[str, Any], loop_dir: Path) -> list[str]:
return [
sys.executable,
@ -588,6 +661,148 @@ def save_stage_repair_iteration(stage_dir: Path, plan: dict[str, Any]) -> Path:
return iteration_dir
def latest_repair_iteration_dir(stage_dir: Path) -> Path | None:
root = stage_dir / "repair_iterations"
if not root.exists():
return None
candidates = [path for path in root.iterdir() if path.is_dir() and (path / "repair_iteration_plan.json").exists()]
if not candidates:
return None
return max(candidates, key=lambda path: path.stat().st_mtime)
def resolve_stage_repair_iteration(
*,
args: argparse.Namespace,
stage_manifest: dict[str, Any],
stage_dir: Path,
) -> tuple[Path, dict[str, Any]]:
plan_path = repo_path(args.plan) if getattr(args, "plan", None) else None
if plan_path is not None:
plan = load_json_object(plan_path, "Stage repair iteration plan")
return plan_path.parent, plan
iteration_id = str(getattr(args, "iteration_id", "") or "").strip()
if iteration_id:
iteration_dir = stage_dir / "repair_iterations" / slugify(iteration_id)
plan = load_json_object(iteration_dir / "repair_iteration_plan.json", "Stage repair iteration plan")
return iteration_dir, plan
latest_dir = latest_repair_iteration_dir(stage_dir)
if latest_dir is not None:
return latest_dir, load_json_object(latest_dir / "repair_iteration_plan.json", "Stage repair iteration plan")
handoff_path = stage_dir / "stage_repair_handoff.json"
handoff = load_json_object(handoff_path, "Stage repair handoff")
auto_iteration_id = f"repair_{slugify(str(handoff.get('run_id') or 'gui_run'))}"
plan = build_stage_repair_iteration_plan(
stage_manifest=stage_manifest,
stage_dir=stage_dir,
handoff=handoff,
iteration_id=auto_iteration_id,
)
iteration_dir = save_stage_repair_iteration(stage_dir, plan)
return iteration_dir, plan
def repair_candidate_paths(plan: dict[str, Any]) -> list[Path]:
paths: list[Path] = []
seen: set[Path] = set()
for raw_path in plan.get("candidate_files", []) if isinstance(plan.get("candidate_files"), list) else []:
resolved = dcl.resolve_repo_relative_path(str(raw_path))
if resolved is None or not resolved.exists() or resolved in seen:
continue
seen.add(resolved)
paths.append(resolved)
return paths
def build_stage_repair_coder_command(
args: argparse.Namespace,
*,
iteration_dir: Path,
output_file: Path,
) -> list[str]:
return dcl.build_codex_exec_command(
args,
output_file=output_file,
schema_file=repo_path(args.coder_schema),
sandbox_mode="workspace-write",
model_override=getattr(args, "coder_codex_model", None),
reasoning_effort=getattr(args, "coder_reasoning_effort", None),
)
def handle_run_repair(args: argparse.Namespace) -> int:
stage_manifest_path = repo_path(args.manifest)
stage_manifest = load_stage_manifest(stage_manifest_path)
stage_dir = stage_dir_for(repo_path(args.output_root), stage_manifest["stage_id"])
iteration_dir, plan = resolve_stage_repair_iteration(args=args, stage_manifest=stage_manifest, stage_dir=stage_dir)
prompt_path = iteration_dir / "repair_prompt.md"
prompt = prompt_path.read_text(encoding="utf-8") if prompt_path.exists() else build_stage_repair_prompt(plan)
if not prompt_path.exists():
write_text(prompt_path, prompt)
output_file = iteration_dir / "repair_coder_result.json"
command = build_stage_repair_coder_command(args, iteration_dir=iteration_dir, output_file=output_file)
write_text(iteration_dir / "repair_coder.command.txt", " ".join(command) + "\n")
payload = {
"schema_version": "stage_gui_repair_run_result_v1",
"stage_id": stage_manifest["stage_id"],
"iteration_dir": repo_relative(iteration_dir),
"repair_prompt": repo_relative(prompt_path),
"repair_coder_command": repo_relative(iteration_dir / "repair_coder.command.txt"),
"repair_coder_result": repo_relative(output_file),
"repair_execution_summary": repo_relative(iteration_dir / "repair_execution_summary.json"),
"candidate_files": plan.get("candidate_files") or [],
"dry_run": bool(args.dry_run),
}
if args.dry_run:
payload["next_action"] = repair_execution_next_action(dry_run=True)
payload["command"] = command
write_json(iteration_dir / "repair_execution_summary.json", payload)
summary_path = stage_dir / "stage_loop_summary.json"
previous_summary = load_json_object(summary_path, "Existing stage summary") if summary_path.exists() else None
save_stage_summary(
stage_dir,
build_repair_execution_stage_summary(
stage_manifest=stage_manifest,
previous_summary=previous_summary,
execution=payload,
),
)
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
snapshots = dcl.snapshot_coder_candidate_files(repair_candidate_paths(plan))
dcl.run_subprocess_command(
command,
cwd=REPO_ROOT,
timeout_seconds=int(args.codex_timeout_seconds),
input_text=prompt,
stdout_path=iteration_dir / "repair_coder.stdout.log",
stderr_path=iteration_dir / "repair_coder.stderr.log",
)
restored_files = dcl.restore_line_collapsed_files_from_snapshot(snapshots)
coder_result = dcl.read_json_output(output_file)
payload["coder_status"] = str(coder_result.get("status") or "unknown")
payload["changed_files"] = coder_result.get("changed_files") if isinstance(coder_result.get("changed_files"), list) else []
payload["restored_line_collapsed_files"] = restored_files
payload["next_action"] = repair_execution_next_action(dry_run=False, coder_status=payload["coder_status"])
write_json(iteration_dir / "repair_execution_summary.json", payload)
summary_path = stage_dir / "stage_loop_summary.json"
previous_summary = load_json_object(summary_path, "Existing stage summary") if summary_path.exists() else None
save_stage_summary(
stage_dir,
build_repair_execution_stage_summary(
stage_manifest=stage_manifest,
previous_summary=previous_summary,
execution=payload,
),
)
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def handle_ingest_gui_run(args: argparse.Namespace) -> int:
stage_manifest_path = repo_path(args.manifest)
stage_manifest = load_stage_manifest(stage_manifest_path)
@ -785,6 +1000,17 @@ def build_parser() -> argparse.ArgumentParser:
repair_parser.add_argument("--handoff")
repair_parser.add_argument("--iteration-id")
repair_parser.set_defaults(func=handle_prepare_repair)
run_repair_parser = subparsers.add_parser(
"run-repair",
help="Run or dry-run the coder command for a prepared stage repair iteration.",
)
add_common_args(run_repair_parser)
run_repair_parser.add_argument("--plan")
run_repair_parser.add_argument("--iteration-id")
run_repair_parser.add_argument("--coder-schema", default=str(DEFAULT_REPAIR_CODER_SCHEMA))
run_repair_parser.add_argument("--dry-run", action="store_true")
run_repair_parser.set_defaults(func=handle_run_repair)
return parser

View File

@ -347,6 +347,98 @@ class StageAgentLoopTests(unittest.TestCase):
self.assertTrue(checklist_exists)
self.assertIn("business_answer_too_verbose", prompt)
def test_handle_run_repair_dry_run_materializes_coder_command(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
manifest_path = root / "stage.json"
output_root = root / "stage_runs"
stage_dir = output_root / "agent_loop"
iteration_dir = stage_dir / "repair_iterations" / "repair_001"
schema_path = root / "coder.schema.json"
write_json(
manifest_path,
{
"stage_id": "agent_loop",
"module_name": "Agent Loop",
"title": "Agent Loop",
"pack_manifest": "docs/orchestration/demo_pack.json",
},
)
write_json(
schema_path,
{
"type": "object",
"additionalProperties": True,
},
)
write_json(
iteration_dir / "repair_iteration_plan.json",
{
"schema_version": "stage_gui_repair_iteration_plan_v1",
"stage_id": "agent_loop",
"iteration_id": "repair_001",
"candidate_files": ["scripts/stage_agent_loop.py"],
"primary_repair_targets": [],
},
)
(iteration_dir / "repair_prompt.md").write_text("repair prompt\n", encoding="utf-8")
exit_code = stage_loop.handle_run_repair(
stage_args(
manifest=str(manifest_path),
output_root=str(output_root),
plan=None,
iteration_id="repair_001",
coder_schema=str(schema_path),
dry_run=True,
)
)
command_text = (iteration_dir / "repair_coder.command.txt").read_text(encoding="utf-8")
execution_summary = json.loads((iteration_dir / "repair_execution_summary.json").read_text(encoding="utf-8"))
stage_summary = json.loads((stage_dir / "stage_loop_summary.json").read_text(encoding="utf-8"))
self.assertEqual(exit_code, 0)
self.assertIn("codex exec", command_text)
self.assertIn("repair_coder_result.json", command_text)
self.assertEqual(execution_summary["next_action"], "execute_repair_without_dry_run_or_review_command")
self.assertTrue(stage_summary["latest_repair_execution"]["dry_run"])
self.assertEqual(stage_summary["next_action"], "execute_repair_without_dry_run_or_review_command")
def test_resolve_stage_repair_iteration_auto_prepares_from_handoff(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
stage_dir = root / "stage"
write_json(
stage_dir / "stage_repair_handoff.json",
{
"run_id": "assistant-stage1-auto",
"next_action": "continue_repair_from_gui_review_p0",
"primary_repair_targets": [
{
"problem_layer": "answer_shape_mismatch",
"issue_code": "business_direct_answer_missing",
"severity": "P0",
"occurrences": 1,
}
],
"sample_findings": [],
},
)
iteration_dir, plan = stage_loop.resolve_stage_repair_iteration(
args=stage_args(plan=None, iteration_id=None),
stage_manifest={
"stage_id": "agent_loop",
"module_name": "Agent Loop",
"title": "Agent Loop",
},
stage_dir=stage_dir,
)
plan_exists = (iteration_dir / "repair_iteration_plan.json").exists()
self.assertTrue(plan_exists)
self.assertEqual(plan["run_id"], "assistant-stage1-auto")
def test_handle_ingest_gui_run_materializes_stage_review(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)