From 931251d1eb5dce13f2033da81e9bbb9defb98ea4 Mon Sep 17 00:00:00 2001 From: dctouch Date: Sat, 9 May 2026 11:44:02 +0300 Subject: [PATCH] =?UTF-8?q?=D0=90=D0=B2=D1=82=D0=BE=D0=BC=D0=B0=D1=82?= =?UTF-8?q?=D0=B8=D0=B7=D0=B8=D1=80=D0=BE=D0=B2=D0=B0=D1=82=D1=8C=20=D0=B0?= =?UTF-8?q?=D0=B3=D0=B5=D0=BD=D1=82=D0=BD=D1=83=D1=8E=20=D0=BF=D1=80=D0=BE?= =?UTF-8?q?=D0=B2=D0=B5=D1=80=D0=BA=D1=83=20GUI-=D0=BF=D1=80=D0=BE=D0=B3?= =?UTF-8?q?=D0=BE=D0=BD=D0=BE=D0=B2=20=D0=B8=20stage-loop?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain_scenario_loop_repo_adapter.md | 72 ++ .../stage_agent_loop_manifest.schema.json | 72 ++ scripts/domain_case_loop.py | 168 ++++- scripts/domain_truth_harness.py | 152 +++++ scripts/review_assistant_stage1_run.py | 634 ++++++++++++++++++ scripts/save_agent_semantic_run.py | 243 ++++++- scripts/stage_agent_loop.py | 406 +++++++++++ scripts/test_domain_case_loop_step_state.py | 141 ++++ scripts/test_review_assistant_stage1_run.py | 155 +++++ scripts/test_save_agent_semantic_run.py | 241 +++++++ scripts/test_stage_agent_loop.py | 153 +++++ 11 files changed, 2428 insertions(+), 9 deletions(-) create mode 100644 docs/orchestration/schemas/stage_agent_loop_manifest.schema.json create mode 100644 scripts/review_assistant_stage1_run.py create mode 100644 scripts/stage_agent_loop.py create mode 100644 scripts/test_review_assistant_stage1_run.py create mode 100644 scripts/test_save_agent_semantic_run.py create mode 100644 scripts/test_stage_agent_loop.py diff --git a/docs/orchestration/domain_scenario_loop_repo_adapter.md b/docs/orchestration/domain_scenario_loop_repo_adapter.md index 28964c7..8ac4f91 100644 --- a/docs/orchestration/domain_scenario_loop_repo_adapter.md +++ b/docs/orchestration/domain_scenario_loop_repo_adapter.md @@ -53,6 +53,78 @@ Pack artifacts live under: - `final_status.md` - `scenarios//...` +## AGENT autorun save gate + +`scripts/save_agent_semantic_run.py` is a post-validation persistence tool, not a replay executor. +The normal path is: + +1. build/update the truth-harness spec; +2. run `python scripts/domain_truth_harness.py run-live --spec ... --output-dir artifacts/domain_runs/`; +3. inspect `truth_review.md`, `business_review.md`, `pack_state.json`, and `final_status.md`; +4. save to GUI autoruns only with `python scripts/save_agent_semantic_run.py --spec ... --validated-run-dir artifacts/domain_runs/`. + +The save gate requires: +- `pack_state.final_status = accepted`; +- `pack_state.acceptance_gate_passed = true`; +- `truth_review.summary.overall_status = pass`; +- `business_review.overall_business_status = pass`; +- zero unresolved P0 and zero business-answer failures. + +If a pack must be saved as a deliberate manual draft before live acceptance, use +`--allow-unvalidated --unvalidated-reason ""`. +That path is explicitly marked as unvalidated and must not be treated as semantic proof. + +## Stage-level AGENT loop + +`scripts/stage_agent_loop.py` wraps the domain pack loop into the development-stage workflow: + +1. take the current global/local stage manifest; +2. run `scripts/domain_case_loop.py run-pack-loop` for that stage pack; +3. let the loop iterate through pack replay, business-first analyst verdict, coder patch, and rerun until the objective gate is accepted, blocked, or a real user decision is required; +4. if accepted, persist the validated AGENT pack into GUI autoruns through `scripts/save_agent_semantic_run.py --validated-run-dir`; +5. write `stage_loop_summary.json` and `stage_loop_handoff.md` for the final human visual confirmation. + +The stage manifest schema is `docs/orchestration/schemas/stage_agent_loop_manifest.schema.json`. +The default stage gate is intentionally stricter than a narrow case gate: `target_score = 88`, no unresolved P0/P1 repair targets, accepted analyst verdict, clean business usefulness, direct-answer, temporal-honesty, field-truth, and answer-layering flags. + +Canonical commands: + +```powershell +python scripts/stage_agent_loop.py plan --manifest docs/orchestration/.json +python scripts/stage_agent_loop.py run --manifest docs/orchestration/.json +python scripts/stage_agent_loop.py summarize --manifest docs/orchestration/.json +``` + +This is the intended path for “implement the stage, generate/check stage questions, analyze business answers, patch code, rerun, then ask the user for final visual confirmation”. + +## GUI run review bridge + +When a manual or GUI autorun already exists, `scripts/review_assistant_stage1_run.py` turns the run id into the same machine-readable review surface. + +Canonical command: + +```powershell +python scripts/review_assistant_stage1_run.py assistant-stage1- --print-summary +``` + +The script resolves: +- `llm_normalizer/reports/assistant-stage1-.md`; +- `llm_normalizer/data/assistant_sessions/assistant-stage1--*.json`. + +It writes: +- `artifacts/domain_runs/gui_run_reviews/assistant-stage1-/run_review.json`; +- `artifacts/domain_runs/gui_run_reviews/assistant-stage1-/run_review.md`; +- `conversation_pairs.json`; +- `question_quality_review.json`; +- `repair_targets.json`. + +This bridge is intentionally business-first: +- the user's question and visible assistant answer are reviewed before route ids and debug fields; +- noisy direct answers, missing first-line answers, technical garbage, and over-broad business answers become findings; +- generated question packs get a deterministic quality review for follow-up density, direct questions, report-style analysis, domain diversity, duplicates, and weak business anchors. + +Use this bridge when the operator would otherwise say “чекни прогон `assistant-stage1-...`”. The expected next step is no longer manual eyeballing first; it is: review by id, inspect `run_review.md`, map `repair_targets.json` into the current stage loop, patch, and rerun. + ## Placeholder contract Scenario questions can reference earlier step outputs with placeholders such as: diff --git a/docs/orchestration/schemas/stage_agent_loop_manifest.schema.json b/docs/orchestration/schemas/stage_agent_loop_manifest.schema.json new file mode 100644 index 0000000..fcbcdff --- /dev/null +++ b/docs/orchestration/schemas/stage_agent_loop_manifest.schema.json @@ -0,0 +1,72 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Stage Agent Loop Manifest", + "type": "object", + "additionalProperties": true, + "required": ["stage_id", "module_name", "title", "pack_manifest"], + "properties": { + "schema_version": { + "type": "string", + "enum": ["stage_agent_loop_manifest_v1"] + }, + "stage_id": { + "type": "string", + "minLength": 1 + }, + "module_name": { + "type": "string", + "minLength": 1 + }, + "title": { + "type": "string", + "minLength": 1 + }, + "architecture_phase": { + "type": "string" + }, + "agent_focus": { + "type": "string" + }, + "current_stage_status": { + "type": "string" + }, + "global_plan_refs": { + "type": "array", + "items": { + "type": "string" + } + }, + "pack_manifest": { + "type": "string", + "description": "Path to a domain_case_loop run-pack manifest with scenarios for the stage gate." + }, + "loop_id": { + "type": "string" + }, + "target_score": { + "type": "integer", + "minimum": 0, + "maximum": 100, + "default": 88 + }, + "max_iterations": { + "type": "integer", + "minimum": 1, + "default": 6 + }, + "acceptance_invariants": { + "type": "array", + "items": { + "type": "string" + } + }, + "save_autorun_on_accept": { + "type": "boolean", + "default": true + }, + "manual_confirmation_required_after_accept": { + "type": "boolean", + "default": true + } + } +} diff --git a/scripts/domain_case_loop.py b/scripts/domain_case_loop.py index cc082a8..69d246e 100644 --- a/scripts/domain_case_loop.py +++ b/scripts/domain_case_loop.py @@ -88,6 +88,48 @@ TOP_LEVEL_NOISE_PATTERNS = ( re.compile(r"^(?:подтверждение|опорные документы|сервисно)\b", re.IGNORECASE), ) +BUSINESS_DIRECT_QUESTION_MARKERS = ( + "\u0441\u043a\u043e\u043b\u044c\u043a\u043e", + "\u0441\u043a\u043e\u043a", + "\u043a\u0430\u043a\u043e\u0439", + "\u043a\u0430\u043a\u0430\u044f", + "\u043a\u0430\u043a\u0438\u0435", + "\u043a\u0442\u043e", + "\u043a\u043e\u043c\u0443", + "\u043a\u043e\u0433\u0434\u0430", + "\u0433\u0434\u0435", + "\u043a\u0443\u0434\u0430", + "\u043f\u043e\u0447\u0435\u043c\u0443", + "\u0437\u0430\u0447\u0435\u043c", + "\u043a\u0430\u043a\u0438\u043c \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u043c", + "\u043f\u043e\u043a\u0430\u0436\u0438", +) +BUSINESS_REPORT_REQUEST_MARKERS = ( + "\u043e\u0431\u0437\u043e\u0440", + "\u0430\u043d\u0430\u043b\u0438\u0437", + "\u043f\u043e\u0434\u0440\u043e\u0431", + "\u0440\u0430\u0437\u0432\u0435\u0440\u043d", + "\u043e\u0446\u0435\u043d", + "\u0430\u0443\u0434\u0438\u0442", +) +BUSINESS_TOP_LINE_SCAFFOLD_MARKERS = ( + "\u043e\u0433\u0440\u0430\u043d\u0438\u0447\u0435\u043d\u043d\u044b\u0439 \u0431\u0438\u0437\u043d\u0435\u0441-\u043e\u0431\u0437\u043e\u0440", + "\u0447\u0442\u043e \u043f\u043e\u0434\u0442\u0432\u0435\u0440\u0436\u0434\u0435\u043d\u043e", + "\u043f\u0440\u043e\u0432\u0435\u0440\u0435\u043d\u043d\u044b\u0435 \u043a\u043e\u043d\u0442\u0443\u0440\u044b", + "\u0431\u043b\u043e\u043a 1", + "\u0441\u0442\u0430\u0442\u0443\u0441", +) +BUSINESS_TECHNICAL_GARBAGE_MARKERS = ( + "mcp_discovery", + "runtime_", + "capability_id", + "selected_chain_id", + "business_overview_route_template_v1", + "query_movements", + "query_documents", +) +BUSINESS_DIRECT_ANSWER_SOFT_LIMIT = 1800 + DEFAULT_INVARIANT_SEVERITY: dict[str, str] = { "wrong_intent": "P0", "wrong_capability": "P0", @@ -104,6 +146,10 @@ DEFAULT_INVARIANT_SEVERITY: dict[str, str] = { "wrong_date_scope_state": "P0", "direct_answer_missing": "P0", "top_level_noise_present": "P0", + "business_direct_answer_missing": "P0", + "technical_garbage_in_answer": "P0", + "answer_layering_noise": "P1", + "business_answer_too_verbose": "P1", } REPAIR_TARGET_SEVERITY_ORDER = {"P0": 0, "P1": 1, "P2": 2} @@ -114,11 +160,12 @@ REPAIR_TARGET_PROBLEM_ORDER = { "object_memory_gap": 3, "route_gap": 4, "answer_shape_mismatch": 5, - "presentation_gap": 6, - "domain_anchor_gap": 7, - "capability_gap": 8, - "evidence_gap": 9, - "other": 10, + "business_utility_gap": 6, + "presentation_gap": 7, + "domain_anchor_gap": 8, + "capability_gap": 9, + "evidence_gap": 10, + "other": 11, } REPAIR_TARGET_FILE_HINTS: dict[str, list[str]] = { @@ -157,6 +204,16 @@ REPAIR_TARGET_FILE_HINTS: dict[str, list[str]] = { "llm_normalizer/backend/src/services/address_runtime/composeStage.ts", "llm_normalizer/backend/src/services/assistantService.ts", ], + "answer_shape_mismatch": [ + "llm_normalizer/backend/src/services/address_runtime/composeStage.ts", + "llm_normalizer/backend/src/services/assistantMcpDiscoveryResponseCandidate.ts", + "llm_normalizer/backend/src/services/assistantService.ts", + ], + "business_utility_gap": [ + "llm_normalizer/backend/src/services/address_runtime/composeStage.ts", + "llm_normalizer/backend/src/services/assistantMcpDiscoveryResponseCandidate.ts", + "llm_normalizer/backend/src/services/assistantService.ts", + ], "evidence_gap": [ "llm_normalizer/backend/src/services/addressQueryService.ts", "llm_normalizer/backend/src/services/addressRecipeCatalog.ts", @@ -1526,6 +1583,79 @@ def should_require_direct_answer(step_state: dict[str, Any]) -> bool: return str(step_state.get("node_role") or "").strip() in {"root", "critical_child"} +def _review_text(value: Any) -> str: + return str(value or "").strip().lower() + + +def _marker_hits(text: str, markers: tuple[str, ...]) -> list[str]: + lowered = _review_text(text) + return [marker for marker in markers if marker and marker in lowered] + + +def is_report_style_business_question(question: str) -> bool: + return bool(_marker_hits(question, BUSINESS_REPORT_REQUEST_MARKERS)) + + +def is_direct_style_business_question(question: str) -> bool: + if is_report_style_business_question(question): + return False + return bool(_marker_hits(question, BUSINESS_DIRECT_QUESTION_MARKERS)) + + +def build_business_first_review(step_state: dict[str, Any]) -> dict[str, Any]: + question = str(step_state.get("question_resolved") or step_state.get("question_template") or "").strip() + assistant_text = str(step_state.get("assistant_text") or "") + top_lines = step_state.get("top_non_empty_lines") if isinstance(step_state.get("top_non_empty_lines"), list) else [] + first_line = str(top_lines[0] if top_lines else step_state.get("actual_direct_answer") or "").strip() + direct_answer_required = should_require_direct_answer(step_state) or is_direct_style_business_question(question) + report_style_question = is_report_style_business_question(question) + technical_hits = _marker_hits(assistant_text, BUSINESS_TECHNICAL_GARBAGE_MARKERS) + first_line_technical_hits = _marker_hits(first_line, BUSINESS_TECHNICAL_GARBAGE_MARKERS) + scaffold_hits = _marker_hits(first_line, BUSINESS_TOP_LINE_SCAFFOLD_MARKERS) + top_noise = bool(first_line and is_top_level_noise_line(first_line)) + direct_answer_first_ok = bool(first_line) and not top_noise and not scaffold_hits and not first_line_technical_hits + too_verbose_for_direct = bool( + direct_answer_required + and not report_style_question + and len(assistant_text) > BUSINESS_DIRECT_ANSWER_SOFT_LIMIT + ) + issue_codes: list[str] = [] + if technical_hits: + issue_codes.append("technical_garbage_in_answer") + if direct_answer_required and not direct_answer_first_ok: + issue_codes.append("business_direct_answer_missing") + if scaffold_hits or top_noise: + issue_codes.append("answer_layering_noise") + if too_verbose_for_direct: + issue_codes.append("business_answer_too_verbose") + + root_cause_layers: list[str] = [] + if "business_direct_answer_missing" in issue_codes or "answer_layering_noise" in issue_codes: + root_cause_layers.append("answer_shape_mismatch") + if "business_answer_too_verbose" in issue_codes or "technical_garbage_in_answer" in issue_codes: + root_cause_layers.append("business_utility_gap") + + return { + "schema_version": "business_first_step_review_v1", + "question": question, + "direct_answer_required": direct_answer_required, + "report_style_question": report_style_question, + "answer_length_chars": len(assistant_text), + "answer_line_count": len([line for line in assistant_text.splitlines() if line.strip()]), + "actual_direct_answer": first_line or None, + "direct_answer_first_ok": (not direct_answer_required) or direct_answer_first_ok, + "answer_layering_ok": not scaffold_hits and not top_noise, + "technical_garbage_present": bool(technical_hits), + "technical_garbage_hits": technical_hits, + "top_line_scaffold_present": bool(scaffold_hits or top_noise), + "top_line_scaffold_hits": scaffold_hits, + "too_verbose_for_direct_question": too_verbose_for_direct, + "business_usefulness_ok": not issue_codes, + "issue_codes": issue_codes, + "suggested_root_cause_layers": list(dict.fromkeys(root_cause_layers)), + } + + def is_top_level_noise_line(line: str) -> bool: cleaned = str(line or "").strip() if not cleaned: @@ -1563,6 +1693,8 @@ def validate_step_contract(step_state: dict[str, Any]) -> dict[str, Any]: date_scope = state.get("date_scope") if isinstance(state.get("date_scope"), dict) else {} violated_invariants: list[str] = [] warnings: list[str] = [] + business_review = build_business_first_review(state) + state["business_first_review"] = business_review expected_intents = normalize_string_list(state.get("expected_intents")) if expected_intents and not identifier_in_list(state.get("detected_intent"), expected_intents): @@ -1645,6 +1777,13 @@ def validate_step_contract(step_state: dict[str, Any]) -> dict[str, Any]: if first_top_line and is_top_level_noise_line(first_top_line): violated_invariants.append("top_level_noise_present") + for issue_code in normalize_string_list(business_review.get("issue_codes")): + if issue_code == "business_answer_too_verbose": + warnings.append(issue_code) + violated_invariants.append(issue_code) + continue + violated_invariants.append(issue_code) + forbidden_answer_patterns = normalize_string_list(state.get("forbidden_answer_patterns")) if forbidden_answer_patterns and top_non_empty_lines: joined_top_block = "\n".join(str(line) for line in top_non_empty_lines) @@ -2697,6 +2836,7 @@ def compact_step_output_for_review(step_output: Any) -> dict[str, Any]: "result_mode": step_output.get("result_mode"), "answer_shape": step_output.get("answer_shape"), "actual_direct_answer": step_output.get("actual_direct_answer"), + "business_first_review": step_output.get("business_first_review"), "violated_invariants": step_output.get("violated_invariants"), "warnings": step_output.get("warnings"), "fallback_type": step_output.get("fallback_type"), @@ -2742,6 +2882,9 @@ def derive_repair_target_severity(step_output: dict[str, Any]) -> str: return "P1" if execution_status in {"partial", "needs_exact_capability"} or reply_type == "partial_coverage": return "P1" + violated_invariants = normalize_string_list(step_output.get("violated_invariants")) + if any(derive_invariant_severity(step_output, code) == "P1" for code in violated_invariants): + return "P1" if normalize_string_list(step_output.get("warnings")): return "P2" return "P2" @@ -2772,6 +2915,10 @@ def derive_repair_problem_type(step_output: dict[str, Any]) -> str: "forbidden_recipe_selected", } & violated: return "route_gap" + if {"business_direct_answer_missing", "answer_layering_noise"} & violated: + return "answer_shape_mismatch" + if {"business_answer_too_verbose", "technical_garbage_in_answer"} & violated: + return "business_utility_gap" if {"direct_answer_missing", "top_level_noise_present"} & violated: return "presentation_gap" if mcp_call_status == "materialized_but_not_anchor_matched": @@ -2808,6 +2955,13 @@ def derive_repair_root_cause_layers(step_output: dict[str, Any], problem_type: s layers.append("business_utility_gap") if str(step_output.get("required_answer_shape") or "").strip(): layers.append("answer_shape_mismatch") + elif problem_type == "answer_shape_mismatch": + layers.append("answer_shape_mismatch") + layers.append("business_utility_gap") + elif problem_type == "business_utility_gap": + layers.append("business_utility_gap") + if "answer_layering_noise" in violated: + layers.append("answer_shape_mismatch") elif problem_type == "evidence_gap": layers.append("runtime_capability_gap") elif problem_type == "domain_anchor_gap": @@ -2833,6 +2987,10 @@ def build_repair_fix_goal(step_output: dict[str, Any], problem_type: str) -> str return f"Enable an exact route for `{question}` so the loop no longer falls back to partial or unsupported behavior." if problem_type == "presentation_gap": return f"Make `{question}` answer-first: direct business answer in the first line, proof second, service notes last." + if problem_type == "answer_shape_mismatch": + return f"Make `{question}` start with the exact business answer requested, then put proof and caveats after it." + if problem_type == "business_utility_gap": + return f"Make `{question}` useful for a business reader: remove technical/scaffold noise and keep direct answers compact." if problem_type == "evidence_gap": return f"Return grounded evidence for `{question}` instead of a limited empty response when the correct route already fires." if problem_type == "domain_anchor_gap": diff --git a/scripts/domain_truth_harness.py b/scripts/domain_truth_harness.py index c468b54..48b9f41 100644 --- a/scripts/domain_truth_harness.py +++ b/scripts/domain_truth_harness.py @@ -309,6 +309,46 @@ def append_finding( ) +BUSINESS_REVIEW_FINDING_MESSAGES = { + "technical_garbage_in_answer": "User-facing answer leaked internal runtime or MCP identifiers.", + "business_direct_answer_missing": "The answer did not put the direct business answer first.", + "answer_layering_noise": "The answer opened with scaffolding or report framing instead of a clean business result.", + "business_answer_too_verbose": "The answer is too verbose for a direct business question.", +} + +BUSINESS_REVIEW_FINDING_SEVERITY = { + "technical_garbage_in_answer": "critical", + "business_direct_answer_missing": "critical", + "answer_layering_noise": "critical", + "business_answer_too_verbose": "warning", +} + + +def append_business_review_findings(findings: list[dict[str, Any]], step: dict[str, Any], step_state: dict[str, Any]) -> None: + business_review = step_state.get("business_first_review") + if not isinstance(business_review, dict): + return + for issue_code in dcl.normalize_string_list(business_review.get("issue_codes")): + append_finding( + findings, + step, + f"business_review:{issue_code}", + BUSINESS_REVIEW_FINDING_MESSAGES.get(issue_code, "Business-first answer review detected a semantic quality issue."), + actual={ + "direct_answer": business_review.get("actual_direct_answer"), + "answer_length_chars": business_review.get("answer_length_chars"), + "technical_garbage_hits": business_review.get("technical_garbage_hits"), + "top_line_scaffold_hits": business_review.get("top_line_scaffold_hits"), + }, + expected={ + "direct_answer_first_ok": True, + "business_usefulness_ok": True, + "answer_layering_ok": True, + }, + severity=BUSINESS_REVIEW_FINDING_SEVERITY.get(issue_code, step.get("criticality") or DEFAULT_CRITICALITY), + ) + + def matches_any_pattern(text: str, patterns: list[str]) -> bool: return any(re.search(pattern, text, flags=re.IGNORECASE) for pattern in patterns if pattern) @@ -355,6 +395,7 @@ def evaluate_truth_step( extracted_filters = ( step_state.get("extracted_filters") if isinstance(step_state.get("extracted_filters"), dict) else {} ) + append_business_review_findings(findings, step, step_state) if ( catalog_alignment_status in {"selected_lower_rank", "selected_outside_match_set"} @@ -751,6 +792,101 @@ def build_truth_review_summary(spec: dict[str, Any], scenario_state: dict[str, A } +def build_business_review_summary(spec: dict[str, Any], scenario_state: dict[str, Any]) -> dict[str, Any]: + step_outputs = scenario_state.get("step_outputs") if isinstance(scenario_state.get("step_outputs"), dict) else {} + steps: list[dict[str, Any]] = [] + issue_counts: dict[str, int] = {} + for index, step in enumerate(spec["steps"], start=1): + step_state = step_outputs.get(step["step_id"], {}) + business_review = ( + step_state.get("business_first_review") + if isinstance(step_state, dict) and isinstance(step_state.get("business_first_review"), dict) + else {} + ) + issue_codes = dcl.normalize_string_list(business_review.get("issue_codes")) + for issue_code in issue_codes: + issue_counts[issue_code] = issue_counts.get(issue_code, 0) + 1 + steps.append( + { + "index": index, + "step_id": step["step_id"], + "question": step["question_template"], + "review_status": step_state.get("review_status") if isinstance(step_state, dict) else None, + "direct_answer": business_review.get("actual_direct_answer"), + "answer_length_chars": business_review.get("answer_length_chars"), + "direct_answer_required": business_review.get("direct_answer_required"), + "direct_answer_first_ok": business_review.get("direct_answer_first_ok"), + "business_usefulness_ok": business_review.get("business_usefulness_ok"), + "answer_layering_ok": business_review.get("answer_layering_ok"), + "technical_garbage_present": business_review.get("technical_garbage_present"), + "too_verbose_for_direct_question": business_review.get("too_verbose_for_direct_question"), + "issue_codes": issue_codes, + "suggested_root_cause_layers": business_review.get("suggested_root_cause_layers") or [], + } + ) + failed = sum( + 1 + for step in steps + if any( + issue in {"technical_garbage_in_answer", "business_direct_answer_missing", "answer_layering_noise"} + for issue in step["issue_codes"] + ) + ) + warnings = sum(1 for step in steps if "business_answer_too_verbose" in step["issue_codes"]) + return { + "schema_version": "business_first_run_review_v1", + "scenario_id": spec["scenario_id"], + "domain": spec["domain"], + "title": spec["title"], + "session_id": scenario_state.get("session_id"), + "steps_total": len(steps), + "steps_with_business_failures": failed, + "steps_with_business_warnings": warnings, + "issue_counts": issue_counts, + "overall_business_status": "fail" if failed else ("warning" if warnings else "pass"), + "steps": steps, + } + + +def build_business_review_markdown(business_review: dict[str, Any]) -> str: + lines = [ + "# Business-first review", + "", + f"- scenario_id: `{business_review.get('scenario_id') or 'n/a'}`", + f"- domain: `{business_review.get('domain') or 'n/a'}`", + f"- title: {business_review.get('title') or 'n/a'}", + f"- session_id: `{business_review.get('session_id') or 'n/a'}`", + f"- overall_business_status: `{business_review.get('overall_business_status') or 'n/a'}`", + f"- steps_total: `{business_review.get('steps_total')}`", + f"- steps_with_business_failures: `{business_review.get('steps_with_business_failures')}`", + f"- steps_with_business_warnings: `{business_review.get('steps_with_business_warnings')}`", + f"- issue_counts: `{dump_json(business_review.get('issue_counts') or {})}`", + "", + "## Human Answer Surface", + ] + for step in business_review.get("steps") or []: + if not isinstance(step, dict): + continue + lines.extend( + [ + f"{step.get('index')}. `{step.get('step_id')}` - {step.get('question')}", + f"review_status: `{step.get('review_status') or 'n/a'}`", + f"direct_answer: {step.get('direct_answer') or 'n/a'}", + f"answer_length_chars: `{step.get('answer_length_chars')}`", + f"direct_answer_required: `{step.get('direct_answer_required')}`", + f"direct_answer_first_ok: `{step.get('direct_answer_first_ok')}`", + f"business_usefulness_ok: `{step.get('business_usefulness_ok')}`", + f"answer_layering_ok: `{step.get('answer_layering_ok')}`", + f"technical_garbage_present: `{step.get('technical_garbage_present')}`", + f"too_verbose_for_direct_question: `{step.get('too_verbose_for_direct_question')}`", + f"issue_codes: `{', '.join(step.get('issue_codes') or []) or 'none'}`", + f"suggested_root_cause_layers: `{', '.join(step.get('suggested_root_cause_layers') or []) or 'none'}`", + "", + ] + ) + return "\n".join(lines).strip() + "\n" + + def build_truth_review_markdown(spec: dict[str, Any], scenario_state: dict[str, Any], review_summary: dict[str, Any]) -> str: lines = [ "# Truth harness review", @@ -772,6 +908,11 @@ def build_truth_review_markdown(spec: dict[str, Any], scenario_state: dict[str, for index, step in enumerate(spec["steps"], start=1): step_state = step_outputs.get(step["step_id"], {}) findings = step_state.get("review_findings") if isinstance(step_state.get("review_findings"), list) else [] + business_review = ( + step_state.get("business_first_review") + if isinstance(step_state, dict) and isinstance(step_state.get("business_first_review"), dict) + else {} + ) lines.extend( [ f"{index}. `{step['step_id']}` - {step['question_template']}", @@ -786,6 +927,11 @@ def build_truth_review_markdown(spec: dict[str, Any], scenario_state: dict[str, f"limited_reason_category: `{step_state.get('limited_reason_category') or 'n/a'}`", f"filters: `{dump_json(step_state.get('extracted_filters') or {})}`", f"direct_answer: {step_state.get('actual_direct_answer') or 'n/a'}", + f"business_first: status=`{business_review.get('business_usefulness_ok')}`, " + f"direct_first=`{business_review.get('direct_answer_first_ok')}`, " + f"layering=`{business_review.get('answer_layering_ok')}`, " + f"length=`{business_review.get('answer_length_chars')}`, " + f"issues=`{', '.join(business_review.get('issue_codes') or []) or 'none'}`", ] ) if step.get("notes"): @@ -964,9 +1110,12 @@ def review_export(spec: dict[str, Any], export_path: Path, output_dir: Path) -> scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat() review_summary = build_truth_review_summary(spec, scenario_state, f"export:{export_path}") review_markdown = build_truth_review_markdown(spec, scenario_state, review_summary) + business_review = build_business_review_summary(spec, scenario_state) write_json(output_dir / "scenario_state.json", scenario_state) write_json(output_dir / "truth_review.json", {"summary": review_summary, "steps": scenario_state["step_outputs"]}) write_text(output_dir / "truth_review.md", review_markdown) + write_json(output_dir / "business_review.json", business_review) + write_text(output_dir / "business_review.md", build_business_review_markdown(business_review)) acceptance_bundle = write_acceptance_artifacts(output_dir, spec, scenario_state, review_summary) return { "scenario_state": scenario_state, @@ -1056,10 +1205,13 @@ def run_live(spec: dict[str, Any], output_dir: Path, args: argparse.Namespace) - review_summary = build_truth_review_summary(spec, scenario_state, "live_strict_replay") review_markdown = build_truth_review_markdown(spec, scenario_state, review_summary) + business_review = build_business_review_summary(spec, scenario_state) write_text(output_dir / "session_id.txt", f"{scenario_state.get('session_id') or ''}\n") write_json(output_dir / "scenario_state.json", scenario_state) write_json(output_dir / "truth_review.json", {"summary": review_summary, "steps": scenario_state["step_outputs"]}) write_text(output_dir / "truth_review.md", review_markdown) + write_json(output_dir / "business_review.json", business_review) + write_text(output_dir / "business_review.md", build_business_review_markdown(business_review)) acceptance_bundle = write_acceptance_artifacts(output_dir, spec, scenario_state, review_summary) print(f"[truth-harness] saved artifacts to {output_dir}") print(f"[truth-harness] overall_status={review_summary['overall_status']}") diff --git a/scripts/review_assistant_stage1_run.py b/scripts/review_assistant_stage1_run.py new file mode 100644 index 0000000..7fdefc9 --- /dev/null +++ b/scripts/review_assistant_stage1_run.py @@ -0,0 +1,634 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import re +import sys +from collections import Counter +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import domain_case_loop as dcl + + +REPO_ROOT = Path(__file__).resolve().parents[1] +DEFAULT_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "assistant_sessions" +DEFAULT_REPORTS_DIR = REPO_ROOT / "llm_normalizer" / "reports" +DEFAULT_OUTPUT_ROOT = REPO_ROOT / "artifacts" / "domain_runs" / "gui_run_reviews" +RUN_REVIEW_SCHEMA_VERSION = "assistant_stage1_run_review_v1" +QUESTION_QUALITY_SCHEMA_VERSION = "assistant_stage1_question_quality_v1" + +DOMAIN_MARKERS: dict[str, tuple[str, ...]] = { + "vat": ("ндс", "налог", "вычет", "счет-фактур"), + "money": ("деньг", "заработ", "доход", "выруч", "поступлен", "оплат", "оборот"), + "counterparty": ("контрагент", "клиент", "покупател", "поставщик", "группа свк", "свк", "чепурнов", "альтернатива"), + "inventory": ("склад", "товар", "остат", "закуп", "продаж", "номенклатур"), + "debt": ("долг", "должен", "должны", "должн", "дебитор", "кредитор", "счет 60", "счет 62", "хвост"), + "documents": ("документ", "доки", "накладн", "акт", "платеж", "реализац", "поступлени"), +} +SMALLTALK_MARKERS = ("привет", "как дела", "что умеешь", "что можешь", "расскажи что можешь") +FOLLOWUP_MARKERS = ( + "по ней", + "по нему", + "по этой", + "по этому", + "по выбран", + "теперь", + "тогда", + "давай на", + "а еще", + "еще", + "эту", + "его", + "ее", + "этот", + "эта", + "сравни", + "а если", + "а нам", + "почему", + "а кому", + "кому ", +) +DATE_ONLY_FOLLOWUP_PATTERN = re.compile( + r"^\s*(?:давай\s+)?(?:на\s+)?(?:январ[ьяе]|феврал[ьяе]|март[ае]?|апрел[ьяе]|ма[йяе]|июн[ьяе]|июл[ьяе]|август[ае]?|сентябр[ьяе]|октябр[ьяе]|ноябр[ьяе]|декабр[ьяе])\s+\d{4}\s*$", + re.IGNORECASE, +) +FALSE_CATASTROPHE_MARKERS = ( + "все сломалось", + "разъехалось", + "разъеб", + "пиздец", + "хуйня", + "косяк", + "неправильно", +) +BUSINESS_NOUN_MARKERS = tuple(sorted({item for values in DOMAIN_MARKERS.values() for item in values})) + + +def now_iso() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat() + + +def load_json(path: Path) -> Any: + return json.loads(path.read_text(encoding="utf-8-sig")) + + +def write_text(path: Path, text: str) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(text, encoding="utf-8") + + +def write_json(path: Path, payload: Any) -> None: + write_text(path, json.dumps(payload, ensure_ascii=False, indent=2) + "\n") + + +def repo_relative(path: Path) -> str: + try: + return str(path.resolve().relative_to(REPO_ROOT)) + except ValueError: + return str(path.resolve()) + + +def normalize_text(value: Any) -> str: + return re.sub(r"\s+", " ", str(value or "").strip().lower()) + + +def compact_preview(value: Any, limit: int = 260) -> str: + text = re.sub(r"\s+", " ", str(value or "").strip()) + if len(text) <= limit: + return text + return text[: limit - 1].rstrip() + "..." + + +def has_any(text: str, markers: tuple[str, ...]) -> bool: + lowered = normalize_text(text) + return any(marker in lowered for marker in markers) + + +def run_id_from_value(value: str) -> str: + text = str(value or "").strip() + match = re.search(r"(assistant-stage1-[A-Za-z0-9_-]+)", text) + if not match: + raise RuntimeError(f"Cannot parse assistant-stage1 run id from: {value}") + return match.group(1) + + +def parse_report_metadata(report_path: Path) -> dict[str, Any]: + if not report_path.exists(): + return {} + metadata: dict[str, Any] = {"report_path": repo_relative(report_path)} + for line in report_path.read_text(encoding="utf-8-sig").splitlines()[:80]: + match = re.match(r"^-\s*([^:]+):\s*(.*)$", line.strip()) + if match: + metadata[match.group(1).strip()] = match.group(2).strip() + return metadata + + +def resolve_session_files( + *, + run_id: str, + sessions_dir: Path, + explicit_session_file: Path | None = None, +) -> list[Path]: + if explicit_session_file is not None: + if not explicit_session_file.exists(): + raise RuntimeError(f"Session file not found: {explicit_session_file}") + return [explicit_session_file] + candidates = sorted(sessions_dir.glob(f"{run_id}-*.json")) + if not candidates: + raise RuntimeError(f"No assistant session files found for {run_id} in {sessions_dir}") + return candidates + + +def load_session(path: Path) -> dict[str, Any]: + payload = load_json(path) + if not isinstance(payload, dict): + raise RuntimeError(f"Assistant session must be a JSON object: {path}") + conversation = payload.get("conversation") + if not isinstance(conversation, list): + raise RuntimeError(f"Assistant session has no conversation[]: {path}") + return payload + + +def build_conversation_pairs(conversation: list[dict[str, Any]]) -> list[dict[str, Any]]: + pairs: list[dict[str, Any]] = [] + for index, item in enumerate(conversation): + if not isinstance(item, dict) or item.get("role") != "user": + continue + assistant_item: dict[str, Any] | None = None + for candidate in conversation[index + 1 :]: + if isinstance(candidate, dict) and candidate.get("role") == "assistant": + assistant_item = candidate + break + if isinstance(candidate, dict) and candidate.get("role") == "user": + break + pairs.append( + { + "pair_index": len(pairs) + 1, + "user": item, + "assistant": assistant_item, + } + ) + return pairs + + +def classify_question(question: str, pair_index: int) -> dict[str, Any]: + normalized = normalize_text(question) + tags: list[str] = [] + domains: list[str] = [] + if has_any(normalized, SMALLTALK_MARKERS): + tags.append("smalltalk_or_meta") + if dcl.is_direct_style_business_question(normalized): + tags.append("direct_business_question") + if dcl.is_report_style_business_question(normalized): + tags.append("report_or_analysis_request") + date_only_followup = pair_index > 1 and bool(DATE_ONLY_FOLLOWUP_PATTERN.match(question)) + if has_any(normalized, FOLLOWUP_MARKERS) or date_only_followup: + tags.append("contextual_followup") + if has_any(normalized, FALSE_CATASTROPHE_MARKERS): + tags.append("false_catastrophe_or_negative_pressure") + for domain, markers in DOMAIN_MARKERS.items(): + if has_any(normalized, markers): + domains.append(domain) + if domains: + tags.append("domain_grounded") + if not tags: + tags.append("unclassified") + + weak_flags: list[str] = [] + if pair_index == 1 and "contextual_followup" in tags and "smalltalk_or_meta" not in tags: + weak_flags.append("root_question_requires_missing_context") + if len(question) > 500: + weak_flags.append("question_too_long") + if ( + "smalltalk_or_meta" not in tags + and "contextual_followup" not in tags + and "report_or_analysis_request" not in tags + and not domains + and not has_any(normalized, BUSINESS_NOUN_MARKERS) + ): + weak_flags.append("low_business_anchor") + + return { + "question": question, + "tags": tags, + "domains": domains, + "weak_flags": weak_flags, + "length_chars": len(question), + } + + +def build_question_quality_review(pairs: list[dict[str, Any]]) -> dict[str, Any]: + question_reviews: list[dict[str, Any]] = [] + question_counter: Counter[str] = Counter() + for pair in pairs: + question = str(pair.get("user", {}).get("text") or "") + normalized = normalize_text(question) + if normalized: + question_counter[normalized] += 1 + question_reviews.append(classify_question(question, int(pair["pair_index"]))) + + tag_counts = Counter(tag for item in question_reviews for tag in item["tags"]) + domain_counts = Counter(domain for item in question_reviews for domain in item["domains"]) + weak_flag_counts = Counter(flag for item in question_reviews for flag in item["weak_flags"]) + duplicate_questions = [question for question, count in question_counter.items() if count > 1] + if duplicate_questions: + weak_flag_counts["duplicate_questions"] += len(duplicate_questions) + if tag_counts["contextual_followup"] < 2 and len(question_reviews) >= 8: + weak_flag_counts["too_few_contextual_followups"] += 1 + if tag_counts["direct_business_question"] < 3 and len(question_reviews) >= 8: + weak_flag_counts["too_few_direct_business_questions"] += 1 + if tag_counts["report_or_analysis_request"] < 1 and len(question_reviews) >= 8: + weak_flag_counts["missing_report_or_analysis_request"] += 1 + if len(domain_counts) < 3 and len(question_reviews) >= 8: + weak_flag_counts["low_domain_diversity"] += 1 + + score = 100 + score -= min(30, weak_flag_counts["low_business_anchor"] * 6) + score -= min(20, weak_flag_counts["question_too_long"] * 5) + score -= min(20, weak_flag_counts["duplicate_questions"] * 5) + score -= 12 if weak_flag_counts["too_few_contextual_followups"] else 0 + score -= 12 if weak_flag_counts["too_few_direct_business_questions"] else 0 + score -= 10 if weak_flag_counts["missing_report_or_analysis_request"] else 0 + score -= 10 if weak_flag_counts["low_domain_diversity"] else 0 + score -= 20 if weak_flag_counts["root_question_requires_missing_context"] else 0 + score = max(0, min(100, score)) + + if score >= 85: + status = "strong" + elif score >= 70: + status = "usable_with_gaps" + else: + status = "weak" + + return { + "schema_version": QUESTION_QUALITY_SCHEMA_VERSION, + "status": status, + "score": score, + "turns_total": len(question_reviews), + "tag_counts": dict(sorted(tag_counts.items())), + "domain_counts": dict(sorted(domain_counts.items())), + "weak_flag_counts": dict(sorted(weak_flag_counts.items())), + "duplicate_questions": duplicate_questions[:20], + "questions": question_reviews, + } + + +def build_step_for_pair(pair: dict[str, Any]) -> dict[str, Any]: + pair_index = int(pair["pair_index"]) + question = str(pair.get("user", {}).get("text") or "").strip() + title = compact_preview(question, limit=80) or f"Turn {pair_index}" + return { + "step_id": f"turn_{pair_index:03d}", + "title": title, + "depends_on": [], + "question_template": question, + "invariant_severity": { + "answer_layering_noise": "P1", + "business_answer_too_verbose": "P1", + }, + } + + +def build_step_state_for_pair( + *, + run_id: str, + session: dict[str, Any], + pair: dict[str, Any], +) -> dict[str, Any]: + pair_index = int(pair["pair_index"]) + question = str(pair.get("user", {}).get("text") or "").strip() + assistant_item = pair.get("assistant") if isinstance(pair.get("assistant"), dict) else {} + assistant_text = str(assistant_item.get("text") or "") + debug = assistant_item.get("debug") if isinstance(assistant_item.get("debug"), dict) else {} + turn_artifact = { + "schema_version": "assistant_stage1_gui_turn_artifact_v1", + "run_id": run_id, + "session_id": session.get("session_id"), + "pair_index": pair_index, + "user_message": pair.get("user"), + "assistant_message": assistant_item, + "technical_debug_payload": debug, + "session_summary": { + "session_id": session.get("session_id"), + "started_at": session.get("started_at"), + "updated_at": session.get("updated_at"), + "address_navigation_state": session.get("address_navigation_state"), + "investigation_state": session.get("investigation_state"), + "counters": session.get("counters"), + "reply_types": session.get("reply_types"), + }, + } + entries = dcl.extract_structured_entries(assistant_text) + return dcl.build_scenario_step_state( + scenario_id=run_id, + domain="assistant_stage1_gui_run", + step=build_step_for_pair(pair), + step_index=pair_index, + question_resolved=question, + analysis_context={}, + turn_artifact=turn_artifact, + entries=entries, + ) + + +def severity_rank(severity: str) -> int: + return {"P0": 0, "P1": 1, "P2": 2, "WARNING": 3}.get(str(severity or "").upper(), 4) + + +def max_issue_severity(step_state: dict[str, Any], issue_codes: list[str]) -> str: + if not issue_codes: + return "none" + severities = [dcl.derive_invariant_severity(step_state, code) for code in issue_codes] + return sorted(severities, key=severity_rank)[0] + + +def build_finding(step_state: dict[str, Any], session_id: str | None) -> dict[str, Any] | None: + review = step_state.get("business_first_review") if isinstance(step_state.get("business_first_review"), dict) else {} + issue_codes = [str(item) for item in review.get("issue_codes", []) if str(item).strip()] + if not issue_codes: + return None + issue_severities = {code: dcl.derive_invariant_severity(step_state, code) for code in issue_codes} + severity = max_issue_severity(step_state, issue_codes) + return { + "finding_type": "business_answer_quality", + "severity": severity, + "issue_severities": issue_severities, + "session_id": session_id, + "turn_index": step_state.get("step_index"), + "step_id": step_state.get("step_id"), + "question": step_state.get("question_resolved"), + "assistant_first_line": review.get("actual_direct_answer"), + "issue_codes": issue_codes, + "suggested_root_cause_layers": review.get("suggested_root_cause_layers") or [], + "answer_length_chars": review.get("answer_length_chars"), + "reply_type": step_state.get("reply_type"), + "trace_id": step_state.get("trace_id"), + "capability_id": step_state.get("capability_id"), + "selected_recipe": step_state.get("selected_recipe"), + } + + +def build_repair_targets(findings: list[dict[str, Any]]) -> list[dict[str, Any]]: + grouped: dict[tuple[str, str], dict[str, Any]] = {} + for finding in findings: + issue_codes = [str(item) for item in finding.get("issue_codes", []) if str(item).strip()] + layers = [str(item) for item in finding.get("suggested_root_cause_layers", []) if str(item).strip()] + if not layers: + layers = ["business_answer_quality_gap"] + for issue_code in issue_codes: + issue_severity = ( + finding.get("issue_severities", {}).get(issue_code) + if isinstance(finding.get("issue_severities"), dict) + else finding.get("severity") + ) + for layer in layers: + key = (layer, issue_code) + target = grouped.setdefault( + key, + { + "problem_layer": layer, + "issue_code": issue_code, + "severity": issue_severity, + "occurrences": 0, + "sample_turns": [], + }, + ) + target["occurrences"] += 1 + if severity_rank(str(issue_severity)) < severity_rank(str(target.get("severity"))): + target["severity"] = issue_severity + if len(target["sample_turns"]) < 5: + target["sample_turns"].append( + { + "session_id": finding.get("session_id"), + "turn_index": finding.get("turn_index"), + "question": finding.get("question"), + "assistant_first_line": finding.get("assistant_first_line"), + } + ) + return sorted( + grouped.values(), + key=lambda item: (severity_rank(str(item.get("severity"))), -int(item.get("occurrences") or 0), str(item.get("issue_code"))), + ) + + +def build_run_review( + *, + run_id: str, + session_files: list[Path], + report_path: Path, +) -> dict[str, Any]: + sessions_review: list[dict[str, Any]] = [] + all_pairs: list[dict[str, Any]] = [] + all_step_states: list[dict[str, Any]] = [] + findings: list[dict[str, Any]] = [] + for session_file in session_files: + session = load_session(session_file) + conversation = [item for item in session.get("conversation", []) if isinstance(item, dict)] + pairs = build_conversation_pairs(conversation) + session_step_states: list[dict[str, Any]] = [] + for pair in pairs: + step_state = build_step_state_for_pair(run_id=run_id, session=session, pair=pair) + session_step_states.append(step_state) + all_step_states.append(step_state) + pair_record = { + "session_id": session.get("session_id"), + "pair_index": pair["pair_index"], + "user_text": pair.get("user", {}).get("text"), + "assistant_text": (pair.get("assistant") or {}).get("text") if isinstance(pair.get("assistant"), dict) else None, + "assistant_reply_type": (pair.get("assistant") or {}).get("reply_type") if isinstance(pair.get("assistant"), dict) else None, + "assistant_trace_id": (pair.get("assistant") or {}).get("trace_id") if isinstance(pair.get("assistant"), dict) else None, + } + all_pairs.append(pair_record) + finding = build_finding(step_state, str(session.get("session_id") or "")) + if finding is not None: + findings.append(finding) + sessions_review.append( + { + "session_file": repo_relative(session_file), + "session_id": session.get("session_id"), + "conversation_items": len(conversation), + "pairs_total": len(pairs), + "business_issue_turns": sum( + 1 + for item in session_step_states + if (item.get("business_first_review") or {}).get("issue_codes") + ), + } + ) + + issue_counter = Counter(code for finding in findings for code in finding.get("issue_codes", [])) + severity_counter = Counter(str(finding.get("severity") or "none") for finding in findings) + runtime_status_counts = Counter(str(item.get("execution_status") or "unknown") for item in all_step_states) + p0_findings = [item for item in findings if item.get("severity") == "P0"] + p1_findings = [item for item in findings if item.get("severity") == "P1"] + if p0_findings: + overall_status = "fail" + elif p1_findings: + overall_status = "warning" + else: + overall_status = "pass" + + question_quality = build_question_quality_review( + [ + { + "pair_index": item["pair_index"], + "user": {"text": item.get("user_text")}, + } + for item in all_pairs + ] + ) + repair_targets = build_repair_targets(findings) + report_metadata = parse_report_metadata(report_path) + + return { + "schema_version": RUN_REVIEW_SCHEMA_VERSION, + "run_id": run_id, + "reviewed_at": now_iso(), + "source": { + "report_path": repo_relative(report_path) if report_path.exists() else None, + "report_metadata": report_metadata, + "session_files": [repo_relative(path) for path in session_files], + }, + "summary": { + "overall_business_status": overall_status, + "sessions_total": len(session_files), + "turn_pairs_total": len(all_pairs), + "business_issue_turns": len(findings), + "p0_findings": len(p0_findings), + "p1_findings": len(p1_findings), + "issue_counts": dict(sorted(issue_counter.items())), + "severity_counts": dict(sorted(severity_counter.items())), + "runtime_status_counts": dict(sorted(runtime_status_counts.items())), + "question_quality_status": question_quality["status"], + "question_quality_score": question_quality["score"], + }, + "sessions": sessions_review, + "question_quality_review": question_quality, + "findings": findings, + "repair_targets": repair_targets, + "conversation_pairs": all_pairs, + "step_states": all_step_states, + } + + +def build_review_markdown(review: dict[str, Any]) -> str: + summary = review.get("summary") if isinstance(review.get("summary"), dict) else {} + question_quality = ( + review.get("question_quality_review") + if isinstance(review.get("question_quality_review"), dict) + else {} + ) + lines = [ + "# Assistant Stage 1 GUI Run Review", + "", + f"- run_id: `{review.get('run_id')}`", + f"- overall_business_status: `{summary.get('overall_business_status')}`", + f"- turn_pairs_total: `{summary.get('turn_pairs_total')}`", + f"- business_issue_turns: `{summary.get('business_issue_turns')}`", + f"- p0_findings: `{summary.get('p0_findings')}`", + f"- p1_findings: `{summary.get('p1_findings')}`", + f"- question_quality: `{summary.get('question_quality_status')}` / `{summary.get('question_quality_score')}`", + "", + "## Question Quality", + "", + f"- status: `{question_quality.get('status')}`", + f"- score: `{question_quality.get('score')}`", + f"- tag_counts: `{json.dumps(question_quality.get('tag_counts') or {}, ensure_ascii=False, sort_keys=True)}`", + f"- domain_counts: `{json.dumps(question_quality.get('domain_counts') or {}, ensure_ascii=False, sort_keys=True)}`", + f"- weak_flag_counts: `{json.dumps(question_quality.get('weak_flag_counts') or {}, ensure_ascii=False, sort_keys=True)}`", + "", + "## Business Findings", + ] + findings = review.get("findings") if isinstance(review.get("findings"), list) else [] + if not findings: + lines.append("") + lines.append("- no business-first answer quality findings") + else: + for finding in findings[:80]: + lines.extend( + [ + "", + f"### Turn {finding.get('turn_index')} - {finding.get('severity')}", + "", + f"- issue_codes: `{', '.join(str(item) for item in finding.get('issue_codes') or [])}`", + f"- root_cause_layers: `{', '.join(str(item) for item in finding.get('suggested_root_cause_layers') or []) or 'n/a'}`", + f"- reply_type: `{finding.get('reply_type') or 'n/a'}`", + f"- capability_id: `{finding.get('capability_id') or 'n/a'}`", + f"- selected_recipe: `{finding.get('selected_recipe') or 'n/a'}`", + f"- question: {compact_preview(finding.get('question'), 500)}", + f"- assistant_first_line: {compact_preview(finding.get('assistant_first_line'), 500) or 'n/a'}", + ] + ) + lines.extend(["", "## Repair Targets"]) + repair_targets = review.get("repair_targets") if isinstance(review.get("repair_targets"), list) else [] + if not repair_targets: + lines.append("") + lines.append("- no repair targets") + else: + for target in repair_targets[:30]: + lines.append( + f"- `{target.get('severity')}` `{target.get('problem_layer')}` / `{target.get('issue_code')}`: " + f"{target.get('occurrences')} occurrence(s)" + ) + return "\n".join(lines).strip() + "\n" + + +def save_run_review(review: dict[str, Any], output_dir: Path) -> None: + write_json(output_dir / "run_review.json", review) + write_text(output_dir / "run_review.md", build_review_markdown(review)) + write_json(output_dir / "conversation_pairs.json", review.get("conversation_pairs") or []) + write_json(output_dir / "question_quality_review.json", review.get("question_quality_review") or {}) + write_json(output_dir / "repair_targets.json", review.get("repair_targets") or []) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Review a GUI assistant_stage1 saved-session run by assistant-stage1-* id." + ) + parser.add_argument("run_id", help="Run id or text containing assistant-stage1-...") + parser.add_argument("--session-file", type=Path, default=None, help="Explicit assistant session JSON file.") + parser.add_argument("--sessions-dir", type=Path, default=DEFAULT_SESSIONS_DIR) + parser.add_argument("--reports-dir", type=Path, default=DEFAULT_REPORTS_DIR) + parser.add_argument("--output-root", type=Path, default=DEFAULT_OUTPUT_ROOT) + parser.add_argument("--output-dir", type=Path, default=None) + parser.add_argument("--print-summary", action="store_true") + return parser + + +def main(argv: list[str] | None = None) -> int: + args = build_parser().parse_args(argv) + run_id = run_id_from_value(args.run_id) + report_path = args.reports_dir / f"{run_id}.md" + session_files = resolve_session_files( + run_id=run_id, + sessions_dir=args.sessions_dir, + explicit_session_file=args.session_file, + ) + review = build_run_review(run_id=run_id, session_files=session_files, report_path=report_path) + output_dir = args.output_dir or (args.output_root / run_id) + save_run_review(review, output_dir) + if args.print_summary: + summary = review["summary"] + print( + json.dumps( + { + "run_id": run_id, + "output_dir": repo_relative(output_dir), + "overall_business_status": summary["overall_business_status"], + "turn_pairs_total": summary["turn_pairs_total"], + "business_issue_turns": summary["business_issue_turns"], + "question_quality_score": summary["question_quality_score"], + }, + ensure_ascii=False, + indent=2, + ) + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv[1:])) diff --git a/scripts/save_agent_semantic_run.py b/scripts/save_agent_semantic_run.py index 531a017..f3e9bb9 100644 --- a/scripts/save_agent_semantic_run.py +++ b/scripts/save_agent_semantic_run.py @@ -14,6 +14,7 @@ REPO_ROOT = Path(__file__).resolve().parents[1] HISTORY_FILE = REPO_ROOT / "llm_normalizer" / "data" / "autorun_generators" / "history.json" SAVED_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "autorun_generators" / "saved_sessions" EVAL_CASES_DIR = REPO_ROOT / "llm_normalizer" / "data" / "eval_cases" +VALIDATED_AGENT_SAVE_SCHEMA_VERSION = "agent_semantic_save_gate_v1" def now_utc() -> datetime: @@ -54,6 +55,188 @@ def write_json(path: Path, payload: Any) -> None: path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") +def resolve_repo_path(raw_path: str | Path) -> Path: + path = Path(raw_path) + return path if path.is_absolute() else (REPO_ROOT / path).resolve() + + +def repo_relative(path: Path) -> str: + try: + return str(path.resolve().relative_to(REPO_ROOT)) + except ValueError: + return str(path.resolve()) + + +def load_json_object(path: Path, label: str) -> dict[str, Any]: + if not path.exists(): + raise RuntimeError(f"{label} not found: {path}") + parsed = load_json(path) + if not isinstance(parsed, dict): + raise RuntimeError(f"{label} must be a JSON object: {path}") + return parsed + + +def assert_status(value: Any, expected: str, label: str, problems: list[str]) -> None: + actual = str(value or "").strip().lower() + if actual != expected: + problems.append(f"{label}={actual or 'missing'}") + + +def validate_truth_harness_run_dir(run_dir: Path) -> dict[str, Any]: + run_dir = run_dir.resolve() + pack_state = load_json_object(run_dir / "pack_state.json", "Validated run pack_state.json") + truth_review = load_json_object(run_dir / "truth_review.json", "Validated run truth_review.json") + business_review = load_json_object(run_dir / "business_review.json", "Validated run business_review.json") + truth_summary = truth_review.get("summary") if isinstance(truth_review.get("summary"), dict) else {} + + problems: list[str] = [] + assert_status(pack_state.get("final_status"), "accepted", "pack_state.final_status", problems) + assert_status(pack_state.get("review_overall_status"), "pass", "pack_state.review_overall_status", problems) + assert_status(truth_summary.get("overall_status"), "pass", "truth_review.summary.overall_status", problems) + assert_status(business_review.get("overall_business_status"), "pass", "business_review.overall_business_status", problems) + if pack_state.get("acceptance_gate_passed") is not True: + problems.append("pack_state.acceptance_gate_passed=false") + if pack_state.get("no_unresolved_p0") is not True: + problems.append("pack_state.no_unresolved_p0=false") + if int(pack_state.get("unresolved_p0_count") or 0) != 0: + problems.append(f"pack_state.unresolved_p0_count={pack_state.get('unresolved_p0_count')}") + if int(business_review.get("steps_with_business_failures") or 0) != 0: + problems.append(f"business_review.steps_with_business_failures={business_review.get('steps_with_business_failures')}") + + if problems: + raise RuntimeError( + "Refusing to save AGENT autorun because the validated run is not clean: " + + ", ".join(problems) + ) + + return { + "schema_version": VALIDATED_AGENT_SAVE_SCHEMA_VERSION, + "validation_status": "accepted_live_replay", + "validated_run_dir": repo_relative(run_dir), + "final_status": pack_state.get("final_status"), + "review_overall_status": pack_state.get("review_overall_status"), + "business_overall_status": business_review.get("overall_business_status"), + "steps_total": pack_state.get("steps_total"), + "steps_passed": pack_state.get("steps_passed"), + "steps_failed": pack_state.get("steps_failed"), + "steps_with_business_failures": business_review.get("steps_with_business_failures"), + "steps_with_business_warnings": business_review.get("steps_with_business_warnings"), + "acceptance_gate_passed": pack_state.get("acceptance_gate_passed"), + "saved_after_validated_replay": True, + } + + +def validate_domain_pack_loop_dir(loop_dir: Path) -> dict[str, Any]: + loop_dir = loop_dir.resolve() + loop_state = load_json_object(loop_dir / "loop_state.json", "Validated loop_state.json") + iterations = loop_state.get("iterations") + if not isinstance(iterations, list) or not iterations: + raise RuntimeError("Refusing to save AGENT autorun because the validated loop has no iterations") + accepted_iterations = [ + item for item in iterations if isinstance(item, dict) and bool(item.get("accepted_gate")) + ] + last_iteration = accepted_iterations[-1] if accepted_iterations else iterations[-1] + if not isinstance(last_iteration, dict): + raise RuntimeError("Refusing to save AGENT autorun because the validated loop iteration is invalid") + + analyst_path_raw = str(last_iteration.get("analyst_verdict_path") or "").strip() + repair_targets_path_raw = str(last_iteration.get("repair_targets_path") or "").strip() + analyst_verdict = load_json_object(resolve_repo_path(analyst_path_raw), "Validated loop analyst_verdict.json") + repair_targets = load_json_object(resolve_repo_path(repair_targets_path_raw), "Validated loop repair_targets.json") + severity_counts = repair_targets.get("severity_counts") if isinstance(repair_targets.get("severity_counts"), dict) else {} + + problems: list[str] = [] + assert_status(loop_state.get("final_status"), "accepted", "loop_state.final_status", problems) + if last_iteration.get("accepted_gate") is not True: + problems.append("last_iteration.accepted_gate=false") + if last_iteration.get("analyst_accepted_gate") is not True: + problems.append("last_iteration.analyst_accepted_gate=false") + if last_iteration.get("deterministic_gate_ok") is not True: + problems.append("last_iteration.deterministic_gate_ok=false") + if int(last_iteration.get("quality_score") or 0) < int(loop_state.get("target_score") or 80): + problems.append( + f"last_iteration.quality_score={last_iteration.get('quality_score')} dict[str, Any]: + run_dir = run_dir.resolve() + if (run_dir / "loop_state.json").exists(): + return validate_domain_pack_loop_dir(run_dir) + return validate_truth_harness_run_dir(run_dir) + + +def build_save_gate_metadata(args: argparse.Namespace, spec: dict[str, Any], spec_path: Path) -> dict[str, Any]: + raw_run_dir = args.validated_run_dir or spec.get("validated_run_dir") or spec.get("validated_artifact_dir") + if raw_run_dir: + return validate_accepted_run_dir(resolve_repo_path(str(raw_run_dir))) + + if args.dry_run: + return { + "schema_version": VALIDATED_AGENT_SAVE_SCHEMA_VERSION, + "validation_status": "dry_run_unvalidated", + "source_spec_file": repo_relative(spec_path), + "saved_after_validated_replay": False, + } + + if args.allow_unvalidated: + reason = str(args.unvalidated_reason or "").strip() + if not reason: + raise RuntimeError("--unvalidated-reason is required when --allow-unvalidated is used") + return { + "schema_version": VALIDATED_AGENT_SAVE_SCHEMA_VERSION, + "validation_status": "explicitly_unvalidated", + "source_spec_file": repo_relative(spec_path), + "unvalidated_reason": reason, + "saved_after_validated_replay": False, + } + + raise RuntimeError( + "Refusing to save AGENT autorun before a reviewed live replay. " + "Pass --validated-run-dir artifacts/domain_runs/ after run-live/review-export is accepted, " + "or use --allow-unvalidated --unvalidated-reason only for an explicit draft." + ) + + def normalize_questions(raw_questions: list[Any]) -> list[str]: result: list[str] = [] seen: set[str] = set() @@ -90,9 +273,31 @@ def extract_questions_from_spec(spec: dict[str, Any]) -> list[str]: steps = spec.get("steps") if isinstance(steps, list): return normalize_questions( - [step.get("question") for step in steps if isinstance(step, dict) and step.get("question")] + [ + step.get("question") or step.get("question_template") + for step in steps + if isinstance(step, dict) and (step.get("question") or step.get("question_template")) + ] ) - raise RuntimeError("Spec must define either `questions[]` or `steps[].question`") + scenarios = spec.get("scenarios") + if isinstance(scenarios, list): + raw_questions: list[Any] = [] + for scenario in scenarios: + if not isinstance(scenario, dict): + continue + scenario_steps = scenario.get("steps") + if not isinstance(scenario_steps, list): + continue + raw_questions.extend( + step.get("question") or step.get("question_template") + for step in scenario_steps + if isinstance(step, dict) and (step.get("question") or step.get("question_template")) + ) + return normalize_questions(raw_questions) + raise RuntimeError( + "Spec must define `questions[]`, `steps[].question`, `steps[].question_template`, " + "or `scenarios[].steps[]` questions" + ) def build_case_set_payload( @@ -203,6 +408,9 @@ def build_history_record( "source_spec_file": metadata.get("source_spec_file"), "scenario_id": metadata.get("scenario_id"), "semantic_tags": metadata.get("semantic_tags"), + "validation_status": metadata.get("validation_status"), + "validated_run_dir": metadata.get("validated_run_dir"), + "saved_after_validated_replay": metadata.get("saved_after_validated_replay"), } return { "generation_id": generation_id, @@ -218,7 +426,12 @@ def build_history_record( } -def build_metadata(args: argparse.Namespace, spec: dict[str, Any], spec_path: Path | None) -> dict[str, Any]: +def build_metadata( + args: argparse.Namespace, + spec: dict[str, Any], + spec_path: Path | None, + save_gate: dict[str, Any], +) -> dict[str, Any]: semantic_tags = extract_semantic_tags(spec) return { "assistant_prompt_version": args.assistant_prompt_version, @@ -229,6 +442,10 @@ def build_metadata(args: argparse.Namespace, spec: dict[str, Any], spec_path: Pa "source_spec_file": str(spec_path.resolve()) if spec_path else None, "scenario_id": str(spec.get("scenario_id") or "").strip() or None, "semantic_tags": semantic_tags, + "validation_status": save_gate.get("validation_status"), + "validated_run_dir": save_gate.get("validated_run_dir"), + "saved_after_validated_replay": save_gate.get("saved_after_validated_replay"), + "save_gate": save_gate, } @@ -242,6 +459,19 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--assistant-prompt-version", help="Optional assistant prompt version metadata.") parser.add_argument("--decomposition-prompt-version", help="Optional decomposition prompt version metadata.") parser.add_argument("--prompt-fingerprint", help="Optional prompt fingerprint metadata.") + parser.add_argument( + "--validated-run-dir", + help="Accepted truth-harness artifact directory containing pack_state.json, truth_review.json, and business_review.json.", + ) + parser.add_argument( + "--allow-unvalidated", + action="store_true", + help="Explicitly save a draft AGENT run without accepted replay artifacts. This is not an acceptance proof.", + ) + parser.add_argument( + "--unvalidated-reason", + help="Required explanation when --allow-unvalidated is used.", + ) parser.add_argument("--dry-run", action="store_true", help="Print resulting record metadata without writing files.") return parser.parse_args() @@ -262,10 +492,11 @@ def main() -> int: if not questions: raise RuntimeError("Agent semantic run must contain at least one question") + save_gate = build_save_gate_metadata(args, spec_raw, spec_path) domain = str(spec_raw.get("domain") or "").strip() or None source_title = str(args.title or spec_raw.get("title") or spec_path.stem).strip() title = ensure_agent_title(source_title) - metadata = build_metadata(args, spec_raw, spec_path) + metadata = build_metadata(args, spec_raw, spec_path, save_gate) timestamp = now_utc() generation_id = generate_id(timestamp) @@ -307,6 +538,8 @@ def main() -> int: "case_set_file": case_set_file, "saved_session_file": saved_session_file, "domain": domain, + "validation_status": save_gate.get("validation_status"), + "validated_run_dir": save_gate.get("validated_run_dir"), }, ensure_ascii=False, indent=2, @@ -329,6 +562,8 @@ def main() -> int: "questions_total": len(questions), "case_set_file": case_set_file, "saved_session_file": saved_session_file, + "validation_status": save_gate.get("validation_status"), + "validated_run_dir": save_gate.get("validated_run_dir"), }, ensure_ascii=False, indent=2, diff --git a/scripts/stage_agent_loop.py b/scripts/stage_agent_loop.py new file mode 100644 index 0000000..a2cffa6 --- /dev/null +++ b/scripts/stage_agent_loop.py @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import re +import subprocess +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +REPO_ROOT = Path(__file__).resolve().parents[1] +DEFAULT_STAGE_OUTPUT_ROOT = REPO_ROOT / "artifacts" / "domain_runs" / "stage_agent_loops" +STAGE_LOOP_SCHEMA_VERSION = "stage_agent_loop_manifest_v1" +STAGE_SUMMARY_SCHEMA_VERSION = "stage_agent_loop_summary_v1" + + +def now_iso() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat() + + +def slugify(value: str, fallback: str = "stage_agent_loop") -> str: + normalized = re.sub(r"[^a-zA-Z0-9_.-]+", "_", str(value or "").strip()).strip("_.-") + return normalized or fallback + + +def load_json(path: Path) -> Any: + return json.loads(path.read_text(encoding="utf-8")) + + +def load_json_object(path: Path, label: str) -> dict[str, Any]: + if not path.exists(): + raise RuntimeError(f"{label} not found: {path}") + parsed = load_json(path) + if not isinstance(parsed, dict): + raise RuntimeError(f"{label} must be a JSON object: {path}") + return parsed + + +def write_text(path: Path, text: str) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(text, encoding="utf-8") + + +def write_json(path: Path, payload: Any) -> None: + write_text(path, json.dumps(payload, ensure_ascii=False, indent=2) + "\n") + + +def repo_path(raw_path: str | Path) -> Path: + path = Path(raw_path) + return path if path.is_absolute() else (REPO_ROOT / path).resolve() + + +def repo_relative(path: Path) -> str: + try: + return str(path.resolve().relative_to(REPO_ROOT)) + except ValueError: + return str(path.resolve()) + + +def string_list(value: Any) -> list[str]: + if not isinstance(value, list): + return [] + result: list[str] = [] + for item in value: + text = str(item or "").strip() + if text: + result.append(text) + return result + + +def load_stage_manifest(path: Path) -> dict[str, Any]: + raw = load_json_object(path, "Stage agent loop manifest") + stage_id = slugify(str(raw.get("stage_id") or path.stem), path.stem) + pack_manifest = str(raw.get("pack_manifest") or "").strip() + if not pack_manifest: + raise RuntimeError("Stage manifest must define `pack_manifest` for the autonomous stage loop") + target_score = int(raw.get("target_score") or 88) + max_iterations = int(raw.get("max_iterations") or 6) + if target_score < 0 or target_score > 100: + raise RuntimeError("Stage manifest `target_score` must be between 0 and 100") + if max_iterations < 1: + raise RuntimeError("Stage manifest `max_iterations` must be >= 1") + return { + **raw, + "schema_version": str(raw.get("schema_version") or STAGE_LOOP_SCHEMA_VERSION), + "stage_id": stage_id, + "module_name": str(raw.get("module_name") or raw.get("domain") or "unknown_module").strip(), + "title": str(raw.get("title") or stage_id).strip(), + "pack_manifest": pack_manifest, + "target_score": target_score, + "max_iterations": max_iterations, + "global_plan_refs": string_list(raw.get("global_plan_refs")), + "acceptance_invariants": string_list(raw.get("acceptance_invariants")), + "save_autorun_on_accept": bool(raw.get("save_autorun_on_accept", True)), + "manual_confirmation_required_after_accept": bool(raw.get("manual_confirmation_required_after_accept", True)), + } + + +def stage_dir_for(output_root: Path, stage_id: str) -> Path: + return output_root.resolve() / slugify(stage_id) + + +def stage_loop_dir(stage_dir: Path, stage_manifest: dict[str, Any]) -> Path: + loop_id = str(stage_manifest.get("loop_id") or stage_manifest["stage_id"]).strip() + return stage_dir / "domain_loops" / slugify(loop_id) + + +def build_domain_pack_loop_command(args: argparse.Namespace, stage_manifest: dict[str, Any], stage_dir: Path) -> list[str]: + loop_id = str(stage_manifest.get("loop_id") or stage_manifest["stage_id"]).strip() + command = [ + sys.executable, + str(REPO_ROOT / "scripts" / "domain_case_loop.py"), + "run-pack-loop", + "--manifest", + str(repo_path(stage_manifest["pack_manifest"])), + "--loop-id", + loop_id, + "--output-root", + str(stage_dir / "domain_loops"), + "--target-score", + str(int(stage_manifest["target_score"])), + "--max-iterations", + str(int(stage_manifest["max_iterations"])), + "--backend-url", + str(args.backend_url), + "--prompt-version", + str(args.prompt_version), + "--llm-provider", + str(args.llm_provider), + "--llm-model", + str(args.llm_model), + "--llm-base-url", + str(args.llm_base_url), + "--llm-api-key", + str(args.llm_api_key), + "--temperature", + str(args.temperature), + "--max-output-tokens", + str(args.max_output_tokens), + "--timeout-seconds", + str(args.timeout_seconds), + "--codex-binary", + str(args.codex_binary), + "--analyst-codex-model", + str(args.analyst_codex_model), + "--coder-codex-model", + str(args.coder_codex_model), + "--analyst-reasoning-effort", + str(args.analyst_reasoning_effort), + "--coder-reasoning-effort", + str(args.coder_reasoning_effort), + "--codex-timeout-seconds", + str(args.codex_timeout_seconds), + ] + if args.codex_profile: + command.extend(["--codex-profile", str(args.codex_profile)]) + if args.codex_model: + command.extend(["--codex-model", str(args.codex_model)]) + if args.analysis_date: + command.extend(["--analysis-date", str(args.analysis_date)]) + if args.max_scenarios is not None: + command.extend(["--max-scenarios", str(int(args.max_scenarios))]) + if args.use_mock: + command.append("--use-mock") + return command + + +def run_command(command: list[str], cwd: Path, stdout_path: Path, stderr_path: Path, timeout_seconds: int) -> None: + result = subprocess.run( + command, + cwd=str(cwd), + text=True, + encoding="utf-8", + errors="replace", + capture_output=True, + timeout=timeout_seconds, + check=False, + ) + write_text(stdout_path, result.stdout) + write_text(stderr_path, result.stderr) + if result.returncode != 0: + raise RuntimeError(f"Command failed with exit code {result.returncode}: {' '.join(command)}") + + +def build_stage_summary(stage_manifest: dict[str, Any], loop_dir: Path) -> dict[str, Any]: + loop_state = load_json_object(loop_dir / "loop_state.json", "Stage domain loop_state.json") + iterations = loop_state.get("iterations") if isinstance(loop_state.get("iterations"), list) else [] + last_iteration = iterations[-1] if iterations and isinstance(iterations[-1], dict) else {} + final_status = str(loop_state.get("final_status") or "unknown").strip() + accepted = final_status == "accepted" and bool(last_iteration.get("accepted_gate")) + manual_confirmation_required = bool(stage_manifest.get("manual_confirmation_required_after_accept", True)) and accepted + if accepted and manual_confirmation_required: + next_action = "manual_gui_confirmation" + elif accepted: + next_action = "stage_closed_without_manual_confirmation" + elif bool(loop_state.get("last_user_decision_prompt")): + next_action = "user_decision_required" + else: + next_action = "continue_autonomous_or_fix_blocker" + return { + "schema_version": STAGE_SUMMARY_SCHEMA_VERSION, + "stage_id": stage_manifest["stage_id"], + "module_name": stage_manifest.get("module_name"), + "title": stage_manifest.get("title"), + "global_plan_refs": stage_manifest.get("global_plan_refs") or [], + "target_score": stage_manifest.get("target_score"), + "acceptance_invariants": stage_manifest.get("acceptance_invariants") or [], + "loop_dir": repo_relative(loop_dir), + "loop_final_status": final_status, + "stop_reason": loop_state.get("stop_reason"), + "iterations_ran": len(iterations), + "last_quality_score": last_iteration.get("quality_score"), + "last_analyst_decision": last_iteration.get("loop_decision") or loop_state.get("last_analyst_decision"), + "last_deterministic_gate_ok": last_iteration.get("deterministic_gate_ok"), + "last_deterministic_gate_reason": last_iteration.get("deterministic_gate_reason"), + "accepted_gate": bool(last_iteration.get("accepted_gate")), + "manual_confirmation_required": manual_confirmation_required, + "next_action": next_action, + "save_autorun_on_accept": bool(stage_manifest.get("save_autorun_on_accept", True)), + "updated_at": now_iso(), + } + + +def build_stage_handoff_markdown(summary: dict[str, Any]) -> str: + lines = [ + "# Stage agent loop handoff", + "", + f"- stage_id: `{summary.get('stage_id')}`", + f"- module_name: `{summary.get('module_name')}`", + f"- title: {summary.get('title')}", + f"- loop_final_status: `{summary.get('loop_final_status')}`", + f"- target_score: `{summary.get('target_score')}`", + f"- iterations_ran: `{summary.get('iterations_ran')}`", + f"- last_quality_score: `{summary.get('last_quality_score')}`", + f"- accepted_gate: `{summary.get('accepted_gate')}`", + f"- deterministic_gate_ok: `{summary.get('last_deterministic_gate_ok')}`", + f"- deterministic_gate_reason: `{summary.get('last_deterministic_gate_reason') or 'n/a'}`", + f"- manual_confirmation_required: `{summary.get('manual_confirmation_required')}`", + f"- next_action: `{summary.get('next_action')}`", + f"- loop_dir: `{summary.get('loop_dir')}`", + f"- stop_reason: {summary.get('stop_reason') or 'n/a'}", + "", + "## Plan refs", + ] + refs = summary.get("global_plan_refs") or [] + lines.extend([f"- {item}" for item in refs] if refs else ["- none"]) + lines.extend(["", "## Acceptance invariants"]) + invariants = summary.get("acceptance_invariants") or [] + lines.extend([f"- {item}" for item in invariants] if invariants else ["- domain loop gate + analyst verdict"]) + return "\n".join(lines).strip() + "\n" + + +def save_stage_summary(stage_dir: Path, summary: dict[str, Any]) -> None: + write_json(stage_dir / "stage_loop_summary.json", summary) + write_text(stage_dir / "stage_loop_handoff.md", build_stage_handoff_markdown(summary)) + + +def build_save_autorun_command(args: argparse.Namespace, stage_manifest: dict[str, Any], loop_dir: Path) -> list[str]: + return [ + sys.executable, + str(REPO_ROOT / "scripts" / "save_agent_semantic_run.py"), + "--spec", + str(repo_path(stage_manifest["pack_manifest"])), + "--validated-run-dir", + str(loop_dir), + "--title", + f"AGENT | {stage_manifest.get('title') or stage_manifest['stage_id']}", + "--architecture-phase", + str(stage_manifest.get("architecture_phase") or stage_manifest.get("module_name") or "stage_agent_loop"), + "--agent-focus", + str(stage_manifest.get("agent_focus") or stage_manifest.get("title") or stage_manifest["stage_id"]), + ] + + +def handle_plan(args: argparse.Namespace) -> int: + stage_manifest_path = repo_path(args.manifest) + stage_manifest = load_stage_manifest(stage_manifest_path) + stage_dir = stage_dir_for(repo_path(args.output_root), stage_manifest["stage_id"]) + command = build_domain_pack_loop_command(args, stage_manifest, stage_dir) + payload = { + "schema_version": STAGE_SUMMARY_SCHEMA_VERSION, + "stage_manifest": repo_relative(stage_manifest_path), + "stage_id": stage_manifest["stage_id"], + "stage_dir": repo_relative(stage_dir), + "loop_dir": repo_relative(stage_loop_dir(stage_dir, stage_manifest)), + "domain_pack_loop_command": command, + } + print(json.dumps(payload, ensure_ascii=False, indent=2)) + return 0 + + +def handle_summarize(args: argparse.Namespace) -> int: + stage_manifest_path = repo_path(args.manifest) + stage_manifest = load_stage_manifest(stage_manifest_path) + stage_dir = stage_dir_for(repo_path(args.output_root), stage_manifest["stage_id"]) + loop_dir = repo_path(args.loop_dir) if args.loop_dir else stage_loop_dir(stage_dir, stage_manifest) + summary = build_stage_summary(stage_manifest, loop_dir) + save_stage_summary(stage_dir, summary) + print(json.dumps(summary, ensure_ascii=False, indent=2)) + return 0 + + +def handle_run(args: argparse.Namespace) -> int: + stage_manifest_path = repo_path(args.manifest) + stage_manifest = load_stage_manifest(stage_manifest_path) + stage_dir = stage_dir_for(repo_path(args.output_root), stage_manifest["stage_id"]) + stage_dir.mkdir(parents=True, exist_ok=True) + write_json(stage_dir / "stage_manifest.json", stage_manifest) + write_text(stage_dir / "stage_manifest_source.txt", repo_relative(stage_manifest_path) + "\n") + + command = build_domain_pack_loop_command(args, stage_manifest, stage_dir) + write_text(stage_dir / "domain_pack_loop.command.txt", " ".join(command) + "\n") + if args.dry_run: + print(json.dumps({"dry_run": True, "command": command}, ensure_ascii=False, indent=2)) + return 0 + + run_command( + command, + cwd=REPO_ROOT, + stdout_path=stage_dir / "domain_pack_loop.stdout.log", + stderr_path=stage_dir / "domain_pack_loop.stderr.log", + timeout_seconds=max(3600, int(args.codex_timeout_seconds) * max(1, int(stage_manifest["max_iterations"]))), + ) + loop_dir = stage_loop_dir(stage_dir, stage_manifest) + summary = build_stage_summary(stage_manifest, loop_dir) + save_stage_summary(stage_dir, summary) + + if ( + summary["loop_final_status"] == "accepted" + and bool(stage_manifest.get("save_autorun_on_accept", True)) + and not args.no_save_autorun + ): + save_command = build_save_autorun_command(args, stage_manifest, loop_dir) + write_text(stage_dir / "save_agent_semantic_run.command.txt", " ".join(save_command) + "\n") + run_command( + save_command, + cwd=REPO_ROOT, + stdout_path=stage_dir / "save_agent_semantic_run.stdout.log", + stderr_path=stage_dir / "save_agent_semantic_run.stderr.log", + timeout_seconds=120, + ) + print(json.dumps(summary, ensure_ascii=False, indent=2)) + return 0 + + +def add_common_args(parser: argparse.ArgumentParser) -> None: + parser.add_argument("--manifest", required=True) + parser.add_argument("--output-root", default=str(DEFAULT_STAGE_OUTPUT_ROOT)) + parser.add_argument("--analysis-date") + parser.add_argument("--max-scenarios", type=int) + parser.add_argument("--backend-url", default="http://127.0.0.1:8787") + parser.add_argument("--prompt-version", default="address_query_runtime_v1") + parser.add_argument("--llm-provider", default="local", choices=["openai", "local"]) + parser.add_argument("--llm-model", default="qwen2.5-14b-instruct-1m") + parser.add_argument("--llm-base-url", default="http://127.0.0.1:1234/v1") + parser.add_argument("--llm-api-key", default="") + parser.add_argument("--temperature", type=float, default=0.0) + parser.add_argument("--max-output-tokens", type=int, default=2048) + parser.add_argument("--timeout-seconds", type=int, default=180) + parser.add_argument("--use-mock", action="store_true") + parser.add_argument("--codex-binary", default="codex") + parser.add_argument("--codex-profile") + parser.add_argument("--codex-model") + parser.add_argument("--analyst-codex-model", default="gpt-5.4") + parser.add_argument("--coder-codex-model", default="gpt-5.4-mini") + parser.add_argument("--analyst-reasoning-effort", default="medium") + parser.add_argument("--coder-reasoning-effort", default="low") + parser.add_argument("--codex-timeout-seconds", type=int, default=1800) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Stage-level AGENT loop wrapper for NDC_1C development phases.") + subparsers = parser.add_subparsers(dest="command", required=True) + + plan_parser = subparsers.add_parser("plan", help="Print the domain pack-loop command for a stage manifest.") + add_common_args(plan_parser) + plan_parser.set_defaults(func=handle_plan) + + run_parser = subparsers.add_parser("run", help="Run stage pack-loop, summarize, and optionally save accepted autorun.") + add_common_args(run_parser) + run_parser.add_argument("--dry-run", action="store_true") + run_parser.add_argument("--no-save-autorun", action="store_true") + run_parser.set_defaults(func=handle_run) + + summarize_parser = subparsers.add_parser("summarize", help="Build stage handoff from an existing loop_dir.") + add_common_args(summarize_parser) + summarize_parser.add_argument("--loop-dir") + summarize_parser.set_defaults(func=handle_summarize) + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + try: + return int(args.func(args)) + except Exception as error: # noqa: BLE001 + print(f"[stage-agent-loop] error: {error}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/test_domain_case_loop_step_state.py b/scripts/test_domain_case_loop_step_state.py index 104fae6..9a1260f 100644 --- a/scripts/test_domain_case_loop_step_state.py +++ b/scripts/test_domain_case_loop_step_state.py @@ -114,6 +114,147 @@ class DomainCaseLoopStepStateTests(unittest.TestCase): self.assertEqual(reviewed["critical_findings_count"], 1) self.assertEqual(reviewed["review_findings"][0]["code"], "wrong_catalog_chain_top_match") + def test_business_first_review_flags_dirty_direct_answer_surface(self) -> None: + step_state = dcl.build_scenario_step_state( + scenario_id="business_surface_demo", + domain="business_overview", + step={ + "step_id": "step_01", + "title": "Top year", + "depends_on": [], + "question_template": "какой у нас самый доходный год", + }, + step_index=1, + question_resolved="какой у нас самый доходный год", + analysis_context={}, + turn_artifact={ + "assistant_message": { + "reply_type": "partial_coverage", + "text": "Коротко: Ограниченный бизнес-обзор по подтвержденным строкам 1С. " + ("лишний текст " * 220), + "message_id": "msg-1", + "trace_id": "trace-1", + }, + "technical_debug_payload": {}, + "session_summary": {}, + }, + entries=[], + ) + + review = step_state["business_first_review"] + self.assertFalse(review["direct_answer_first_ok"]) + self.assertFalse(review["business_usefulness_ok"]) + self.assertIn("business_direct_answer_missing", review["issue_codes"]) + self.assertIn("answer_layering_noise", review["issue_codes"]) + self.assertIn("business_answer_too_verbose", review["issue_codes"]) + self.assertIn("business_direct_answer_missing", step_state["violated_invariants"]) + + def test_business_first_review_accepts_compact_direct_answer_surface(self) -> None: + step_state = dcl.build_scenario_step_state( + scenario_id="business_surface_demo", + domain="business_overview", + step={ + "step_id": "step_01", + "title": "Top year", + "depends_on": [], + "question_template": "какой у нас самый доходный год", + }, + step_index=1, + question_resolved="какой у нас самый доходный год", + analysis_context={}, + turn_artifact={ + "assistant_message": { + "reply_type": "partial_coverage", + "text": "Коротко: самый доходный год в доступном денежном контуре 1С — 2015: 136 723 459,73 руб.\nМетод: считаю по подтвержденным входящим поступлениям.", + "message_id": "msg-1", + "trace_id": "trace-1", + }, + "technical_debug_payload": {}, + "session_summary": {}, + }, + entries=[], + ) + + review = step_state["business_first_review"] + self.assertTrue(review["direct_answer_first_ok"]) + self.assertTrue(review["business_usefulness_ok"]) + self.assertEqual(review["issue_codes"], []) + + def test_business_first_review_separates_direct_answer_from_later_technical_leak(self) -> None: + question = "\u043a\u0430\u043a\u043e\u0439 \u0443 \u043d\u0430\u0441 \u0441\u0430\u043c\u044b\u0439 \u0434\u043e\u0445\u043e\u0434\u043d\u044b\u0439 \u0433\u043e\u0434" + step_state = dcl.build_scenario_step_state( + scenario_id="business_surface_demo", + domain="business_overview", + step={ + "step_id": "step_01", + "title": "Top year", + "depends_on": [], + "question_template": question, + }, + step_index=1, + question_resolved=question, + analysis_context={}, + turn_artifact={ + "assistant_message": { + "reply_type": "partial_coverage", + "text": "2015 \u2014 \u0441\u0430\u043c\u044b\u0439 \u0434\u043e\u0445\u043e\u0434\u043d\u044b\u0439 \u0433\u043e\u0434 \u043f\u043e \u043f\u043e\u0434\u0442\u0432\u0435\u0440\u0436\u0434\u0435\u043d\u043d\u044b\u043c \u0432\u0445\u043e\u0434\u044f\u0449\u0438\u043c \u0434\u0435\u043d\u044c\u0433\u0430\u043c.\nservice: capability_id=business_overview_route_template_v1", + "message_id": "msg-1", + "trace_id": "trace-1", + }, + "technical_debug_payload": {}, + "session_summary": {}, + }, + entries=[], + ) + + review = step_state["business_first_review"] + self.assertTrue(review["direct_answer_first_ok"]) + self.assertTrue(review["technical_garbage_present"]) + self.assertIn("technical_garbage_in_answer", review["issue_codes"]) + self.assertNotIn("business_direct_answer_missing", review["issue_codes"]) + + def test_truth_harness_promotes_business_review_issues_to_findings(self) -> None: + step_state = dcl.build_scenario_step_state( + scenario_id="business_surface_demo", + domain="business_overview", + step={ + "step_id": "step_01", + "title": "Top year", + "depends_on": [], + "question_template": "какой у нас самый доходный год", + }, + step_index=1, + question_resolved="какой у нас самый доходный год", + analysis_context={}, + turn_artifact={ + "assistant_message": { + "reply_type": "partial_coverage", + "text": "Коротко: Ограниченный бизнес-обзор по подтвержденным строкам 1С. " + ("лишний текст " * 220), + "message_id": "msg-1", + "trace_id": "trace-1", + }, + "technical_debug_payload": {}, + "session_summary": {}, + }, + entries=[], + ) + reviewed = dth.evaluate_truth_step( + step={ + "step_id": "step_01", + "question_template": "какой у нас самый доходный год", + "criticality": "critical", + "allowed_reply_types": [], + }, + step_state=step_state, + step_results={}, + bindings={}, + runtime_bindings={}, + ) + + codes = [item["code"] for item in reviewed["review_findings"]] + self.assertIn("business_review:business_direct_answer_missing", codes) + self.assertIn("business_review:answer_layering_noise", codes) + self.assertEqual(reviewed["review_status"], "fail") + if __name__ == "__main__": unittest.main() diff --git a/scripts/test_review_assistant_stage1_run.py b/scripts/test_review_assistant_stage1_run.py new file mode 100644 index 0000000..7e51dfd --- /dev/null +++ b/scripts/test_review_assistant_stage1_run.py @@ -0,0 +1,155 @@ +from __future__ import annotations + +import json +import sys +import tempfile +import unittest +from pathlib import Path + + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +import review_assistant_stage1_run as reviewer + + +def write_json(path: Path, payload: object) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + + +def session_payload(conversation: list[dict[str, object]]) -> dict[str, object]: + return { + "schema_version": "assistant_session_v1", + "session_id": "assistant-stage1-test-SAVED-001", + "started_at": "2026-05-09T00:00:00Z", + "updated_at": "2026-05-09T00:01:00Z", + "conversation": conversation, + "address_navigation_state": {"session_context": {}}, + "investigation_state": {}, + "counters": {}, + "reply_types": {}, + } + + +class AssistantStage1RunReviewTests(unittest.TestCase): + def test_builds_conversation_pairs_without_crossing_next_user_turn(self) -> None: + conversation = [ + {"role": "user", "text": "первый вопрос"}, + {"role": "assistant", "text": "первый ответ"}, + {"role": "user", "text": "второй вопрос"}, + ] + + pairs = reviewer.build_conversation_pairs(conversation) + + self.assertEqual(len(pairs), 2) + self.assertEqual(pairs[0]["assistant"]["text"], "первый ответ") + self.assertIsNone(pairs[1]["assistant"]) + + def test_review_flags_dirty_business_answer_and_writes_repair_targets(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + sessions_dir = root / "sessions" + reports_dir = root / "reports" + run_id = "assistant-stage1-test123" + session_file = sessions_dir / f"{run_id}-SAVED-001.json" + report_file = reports_dir / f"{run_id}.md" + write_json( + session_file, + session_payload( + [ + {"role": "user", "text": "какой у нас самый доходный год"}, + { + "role": "assistant", + "text": "Коротко: Ограниченный бизнес-обзор по подтвержденным строкам 1С. " + + ("лишний текст " * 220), + "reply_type": "partial_coverage", + "message_id": "a-1", + "trace_id": "trace-1", + "debug": {"capability_id": "business_overview_route_template_v1"}, + }, + {"role": "user", "text": "по нему покажи документы"}, + { + "role": "assistant", + "text": "Документы по выбранному году не найдены в подтвержденном контуре.", + "reply_type": "factual_with_explanation", + "message_id": "a-2", + "trace_id": "trace-2", + "debug": {}, + }, + ] + ), + ) + report_file.parent.mkdir(parents=True, exist_ok=True) + report_file.write_text( + "# Assistant Stage 1 Eval Run\n\n" + f"- run_id: {run_id}\n" + "- suite_id: assistant_saved_session_runtime_job-test\n", + encoding="utf-8", + ) + + review = reviewer.build_run_review( + run_id=run_id, + session_files=[session_file], + report_path=report_file, + ) + + self.assertEqual(review["summary"]["overall_business_status"], "fail") + self.assertEqual(review["summary"]["turn_pairs_total"], 2) + self.assertGreaterEqual(review["summary"]["p0_findings"], 1) + self.assertIn("business_direct_answer_missing", review["summary"]["issue_counts"]) + self.assertTrue(review["repair_targets"]) + target_by_issue = {item["issue_code"]: item for item in review["repair_targets"]} + self.assertEqual(target_by_issue["business_direct_answer_missing"]["severity"], "P0") + self.assertEqual(target_by_issue["business_answer_too_verbose"]["severity"], "P1") + self.assertEqual(review["question_quality_review"]["turns_total"], 2) + self.assertIn("contextual_followup", review["question_quality_review"]["tag_counts"]) + + def test_save_run_review_materializes_machine_and_markdown_artifacts(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + output_dir = Path(tmp) / "review" + review = { + "run_id": "assistant-stage1-test123", + "summary": { + "overall_business_status": "pass", + "turn_pairs_total": 1, + "business_issue_turns": 0, + "p0_findings": 0, + "p1_findings": 0, + "question_quality_status": "strong", + "question_quality_score": 95, + }, + "question_quality_review": {"status": "strong", "score": 95}, + "findings": [], + "repair_targets": [], + "conversation_pairs": [], + } + + reviewer.save_run_review(review, output_dir) + + self.assertTrue((output_dir / "run_review.json").exists()) + self.assertTrue((output_dir / "run_review.md").exists()) + markdown = (output_dir / "run_review.md").read_text(encoding="utf-8") + self.assertIn("overall_business_status", markdown) + self.assertIn("Question Quality", markdown) + + def test_question_quality_treats_short_natural_followups_as_contextual(self) -> None: + pairs = [ + {"pair_index": 1, "user": {"text": "приветик - че как там дела"}}, + {"pair_index": 2, "user": {"text": "какие остатки на складе"}}, + {"pair_index": 3, "user": {"text": "давай на июль 2017"}}, + {"pair_index": 4, "user": {"text": "март 2016"}}, + {"pair_index": 5, "user": {"text": "а кому продали?"}}, + {"pair_index": 6, "user": {"text": "кто нам должен денег на май 2017"}}, + {"pair_index": 7, "user": {"text": "а по свк"}}, + ] + + review = reviewer.build_question_quality_review(pairs) + + self.assertNotIn("root_question_requires_missing_context", review["weak_flag_counts"]) + self.assertNotIn("low_business_anchor", review["weak_flag_counts"]) + self.assertGreaterEqual(review["tag_counts"]["contextual_followup"], 3) + self.assertGreaterEqual(review["tag_counts"]["direct_business_question"], 2) + + +if __name__ == "__main__": + unittest.main() diff --git a/scripts/test_save_agent_semantic_run.py b/scripts/test_save_agent_semantic_run.py new file mode 100644 index 0000000..49dd467 --- /dev/null +++ b/scripts/test_save_agent_semantic_run.py @@ -0,0 +1,241 @@ +from __future__ import annotations + +import json +import sys +import tempfile +import unittest +from pathlib import Path +from types import SimpleNamespace + + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +import save_agent_semantic_run as saver + + +def write_json(path: Path, payload: object) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + + +class SaveAgentSemanticRunTests(unittest.TestCase): + def test_extract_questions_accepts_truth_harness_question_template(self) -> None: + questions = saver.extract_questions_from_spec( + { + "steps": [ + {"step_id": "step_01", "question_template": "first question"}, + {"step_id": "step_02", "question": "second question"}, + ] + } + ) + + self.assertEqual(questions, ["first question", "second question"]) + + def test_extract_questions_accepts_domain_pack_scenarios(self) -> None: + questions = saver.extract_questions_from_spec( + { + "pack_id": "demo_pack", + "scenarios": [ + { + "scenario_id": "scenario_01", + "steps": [ + {"step_id": "step_01", "question_template": "first question"}, + {"step_id": "step_02", "question": "second question"}, + ], + }, + { + "scenario_id": "scenario_02", + "steps": [ + {"step_id": "step_01", "question": "first question"}, + {"step_id": "step_02", "question": "third question"}, + ], + }, + ], + } + ) + + self.assertEqual(questions, ["first question", "second question", "third question"]) + + def test_validate_accepted_run_dir_accepts_clean_business_review(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + run_dir = Path(tmp) + write_json( + run_dir / "pack_state.json", + { + "final_status": "accepted", + "review_overall_status": "pass", + "acceptance_gate_passed": True, + "no_unresolved_p0": True, + "unresolved_p0_count": 0, + "steps_total": 1, + "steps_passed": 1, + "steps_failed": 0, + }, + ) + write_json(run_dir / "truth_review.json", {"summary": {"overall_status": "pass"}}) + write_json( + run_dir / "business_review.json", + { + "overall_business_status": "pass", + "steps_with_business_failures": 0, + "steps_with_business_warnings": 0, + }, + ) + + metadata = saver.validate_accepted_run_dir(run_dir) + + self.assertEqual(metadata["validation_status"], "accepted_live_replay") + self.assertTrue(metadata["saved_after_validated_replay"]) + + def test_validate_accepted_run_dir_rejects_business_review_failures(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + run_dir = Path(tmp) + write_json( + run_dir / "pack_state.json", + { + "final_status": "accepted", + "review_overall_status": "pass", + "acceptance_gate_passed": True, + "no_unresolved_p0": True, + "unresolved_p0_count": 0, + }, + ) + write_json(run_dir / "truth_review.json", {"summary": {"overall_status": "pass"}}) + write_json( + run_dir / "business_review.json", + { + "overall_business_status": "fail", + "steps_with_business_failures": 1, + }, + ) + + with self.assertRaisesRegex(RuntimeError, "business_review"): + saver.validate_accepted_run_dir(run_dir) + + def test_validate_accepted_run_dir_accepts_clean_domain_pack_loop(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + loop_dir = Path(tmp) + iteration_dir = loop_dir / "iterations" / "iteration_00" + analyst_path = iteration_dir / "analyst_verdict.json" + repair_targets_path = iteration_dir / "pack_output" / "pack_run" / "repair_targets.json" + write_json( + loop_dir / "loop_state.json", + { + "loop_id": "stage_demo", + "target_score": 88, + "final_status": "accepted", + "iterations": [ + { + "iteration_id": "iteration_00", + "quality_score": 91, + "accepted_gate": True, + "analyst_accepted_gate": True, + "deterministic_gate_ok": True, + "repair_target_count": 0, + "repair_target_severity_counts": {"P0": 0, "P1": 0, "P2": 0}, + "analyst_verdict_path": str(analyst_path), + "repair_targets_path": str(repair_targets_path), + } + ], + }, + ) + write_json( + analyst_path, + { + "loop_decision": "accepted", + "unresolved_p0_count": 0, + "regression_detected": False, + "direct_answer_ok": True, + "business_usefulness_ok": True, + "temporal_honesty_ok": True, + "field_truth_ok": True, + "answer_layering_ok": True, + }, + ) + write_json(repair_targets_path, {"severity_counts": {"P0": 0, "P1": 0, "P2": 0}}) + + metadata = saver.validate_accepted_run_dir(loop_dir) + + self.assertEqual(metadata["validation_status"], "accepted_domain_pack_loop") + self.assertEqual(metadata["quality_score"], 91) + + def test_validate_accepted_run_dir_rejects_domain_pack_loop_with_p1_targets(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + loop_dir = Path(tmp) + iteration_dir = loop_dir / "iterations" / "iteration_00" + analyst_path = iteration_dir / "analyst_verdict.json" + repair_targets_path = iteration_dir / "pack_output" / "pack_run" / "repair_targets.json" + write_json( + loop_dir / "loop_state.json", + { + "loop_id": "stage_demo", + "target_score": 88, + "final_status": "accepted", + "iterations": [ + { + "quality_score": 91, + "accepted_gate": True, + "analyst_accepted_gate": True, + "deterministic_gate_ok": True, + "analyst_verdict_path": str(analyst_path), + "repair_targets_path": str(repair_targets_path), + } + ], + }, + ) + write_json( + analyst_path, + { + "loop_decision": "accepted", + "unresolved_p0_count": 0, + "regression_detected": False, + "direct_answer_ok": True, + "business_usefulness_ok": True, + "temporal_honesty_ok": True, + "field_truth_ok": True, + "answer_layering_ok": True, + }, + ) + write_json(repair_targets_path, {"severity_counts": {"P0": 0, "P1": 1, "P2": 0}}) + + with self.assertRaisesRegex(RuntimeError, "repair_targets"): + saver.validate_accepted_run_dir(loop_dir) + + def test_save_gate_refuses_real_write_without_validation(self) -> None: + args = SimpleNamespace( + validated_run_dir=None, + dry_run=False, + allow_unvalidated=False, + unvalidated_reason=None, + ) + + with self.assertRaisesRegex(RuntimeError, "Refusing to save AGENT autorun"): + saver.build_save_gate_metadata(args, {}, Path("demo.json")) + + def test_save_gate_requires_reason_for_unvalidated_draft(self) -> None: + args = SimpleNamespace( + validated_run_dir=None, + dry_run=False, + allow_unvalidated=True, + unvalidated_reason="", + ) + + with self.assertRaisesRegex(RuntimeError, "--unvalidated-reason"): + saver.build_save_gate_metadata(args, {}, Path("demo.json")) + + def test_save_gate_marks_explicit_unvalidated_draft(self) -> None: + args = SimpleNamespace( + validated_run_dir=None, + dry_run=False, + allow_unvalidated=True, + unvalidated_reason="manual GUI canary before live replay", + ) + + metadata = saver.build_save_gate_metadata(args, {}, Path("demo.json")) + + self.assertEqual(metadata["validation_status"], "explicitly_unvalidated") + self.assertFalse(metadata["saved_after_validated_replay"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/scripts/test_stage_agent_loop.py b/scripts/test_stage_agent_loop.py new file mode 100644 index 0000000..a64ca86 --- /dev/null +++ b/scripts/test_stage_agent_loop.py @@ -0,0 +1,153 @@ +from __future__ import annotations + +import argparse +import json +import sys +import tempfile +import unittest +from pathlib import Path + + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +import stage_agent_loop as stage_loop + + +def write_json(path: Path, payload: object) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + + +def args() -> argparse.Namespace: + return argparse.Namespace( + backend_url="http://127.0.0.1:8787", + prompt_version="address_query_runtime_v1", + llm_provider="local", + llm_model="qwen2.5-14b-instruct-1m", + llm_base_url="http://127.0.0.1:1234/v1", + llm_api_key="", + temperature=0.0, + max_output_tokens=2048, + timeout_seconds=180, + codex_binary="codex", + codex_profile=None, + codex_model=None, + analyst_codex_model="gpt-5.4", + coder_codex_model="gpt-5.4-mini", + analyst_reasoning_effort="medium", + coder_reasoning_effort="low", + codex_timeout_seconds=1800, + analysis_date=None, + max_scenarios=None, + use_mock=False, + ) + + +class StageAgentLoopTests(unittest.TestCase): + def test_load_stage_manifest_defaults_gate_fields(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + manifest_path = Path(tmp) / "stage.json" + write_json( + manifest_path, + { + "stage_id": "open_world_control_gate", + "module_name": "Open-World Bounded Autonomy Breadth", + "title": "Open-world semantic control gate", + "pack_manifest": "docs/orchestration/demo_pack.json", + }, + ) + + manifest = stage_loop.load_stage_manifest(manifest_path) + + self.assertEqual(manifest["target_score"], 88) + self.assertEqual(manifest["max_iterations"], 6) + self.assertTrue(manifest["save_autorun_on_accept"]) + self.assertTrue(manifest["manual_confirmation_required_after_accept"]) + + def test_build_domain_pack_loop_command_uses_stage_gate(self) -> None: + manifest = { + "stage_id": "open_world_control_gate", + "pack_manifest": "docs/orchestration/demo_pack.json", + "target_score": 91, + "max_iterations": 4, + } + command = stage_loop.build_domain_pack_loop_command(args(), manifest, Path("X:/repo/stage")) + + self.assertIn("run-pack-loop", command) + self.assertIn("--target-score", command) + self.assertIn("91", command) + self.assertIn("--max-iterations", command) + self.assertIn("4", command) + self.assertIn("--output-root", command) + + def test_build_stage_summary_requests_manual_confirmation_after_accept(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + loop_dir = Path(tmp) + write_json( + loop_dir / "loop_state.json", + { + "final_status": "accepted", + "target_score": 88, + "stop_reason": "analyst accepted + deterministic gate passed", + "iterations": [ + { + "quality_score": 93, + "loop_decision": "accepted", + "accepted_gate": True, + "deterministic_gate_ok": True, + } + ], + }, + ) + + summary = stage_loop.build_stage_summary( + { + "stage_id": "open_world_control_gate", + "module_name": "Open-World Bounded Autonomy Breadth", + "title": "Open-world semantic control gate", + "target_score": 88, + "manual_confirmation_required_after_accept": True, + }, + loop_dir, + ) + + self.assertEqual(summary["loop_final_status"], "accepted") + self.assertTrue(summary["manual_confirmation_required"]) + self.assertEqual(summary["next_action"], "manual_gui_confirmation") + + def test_build_stage_summary_continues_when_loop_is_partial(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + loop_dir = Path(tmp) + write_json( + loop_dir / "loop_state.json", + { + "final_status": "partial", + "target_score": 88, + "iterations": [ + { + "quality_score": 76, + "loop_decision": "continue", + "accepted_gate": False, + "deterministic_gate_ok": False, + "deterministic_gate_reason": "repair_targets_remaining=P1:1", + } + ], + }, + ) + + summary = stage_loop.build_stage_summary( + { + "stage_id": "open_world_control_gate", + "module_name": "Open-World Bounded Autonomy Breadth", + "title": "Open-world semantic control gate", + "target_score": 88, + }, + loop_dir, + ) + + self.assertFalse(summary["manual_confirmation_required"]) + self.assertEqual(summary["next_action"], "continue_autonomous_or_fix_blocker") + + +if __name__ == "__main__": + unittest.main()