Автоматизировать агентную проверку GUI-прогонов и stage-loop

This commit is contained in:
dctouch 2026-05-09 11:44:02 +03:00
parent 7c77db2c8d
commit 931251d1eb
11 changed files with 2428 additions and 9 deletions

View File

@ -53,6 +53,78 @@ Pack artifacts live under:
- `final_status.md` - `final_status.md`
- `scenarios/<scenario_id>/...` - `scenarios/<scenario_id>/...`
## AGENT autorun save gate
`scripts/save_agent_semantic_run.py` is a post-validation persistence tool, not a replay executor.
The normal path is:
1. build/update the truth-harness spec;
2. run `python scripts/domain_truth_harness.py run-live --spec ... --output-dir artifacts/domain_runs/<run_id>`;
3. inspect `truth_review.md`, `business_review.md`, `pack_state.json`, and `final_status.md`;
4. save to GUI autoruns only with `python scripts/save_agent_semantic_run.py --spec ... --validated-run-dir artifacts/domain_runs/<run_id>`.
The save gate requires:
- `pack_state.final_status = accepted`;
- `pack_state.acceptance_gate_passed = true`;
- `truth_review.summary.overall_status = pass`;
- `business_review.overall_business_status = pass`;
- zero unresolved P0 and zero business-answer failures.
If a pack must be saved as a deliberate manual draft before live acceptance, use
`--allow-unvalidated --unvalidated-reason "<why this is intentionally not accepted>"`.
That path is explicitly marked as unvalidated and must not be treated as semantic proof.
## Stage-level AGENT loop
`scripts/stage_agent_loop.py` wraps the domain pack loop into the development-stage workflow:
1. take the current global/local stage manifest;
2. run `scripts/domain_case_loop.py run-pack-loop` for that stage pack;
3. let the loop iterate through pack replay, business-first analyst verdict, coder patch, and rerun until the objective gate is accepted, blocked, or a real user decision is required;
4. if accepted, persist the validated AGENT pack into GUI autoruns through `scripts/save_agent_semantic_run.py --validated-run-dir`;
5. write `stage_loop_summary.json` and `stage_loop_handoff.md` for the final human visual confirmation.
The stage manifest schema is `docs/orchestration/schemas/stage_agent_loop_manifest.schema.json`.
The default stage gate is intentionally stricter than a narrow case gate: `target_score = 88`, no unresolved P0/P1 repair targets, accepted analyst verdict, clean business usefulness, direct-answer, temporal-honesty, field-truth, and answer-layering flags.
Canonical commands:
```powershell
python scripts/stage_agent_loop.py plan --manifest docs/orchestration/<stage_loop>.json
python scripts/stage_agent_loop.py run --manifest docs/orchestration/<stage_loop>.json
python scripts/stage_agent_loop.py summarize --manifest docs/orchestration/<stage_loop>.json
```
This is the intended path for “implement the stage, generate/check stage questions, analyze business answers, patch code, rerun, then ask the user for final visual confirmation”.
## GUI run review bridge
When a manual or GUI autorun already exists, `scripts/review_assistant_stage1_run.py` turns the run id into the same machine-readable review surface.
Canonical command:
```powershell
python scripts/review_assistant_stage1_run.py assistant-stage1-<id> --print-summary
```
The script resolves:
- `llm_normalizer/reports/assistant-stage1-<id>.md`;
- `llm_normalizer/data/assistant_sessions/assistant-stage1-<id>-*.json`.
It writes:
- `artifacts/domain_runs/gui_run_reviews/assistant-stage1-<id>/run_review.json`;
- `artifacts/domain_runs/gui_run_reviews/assistant-stage1-<id>/run_review.md`;
- `conversation_pairs.json`;
- `question_quality_review.json`;
- `repair_targets.json`.
This bridge is intentionally business-first:
- the user's question and visible assistant answer are reviewed before route ids and debug fields;
- noisy direct answers, missing first-line answers, technical garbage, and over-broad business answers become findings;
- generated question packs get a deterministic quality review for follow-up density, direct questions, report-style analysis, domain diversity, duplicates, and weak business anchors.
Use this bridge when the operator would otherwise say “чекни прогон `assistant-stage1-...`”. The expected next step is no longer manual eyeballing first; it is: review by id, inspect `run_review.md`, map `repair_targets.json` into the current stage loop, patch, and rerun.
## Placeholder contract ## Placeholder contract
Scenario questions can reference earlier step outputs with placeholders such as: Scenario questions can reference earlier step outputs with placeholders such as:

View File

@ -0,0 +1,72 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Stage Agent Loop Manifest",
"type": "object",
"additionalProperties": true,
"required": ["stage_id", "module_name", "title", "pack_manifest"],
"properties": {
"schema_version": {
"type": "string",
"enum": ["stage_agent_loop_manifest_v1"]
},
"stage_id": {
"type": "string",
"minLength": 1
},
"module_name": {
"type": "string",
"minLength": 1
},
"title": {
"type": "string",
"minLength": 1
},
"architecture_phase": {
"type": "string"
},
"agent_focus": {
"type": "string"
},
"current_stage_status": {
"type": "string"
},
"global_plan_refs": {
"type": "array",
"items": {
"type": "string"
}
},
"pack_manifest": {
"type": "string",
"description": "Path to a domain_case_loop run-pack manifest with scenarios for the stage gate."
},
"loop_id": {
"type": "string"
},
"target_score": {
"type": "integer",
"minimum": 0,
"maximum": 100,
"default": 88
},
"max_iterations": {
"type": "integer",
"minimum": 1,
"default": 6
},
"acceptance_invariants": {
"type": "array",
"items": {
"type": "string"
}
},
"save_autorun_on_accept": {
"type": "boolean",
"default": true
},
"manual_confirmation_required_after_accept": {
"type": "boolean",
"default": true
}
}
}

View File

@ -88,6 +88,48 @@ TOP_LEVEL_NOISE_PATTERNS = (
re.compile(r"^(?:подтверждение|опорные документы|сервисно)\b", re.IGNORECASE), re.compile(r"^(?:подтверждение|опорные документы|сервисно)\b", re.IGNORECASE),
) )
BUSINESS_DIRECT_QUESTION_MARKERS = (
"\u0441\u043a\u043e\u043b\u044c\u043a\u043e",
"\u0441\u043a\u043e\u043a",
"\u043a\u0430\u043a\u043e\u0439",
"\u043a\u0430\u043a\u0430\u044f",
"\u043a\u0430\u043a\u0438\u0435",
"\u043a\u0442\u043e",
"\u043a\u043e\u043c\u0443",
"\u043a\u043e\u0433\u0434\u0430",
"\u0433\u0434\u0435",
"\u043a\u0443\u0434\u0430",
"\u043f\u043e\u0447\u0435\u043c\u0443",
"\u0437\u0430\u0447\u0435\u043c",
"\u043a\u0430\u043a\u0438\u043c \u0434\u043e\u043a\u0443\u043c\u0435\u043d\u0442\u043e\u043c",
"\u043f\u043e\u043a\u0430\u0436\u0438",
)
BUSINESS_REPORT_REQUEST_MARKERS = (
"\u043e\u0431\u0437\u043e\u0440",
"\u0430\u043d\u0430\u043b\u0438\u0437",
"\u043f\u043e\u0434\u0440\u043e\u0431",
"\u0440\u0430\u0437\u0432\u0435\u0440\u043d",
"\u043e\u0446\u0435\u043d",
"\u0430\u0443\u0434\u0438\u0442",
)
BUSINESS_TOP_LINE_SCAFFOLD_MARKERS = (
"\u043e\u0433\u0440\u0430\u043d\u0438\u0447\u0435\u043d\u043d\u044b\u0439 \u0431\u0438\u0437\u043d\u0435\u0441-\u043e\u0431\u0437\u043e\u0440",
"\u0447\u0442\u043e \u043f\u043e\u0434\u0442\u0432\u0435\u0440\u0436\u0434\u0435\u043d\u043e",
"\u043f\u0440\u043e\u0432\u0435\u0440\u0435\u043d\u043d\u044b\u0435 \u043a\u043e\u043d\u0442\u0443\u0440\u044b",
"\u0431\u043b\u043e\u043a 1",
"\u0441\u0442\u0430\u0442\u0443\u0441",
)
BUSINESS_TECHNICAL_GARBAGE_MARKERS = (
"mcp_discovery",
"runtime_",
"capability_id",
"selected_chain_id",
"business_overview_route_template_v1",
"query_movements",
"query_documents",
)
BUSINESS_DIRECT_ANSWER_SOFT_LIMIT = 1800
DEFAULT_INVARIANT_SEVERITY: dict[str, str] = { DEFAULT_INVARIANT_SEVERITY: dict[str, str] = {
"wrong_intent": "P0", "wrong_intent": "P0",
"wrong_capability": "P0", "wrong_capability": "P0",
@ -104,6 +146,10 @@ DEFAULT_INVARIANT_SEVERITY: dict[str, str] = {
"wrong_date_scope_state": "P0", "wrong_date_scope_state": "P0",
"direct_answer_missing": "P0", "direct_answer_missing": "P0",
"top_level_noise_present": "P0", "top_level_noise_present": "P0",
"business_direct_answer_missing": "P0",
"technical_garbage_in_answer": "P0",
"answer_layering_noise": "P1",
"business_answer_too_verbose": "P1",
} }
REPAIR_TARGET_SEVERITY_ORDER = {"P0": 0, "P1": 1, "P2": 2} REPAIR_TARGET_SEVERITY_ORDER = {"P0": 0, "P1": 1, "P2": 2}
@ -114,11 +160,12 @@ REPAIR_TARGET_PROBLEM_ORDER = {
"object_memory_gap": 3, "object_memory_gap": 3,
"route_gap": 4, "route_gap": 4,
"answer_shape_mismatch": 5, "answer_shape_mismatch": 5,
"presentation_gap": 6, "business_utility_gap": 6,
"domain_anchor_gap": 7, "presentation_gap": 7,
"capability_gap": 8, "domain_anchor_gap": 8,
"evidence_gap": 9, "capability_gap": 9,
"other": 10, "evidence_gap": 10,
"other": 11,
} }
REPAIR_TARGET_FILE_HINTS: dict[str, list[str]] = { REPAIR_TARGET_FILE_HINTS: dict[str, list[str]] = {
@ -157,6 +204,16 @@ REPAIR_TARGET_FILE_HINTS: dict[str, list[str]] = {
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts", "llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
"llm_normalizer/backend/src/services/assistantService.ts", "llm_normalizer/backend/src/services/assistantService.ts",
], ],
"answer_shape_mismatch": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
"llm_normalizer/backend/src/services/assistantMcpDiscoveryResponseCandidate.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"business_utility_gap": [
"llm_normalizer/backend/src/services/address_runtime/composeStage.ts",
"llm_normalizer/backend/src/services/assistantMcpDiscoveryResponseCandidate.ts",
"llm_normalizer/backend/src/services/assistantService.ts",
],
"evidence_gap": [ "evidence_gap": [
"llm_normalizer/backend/src/services/addressQueryService.ts", "llm_normalizer/backend/src/services/addressQueryService.ts",
"llm_normalizer/backend/src/services/addressRecipeCatalog.ts", "llm_normalizer/backend/src/services/addressRecipeCatalog.ts",
@ -1526,6 +1583,79 @@ def should_require_direct_answer(step_state: dict[str, Any]) -> bool:
return str(step_state.get("node_role") or "").strip() in {"root", "critical_child"} return str(step_state.get("node_role") or "").strip() in {"root", "critical_child"}
def _review_text(value: Any) -> str:
return str(value or "").strip().lower()
def _marker_hits(text: str, markers: tuple[str, ...]) -> list[str]:
lowered = _review_text(text)
return [marker for marker in markers if marker and marker in lowered]
def is_report_style_business_question(question: str) -> bool:
return bool(_marker_hits(question, BUSINESS_REPORT_REQUEST_MARKERS))
def is_direct_style_business_question(question: str) -> bool:
if is_report_style_business_question(question):
return False
return bool(_marker_hits(question, BUSINESS_DIRECT_QUESTION_MARKERS))
def build_business_first_review(step_state: dict[str, Any]) -> dict[str, Any]:
question = str(step_state.get("question_resolved") or step_state.get("question_template") or "").strip()
assistant_text = str(step_state.get("assistant_text") or "")
top_lines = step_state.get("top_non_empty_lines") if isinstance(step_state.get("top_non_empty_lines"), list) else []
first_line = str(top_lines[0] if top_lines else step_state.get("actual_direct_answer") or "").strip()
direct_answer_required = should_require_direct_answer(step_state) or is_direct_style_business_question(question)
report_style_question = is_report_style_business_question(question)
technical_hits = _marker_hits(assistant_text, BUSINESS_TECHNICAL_GARBAGE_MARKERS)
first_line_technical_hits = _marker_hits(first_line, BUSINESS_TECHNICAL_GARBAGE_MARKERS)
scaffold_hits = _marker_hits(first_line, BUSINESS_TOP_LINE_SCAFFOLD_MARKERS)
top_noise = bool(first_line and is_top_level_noise_line(first_line))
direct_answer_first_ok = bool(first_line) and not top_noise and not scaffold_hits and not first_line_technical_hits
too_verbose_for_direct = bool(
direct_answer_required
and not report_style_question
and len(assistant_text) > BUSINESS_DIRECT_ANSWER_SOFT_LIMIT
)
issue_codes: list[str] = []
if technical_hits:
issue_codes.append("technical_garbage_in_answer")
if direct_answer_required and not direct_answer_first_ok:
issue_codes.append("business_direct_answer_missing")
if scaffold_hits or top_noise:
issue_codes.append("answer_layering_noise")
if too_verbose_for_direct:
issue_codes.append("business_answer_too_verbose")
root_cause_layers: list[str] = []
if "business_direct_answer_missing" in issue_codes or "answer_layering_noise" in issue_codes:
root_cause_layers.append("answer_shape_mismatch")
if "business_answer_too_verbose" in issue_codes or "technical_garbage_in_answer" in issue_codes:
root_cause_layers.append("business_utility_gap")
return {
"schema_version": "business_first_step_review_v1",
"question": question,
"direct_answer_required": direct_answer_required,
"report_style_question": report_style_question,
"answer_length_chars": len(assistant_text),
"answer_line_count": len([line for line in assistant_text.splitlines() if line.strip()]),
"actual_direct_answer": first_line or None,
"direct_answer_first_ok": (not direct_answer_required) or direct_answer_first_ok,
"answer_layering_ok": not scaffold_hits and not top_noise,
"technical_garbage_present": bool(technical_hits),
"technical_garbage_hits": technical_hits,
"top_line_scaffold_present": bool(scaffold_hits or top_noise),
"top_line_scaffold_hits": scaffold_hits,
"too_verbose_for_direct_question": too_verbose_for_direct,
"business_usefulness_ok": not issue_codes,
"issue_codes": issue_codes,
"suggested_root_cause_layers": list(dict.fromkeys(root_cause_layers)),
}
def is_top_level_noise_line(line: str) -> bool: def is_top_level_noise_line(line: str) -> bool:
cleaned = str(line or "").strip() cleaned = str(line or "").strip()
if not cleaned: if not cleaned:
@ -1563,6 +1693,8 @@ def validate_step_contract(step_state: dict[str, Any]) -> dict[str, Any]:
date_scope = state.get("date_scope") if isinstance(state.get("date_scope"), dict) else {} date_scope = state.get("date_scope") if isinstance(state.get("date_scope"), dict) else {}
violated_invariants: list[str] = [] violated_invariants: list[str] = []
warnings: list[str] = [] warnings: list[str] = []
business_review = build_business_first_review(state)
state["business_first_review"] = business_review
expected_intents = normalize_string_list(state.get("expected_intents")) expected_intents = normalize_string_list(state.get("expected_intents"))
if expected_intents and not identifier_in_list(state.get("detected_intent"), expected_intents): if expected_intents and not identifier_in_list(state.get("detected_intent"), expected_intents):
@ -1645,6 +1777,13 @@ def validate_step_contract(step_state: dict[str, Any]) -> dict[str, Any]:
if first_top_line and is_top_level_noise_line(first_top_line): if first_top_line and is_top_level_noise_line(first_top_line):
violated_invariants.append("top_level_noise_present") violated_invariants.append("top_level_noise_present")
for issue_code in normalize_string_list(business_review.get("issue_codes")):
if issue_code == "business_answer_too_verbose":
warnings.append(issue_code)
violated_invariants.append(issue_code)
continue
violated_invariants.append(issue_code)
forbidden_answer_patterns = normalize_string_list(state.get("forbidden_answer_patterns")) forbidden_answer_patterns = normalize_string_list(state.get("forbidden_answer_patterns"))
if forbidden_answer_patterns and top_non_empty_lines: if forbidden_answer_patterns and top_non_empty_lines:
joined_top_block = "\n".join(str(line) for line in top_non_empty_lines) joined_top_block = "\n".join(str(line) for line in top_non_empty_lines)
@ -2697,6 +2836,7 @@ def compact_step_output_for_review(step_output: Any) -> dict[str, Any]:
"result_mode": step_output.get("result_mode"), "result_mode": step_output.get("result_mode"),
"answer_shape": step_output.get("answer_shape"), "answer_shape": step_output.get("answer_shape"),
"actual_direct_answer": step_output.get("actual_direct_answer"), "actual_direct_answer": step_output.get("actual_direct_answer"),
"business_first_review": step_output.get("business_first_review"),
"violated_invariants": step_output.get("violated_invariants"), "violated_invariants": step_output.get("violated_invariants"),
"warnings": step_output.get("warnings"), "warnings": step_output.get("warnings"),
"fallback_type": step_output.get("fallback_type"), "fallback_type": step_output.get("fallback_type"),
@ -2742,6 +2882,9 @@ def derive_repair_target_severity(step_output: dict[str, Any]) -> str:
return "P1" return "P1"
if execution_status in {"partial", "needs_exact_capability"} or reply_type == "partial_coverage": if execution_status in {"partial", "needs_exact_capability"} or reply_type == "partial_coverage":
return "P1" return "P1"
violated_invariants = normalize_string_list(step_output.get("violated_invariants"))
if any(derive_invariant_severity(step_output, code) == "P1" for code in violated_invariants):
return "P1"
if normalize_string_list(step_output.get("warnings")): if normalize_string_list(step_output.get("warnings")):
return "P2" return "P2"
return "P2" return "P2"
@ -2772,6 +2915,10 @@ def derive_repair_problem_type(step_output: dict[str, Any]) -> str:
"forbidden_recipe_selected", "forbidden_recipe_selected",
} & violated: } & violated:
return "route_gap" return "route_gap"
if {"business_direct_answer_missing", "answer_layering_noise"} & violated:
return "answer_shape_mismatch"
if {"business_answer_too_verbose", "technical_garbage_in_answer"} & violated:
return "business_utility_gap"
if {"direct_answer_missing", "top_level_noise_present"} & violated: if {"direct_answer_missing", "top_level_noise_present"} & violated:
return "presentation_gap" return "presentation_gap"
if mcp_call_status == "materialized_but_not_anchor_matched": if mcp_call_status == "materialized_but_not_anchor_matched":
@ -2808,6 +2955,13 @@ def derive_repair_root_cause_layers(step_output: dict[str, Any], problem_type: s
layers.append("business_utility_gap") layers.append("business_utility_gap")
if str(step_output.get("required_answer_shape") or "").strip(): if str(step_output.get("required_answer_shape") or "").strip():
layers.append("answer_shape_mismatch") layers.append("answer_shape_mismatch")
elif problem_type == "answer_shape_mismatch":
layers.append("answer_shape_mismatch")
layers.append("business_utility_gap")
elif problem_type == "business_utility_gap":
layers.append("business_utility_gap")
if "answer_layering_noise" in violated:
layers.append("answer_shape_mismatch")
elif problem_type == "evidence_gap": elif problem_type == "evidence_gap":
layers.append("runtime_capability_gap") layers.append("runtime_capability_gap")
elif problem_type == "domain_anchor_gap": elif problem_type == "domain_anchor_gap":
@ -2833,6 +2987,10 @@ def build_repair_fix_goal(step_output: dict[str, Any], problem_type: str) -> str
return f"Enable an exact route for `{question}` so the loop no longer falls back to partial or unsupported behavior." return f"Enable an exact route for `{question}` so the loop no longer falls back to partial or unsupported behavior."
if problem_type == "presentation_gap": if problem_type == "presentation_gap":
return f"Make `{question}` answer-first: direct business answer in the first line, proof second, service notes last." return f"Make `{question}` answer-first: direct business answer in the first line, proof second, service notes last."
if problem_type == "answer_shape_mismatch":
return f"Make `{question}` start with the exact business answer requested, then put proof and caveats after it."
if problem_type == "business_utility_gap":
return f"Make `{question}` useful for a business reader: remove technical/scaffold noise and keep direct answers compact."
if problem_type == "evidence_gap": if problem_type == "evidence_gap":
return f"Return grounded evidence for `{question}` instead of a limited empty response when the correct route already fires." return f"Return grounded evidence for `{question}` instead of a limited empty response when the correct route already fires."
if problem_type == "domain_anchor_gap": if problem_type == "domain_anchor_gap":

View File

@ -309,6 +309,46 @@ def append_finding(
) )
BUSINESS_REVIEW_FINDING_MESSAGES = {
"technical_garbage_in_answer": "User-facing answer leaked internal runtime or MCP identifiers.",
"business_direct_answer_missing": "The answer did not put the direct business answer first.",
"answer_layering_noise": "The answer opened with scaffolding or report framing instead of a clean business result.",
"business_answer_too_verbose": "The answer is too verbose for a direct business question.",
}
BUSINESS_REVIEW_FINDING_SEVERITY = {
"technical_garbage_in_answer": "critical",
"business_direct_answer_missing": "critical",
"answer_layering_noise": "critical",
"business_answer_too_verbose": "warning",
}
def append_business_review_findings(findings: list[dict[str, Any]], step: dict[str, Any], step_state: dict[str, Any]) -> None:
business_review = step_state.get("business_first_review")
if not isinstance(business_review, dict):
return
for issue_code in dcl.normalize_string_list(business_review.get("issue_codes")):
append_finding(
findings,
step,
f"business_review:{issue_code}",
BUSINESS_REVIEW_FINDING_MESSAGES.get(issue_code, "Business-first answer review detected a semantic quality issue."),
actual={
"direct_answer": business_review.get("actual_direct_answer"),
"answer_length_chars": business_review.get("answer_length_chars"),
"technical_garbage_hits": business_review.get("technical_garbage_hits"),
"top_line_scaffold_hits": business_review.get("top_line_scaffold_hits"),
},
expected={
"direct_answer_first_ok": True,
"business_usefulness_ok": True,
"answer_layering_ok": True,
},
severity=BUSINESS_REVIEW_FINDING_SEVERITY.get(issue_code, step.get("criticality") or DEFAULT_CRITICALITY),
)
def matches_any_pattern(text: str, patterns: list[str]) -> bool: def matches_any_pattern(text: str, patterns: list[str]) -> bool:
return any(re.search(pattern, text, flags=re.IGNORECASE) for pattern in patterns if pattern) return any(re.search(pattern, text, flags=re.IGNORECASE) for pattern in patterns if pattern)
@ -355,6 +395,7 @@ def evaluate_truth_step(
extracted_filters = ( extracted_filters = (
step_state.get("extracted_filters") if isinstance(step_state.get("extracted_filters"), dict) else {} step_state.get("extracted_filters") if isinstance(step_state.get("extracted_filters"), dict) else {}
) )
append_business_review_findings(findings, step, step_state)
if ( if (
catalog_alignment_status in {"selected_lower_rank", "selected_outside_match_set"} catalog_alignment_status in {"selected_lower_rank", "selected_outside_match_set"}
@ -751,6 +792,101 @@ def build_truth_review_summary(spec: dict[str, Any], scenario_state: dict[str, A
} }
def build_business_review_summary(spec: dict[str, Any], scenario_state: dict[str, Any]) -> dict[str, Any]:
step_outputs = scenario_state.get("step_outputs") if isinstance(scenario_state.get("step_outputs"), dict) else {}
steps: list[dict[str, Any]] = []
issue_counts: dict[str, int] = {}
for index, step in enumerate(spec["steps"], start=1):
step_state = step_outputs.get(step["step_id"], {})
business_review = (
step_state.get("business_first_review")
if isinstance(step_state, dict) and isinstance(step_state.get("business_first_review"), dict)
else {}
)
issue_codes = dcl.normalize_string_list(business_review.get("issue_codes"))
for issue_code in issue_codes:
issue_counts[issue_code] = issue_counts.get(issue_code, 0) + 1
steps.append(
{
"index": index,
"step_id": step["step_id"],
"question": step["question_template"],
"review_status": step_state.get("review_status") if isinstance(step_state, dict) else None,
"direct_answer": business_review.get("actual_direct_answer"),
"answer_length_chars": business_review.get("answer_length_chars"),
"direct_answer_required": business_review.get("direct_answer_required"),
"direct_answer_first_ok": business_review.get("direct_answer_first_ok"),
"business_usefulness_ok": business_review.get("business_usefulness_ok"),
"answer_layering_ok": business_review.get("answer_layering_ok"),
"technical_garbage_present": business_review.get("technical_garbage_present"),
"too_verbose_for_direct_question": business_review.get("too_verbose_for_direct_question"),
"issue_codes": issue_codes,
"suggested_root_cause_layers": business_review.get("suggested_root_cause_layers") or [],
}
)
failed = sum(
1
for step in steps
if any(
issue in {"technical_garbage_in_answer", "business_direct_answer_missing", "answer_layering_noise"}
for issue in step["issue_codes"]
)
)
warnings = sum(1 for step in steps if "business_answer_too_verbose" in step["issue_codes"])
return {
"schema_version": "business_first_run_review_v1",
"scenario_id": spec["scenario_id"],
"domain": spec["domain"],
"title": spec["title"],
"session_id": scenario_state.get("session_id"),
"steps_total": len(steps),
"steps_with_business_failures": failed,
"steps_with_business_warnings": warnings,
"issue_counts": issue_counts,
"overall_business_status": "fail" if failed else ("warning" if warnings else "pass"),
"steps": steps,
}
def build_business_review_markdown(business_review: dict[str, Any]) -> str:
lines = [
"# Business-first review",
"",
f"- scenario_id: `{business_review.get('scenario_id') or 'n/a'}`",
f"- domain: `{business_review.get('domain') or 'n/a'}`",
f"- title: {business_review.get('title') or 'n/a'}",
f"- session_id: `{business_review.get('session_id') or 'n/a'}`",
f"- overall_business_status: `{business_review.get('overall_business_status') or 'n/a'}`",
f"- steps_total: `{business_review.get('steps_total')}`",
f"- steps_with_business_failures: `{business_review.get('steps_with_business_failures')}`",
f"- steps_with_business_warnings: `{business_review.get('steps_with_business_warnings')}`",
f"- issue_counts: `{dump_json(business_review.get('issue_counts') or {})}`",
"",
"## Human Answer Surface",
]
for step in business_review.get("steps") or []:
if not isinstance(step, dict):
continue
lines.extend(
[
f"{step.get('index')}. `{step.get('step_id')}` - {step.get('question')}",
f"review_status: `{step.get('review_status') or 'n/a'}`",
f"direct_answer: {step.get('direct_answer') or 'n/a'}",
f"answer_length_chars: `{step.get('answer_length_chars')}`",
f"direct_answer_required: `{step.get('direct_answer_required')}`",
f"direct_answer_first_ok: `{step.get('direct_answer_first_ok')}`",
f"business_usefulness_ok: `{step.get('business_usefulness_ok')}`",
f"answer_layering_ok: `{step.get('answer_layering_ok')}`",
f"technical_garbage_present: `{step.get('technical_garbage_present')}`",
f"too_verbose_for_direct_question: `{step.get('too_verbose_for_direct_question')}`",
f"issue_codes: `{', '.join(step.get('issue_codes') or []) or 'none'}`",
f"suggested_root_cause_layers: `{', '.join(step.get('suggested_root_cause_layers') or []) or 'none'}`",
"",
]
)
return "\n".join(lines).strip() + "\n"
def build_truth_review_markdown(spec: dict[str, Any], scenario_state: dict[str, Any], review_summary: dict[str, Any]) -> str: def build_truth_review_markdown(spec: dict[str, Any], scenario_state: dict[str, Any], review_summary: dict[str, Any]) -> str:
lines = [ lines = [
"# Truth harness review", "# Truth harness review",
@ -772,6 +908,11 @@ def build_truth_review_markdown(spec: dict[str, Any], scenario_state: dict[str,
for index, step in enumerate(spec["steps"], start=1): for index, step in enumerate(spec["steps"], start=1):
step_state = step_outputs.get(step["step_id"], {}) step_state = step_outputs.get(step["step_id"], {})
findings = step_state.get("review_findings") if isinstance(step_state.get("review_findings"), list) else [] findings = step_state.get("review_findings") if isinstance(step_state.get("review_findings"), list) else []
business_review = (
step_state.get("business_first_review")
if isinstance(step_state, dict) and isinstance(step_state.get("business_first_review"), dict)
else {}
)
lines.extend( lines.extend(
[ [
f"{index}. `{step['step_id']}` - {step['question_template']}", f"{index}. `{step['step_id']}` - {step['question_template']}",
@ -786,6 +927,11 @@ def build_truth_review_markdown(spec: dict[str, Any], scenario_state: dict[str,
f"limited_reason_category: `{step_state.get('limited_reason_category') or 'n/a'}`", f"limited_reason_category: `{step_state.get('limited_reason_category') or 'n/a'}`",
f"filters: `{dump_json(step_state.get('extracted_filters') or {})}`", f"filters: `{dump_json(step_state.get('extracted_filters') or {})}`",
f"direct_answer: {step_state.get('actual_direct_answer') or 'n/a'}", f"direct_answer: {step_state.get('actual_direct_answer') or 'n/a'}",
f"business_first: status=`{business_review.get('business_usefulness_ok')}`, "
f"direct_first=`{business_review.get('direct_answer_first_ok')}`, "
f"layering=`{business_review.get('answer_layering_ok')}`, "
f"length=`{business_review.get('answer_length_chars')}`, "
f"issues=`{', '.join(business_review.get('issue_codes') or []) or 'none'}`",
] ]
) )
if step.get("notes"): if step.get("notes"):
@ -964,9 +1110,12 @@ def review_export(spec: dict[str, Any], export_path: Path, output_dir: Path) ->
scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat() scenario_state["updated_at"] = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
review_summary = build_truth_review_summary(spec, scenario_state, f"export:{export_path}") review_summary = build_truth_review_summary(spec, scenario_state, f"export:{export_path}")
review_markdown = build_truth_review_markdown(spec, scenario_state, review_summary) review_markdown = build_truth_review_markdown(spec, scenario_state, review_summary)
business_review = build_business_review_summary(spec, scenario_state)
write_json(output_dir / "scenario_state.json", scenario_state) write_json(output_dir / "scenario_state.json", scenario_state)
write_json(output_dir / "truth_review.json", {"summary": review_summary, "steps": scenario_state["step_outputs"]}) write_json(output_dir / "truth_review.json", {"summary": review_summary, "steps": scenario_state["step_outputs"]})
write_text(output_dir / "truth_review.md", review_markdown) write_text(output_dir / "truth_review.md", review_markdown)
write_json(output_dir / "business_review.json", business_review)
write_text(output_dir / "business_review.md", build_business_review_markdown(business_review))
acceptance_bundle = write_acceptance_artifacts(output_dir, spec, scenario_state, review_summary) acceptance_bundle = write_acceptance_artifacts(output_dir, spec, scenario_state, review_summary)
return { return {
"scenario_state": scenario_state, "scenario_state": scenario_state,
@ -1056,10 +1205,13 @@ def run_live(spec: dict[str, Any], output_dir: Path, args: argparse.Namespace) -
review_summary = build_truth_review_summary(spec, scenario_state, "live_strict_replay") review_summary = build_truth_review_summary(spec, scenario_state, "live_strict_replay")
review_markdown = build_truth_review_markdown(spec, scenario_state, review_summary) review_markdown = build_truth_review_markdown(spec, scenario_state, review_summary)
business_review = build_business_review_summary(spec, scenario_state)
write_text(output_dir / "session_id.txt", f"{scenario_state.get('session_id') or ''}\n") write_text(output_dir / "session_id.txt", f"{scenario_state.get('session_id') or ''}\n")
write_json(output_dir / "scenario_state.json", scenario_state) write_json(output_dir / "scenario_state.json", scenario_state)
write_json(output_dir / "truth_review.json", {"summary": review_summary, "steps": scenario_state["step_outputs"]}) write_json(output_dir / "truth_review.json", {"summary": review_summary, "steps": scenario_state["step_outputs"]})
write_text(output_dir / "truth_review.md", review_markdown) write_text(output_dir / "truth_review.md", review_markdown)
write_json(output_dir / "business_review.json", business_review)
write_text(output_dir / "business_review.md", build_business_review_markdown(business_review))
acceptance_bundle = write_acceptance_artifacts(output_dir, spec, scenario_state, review_summary) acceptance_bundle = write_acceptance_artifacts(output_dir, spec, scenario_state, review_summary)
print(f"[truth-harness] saved artifacts to {output_dir}") print(f"[truth-harness] saved artifacts to {output_dir}")
print(f"[truth-harness] overall_status={review_summary['overall_status']}") print(f"[truth-harness] overall_status={review_summary['overall_status']}")

View File

@ -0,0 +1,634 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import re
import sys
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import domain_case_loop as dcl
REPO_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "assistant_sessions"
DEFAULT_REPORTS_DIR = REPO_ROOT / "llm_normalizer" / "reports"
DEFAULT_OUTPUT_ROOT = REPO_ROOT / "artifacts" / "domain_runs" / "gui_run_reviews"
RUN_REVIEW_SCHEMA_VERSION = "assistant_stage1_run_review_v1"
QUESTION_QUALITY_SCHEMA_VERSION = "assistant_stage1_question_quality_v1"
DOMAIN_MARKERS: dict[str, tuple[str, ...]] = {
"vat": ("ндс", "налог", "вычет", "счет-фактур"),
"money": ("деньг", "заработ", "доход", "выруч", "поступлен", "оплат", "оборот"),
"counterparty": ("контрагент", "клиент", "покупател", "поставщик", "группа свк", "свк", "чепурнов", "альтернатива"),
"inventory": ("склад", "товар", "остат", "закуп", "продаж", "номенклатур"),
"debt": ("долг", "должен", "должны", "должн", "дебитор", "кредитор", "счет 60", "счет 62", "хвост"),
"documents": ("документ", "доки", "накладн", "акт", "платеж", "реализац", "поступлени"),
}
SMALLTALK_MARKERS = ("привет", "как дела", "что умеешь", "что можешь", "расскажи что можешь")
FOLLOWUP_MARKERS = (
"по ней",
"по нему",
"по этой",
"по этому",
"по выбран",
"теперь",
"тогда",
"давай на",
"а еще",
"еще",
"эту",
"его",
"ее",
"этот",
"эта",
"сравни",
"а если",
"а нам",
"почему",
"а кому",
"кому ",
)
DATE_ONLY_FOLLOWUP_PATTERN = re.compile(
r"^\s*(?:давай\s+)?(?:на\s+)?(?:январ[ьяе]|феврал[ьяе]|март[ае]?|апрел[ьяе]|ма[йяе]|июн[ьяе]|июл[ьяе]|август[ае]?|сентябр[ьяе]|октябр[ьяе]|ноябр[ьяе]|декабр[ьяе])\s+\d{4}\s*$",
re.IGNORECASE,
)
FALSE_CATASTROPHE_MARKERS = (
"все сломалось",
"разъехалось",
"разъеб",
"пиздец",
"хуйня",
"косяк",
"неправильно",
)
BUSINESS_NOUN_MARKERS = tuple(sorted({item for values in DOMAIN_MARKERS.values() for item in values}))
def now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def load_json(path: Path) -> Any:
return json.loads(path.read_text(encoding="utf-8-sig"))
def write_text(path: Path, text: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text, encoding="utf-8")
def write_json(path: Path, payload: Any) -> None:
write_text(path, json.dumps(payload, ensure_ascii=False, indent=2) + "\n")
def repo_relative(path: Path) -> str:
try:
return str(path.resolve().relative_to(REPO_ROOT))
except ValueError:
return str(path.resolve())
def normalize_text(value: Any) -> str:
return re.sub(r"\s+", " ", str(value or "").strip().lower())
def compact_preview(value: Any, limit: int = 260) -> str:
text = re.sub(r"\s+", " ", str(value or "").strip())
if len(text) <= limit:
return text
return text[: limit - 1].rstrip() + "..."
def has_any(text: str, markers: tuple[str, ...]) -> bool:
lowered = normalize_text(text)
return any(marker in lowered for marker in markers)
def run_id_from_value(value: str) -> str:
text = str(value or "").strip()
match = re.search(r"(assistant-stage1-[A-Za-z0-9_-]+)", text)
if not match:
raise RuntimeError(f"Cannot parse assistant-stage1 run id from: {value}")
return match.group(1)
def parse_report_metadata(report_path: Path) -> dict[str, Any]:
if not report_path.exists():
return {}
metadata: dict[str, Any] = {"report_path": repo_relative(report_path)}
for line in report_path.read_text(encoding="utf-8-sig").splitlines()[:80]:
match = re.match(r"^-\s*([^:]+):\s*(.*)$", line.strip())
if match:
metadata[match.group(1).strip()] = match.group(2).strip()
return metadata
def resolve_session_files(
*,
run_id: str,
sessions_dir: Path,
explicit_session_file: Path | None = None,
) -> list[Path]:
if explicit_session_file is not None:
if not explicit_session_file.exists():
raise RuntimeError(f"Session file not found: {explicit_session_file}")
return [explicit_session_file]
candidates = sorted(sessions_dir.glob(f"{run_id}-*.json"))
if not candidates:
raise RuntimeError(f"No assistant session files found for {run_id} in {sessions_dir}")
return candidates
def load_session(path: Path) -> dict[str, Any]:
payload = load_json(path)
if not isinstance(payload, dict):
raise RuntimeError(f"Assistant session must be a JSON object: {path}")
conversation = payload.get("conversation")
if not isinstance(conversation, list):
raise RuntimeError(f"Assistant session has no conversation[]: {path}")
return payload
def build_conversation_pairs(conversation: list[dict[str, Any]]) -> list[dict[str, Any]]:
pairs: list[dict[str, Any]] = []
for index, item in enumerate(conversation):
if not isinstance(item, dict) or item.get("role") != "user":
continue
assistant_item: dict[str, Any] | None = None
for candidate in conversation[index + 1 :]:
if isinstance(candidate, dict) and candidate.get("role") == "assistant":
assistant_item = candidate
break
if isinstance(candidate, dict) and candidate.get("role") == "user":
break
pairs.append(
{
"pair_index": len(pairs) + 1,
"user": item,
"assistant": assistant_item,
}
)
return pairs
def classify_question(question: str, pair_index: int) -> dict[str, Any]:
normalized = normalize_text(question)
tags: list[str] = []
domains: list[str] = []
if has_any(normalized, SMALLTALK_MARKERS):
tags.append("smalltalk_or_meta")
if dcl.is_direct_style_business_question(normalized):
tags.append("direct_business_question")
if dcl.is_report_style_business_question(normalized):
tags.append("report_or_analysis_request")
date_only_followup = pair_index > 1 and bool(DATE_ONLY_FOLLOWUP_PATTERN.match(question))
if has_any(normalized, FOLLOWUP_MARKERS) or date_only_followup:
tags.append("contextual_followup")
if has_any(normalized, FALSE_CATASTROPHE_MARKERS):
tags.append("false_catastrophe_or_negative_pressure")
for domain, markers in DOMAIN_MARKERS.items():
if has_any(normalized, markers):
domains.append(domain)
if domains:
tags.append("domain_grounded")
if not tags:
tags.append("unclassified")
weak_flags: list[str] = []
if pair_index == 1 and "contextual_followup" in tags and "smalltalk_or_meta" not in tags:
weak_flags.append("root_question_requires_missing_context")
if len(question) > 500:
weak_flags.append("question_too_long")
if (
"smalltalk_or_meta" not in tags
and "contextual_followup" not in tags
and "report_or_analysis_request" not in tags
and not domains
and not has_any(normalized, BUSINESS_NOUN_MARKERS)
):
weak_flags.append("low_business_anchor")
return {
"question": question,
"tags": tags,
"domains": domains,
"weak_flags": weak_flags,
"length_chars": len(question),
}
def build_question_quality_review(pairs: list[dict[str, Any]]) -> dict[str, Any]:
question_reviews: list[dict[str, Any]] = []
question_counter: Counter[str] = Counter()
for pair in pairs:
question = str(pair.get("user", {}).get("text") or "")
normalized = normalize_text(question)
if normalized:
question_counter[normalized] += 1
question_reviews.append(classify_question(question, int(pair["pair_index"])))
tag_counts = Counter(tag for item in question_reviews for tag in item["tags"])
domain_counts = Counter(domain for item in question_reviews for domain in item["domains"])
weak_flag_counts = Counter(flag for item in question_reviews for flag in item["weak_flags"])
duplicate_questions = [question for question, count in question_counter.items() if count > 1]
if duplicate_questions:
weak_flag_counts["duplicate_questions"] += len(duplicate_questions)
if tag_counts["contextual_followup"] < 2 and len(question_reviews) >= 8:
weak_flag_counts["too_few_contextual_followups"] += 1
if tag_counts["direct_business_question"] < 3 and len(question_reviews) >= 8:
weak_flag_counts["too_few_direct_business_questions"] += 1
if tag_counts["report_or_analysis_request"] < 1 and len(question_reviews) >= 8:
weak_flag_counts["missing_report_or_analysis_request"] += 1
if len(domain_counts) < 3 and len(question_reviews) >= 8:
weak_flag_counts["low_domain_diversity"] += 1
score = 100
score -= min(30, weak_flag_counts["low_business_anchor"] * 6)
score -= min(20, weak_flag_counts["question_too_long"] * 5)
score -= min(20, weak_flag_counts["duplicate_questions"] * 5)
score -= 12 if weak_flag_counts["too_few_contextual_followups"] else 0
score -= 12 if weak_flag_counts["too_few_direct_business_questions"] else 0
score -= 10 if weak_flag_counts["missing_report_or_analysis_request"] else 0
score -= 10 if weak_flag_counts["low_domain_diversity"] else 0
score -= 20 if weak_flag_counts["root_question_requires_missing_context"] else 0
score = max(0, min(100, score))
if score >= 85:
status = "strong"
elif score >= 70:
status = "usable_with_gaps"
else:
status = "weak"
return {
"schema_version": QUESTION_QUALITY_SCHEMA_VERSION,
"status": status,
"score": score,
"turns_total": len(question_reviews),
"tag_counts": dict(sorted(tag_counts.items())),
"domain_counts": dict(sorted(domain_counts.items())),
"weak_flag_counts": dict(sorted(weak_flag_counts.items())),
"duplicate_questions": duplicate_questions[:20],
"questions": question_reviews,
}
def build_step_for_pair(pair: dict[str, Any]) -> dict[str, Any]:
pair_index = int(pair["pair_index"])
question = str(pair.get("user", {}).get("text") or "").strip()
title = compact_preview(question, limit=80) or f"Turn {pair_index}"
return {
"step_id": f"turn_{pair_index:03d}",
"title": title,
"depends_on": [],
"question_template": question,
"invariant_severity": {
"answer_layering_noise": "P1",
"business_answer_too_verbose": "P1",
},
}
def build_step_state_for_pair(
*,
run_id: str,
session: dict[str, Any],
pair: dict[str, Any],
) -> dict[str, Any]:
pair_index = int(pair["pair_index"])
question = str(pair.get("user", {}).get("text") or "").strip()
assistant_item = pair.get("assistant") if isinstance(pair.get("assistant"), dict) else {}
assistant_text = str(assistant_item.get("text") or "")
debug = assistant_item.get("debug") if isinstance(assistant_item.get("debug"), dict) else {}
turn_artifact = {
"schema_version": "assistant_stage1_gui_turn_artifact_v1",
"run_id": run_id,
"session_id": session.get("session_id"),
"pair_index": pair_index,
"user_message": pair.get("user"),
"assistant_message": assistant_item,
"technical_debug_payload": debug,
"session_summary": {
"session_id": session.get("session_id"),
"started_at": session.get("started_at"),
"updated_at": session.get("updated_at"),
"address_navigation_state": session.get("address_navigation_state"),
"investigation_state": session.get("investigation_state"),
"counters": session.get("counters"),
"reply_types": session.get("reply_types"),
},
}
entries = dcl.extract_structured_entries(assistant_text)
return dcl.build_scenario_step_state(
scenario_id=run_id,
domain="assistant_stage1_gui_run",
step=build_step_for_pair(pair),
step_index=pair_index,
question_resolved=question,
analysis_context={},
turn_artifact=turn_artifact,
entries=entries,
)
def severity_rank(severity: str) -> int:
return {"P0": 0, "P1": 1, "P2": 2, "WARNING": 3}.get(str(severity or "").upper(), 4)
def max_issue_severity(step_state: dict[str, Any], issue_codes: list[str]) -> str:
if not issue_codes:
return "none"
severities = [dcl.derive_invariant_severity(step_state, code) for code in issue_codes]
return sorted(severities, key=severity_rank)[0]
def build_finding(step_state: dict[str, Any], session_id: str | None) -> dict[str, Any] | None:
review = step_state.get("business_first_review") if isinstance(step_state.get("business_first_review"), dict) else {}
issue_codes = [str(item) for item in review.get("issue_codes", []) if str(item).strip()]
if not issue_codes:
return None
issue_severities = {code: dcl.derive_invariant_severity(step_state, code) for code in issue_codes}
severity = max_issue_severity(step_state, issue_codes)
return {
"finding_type": "business_answer_quality",
"severity": severity,
"issue_severities": issue_severities,
"session_id": session_id,
"turn_index": step_state.get("step_index"),
"step_id": step_state.get("step_id"),
"question": step_state.get("question_resolved"),
"assistant_first_line": review.get("actual_direct_answer"),
"issue_codes": issue_codes,
"suggested_root_cause_layers": review.get("suggested_root_cause_layers") or [],
"answer_length_chars": review.get("answer_length_chars"),
"reply_type": step_state.get("reply_type"),
"trace_id": step_state.get("trace_id"),
"capability_id": step_state.get("capability_id"),
"selected_recipe": step_state.get("selected_recipe"),
}
def build_repair_targets(findings: list[dict[str, Any]]) -> list[dict[str, Any]]:
grouped: dict[tuple[str, str], dict[str, Any]] = {}
for finding in findings:
issue_codes = [str(item) for item in finding.get("issue_codes", []) if str(item).strip()]
layers = [str(item) for item in finding.get("suggested_root_cause_layers", []) if str(item).strip()]
if not layers:
layers = ["business_answer_quality_gap"]
for issue_code in issue_codes:
issue_severity = (
finding.get("issue_severities", {}).get(issue_code)
if isinstance(finding.get("issue_severities"), dict)
else finding.get("severity")
)
for layer in layers:
key = (layer, issue_code)
target = grouped.setdefault(
key,
{
"problem_layer": layer,
"issue_code": issue_code,
"severity": issue_severity,
"occurrences": 0,
"sample_turns": [],
},
)
target["occurrences"] += 1
if severity_rank(str(issue_severity)) < severity_rank(str(target.get("severity"))):
target["severity"] = issue_severity
if len(target["sample_turns"]) < 5:
target["sample_turns"].append(
{
"session_id": finding.get("session_id"),
"turn_index": finding.get("turn_index"),
"question": finding.get("question"),
"assistant_first_line": finding.get("assistant_first_line"),
}
)
return sorted(
grouped.values(),
key=lambda item: (severity_rank(str(item.get("severity"))), -int(item.get("occurrences") or 0), str(item.get("issue_code"))),
)
def build_run_review(
*,
run_id: str,
session_files: list[Path],
report_path: Path,
) -> dict[str, Any]:
sessions_review: list[dict[str, Any]] = []
all_pairs: list[dict[str, Any]] = []
all_step_states: list[dict[str, Any]] = []
findings: list[dict[str, Any]] = []
for session_file in session_files:
session = load_session(session_file)
conversation = [item for item in session.get("conversation", []) if isinstance(item, dict)]
pairs = build_conversation_pairs(conversation)
session_step_states: list[dict[str, Any]] = []
for pair in pairs:
step_state = build_step_state_for_pair(run_id=run_id, session=session, pair=pair)
session_step_states.append(step_state)
all_step_states.append(step_state)
pair_record = {
"session_id": session.get("session_id"),
"pair_index": pair["pair_index"],
"user_text": pair.get("user", {}).get("text"),
"assistant_text": (pair.get("assistant") or {}).get("text") if isinstance(pair.get("assistant"), dict) else None,
"assistant_reply_type": (pair.get("assistant") or {}).get("reply_type") if isinstance(pair.get("assistant"), dict) else None,
"assistant_trace_id": (pair.get("assistant") or {}).get("trace_id") if isinstance(pair.get("assistant"), dict) else None,
}
all_pairs.append(pair_record)
finding = build_finding(step_state, str(session.get("session_id") or ""))
if finding is not None:
findings.append(finding)
sessions_review.append(
{
"session_file": repo_relative(session_file),
"session_id": session.get("session_id"),
"conversation_items": len(conversation),
"pairs_total": len(pairs),
"business_issue_turns": sum(
1
for item in session_step_states
if (item.get("business_first_review") or {}).get("issue_codes")
),
}
)
issue_counter = Counter(code for finding in findings for code in finding.get("issue_codes", []))
severity_counter = Counter(str(finding.get("severity") or "none") for finding in findings)
runtime_status_counts = Counter(str(item.get("execution_status") or "unknown") for item in all_step_states)
p0_findings = [item for item in findings if item.get("severity") == "P0"]
p1_findings = [item for item in findings if item.get("severity") == "P1"]
if p0_findings:
overall_status = "fail"
elif p1_findings:
overall_status = "warning"
else:
overall_status = "pass"
question_quality = build_question_quality_review(
[
{
"pair_index": item["pair_index"],
"user": {"text": item.get("user_text")},
}
for item in all_pairs
]
)
repair_targets = build_repair_targets(findings)
report_metadata = parse_report_metadata(report_path)
return {
"schema_version": RUN_REVIEW_SCHEMA_VERSION,
"run_id": run_id,
"reviewed_at": now_iso(),
"source": {
"report_path": repo_relative(report_path) if report_path.exists() else None,
"report_metadata": report_metadata,
"session_files": [repo_relative(path) for path in session_files],
},
"summary": {
"overall_business_status": overall_status,
"sessions_total": len(session_files),
"turn_pairs_total": len(all_pairs),
"business_issue_turns": len(findings),
"p0_findings": len(p0_findings),
"p1_findings": len(p1_findings),
"issue_counts": dict(sorted(issue_counter.items())),
"severity_counts": dict(sorted(severity_counter.items())),
"runtime_status_counts": dict(sorted(runtime_status_counts.items())),
"question_quality_status": question_quality["status"],
"question_quality_score": question_quality["score"],
},
"sessions": sessions_review,
"question_quality_review": question_quality,
"findings": findings,
"repair_targets": repair_targets,
"conversation_pairs": all_pairs,
"step_states": all_step_states,
}
def build_review_markdown(review: dict[str, Any]) -> str:
summary = review.get("summary") if isinstance(review.get("summary"), dict) else {}
question_quality = (
review.get("question_quality_review")
if isinstance(review.get("question_quality_review"), dict)
else {}
)
lines = [
"# Assistant Stage 1 GUI Run Review",
"",
f"- run_id: `{review.get('run_id')}`",
f"- overall_business_status: `{summary.get('overall_business_status')}`",
f"- turn_pairs_total: `{summary.get('turn_pairs_total')}`",
f"- business_issue_turns: `{summary.get('business_issue_turns')}`",
f"- p0_findings: `{summary.get('p0_findings')}`",
f"- p1_findings: `{summary.get('p1_findings')}`",
f"- question_quality: `{summary.get('question_quality_status')}` / `{summary.get('question_quality_score')}`",
"",
"## Question Quality",
"",
f"- status: `{question_quality.get('status')}`",
f"- score: `{question_quality.get('score')}`",
f"- tag_counts: `{json.dumps(question_quality.get('tag_counts') or {}, ensure_ascii=False, sort_keys=True)}`",
f"- domain_counts: `{json.dumps(question_quality.get('domain_counts') or {}, ensure_ascii=False, sort_keys=True)}`",
f"- weak_flag_counts: `{json.dumps(question_quality.get('weak_flag_counts') or {}, ensure_ascii=False, sort_keys=True)}`",
"",
"## Business Findings",
]
findings = review.get("findings") if isinstance(review.get("findings"), list) else []
if not findings:
lines.append("")
lines.append("- no business-first answer quality findings")
else:
for finding in findings[:80]:
lines.extend(
[
"",
f"### Turn {finding.get('turn_index')} - {finding.get('severity')}",
"",
f"- issue_codes: `{', '.join(str(item) for item in finding.get('issue_codes') or [])}`",
f"- root_cause_layers: `{', '.join(str(item) for item in finding.get('suggested_root_cause_layers') or []) or 'n/a'}`",
f"- reply_type: `{finding.get('reply_type') or 'n/a'}`",
f"- capability_id: `{finding.get('capability_id') or 'n/a'}`",
f"- selected_recipe: `{finding.get('selected_recipe') or 'n/a'}`",
f"- question: {compact_preview(finding.get('question'), 500)}",
f"- assistant_first_line: {compact_preview(finding.get('assistant_first_line'), 500) or 'n/a'}",
]
)
lines.extend(["", "## Repair Targets"])
repair_targets = review.get("repair_targets") if isinstance(review.get("repair_targets"), list) else []
if not repair_targets:
lines.append("")
lines.append("- no repair targets")
else:
for target in repair_targets[:30]:
lines.append(
f"- `{target.get('severity')}` `{target.get('problem_layer')}` / `{target.get('issue_code')}`: "
f"{target.get('occurrences')} occurrence(s)"
)
return "\n".join(lines).strip() + "\n"
def save_run_review(review: dict[str, Any], output_dir: Path) -> None:
write_json(output_dir / "run_review.json", review)
write_text(output_dir / "run_review.md", build_review_markdown(review))
write_json(output_dir / "conversation_pairs.json", review.get("conversation_pairs") or [])
write_json(output_dir / "question_quality_review.json", review.get("question_quality_review") or {})
write_json(output_dir / "repair_targets.json", review.get("repair_targets") or [])
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Review a GUI assistant_stage1 saved-session run by assistant-stage1-* id."
)
parser.add_argument("run_id", help="Run id or text containing assistant-stage1-...")
parser.add_argument("--session-file", type=Path, default=None, help="Explicit assistant session JSON file.")
parser.add_argument("--sessions-dir", type=Path, default=DEFAULT_SESSIONS_DIR)
parser.add_argument("--reports-dir", type=Path, default=DEFAULT_REPORTS_DIR)
parser.add_argument("--output-root", type=Path, default=DEFAULT_OUTPUT_ROOT)
parser.add_argument("--output-dir", type=Path, default=None)
parser.add_argument("--print-summary", action="store_true")
return parser
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
run_id = run_id_from_value(args.run_id)
report_path = args.reports_dir / f"{run_id}.md"
session_files = resolve_session_files(
run_id=run_id,
sessions_dir=args.sessions_dir,
explicit_session_file=args.session_file,
)
review = build_run_review(run_id=run_id, session_files=session_files, report_path=report_path)
output_dir = args.output_dir or (args.output_root / run_id)
save_run_review(review, output_dir)
if args.print_summary:
summary = review["summary"]
print(
json.dumps(
{
"run_id": run_id,
"output_dir": repo_relative(output_dir),
"overall_business_status": summary["overall_business_status"],
"turn_pairs_total": summary["turn_pairs_total"],
"business_issue_turns": summary["business_issue_turns"],
"question_quality_score": summary["question_quality_score"],
},
ensure_ascii=False,
indent=2,
)
)
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))

View File

@ -14,6 +14,7 @@ REPO_ROOT = Path(__file__).resolve().parents[1]
HISTORY_FILE = REPO_ROOT / "llm_normalizer" / "data" / "autorun_generators" / "history.json" HISTORY_FILE = REPO_ROOT / "llm_normalizer" / "data" / "autorun_generators" / "history.json"
SAVED_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "autorun_generators" / "saved_sessions" SAVED_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "autorun_generators" / "saved_sessions"
EVAL_CASES_DIR = REPO_ROOT / "llm_normalizer" / "data" / "eval_cases" EVAL_CASES_DIR = REPO_ROOT / "llm_normalizer" / "data" / "eval_cases"
VALIDATED_AGENT_SAVE_SCHEMA_VERSION = "agent_semantic_save_gate_v1"
def now_utc() -> datetime: def now_utc() -> datetime:
@ -54,6 +55,188 @@ def write_json(path: Path, payload: Any) -> None:
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
def resolve_repo_path(raw_path: str | Path) -> Path:
path = Path(raw_path)
return path if path.is_absolute() else (REPO_ROOT / path).resolve()
def repo_relative(path: Path) -> str:
try:
return str(path.resolve().relative_to(REPO_ROOT))
except ValueError:
return str(path.resolve())
def load_json_object(path: Path, label: str) -> dict[str, Any]:
if not path.exists():
raise RuntimeError(f"{label} not found: {path}")
parsed = load_json(path)
if not isinstance(parsed, dict):
raise RuntimeError(f"{label} must be a JSON object: {path}")
return parsed
def assert_status(value: Any, expected: str, label: str, problems: list[str]) -> None:
actual = str(value or "").strip().lower()
if actual != expected:
problems.append(f"{label}={actual or 'missing'}")
def validate_truth_harness_run_dir(run_dir: Path) -> dict[str, Any]:
run_dir = run_dir.resolve()
pack_state = load_json_object(run_dir / "pack_state.json", "Validated run pack_state.json")
truth_review = load_json_object(run_dir / "truth_review.json", "Validated run truth_review.json")
business_review = load_json_object(run_dir / "business_review.json", "Validated run business_review.json")
truth_summary = truth_review.get("summary") if isinstance(truth_review.get("summary"), dict) else {}
problems: list[str] = []
assert_status(pack_state.get("final_status"), "accepted", "pack_state.final_status", problems)
assert_status(pack_state.get("review_overall_status"), "pass", "pack_state.review_overall_status", problems)
assert_status(truth_summary.get("overall_status"), "pass", "truth_review.summary.overall_status", problems)
assert_status(business_review.get("overall_business_status"), "pass", "business_review.overall_business_status", problems)
if pack_state.get("acceptance_gate_passed") is not True:
problems.append("pack_state.acceptance_gate_passed=false")
if pack_state.get("no_unresolved_p0") is not True:
problems.append("pack_state.no_unresolved_p0=false")
if int(pack_state.get("unresolved_p0_count") or 0) != 0:
problems.append(f"pack_state.unresolved_p0_count={pack_state.get('unresolved_p0_count')}")
if int(business_review.get("steps_with_business_failures") or 0) != 0:
problems.append(f"business_review.steps_with_business_failures={business_review.get('steps_with_business_failures')}")
if problems:
raise RuntimeError(
"Refusing to save AGENT autorun because the validated run is not clean: "
+ ", ".join(problems)
)
return {
"schema_version": VALIDATED_AGENT_SAVE_SCHEMA_VERSION,
"validation_status": "accepted_live_replay",
"validated_run_dir": repo_relative(run_dir),
"final_status": pack_state.get("final_status"),
"review_overall_status": pack_state.get("review_overall_status"),
"business_overall_status": business_review.get("overall_business_status"),
"steps_total": pack_state.get("steps_total"),
"steps_passed": pack_state.get("steps_passed"),
"steps_failed": pack_state.get("steps_failed"),
"steps_with_business_failures": business_review.get("steps_with_business_failures"),
"steps_with_business_warnings": business_review.get("steps_with_business_warnings"),
"acceptance_gate_passed": pack_state.get("acceptance_gate_passed"),
"saved_after_validated_replay": True,
}
def validate_domain_pack_loop_dir(loop_dir: Path) -> dict[str, Any]:
loop_dir = loop_dir.resolve()
loop_state = load_json_object(loop_dir / "loop_state.json", "Validated loop_state.json")
iterations = loop_state.get("iterations")
if not isinstance(iterations, list) or not iterations:
raise RuntimeError("Refusing to save AGENT autorun because the validated loop has no iterations")
accepted_iterations = [
item for item in iterations if isinstance(item, dict) and bool(item.get("accepted_gate"))
]
last_iteration = accepted_iterations[-1] if accepted_iterations else iterations[-1]
if not isinstance(last_iteration, dict):
raise RuntimeError("Refusing to save AGENT autorun because the validated loop iteration is invalid")
analyst_path_raw = str(last_iteration.get("analyst_verdict_path") or "").strip()
repair_targets_path_raw = str(last_iteration.get("repair_targets_path") or "").strip()
analyst_verdict = load_json_object(resolve_repo_path(analyst_path_raw), "Validated loop analyst_verdict.json")
repair_targets = load_json_object(resolve_repo_path(repair_targets_path_raw), "Validated loop repair_targets.json")
severity_counts = repair_targets.get("severity_counts") if isinstance(repair_targets.get("severity_counts"), dict) else {}
problems: list[str] = []
assert_status(loop_state.get("final_status"), "accepted", "loop_state.final_status", problems)
if last_iteration.get("accepted_gate") is not True:
problems.append("last_iteration.accepted_gate=false")
if last_iteration.get("analyst_accepted_gate") is not True:
problems.append("last_iteration.analyst_accepted_gate=false")
if last_iteration.get("deterministic_gate_ok") is not True:
problems.append("last_iteration.deterministic_gate_ok=false")
if int(last_iteration.get("quality_score") or 0) < int(loop_state.get("target_score") or 80):
problems.append(
f"last_iteration.quality_score={last_iteration.get('quality_score')}<target_score={loop_state.get('target_score')}"
)
assert_status(analyst_verdict.get("loop_decision"), "accepted", "analyst_verdict.loop_decision", problems)
if int(analyst_verdict.get("unresolved_p0_count") or 0) != 0:
problems.append(f"analyst_verdict.unresolved_p0_count={analyst_verdict.get('unresolved_p0_count')}")
if bool(analyst_verdict.get("regression_detected")):
problems.append("analyst_verdict.regression_detected=true")
for field_name in (
"direct_answer_ok",
"business_usefulness_ok",
"temporal_honesty_ok",
"field_truth_ok",
"answer_layering_ok",
):
if analyst_verdict.get(field_name) is not True:
problems.append(f"analyst_verdict.{field_name}=false")
if int(severity_counts.get("P0") or 0) != 0 or int(severity_counts.get("P1") or 0) != 0:
problems.append(
f"repair_targets.severity_counts=P0:{severity_counts.get('P0') or 0},P1:{severity_counts.get('P1') or 0}"
)
if problems:
raise RuntimeError(
"Refusing to save AGENT autorun because the validated stage/domain loop is not clean: "
+ ", ".join(problems)
)
return {
"schema_version": VALIDATED_AGENT_SAVE_SCHEMA_VERSION,
"validation_status": "accepted_domain_pack_loop",
"validated_run_dir": repo_relative(loop_dir),
"final_status": loop_state.get("final_status"),
"loop_id": loop_state.get("loop_id"),
"target_score": loop_state.get("target_score"),
"iterations_ran": len(iterations),
"quality_score": last_iteration.get("quality_score"),
"repair_target_count": last_iteration.get("repair_target_count"),
"repair_target_severity_counts": last_iteration.get("repair_target_severity_counts"),
"accepted_gate": last_iteration.get("accepted_gate"),
"saved_after_validated_replay": True,
}
def validate_accepted_run_dir(run_dir: Path) -> dict[str, Any]:
run_dir = run_dir.resolve()
if (run_dir / "loop_state.json").exists():
return validate_domain_pack_loop_dir(run_dir)
return validate_truth_harness_run_dir(run_dir)
def build_save_gate_metadata(args: argparse.Namespace, spec: dict[str, Any], spec_path: Path) -> dict[str, Any]:
raw_run_dir = args.validated_run_dir or spec.get("validated_run_dir") or spec.get("validated_artifact_dir")
if raw_run_dir:
return validate_accepted_run_dir(resolve_repo_path(str(raw_run_dir)))
if args.dry_run:
return {
"schema_version": VALIDATED_AGENT_SAVE_SCHEMA_VERSION,
"validation_status": "dry_run_unvalidated",
"source_spec_file": repo_relative(spec_path),
"saved_after_validated_replay": False,
}
if args.allow_unvalidated:
reason = str(args.unvalidated_reason or "").strip()
if not reason:
raise RuntimeError("--unvalidated-reason is required when --allow-unvalidated is used")
return {
"schema_version": VALIDATED_AGENT_SAVE_SCHEMA_VERSION,
"validation_status": "explicitly_unvalidated",
"source_spec_file": repo_relative(spec_path),
"unvalidated_reason": reason,
"saved_after_validated_replay": False,
}
raise RuntimeError(
"Refusing to save AGENT autorun before a reviewed live replay. "
"Pass --validated-run-dir artifacts/domain_runs/<run_id> after run-live/review-export is accepted, "
"or use --allow-unvalidated --unvalidated-reason only for an explicit draft."
)
def normalize_questions(raw_questions: list[Any]) -> list[str]: def normalize_questions(raw_questions: list[Any]) -> list[str]:
result: list[str] = [] result: list[str] = []
seen: set[str] = set() seen: set[str] = set()
@ -90,9 +273,31 @@ def extract_questions_from_spec(spec: dict[str, Any]) -> list[str]:
steps = spec.get("steps") steps = spec.get("steps")
if isinstance(steps, list): if isinstance(steps, list):
return normalize_questions( return normalize_questions(
[step.get("question") for step in steps if isinstance(step, dict) and step.get("question")] [
step.get("question") or step.get("question_template")
for step in steps
if isinstance(step, dict) and (step.get("question") or step.get("question_template"))
]
) )
raise RuntimeError("Spec must define either `questions[]` or `steps[].question`") scenarios = spec.get("scenarios")
if isinstance(scenarios, list):
raw_questions: list[Any] = []
for scenario in scenarios:
if not isinstance(scenario, dict):
continue
scenario_steps = scenario.get("steps")
if not isinstance(scenario_steps, list):
continue
raw_questions.extend(
step.get("question") or step.get("question_template")
for step in scenario_steps
if isinstance(step, dict) and (step.get("question") or step.get("question_template"))
)
return normalize_questions(raw_questions)
raise RuntimeError(
"Spec must define `questions[]`, `steps[].question`, `steps[].question_template`, "
"or `scenarios[].steps[]` questions"
)
def build_case_set_payload( def build_case_set_payload(
@ -203,6 +408,9 @@ def build_history_record(
"source_spec_file": metadata.get("source_spec_file"), "source_spec_file": metadata.get("source_spec_file"),
"scenario_id": metadata.get("scenario_id"), "scenario_id": metadata.get("scenario_id"),
"semantic_tags": metadata.get("semantic_tags"), "semantic_tags": metadata.get("semantic_tags"),
"validation_status": metadata.get("validation_status"),
"validated_run_dir": metadata.get("validated_run_dir"),
"saved_after_validated_replay": metadata.get("saved_after_validated_replay"),
} }
return { return {
"generation_id": generation_id, "generation_id": generation_id,
@ -218,7 +426,12 @@ def build_history_record(
} }
def build_metadata(args: argparse.Namespace, spec: dict[str, Any], spec_path: Path | None) -> dict[str, Any]: def build_metadata(
args: argparse.Namespace,
spec: dict[str, Any],
spec_path: Path | None,
save_gate: dict[str, Any],
) -> dict[str, Any]:
semantic_tags = extract_semantic_tags(spec) semantic_tags = extract_semantic_tags(spec)
return { return {
"assistant_prompt_version": args.assistant_prompt_version, "assistant_prompt_version": args.assistant_prompt_version,
@ -229,6 +442,10 @@ def build_metadata(args: argparse.Namespace, spec: dict[str, Any], spec_path: Pa
"source_spec_file": str(spec_path.resolve()) if spec_path else None, "source_spec_file": str(spec_path.resolve()) if spec_path else None,
"scenario_id": str(spec.get("scenario_id") or "").strip() or None, "scenario_id": str(spec.get("scenario_id") or "").strip() or None,
"semantic_tags": semantic_tags, "semantic_tags": semantic_tags,
"validation_status": save_gate.get("validation_status"),
"validated_run_dir": save_gate.get("validated_run_dir"),
"saved_after_validated_replay": save_gate.get("saved_after_validated_replay"),
"save_gate": save_gate,
} }
@ -242,6 +459,19 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--assistant-prompt-version", help="Optional assistant prompt version metadata.") parser.add_argument("--assistant-prompt-version", help="Optional assistant prompt version metadata.")
parser.add_argument("--decomposition-prompt-version", help="Optional decomposition prompt version metadata.") parser.add_argument("--decomposition-prompt-version", help="Optional decomposition prompt version metadata.")
parser.add_argument("--prompt-fingerprint", help="Optional prompt fingerprint metadata.") parser.add_argument("--prompt-fingerprint", help="Optional prompt fingerprint metadata.")
parser.add_argument(
"--validated-run-dir",
help="Accepted truth-harness artifact directory containing pack_state.json, truth_review.json, and business_review.json.",
)
parser.add_argument(
"--allow-unvalidated",
action="store_true",
help="Explicitly save a draft AGENT run without accepted replay artifacts. This is not an acceptance proof.",
)
parser.add_argument(
"--unvalidated-reason",
help="Required explanation when --allow-unvalidated is used.",
)
parser.add_argument("--dry-run", action="store_true", help="Print resulting record metadata without writing files.") parser.add_argument("--dry-run", action="store_true", help="Print resulting record metadata without writing files.")
return parser.parse_args() return parser.parse_args()
@ -262,10 +492,11 @@ def main() -> int:
if not questions: if not questions:
raise RuntimeError("Agent semantic run must contain at least one question") raise RuntimeError("Agent semantic run must contain at least one question")
save_gate = build_save_gate_metadata(args, spec_raw, spec_path)
domain = str(spec_raw.get("domain") or "").strip() or None domain = str(spec_raw.get("domain") or "").strip() or None
source_title = str(args.title or spec_raw.get("title") or spec_path.stem).strip() source_title = str(args.title or spec_raw.get("title") or spec_path.stem).strip()
title = ensure_agent_title(source_title) title = ensure_agent_title(source_title)
metadata = build_metadata(args, spec_raw, spec_path) metadata = build_metadata(args, spec_raw, spec_path, save_gate)
timestamp = now_utc() timestamp = now_utc()
generation_id = generate_id(timestamp) generation_id = generate_id(timestamp)
@ -307,6 +538,8 @@ def main() -> int:
"case_set_file": case_set_file, "case_set_file": case_set_file,
"saved_session_file": saved_session_file, "saved_session_file": saved_session_file,
"domain": domain, "domain": domain,
"validation_status": save_gate.get("validation_status"),
"validated_run_dir": save_gate.get("validated_run_dir"),
}, },
ensure_ascii=False, ensure_ascii=False,
indent=2, indent=2,
@ -329,6 +562,8 @@ def main() -> int:
"questions_total": len(questions), "questions_total": len(questions),
"case_set_file": case_set_file, "case_set_file": case_set_file,
"saved_session_file": saved_session_file, "saved_session_file": saved_session_file,
"validation_status": save_gate.get("validation_status"),
"validated_run_dir": save_gate.get("validated_run_dir"),
}, },
ensure_ascii=False, ensure_ascii=False,
indent=2, indent=2,

406
scripts/stage_agent_loop.py Normal file
View File

@ -0,0 +1,406 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
REPO_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_STAGE_OUTPUT_ROOT = REPO_ROOT / "artifacts" / "domain_runs" / "stage_agent_loops"
STAGE_LOOP_SCHEMA_VERSION = "stage_agent_loop_manifest_v1"
STAGE_SUMMARY_SCHEMA_VERSION = "stage_agent_loop_summary_v1"
def now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def slugify(value: str, fallback: str = "stage_agent_loop") -> str:
normalized = re.sub(r"[^a-zA-Z0-9_.-]+", "_", str(value or "").strip()).strip("_.-")
return normalized or fallback
def load_json(path: Path) -> Any:
return json.loads(path.read_text(encoding="utf-8"))
def load_json_object(path: Path, label: str) -> dict[str, Any]:
if not path.exists():
raise RuntimeError(f"{label} not found: {path}")
parsed = load_json(path)
if not isinstance(parsed, dict):
raise RuntimeError(f"{label} must be a JSON object: {path}")
return parsed
def write_text(path: Path, text: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text, encoding="utf-8")
def write_json(path: Path, payload: Any) -> None:
write_text(path, json.dumps(payload, ensure_ascii=False, indent=2) + "\n")
def repo_path(raw_path: str | Path) -> Path:
path = Path(raw_path)
return path if path.is_absolute() else (REPO_ROOT / path).resolve()
def repo_relative(path: Path) -> str:
try:
return str(path.resolve().relative_to(REPO_ROOT))
except ValueError:
return str(path.resolve())
def string_list(value: Any) -> list[str]:
if not isinstance(value, list):
return []
result: list[str] = []
for item in value:
text = str(item or "").strip()
if text:
result.append(text)
return result
def load_stage_manifest(path: Path) -> dict[str, Any]:
raw = load_json_object(path, "Stage agent loop manifest")
stage_id = slugify(str(raw.get("stage_id") or path.stem), path.stem)
pack_manifest = str(raw.get("pack_manifest") or "").strip()
if not pack_manifest:
raise RuntimeError("Stage manifest must define `pack_manifest` for the autonomous stage loop")
target_score = int(raw.get("target_score") or 88)
max_iterations = int(raw.get("max_iterations") or 6)
if target_score < 0 or target_score > 100:
raise RuntimeError("Stage manifest `target_score` must be between 0 and 100")
if max_iterations < 1:
raise RuntimeError("Stage manifest `max_iterations` must be >= 1")
return {
**raw,
"schema_version": str(raw.get("schema_version") or STAGE_LOOP_SCHEMA_VERSION),
"stage_id": stage_id,
"module_name": str(raw.get("module_name") or raw.get("domain") or "unknown_module").strip(),
"title": str(raw.get("title") or stage_id).strip(),
"pack_manifest": pack_manifest,
"target_score": target_score,
"max_iterations": max_iterations,
"global_plan_refs": string_list(raw.get("global_plan_refs")),
"acceptance_invariants": string_list(raw.get("acceptance_invariants")),
"save_autorun_on_accept": bool(raw.get("save_autorun_on_accept", True)),
"manual_confirmation_required_after_accept": bool(raw.get("manual_confirmation_required_after_accept", True)),
}
def stage_dir_for(output_root: Path, stage_id: str) -> Path:
return output_root.resolve() / slugify(stage_id)
def stage_loop_dir(stage_dir: Path, stage_manifest: dict[str, Any]) -> Path:
loop_id = str(stage_manifest.get("loop_id") or stage_manifest["stage_id"]).strip()
return stage_dir / "domain_loops" / slugify(loop_id)
def build_domain_pack_loop_command(args: argparse.Namespace, stage_manifest: dict[str, Any], stage_dir: Path) -> list[str]:
loop_id = str(stage_manifest.get("loop_id") or stage_manifest["stage_id"]).strip()
command = [
sys.executable,
str(REPO_ROOT / "scripts" / "domain_case_loop.py"),
"run-pack-loop",
"--manifest",
str(repo_path(stage_manifest["pack_manifest"])),
"--loop-id",
loop_id,
"--output-root",
str(stage_dir / "domain_loops"),
"--target-score",
str(int(stage_manifest["target_score"])),
"--max-iterations",
str(int(stage_manifest["max_iterations"])),
"--backend-url",
str(args.backend_url),
"--prompt-version",
str(args.prompt_version),
"--llm-provider",
str(args.llm_provider),
"--llm-model",
str(args.llm_model),
"--llm-base-url",
str(args.llm_base_url),
"--llm-api-key",
str(args.llm_api_key),
"--temperature",
str(args.temperature),
"--max-output-tokens",
str(args.max_output_tokens),
"--timeout-seconds",
str(args.timeout_seconds),
"--codex-binary",
str(args.codex_binary),
"--analyst-codex-model",
str(args.analyst_codex_model),
"--coder-codex-model",
str(args.coder_codex_model),
"--analyst-reasoning-effort",
str(args.analyst_reasoning_effort),
"--coder-reasoning-effort",
str(args.coder_reasoning_effort),
"--codex-timeout-seconds",
str(args.codex_timeout_seconds),
]
if args.codex_profile:
command.extend(["--codex-profile", str(args.codex_profile)])
if args.codex_model:
command.extend(["--codex-model", str(args.codex_model)])
if args.analysis_date:
command.extend(["--analysis-date", str(args.analysis_date)])
if args.max_scenarios is not None:
command.extend(["--max-scenarios", str(int(args.max_scenarios))])
if args.use_mock:
command.append("--use-mock")
return command
def run_command(command: list[str], cwd: Path, stdout_path: Path, stderr_path: Path, timeout_seconds: int) -> None:
result = subprocess.run(
command,
cwd=str(cwd),
text=True,
encoding="utf-8",
errors="replace",
capture_output=True,
timeout=timeout_seconds,
check=False,
)
write_text(stdout_path, result.stdout)
write_text(stderr_path, result.stderr)
if result.returncode != 0:
raise RuntimeError(f"Command failed with exit code {result.returncode}: {' '.join(command)}")
def build_stage_summary(stage_manifest: dict[str, Any], loop_dir: Path) -> dict[str, Any]:
loop_state = load_json_object(loop_dir / "loop_state.json", "Stage domain loop_state.json")
iterations = loop_state.get("iterations") if isinstance(loop_state.get("iterations"), list) else []
last_iteration = iterations[-1] if iterations and isinstance(iterations[-1], dict) else {}
final_status = str(loop_state.get("final_status") or "unknown").strip()
accepted = final_status == "accepted" and bool(last_iteration.get("accepted_gate"))
manual_confirmation_required = bool(stage_manifest.get("manual_confirmation_required_after_accept", True)) and accepted
if accepted and manual_confirmation_required:
next_action = "manual_gui_confirmation"
elif accepted:
next_action = "stage_closed_without_manual_confirmation"
elif bool(loop_state.get("last_user_decision_prompt")):
next_action = "user_decision_required"
else:
next_action = "continue_autonomous_or_fix_blocker"
return {
"schema_version": STAGE_SUMMARY_SCHEMA_VERSION,
"stage_id": stage_manifest["stage_id"],
"module_name": stage_manifest.get("module_name"),
"title": stage_manifest.get("title"),
"global_plan_refs": stage_manifest.get("global_plan_refs") or [],
"target_score": stage_manifest.get("target_score"),
"acceptance_invariants": stage_manifest.get("acceptance_invariants") or [],
"loop_dir": repo_relative(loop_dir),
"loop_final_status": final_status,
"stop_reason": loop_state.get("stop_reason"),
"iterations_ran": len(iterations),
"last_quality_score": last_iteration.get("quality_score"),
"last_analyst_decision": last_iteration.get("loop_decision") or loop_state.get("last_analyst_decision"),
"last_deterministic_gate_ok": last_iteration.get("deterministic_gate_ok"),
"last_deterministic_gate_reason": last_iteration.get("deterministic_gate_reason"),
"accepted_gate": bool(last_iteration.get("accepted_gate")),
"manual_confirmation_required": manual_confirmation_required,
"next_action": next_action,
"save_autorun_on_accept": bool(stage_manifest.get("save_autorun_on_accept", True)),
"updated_at": now_iso(),
}
def build_stage_handoff_markdown(summary: dict[str, Any]) -> str:
lines = [
"# Stage agent loop handoff",
"",
f"- stage_id: `{summary.get('stage_id')}`",
f"- module_name: `{summary.get('module_name')}`",
f"- title: {summary.get('title')}",
f"- loop_final_status: `{summary.get('loop_final_status')}`",
f"- target_score: `{summary.get('target_score')}`",
f"- iterations_ran: `{summary.get('iterations_ran')}`",
f"- last_quality_score: `{summary.get('last_quality_score')}`",
f"- accepted_gate: `{summary.get('accepted_gate')}`",
f"- deterministic_gate_ok: `{summary.get('last_deterministic_gate_ok')}`",
f"- deterministic_gate_reason: `{summary.get('last_deterministic_gate_reason') or 'n/a'}`",
f"- manual_confirmation_required: `{summary.get('manual_confirmation_required')}`",
f"- next_action: `{summary.get('next_action')}`",
f"- loop_dir: `{summary.get('loop_dir')}`",
f"- stop_reason: {summary.get('stop_reason') or 'n/a'}",
"",
"## Plan refs",
]
refs = summary.get("global_plan_refs") or []
lines.extend([f"- {item}" for item in refs] if refs else ["- none"])
lines.extend(["", "## Acceptance invariants"])
invariants = summary.get("acceptance_invariants") or []
lines.extend([f"- {item}" for item in invariants] if invariants else ["- domain loop gate + analyst verdict"])
return "\n".join(lines).strip() + "\n"
def save_stage_summary(stage_dir: Path, summary: dict[str, Any]) -> None:
write_json(stage_dir / "stage_loop_summary.json", summary)
write_text(stage_dir / "stage_loop_handoff.md", build_stage_handoff_markdown(summary))
def build_save_autorun_command(args: argparse.Namespace, stage_manifest: dict[str, Any], loop_dir: Path) -> list[str]:
return [
sys.executable,
str(REPO_ROOT / "scripts" / "save_agent_semantic_run.py"),
"--spec",
str(repo_path(stage_manifest["pack_manifest"])),
"--validated-run-dir",
str(loop_dir),
"--title",
f"AGENT | {stage_manifest.get('title') or stage_manifest['stage_id']}",
"--architecture-phase",
str(stage_manifest.get("architecture_phase") or stage_manifest.get("module_name") or "stage_agent_loop"),
"--agent-focus",
str(stage_manifest.get("agent_focus") or stage_manifest.get("title") or stage_manifest["stage_id"]),
]
def handle_plan(args: argparse.Namespace) -> int:
stage_manifest_path = repo_path(args.manifest)
stage_manifest = load_stage_manifest(stage_manifest_path)
stage_dir = stage_dir_for(repo_path(args.output_root), stage_manifest["stage_id"])
command = build_domain_pack_loop_command(args, stage_manifest, stage_dir)
payload = {
"schema_version": STAGE_SUMMARY_SCHEMA_VERSION,
"stage_manifest": repo_relative(stage_manifest_path),
"stage_id": stage_manifest["stage_id"],
"stage_dir": repo_relative(stage_dir),
"loop_dir": repo_relative(stage_loop_dir(stage_dir, stage_manifest)),
"domain_pack_loop_command": command,
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
def handle_summarize(args: argparse.Namespace) -> int:
stage_manifest_path = repo_path(args.manifest)
stage_manifest = load_stage_manifest(stage_manifest_path)
stage_dir = stage_dir_for(repo_path(args.output_root), stage_manifest["stage_id"])
loop_dir = repo_path(args.loop_dir) if args.loop_dir else stage_loop_dir(stage_dir, stage_manifest)
summary = build_stage_summary(stage_manifest, loop_dir)
save_stage_summary(stage_dir, summary)
print(json.dumps(summary, ensure_ascii=False, indent=2))
return 0
def handle_run(args: argparse.Namespace) -> int:
stage_manifest_path = repo_path(args.manifest)
stage_manifest = load_stage_manifest(stage_manifest_path)
stage_dir = stage_dir_for(repo_path(args.output_root), stage_manifest["stage_id"])
stage_dir.mkdir(parents=True, exist_ok=True)
write_json(stage_dir / "stage_manifest.json", stage_manifest)
write_text(stage_dir / "stage_manifest_source.txt", repo_relative(stage_manifest_path) + "\n")
command = build_domain_pack_loop_command(args, stage_manifest, stage_dir)
write_text(stage_dir / "domain_pack_loop.command.txt", " ".join(command) + "\n")
if args.dry_run:
print(json.dumps({"dry_run": True, "command": command}, ensure_ascii=False, indent=2))
return 0
run_command(
command,
cwd=REPO_ROOT,
stdout_path=stage_dir / "domain_pack_loop.stdout.log",
stderr_path=stage_dir / "domain_pack_loop.stderr.log",
timeout_seconds=max(3600, int(args.codex_timeout_seconds) * max(1, int(stage_manifest["max_iterations"]))),
)
loop_dir = stage_loop_dir(stage_dir, stage_manifest)
summary = build_stage_summary(stage_manifest, loop_dir)
save_stage_summary(stage_dir, summary)
if (
summary["loop_final_status"] == "accepted"
and bool(stage_manifest.get("save_autorun_on_accept", True))
and not args.no_save_autorun
):
save_command = build_save_autorun_command(args, stage_manifest, loop_dir)
write_text(stage_dir / "save_agent_semantic_run.command.txt", " ".join(save_command) + "\n")
run_command(
save_command,
cwd=REPO_ROOT,
stdout_path=stage_dir / "save_agent_semantic_run.stdout.log",
stderr_path=stage_dir / "save_agent_semantic_run.stderr.log",
timeout_seconds=120,
)
print(json.dumps(summary, ensure_ascii=False, indent=2))
return 0
def add_common_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument("--manifest", required=True)
parser.add_argument("--output-root", default=str(DEFAULT_STAGE_OUTPUT_ROOT))
parser.add_argument("--analysis-date")
parser.add_argument("--max-scenarios", type=int)
parser.add_argument("--backend-url", default="http://127.0.0.1:8787")
parser.add_argument("--prompt-version", default="address_query_runtime_v1")
parser.add_argument("--llm-provider", default="local", choices=["openai", "local"])
parser.add_argument("--llm-model", default="qwen2.5-14b-instruct-1m")
parser.add_argument("--llm-base-url", default="http://127.0.0.1:1234/v1")
parser.add_argument("--llm-api-key", default="")
parser.add_argument("--temperature", type=float, default=0.0)
parser.add_argument("--max-output-tokens", type=int, default=2048)
parser.add_argument("--timeout-seconds", type=int, default=180)
parser.add_argument("--use-mock", action="store_true")
parser.add_argument("--codex-binary", default="codex")
parser.add_argument("--codex-profile")
parser.add_argument("--codex-model")
parser.add_argument("--analyst-codex-model", default="gpt-5.4")
parser.add_argument("--coder-codex-model", default="gpt-5.4-mini")
parser.add_argument("--analyst-reasoning-effort", default="medium")
parser.add_argument("--coder-reasoning-effort", default="low")
parser.add_argument("--codex-timeout-seconds", type=int, default=1800)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Stage-level AGENT loop wrapper for NDC_1C development phases.")
subparsers = parser.add_subparsers(dest="command", required=True)
plan_parser = subparsers.add_parser("plan", help="Print the domain pack-loop command for a stage manifest.")
add_common_args(plan_parser)
plan_parser.set_defaults(func=handle_plan)
run_parser = subparsers.add_parser("run", help="Run stage pack-loop, summarize, and optionally save accepted autorun.")
add_common_args(run_parser)
run_parser.add_argument("--dry-run", action="store_true")
run_parser.add_argument("--no-save-autorun", action="store_true")
run_parser.set_defaults(func=handle_run)
summarize_parser = subparsers.add_parser("summarize", help="Build stage handoff from an existing loop_dir.")
add_common_args(summarize_parser)
summarize_parser.add_argument("--loop-dir")
summarize_parser.set_defaults(func=handle_summarize)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
try:
return int(args.func(args))
except Exception as error: # noqa: BLE001
print(f"[stage-agent-loop] error: {error}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -114,6 +114,147 @@ class DomainCaseLoopStepStateTests(unittest.TestCase):
self.assertEqual(reviewed["critical_findings_count"], 1) self.assertEqual(reviewed["critical_findings_count"], 1)
self.assertEqual(reviewed["review_findings"][0]["code"], "wrong_catalog_chain_top_match") self.assertEqual(reviewed["review_findings"][0]["code"], "wrong_catalog_chain_top_match")
def test_business_first_review_flags_dirty_direct_answer_surface(self) -> None:
step_state = dcl.build_scenario_step_state(
scenario_id="business_surface_demo",
domain="business_overview",
step={
"step_id": "step_01",
"title": "Top year",
"depends_on": [],
"question_template": "какой у нас самый доходный год",
},
step_index=1,
question_resolved="какой у нас самый доходный год",
analysis_context={},
turn_artifact={
"assistant_message": {
"reply_type": "partial_coverage",
"text": "Коротко: Ограниченный бизнес-обзор по подтвержденным строкам 1С. " + ("лишний текст " * 220),
"message_id": "msg-1",
"trace_id": "trace-1",
},
"technical_debug_payload": {},
"session_summary": {},
},
entries=[],
)
review = step_state["business_first_review"]
self.assertFalse(review["direct_answer_first_ok"])
self.assertFalse(review["business_usefulness_ok"])
self.assertIn("business_direct_answer_missing", review["issue_codes"])
self.assertIn("answer_layering_noise", review["issue_codes"])
self.assertIn("business_answer_too_verbose", review["issue_codes"])
self.assertIn("business_direct_answer_missing", step_state["violated_invariants"])
def test_business_first_review_accepts_compact_direct_answer_surface(self) -> None:
step_state = dcl.build_scenario_step_state(
scenario_id="business_surface_demo",
domain="business_overview",
step={
"step_id": "step_01",
"title": "Top year",
"depends_on": [],
"question_template": "какой у нас самый доходный год",
},
step_index=1,
question_resolved="какой у нас самый доходный год",
analysis_context={},
turn_artifact={
"assistant_message": {
"reply_type": "partial_coverage",
"text": "Коротко: самый доходный год в доступном денежном контуре 1С — 2015: 136 723 459,73 руб.\nМетод: считаю по подтвержденным входящим поступлениям.",
"message_id": "msg-1",
"trace_id": "trace-1",
},
"technical_debug_payload": {},
"session_summary": {},
},
entries=[],
)
review = step_state["business_first_review"]
self.assertTrue(review["direct_answer_first_ok"])
self.assertTrue(review["business_usefulness_ok"])
self.assertEqual(review["issue_codes"], [])
def test_business_first_review_separates_direct_answer_from_later_technical_leak(self) -> None:
question = "\u043a\u0430\u043a\u043e\u0439 \u0443 \u043d\u0430\u0441 \u0441\u0430\u043c\u044b\u0439 \u0434\u043e\u0445\u043e\u0434\u043d\u044b\u0439 \u0433\u043e\u0434"
step_state = dcl.build_scenario_step_state(
scenario_id="business_surface_demo",
domain="business_overview",
step={
"step_id": "step_01",
"title": "Top year",
"depends_on": [],
"question_template": question,
},
step_index=1,
question_resolved=question,
analysis_context={},
turn_artifact={
"assistant_message": {
"reply_type": "partial_coverage",
"text": "2015 \u2014 \u0441\u0430\u043c\u044b\u0439 \u0434\u043e\u0445\u043e\u0434\u043d\u044b\u0439 \u0433\u043e\u0434 \u043f\u043e \u043f\u043e\u0434\u0442\u0432\u0435\u0440\u0436\u0434\u0435\u043d\u043d\u044b\u043c \u0432\u0445\u043e\u0434\u044f\u0449\u0438\u043c \u0434\u0435\u043d\u044c\u0433\u0430\u043c.\nservice: capability_id=business_overview_route_template_v1",
"message_id": "msg-1",
"trace_id": "trace-1",
},
"technical_debug_payload": {},
"session_summary": {},
},
entries=[],
)
review = step_state["business_first_review"]
self.assertTrue(review["direct_answer_first_ok"])
self.assertTrue(review["technical_garbage_present"])
self.assertIn("technical_garbage_in_answer", review["issue_codes"])
self.assertNotIn("business_direct_answer_missing", review["issue_codes"])
def test_truth_harness_promotes_business_review_issues_to_findings(self) -> None:
step_state = dcl.build_scenario_step_state(
scenario_id="business_surface_demo",
domain="business_overview",
step={
"step_id": "step_01",
"title": "Top year",
"depends_on": [],
"question_template": "какой у нас самый доходный год",
},
step_index=1,
question_resolved="какой у нас самый доходный год",
analysis_context={},
turn_artifact={
"assistant_message": {
"reply_type": "partial_coverage",
"text": "Коротко: Ограниченный бизнес-обзор по подтвержденным строкам 1С. " + ("лишний текст " * 220),
"message_id": "msg-1",
"trace_id": "trace-1",
},
"technical_debug_payload": {},
"session_summary": {},
},
entries=[],
)
reviewed = dth.evaluate_truth_step(
step={
"step_id": "step_01",
"question_template": "какой у нас самый доходный год",
"criticality": "critical",
"allowed_reply_types": [],
},
step_state=step_state,
step_results={},
bindings={},
runtime_bindings={},
)
codes = [item["code"] for item in reviewed["review_findings"]]
self.assertIn("business_review:business_direct_answer_missing", codes)
self.assertIn("business_review:answer_layering_noise", codes)
self.assertEqual(reviewed["review_status"], "fail")
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -0,0 +1,155 @@
from __future__ import annotations
import json
import sys
import tempfile
import unittest
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
import review_assistant_stage1_run as reviewer
def write_json(path: Path, payload: object) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
def session_payload(conversation: list[dict[str, object]]) -> dict[str, object]:
return {
"schema_version": "assistant_session_v1",
"session_id": "assistant-stage1-test-SAVED-001",
"started_at": "2026-05-09T00:00:00Z",
"updated_at": "2026-05-09T00:01:00Z",
"conversation": conversation,
"address_navigation_state": {"session_context": {}},
"investigation_state": {},
"counters": {},
"reply_types": {},
}
class AssistantStage1RunReviewTests(unittest.TestCase):
def test_builds_conversation_pairs_without_crossing_next_user_turn(self) -> None:
conversation = [
{"role": "user", "text": "первый вопрос"},
{"role": "assistant", "text": "первый ответ"},
{"role": "user", "text": "второй вопрос"},
]
pairs = reviewer.build_conversation_pairs(conversation)
self.assertEqual(len(pairs), 2)
self.assertEqual(pairs[0]["assistant"]["text"], "первый ответ")
self.assertIsNone(pairs[1]["assistant"])
def test_review_flags_dirty_business_answer_and_writes_repair_targets(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
sessions_dir = root / "sessions"
reports_dir = root / "reports"
run_id = "assistant-stage1-test123"
session_file = sessions_dir / f"{run_id}-SAVED-001.json"
report_file = reports_dir / f"{run_id}.md"
write_json(
session_file,
session_payload(
[
{"role": "user", "text": "какой у нас самый доходный год"},
{
"role": "assistant",
"text": "Коротко: Ограниченный бизнес-обзор по подтвержденным строкам 1С. "
+ ("лишний текст " * 220),
"reply_type": "partial_coverage",
"message_id": "a-1",
"trace_id": "trace-1",
"debug": {"capability_id": "business_overview_route_template_v1"},
},
{"role": "user", "text": "по нему покажи документы"},
{
"role": "assistant",
"text": "Документы по выбранному году не найдены в подтвержденном контуре.",
"reply_type": "factual_with_explanation",
"message_id": "a-2",
"trace_id": "trace-2",
"debug": {},
},
]
),
)
report_file.parent.mkdir(parents=True, exist_ok=True)
report_file.write_text(
"# Assistant Stage 1 Eval Run\n\n"
f"- run_id: {run_id}\n"
"- suite_id: assistant_saved_session_runtime_job-test\n",
encoding="utf-8",
)
review = reviewer.build_run_review(
run_id=run_id,
session_files=[session_file],
report_path=report_file,
)
self.assertEqual(review["summary"]["overall_business_status"], "fail")
self.assertEqual(review["summary"]["turn_pairs_total"], 2)
self.assertGreaterEqual(review["summary"]["p0_findings"], 1)
self.assertIn("business_direct_answer_missing", review["summary"]["issue_counts"])
self.assertTrue(review["repair_targets"])
target_by_issue = {item["issue_code"]: item for item in review["repair_targets"]}
self.assertEqual(target_by_issue["business_direct_answer_missing"]["severity"], "P0")
self.assertEqual(target_by_issue["business_answer_too_verbose"]["severity"], "P1")
self.assertEqual(review["question_quality_review"]["turns_total"], 2)
self.assertIn("contextual_followup", review["question_quality_review"]["tag_counts"])
def test_save_run_review_materializes_machine_and_markdown_artifacts(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
output_dir = Path(tmp) / "review"
review = {
"run_id": "assistant-stage1-test123",
"summary": {
"overall_business_status": "pass",
"turn_pairs_total": 1,
"business_issue_turns": 0,
"p0_findings": 0,
"p1_findings": 0,
"question_quality_status": "strong",
"question_quality_score": 95,
},
"question_quality_review": {"status": "strong", "score": 95},
"findings": [],
"repair_targets": [],
"conversation_pairs": [],
}
reviewer.save_run_review(review, output_dir)
self.assertTrue((output_dir / "run_review.json").exists())
self.assertTrue((output_dir / "run_review.md").exists())
markdown = (output_dir / "run_review.md").read_text(encoding="utf-8")
self.assertIn("overall_business_status", markdown)
self.assertIn("Question Quality", markdown)
def test_question_quality_treats_short_natural_followups_as_contextual(self) -> None:
pairs = [
{"pair_index": 1, "user": {"text": "приветик - че как там дела"}},
{"pair_index": 2, "user": {"text": "какие остатки на складе"}},
{"pair_index": 3, "user": {"text": "давай на июль 2017"}},
{"pair_index": 4, "user": {"text": "март 2016"}},
{"pair_index": 5, "user": {"text": "а кому продали?"}},
{"pair_index": 6, "user": {"text": "кто нам должен денег на май 2017"}},
{"pair_index": 7, "user": {"text": "а по свк"}},
]
review = reviewer.build_question_quality_review(pairs)
self.assertNotIn("root_question_requires_missing_context", review["weak_flag_counts"])
self.assertNotIn("low_business_anchor", review["weak_flag_counts"])
self.assertGreaterEqual(review["tag_counts"]["contextual_followup"], 3)
self.assertGreaterEqual(review["tag_counts"]["direct_business_question"], 2)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,241 @@
from __future__ import annotations
import json
import sys
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
sys.path.insert(0, str(Path(__file__).resolve().parent))
import save_agent_semantic_run as saver
def write_json(path: Path, payload: object) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
class SaveAgentSemanticRunTests(unittest.TestCase):
def test_extract_questions_accepts_truth_harness_question_template(self) -> None:
questions = saver.extract_questions_from_spec(
{
"steps": [
{"step_id": "step_01", "question_template": "first question"},
{"step_id": "step_02", "question": "second question"},
]
}
)
self.assertEqual(questions, ["first question", "second question"])
def test_extract_questions_accepts_domain_pack_scenarios(self) -> None:
questions = saver.extract_questions_from_spec(
{
"pack_id": "demo_pack",
"scenarios": [
{
"scenario_id": "scenario_01",
"steps": [
{"step_id": "step_01", "question_template": "first question"},
{"step_id": "step_02", "question": "second question"},
],
},
{
"scenario_id": "scenario_02",
"steps": [
{"step_id": "step_01", "question": "first question"},
{"step_id": "step_02", "question": "third question"},
],
},
],
}
)
self.assertEqual(questions, ["first question", "second question", "third question"])
def test_validate_accepted_run_dir_accepts_clean_business_review(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
run_dir = Path(tmp)
write_json(
run_dir / "pack_state.json",
{
"final_status": "accepted",
"review_overall_status": "pass",
"acceptance_gate_passed": True,
"no_unresolved_p0": True,
"unresolved_p0_count": 0,
"steps_total": 1,
"steps_passed": 1,
"steps_failed": 0,
},
)
write_json(run_dir / "truth_review.json", {"summary": {"overall_status": "pass"}})
write_json(
run_dir / "business_review.json",
{
"overall_business_status": "pass",
"steps_with_business_failures": 0,
"steps_with_business_warnings": 0,
},
)
metadata = saver.validate_accepted_run_dir(run_dir)
self.assertEqual(metadata["validation_status"], "accepted_live_replay")
self.assertTrue(metadata["saved_after_validated_replay"])
def test_validate_accepted_run_dir_rejects_business_review_failures(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
run_dir = Path(tmp)
write_json(
run_dir / "pack_state.json",
{
"final_status": "accepted",
"review_overall_status": "pass",
"acceptance_gate_passed": True,
"no_unresolved_p0": True,
"unresolved_p0_count": 0,
},
)
write_json(run_dir / "truth_review.json", {"summary": {"overall_status": "pass"}})
write_json(
run_dir / "business_review.json",
{
"overall_business_status": "fail",
"steps_with_business_failures": 1,
},
)
with self.assertRaisesRegex(RuntimeError, "business_review"):
saver.validate_accepted_run_dir(run_dir)
def test_validate_accepted_run_dir_accepts_clean_domain_pack_loop(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
loop_dir = Path(tmp)
iteration_dir = loop_dir / "iterations" / "iteration_00"
analyst_path = iteration_dir / "analyst_verdict.json"
repair_targets_path = iteration_dir / "pack_output" / "pack_run" / "repair_targets.json"
write_json(
loop_dir / "loop_state.json",
{
"loop_id": "stage_demo",
"target_score": 88,
"final_status": "accepted",
"iterations": [
{
"iteration_id": "iteration_00",
"quality_score": 91,
"accepted_gate": True,
"analyst_accepted_gate": True,
"deterministic_gate_ok": True,
"repair_target_count": 0,
"repair_target_severity_counts": {"P0": 0, "P1": 0, "P2": 0},
"analyst_verdict_path": str(analyst_path),
"repair_targets_path": str(repair_targets_path),
}
],
},
)
write_json(
analyst_path,
{
"loop_decision": "accepted",
"unresolved_p0_count": 0,
"regression_detected": False,
"direct_answer_ok": True,
"business_usefulness_ok": True,
"temporal_honesty_ok": True,
"field_truth_ok": True,
"answer_layering_ok": True,
},
)
write_json(repair_targets_path, {"severity_counts": {"P0": 0, "P1": 0, "P2": 0}})
metadata = saver.validate_accepted_run_dir(loop_dir)
self.assertEqual(metadata["validation_status"], "accepted_domain_pack_loop")
self.assertEqual(metadata["quality_score"], 91)
def test_validate_accepted_run_dir_rejects_domain_pack_loop_with_p1_targets(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
loop_dir = Path(tmp)
iteration_dir = loop_dir / "iterations" / "iteration_00"
analyst_path = iteration_dir / "analyst_verdict.json"
repair_targets_path = iteration_dir / "pack_output" / "pack_run" / "repair_targets.json"
write_json(
loop_dir / "loop_state.json",
{
"loop_id": "stage_demo",
"target_score": 88,
"final_status": "accepted",
"iterations": [
{
"quality_score": 91,
"accepted_gate": True,
"analyst_accepted_gate": True,
"deterministic_gate_ok": True,
"analyst_verdict_path": str(analyst_path),
"repair_targets_path": str(repair_targets_path),
}
],
},
)
write_json(
analyst_path,
{
"loop_decision": "accepted",
"unresolved_p0_count": 0,
"regression_detected": False,
"direct_answer_ok": True,
"business_usefulness_ok": True,
"temporal_honesty_ok": True,
"field_truth_ok": True,
"answer_layering_ok": True,
},
)
write_json(repair_targets_path, {"severity_counts": {"P0": 0, "P1": 1, "P2": 0}})
with self.assertRaisesRegex(RuntimeError, "repair_targets"):
saver.validate_accepted_run_dir(loop_dir)
def test_save_gate_refuses_real_write_without_validation(self) -> None:
args = SimpleNamespace(
validated_run_dir=None,
dry_run=False,
allow_unvalidated=False,
unvalidated_reason=None,
)
with self.assertRaisesRegex(RuntimeError, "Refusing to save AGENT autorun"):
saver.build_save_gate_metadata(args, {}, Path("demo.json"))
def test_save_gate_requires_reason_for_unvalidated_draft(self) -> None:
args = SimpleNamespace(
validated_run_dir=None,
dry_run=False,
allow_unvalidated=True,
unvalidated_reason="",
)
with self.assertRaisesRegex(RuntimeError, "--unvalidated-reason"):
saver.build_save_gate_metadata(args, {}, Path("demo.json"))
def test_save_gate_marks_explicit_unvalidated_draft(self) -> None:
args = SimpleNamespace(
validated_run_dir=None,
dry_run=False,
allow_unvalidated=True,
unvalidated_reason="manual GUI canary before live replay",
)
metadata = saver.build_save_gate_metadata(args, {}, Path("demo.json"))
self.assertEqual(metadata["validation_status"], "explicitly_unvalidated")
self.assertFalse(metadata["saved_after_validated_replay"])
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,153 @@
from __future__ import annotations
import argparse
import json
import sys
import tempfile
import unittest
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
import stage_agent_loop as stage_loop
def write_json(path: Path, payload: object) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
def args() -> argparse.Namespace:
return argparse.Namespace(
backend_url="http://127.0.0.1:8787",
prompt_version="address_query_runtime_v1",
llm_provider="local",
llm_model="qwen2.5-14b-instruct-1m",
llm_base_url="http://127.0.0.1:1234/v1",
llm_api_key="",
temperature=0.0,
max_output_tokens=2048,
timeout_seconds=180,
codex_binary="codex",
codex_profile=None,
codex_model=None,
analyst_codex_model="gpt-5.4",
coder_codex_model="gpt-5.4-mini",
analyst_reasoning_effort="medium",
coder_reasoning_effort="low",
codex_timeout_seconds=1800,
analysis_date=None,
max_scenarios=None,
use_mock=False,
)
class StageAgentLoopTests(unittest.TestCase):
def test_load_stage_manifest_defaults_gate_fields(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
manifest_path = Path(tmp) / "stage.json"
write_json(
manifest_path,
{
"stage_id": "open_world_control_gate",
"module_name": "Open-World Bounded Autonomy Breadth",
"title": "Open-world semantic control gate",
"pack_manifest": "docs/orchestration/demo_pack.json",
},
)
manifest = stage_loop.load_stage_manifest(manifest_path)
self.assertEqual(manifest["target_score"], 88)
self.assertEqual(manifest["max_iterations"], 6)
self.assertTrue(manifest["save_autorun_on_accept"])
self.assertTrue(manifest["manual_confirmation_required_after_accept"])
def test_build_domain_pack_loop_command_uses_stage_gate(self) -> None:
manifest = {
"stage_id": "open_world_control_gate",
"pack_manifest": "docs/orchestration/demo_pack.json",
"target_score": 91,
"max_iterations": 4,
}
command = stage_loop.build_domain_pack_loop_command(args(), manifest, Path("X:/repo/stage"))
self.assertIn("run-pack-loop", command)
self.assertIn("--target-score", command)
self.assertIn("91", command)
self.assertIn("--max-iterations", command)
self.assertIn("4", command)
self.assertIn("--output-root", command)
def test_build_stage_summary_requests_manual_confirmation_after_accept(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
loop_dir = Path(tmp)
write_json(
loop_dir / "loop_state.json",
{
"final_status": "accepted",
"target_score": 88,
"stop_reason": "analyst accepted + deterministic gate passed",
"iterations": [
{
"quality_score": 93,
"loop_decision": "accepted",
"accepted_gate": True,
"deterministic_gate_ok": True,
}
],
},
)
summary = stage_loop.build_stage_summary(
{
"stage_id": "open_world_control_gate",
"module_name": "Open-World Bounded Autonomy Breadth",
"title": "Open-world semantic control gate",
"target_score": 88,
"manual_confirmation_required_after_accept": True,
},
loop_dir,
)
self.assertEqual(summary["loop_final_status"], "accepted")
self.assertTrue(summary["manual_confirmation_required"])
self.assertEqual(summary["next_action"], "manual_gui_confirmation")
def test_build_stage_summary_continues_when_loop_is_partial(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
loop_dir = Path(tmp)
write_json(
loop_dir / "loop_state.json",
{
"final_status": "partial",
"target_score": 88,
"iterations": [
{
"quality_score": 76,
"loop_decision": "continue",
"accepted_gate": False,
"deterministic_gate_ok": False,
"deterministic_gate_reason": "repair_targets_remaining=P1:1",
}
],
},
)
summary = stage_loop.build_stage_summary(
{
"stage_id": "open_world_control_gate",
"module_name": "Open-World Bounded Autonomy Breadth",
"title": "Open-world semantic control gate",
"target_score": 88,
},
loop_dir,
)
self.assertFalse(summary["manual_confirmation_required"])
self.assertEqual(summary["next_action"], "continue_autonomous_or_fix_blocker")
if __name__ == "__main__":
unittest.main()